2525import org .apache .flink .configuration .ReadableConfig ;
2626import org .apache .flink .table .data .RowData ;
2727import org .apache .flink .table .functions .FunctionContext ;
28+ import org .apache .flink .util .Collector ;
2829import org .apache .flink .util .ConfigurationException ;
2930import org .apache .flink .util .StringUtils ;
3031
@@ -269,9 +270,19 @@ private Collection<RowData> deserialize(String responseBody) throws IOException
269270 }
270271
271272 private List <RowData > deserializeSingleValue (byte [] rawBytes ) throws IOException {
272- return Optional .ofNullable (responseBodyDecoder .deserialize (rawBytes ))
273- .map (Collections ::singletonList )
274- .orElse (Collections .emptyList ());
273+ List <RowData > result = new ArrayList <>();
274+ responseBodyDecoder .deserialize (rawBytes , new Collector <>() {
275+ @ Override
276+ public void collect (RowData record ) {
277+ result .add (record );
278+ }
279+
280+ @ Override
281+ public void close () {
282+ // No-op
283+ }
284+ });
285+ return result ;
275286 }
276287
277288 private List <RowData > deserializeArray (byte [] rawBytes ) throws IOException {
@@ -281,11 +292,10 @@ private List<RowData> deserializeArray(byte[] rawBytes) throws IOException {
281292 List <RowData > result = new ArrayList <>();
282293 for (JsonNode rawObject : rawObjects ) {
283294 if (!(rawObject instanceof NullNode )) {
284- RowData deserialized =
285- responseBodyDecoder .deserialize (rawObject .toString ().getBytes ());
286- // deserialize() returns null if deserialization fails
287- if (deserialized != null ) {
288- result .add (deserialized );
295+ List <RowData > deserialized = deserializeSingleValue (rawObject .toString ().getBytes ());
296+ // deserialize() may return empty list if deserialization fails
297+ if (deserialized != null && !deserialized .isEmpty ()) {
298+ result .addAll (deserialized );
289299 }
290300 }
291301 }
0 commit comments