1919import com .fasterxml .jackson .databind .ObjectMapper ;
2020import com .jsoniter .any .Any ;
2121import org .apache .arrow .memory .RootAllocator ;
22+ import org .apache .arrow .vector .FieldVector ;
2223import org .apache .arrow .vector .VectorSchemaRoot ;
2324import org .apache .arrow .vector .ipc .ArrowStreamReader ;
2425import org .apache .arrow .vector .types .pojo .Field ;
@@ -306,6 +307,7 @@ private String parseMultipartResponse(HttpResponse<byte[]> response) throws IOEx
306307 result .add (out .toString (StandardCharsets .UTF_8 ));
307308 } else if (partContentType .toLowerCase ().equals ("application/vnd.apache.arrow.stream" )) {
308309 result .add (parseArrowResponse (out ));
310+ out .close ();
309311 } else {
310312 throw new HttpError (statusCode , String .format ("unknown part content type: %s" , partContentType ));
311313 }
@@ -317,27 +319,43 @@ private String parseMultipartResponse(HttpResponse<byte[]> response) throws IOEx
317319 }
318320
319321 private String parseArrowResponse (ByteArrayOutputStream out ) throws IOException {
320- Map < String , String > result = new HashMap <>();
322+ List < Object > output = new ArrayList <>();
321323
322- RootAllocator allocator = new RootAllocator (Integer .MAX_VALUE );
323324 ByteArrayInputStream in = new ByteArrayInputStream (out .toByteArray ());
324- ArrowStreamReader reader = new ArrowStreamReader (in , allocator );
325-
326- VectorSchemaRoot readBatch = reader .getVectorSchemaRoot ();
327-
328- for (Field f : readBatch .getSchema ().getFields ()) {
329- result .put (f .getName (), String .valueOf (readBatch .getVector (f )));
330- }
331-
332- while (reader .loadNextBatch ()) {
333- readBatch = reader .getVectorSchemaRoot ();
334-
335- for (Field f : readBatch .getSchema ().getFields ()) {
336- result .put (f .getName (), String .valueOf (readBatch .getVector (f )));
325+ List <FieldVector > fieldVectors = null ;
326+ RootAllocator allocator = new RootAllocator (Long .MAX_VALUE );
327+ try (ArrowStreamReader arrowStreamReader = new ArrowStreamReader (in , allocator )){
328+
329+ VectorSchemaRoot root = arrowStreamReader .getVectorSchemaRoot ();
330+ fieldVectors = root .getFieldVectors ();
331+
332+ while (arrowStreamReader .loadNextBatch ()) {
333+ for (FieldVector fieldVector : fieldVectors ) {
334+ Map <String , List <Object >> col = new HashMap <>();
335+
336+ col .put (fieldVector .getField ().getName (), new ArrayList <>());
337+ for (int i = 0 ; i < fieldVector .getValueCount (); i ++) {
338+ col .get (fieldVector .getField ().getName ()).add (fieldVector .getObject (i ));
339+ }
340+ output .add (col );
341+ }
342+ }
343+ } finally {
344+ if (in != null ) {
345+ in .close ();
337346 }
347+ if (fieldVectors != null ) {
348+ for (FieldVector fieldVector : fieldVectors ) {
349+ if (fieldVector != null ) {
350+ fieldVector .close ();
351+ }
352+ }
353+ }
354+ allocator .close ();
338355 }
356+
339357 // serialize map to json string
340- return new ObjectMapper ().writeValueAsString (result );
358+ return new ObjectMapper ().writeValueAsString (output );
341359 }
342360
343361 static void printRequest (HttpRequest request ) {
0 commit comments