Skip to content

Commit e5d95d4

Browse files
committed
http202 Amend deserialize to be Flink version 2 compatible
Signed-off-by: davidradl <[email protected]>
1 parent 00e5f5f commit e5d95d4

File tree

3 files changed

+293
-10
lines changed

3 files changed

+293
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## [Unreleased]
44

55
- allow format options to be applied to the http response decoding.
6+
- change deserialize method so it can work with Flink 2
67

78
## [0.24.0] - 2025-11-26
89

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.configuration.ReadableConfig;
2626
import org.apache.flink.table.data.RowData;
2727
import org.apache.flink.table.functions.FunctionContext;
28+
import org.apache.flink.util.Collector;
2829
import org.apache.flink.util.ConfigurationException;
2930
import org.apache.flink.util.StringUtils;
3031

@@ -268,24 +269,52 @@ private Collection<RowData> deserialize(String responseBody) throws IOException
268269
}
269270
}
270271

271-
private List<RowData> deserializeSingleValue(byte[] rawBytes) throws IOException {
272-
return Optional.ofNullable(responseBodyDecoder.deserialize(rawBytes))
273-
.map(Collections::singletonList)
274-
.orElse(Collections.emptyList());
272+
@VisibleForTesting
273+
List<RowData> deserializeSingleValue(byte[] rawBytes) throws IOException {
274+
List<RowData> result = new ArrayList<>();
275+
responseBodyDecoder.deserialize(rawBytes, createRowDataCollector(result));
276+
return result;
277+
}
278+
279+
@VisibleForTesting
280+
Collector<RowData> createRowDataCollector(List<RowData> result) {
281+
return new RowDataCollector(result);
282+
}
283+
284+
/**
285+
* A simple collector implementation that adds RowData records to a list.
286+
*/
287+
@VisibleForTesting
288+
static class RowDataCollector implements Collector<RowData> {
289+
private final List<RowData> result;
290+
291+
RowDataCollector(List<RowData> result) {
292+
this.result = result;
293+
}
294+
295+
@Override
296+
public void collect(RowData record) {
297+
result.add(record);
298+
}
299+
300+
@Override
301+
public void close() {
302+
// No-op - nothing to clean up
303+
}
275304
}
276305

277-
private List<RowData> deserializeArray(byte[] rawBytes) throws IOException {
306+
@VisibleForTesting
307+
List<RowData> deserializeArray(byte[] rawBytes) throws IOException {
278308
List<JsonNode> rawObjects =
279309
objectMapper.readValue(rawBytes, new TypeReference<>() {
280310
});
281311
List<RowData> result = new ArrayList<>();
282312
for (JsonNode rawObject : rawObjects) {
283313
if (!(rawObject instanceof NullNode)) {
284-
RowData deserialized =
285-
responseBodyDecoder.deserialize(rawObject.toString().getBytes());
286-
// deserialize() returns null if deserialization fails
287-
if (deserialized != null) {
288-
result.add(deserialized);
314+
List<RowData> deserialized = deserializeSingleValue(rawObject.toString().getBytes());
315+
// deserialize() may return empty list if deserialization fails
316+
if (deserialized != null && !deserialized.isEmpty()) {
317+
result.addAll(deserialized);
289318
}
290319
}
291320
}

src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

3+
import java.io.IOException;
34
import java.net.URI;
45
import java.net.http.HttpClient;
56
import java.net.http.HttpRequest;
@@ -15,6 +16,7 @@
1516
import org.apache.flink.table.data.StringData;
1617
import org.apache.flink.table.factories.DynamicTableFactory;
1718
import org.apache.flink.table.types.DataType;
19+
import org.apache.flink.util.Collector;
1820
import org.apache.flink.util.ConfigurationException;
1921
import org.junit.jupiter.api.BeforeEach;
2022
import org.junit.jupiter.api.Test;
@@ -224,4 +226,255 @@ public void shouldBuildClientWithHeaders() throws ConfigurationException {
224226
);
225227
assertPropertyArray(headersAndValues, "Access-Control-Allow-Origin", "*");
226228
}
229+
230+
@Test
231+
public void shouldCollectRowDataInCollector() throws ConfigurationException {
232+
// GIVEN
233+
List<RowData> result = new ArrayList<>();
234+
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
235+
httpClient,
236+
decoder,
237+
options,
238+
new GetRequestFactory(
239+
new GenericGetQueryCreator(lookupRow),
240+
headerPreprocessor,
241+
options
242+
)
243+
);
244+
245+
Collector<RowData> collector = client.createRowDataCollector(result);
246+
247+
RowData row1 = GenericRowData.of(StringData.fromString("test1"));
248+
RowData row2 = GenericRowData.of(StringData.fromString("test2"));
249+
250+
// WHEN
251+
collector.collect(row1);
252+
collector.collect(row2);
253+
254+
// THEN
255+
assertThat(result).hasSize(2);
256+
assertThat(result.get(0)).isEqualTo(row1);
257+
assertThat(result.get(1)).isEqualTo(row2);
258+
}
259+
260+
@Test
261+
public void shouldCallCloseOnRowDataCollectorWithoutException() throws ConfigurationException {
262+
// GIVEN
263+
List<RowData> result = new ArrayList<>();
264+
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
265+
httpClient,
266+
decoder,
267+
options,
268+
new GetRequestFactory(
269+
new GenericGetQueryCreator(lookupRow),
270+
headerPreprocessor,
271+
options
272+
)
273+
);
274+
275+
Collector<RowData> collector = client.createRowDataCollector(result);
276+
collector.collect(GenericRowData.of(StringData.fromString("test")));
277+
278+
// WHEN - close should not throw any exception
279+
collector.close();
280+
281+
// THEN
282+
assertThat(result).hasSize(1);
283+
}
284+
285+
@Test
286+
public void shouldHandleEmptyCollectionInRowDataCollector() throws ConfigurationException {
287+
// GIVEN
288+
List<RowData> result = new ArrayList<>();
289+
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
290+
httpClient,
291+
decoder,
292+
options,
293+
new GetRequestFactory(
294+
new GenericGetQueryCreator(lookupRow),
295+
headerPreprocessor,
296+
options
297+
)
298+
);
299+
300+
Collector<RowData> collector = client.createRowDataCollector(result);
301+
302+
// WHEN - close without collecting anything
303+
collector.close();
304+
305+
// THEN
306+
assertThat(result).isEmpty();
307+
}
308+
309+
@Test
310+
public void shouldDeserializeArrayWithValidObjects() throws Exception {
311+
// GIVEN
312+
DeserializationSchema<RowData> mockDecoder = new DeserializationSchema<RowData>() {
313+
@Override
314+
public RowData deserialize(byte[] message) throws IOException {
315+
return null;
316+
}
317+
318+
@Override
319+
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
320+
String msg = new String(message);
321+
if (msg.contains("value1")) {
322+
out.collect(GenericRowData.of(StringData.fromString("row1")));
323+
} else if (msg.contains("value2")) {
324+
out.collect(GenericRowData.of(StringData.fromString("row2")));
325+
}
326+
out.close();
327+
}
328+
329+
@Override
330+
public boolean isEndOfStream(RowData nextElement) {
331+
return false;
332+
}
333+
334+
@Override
335+
public org.apache.flink.api.common.typeinfo.TypeInformation<RowData> getProducedType() {
336+
return null;
337+
}
338+
};
339+
340+
Properties properties = new Properties();
341+
properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, "array");
342+
343+
HttpLookupConfig lookupConfig = HttpLookupConfig.builder()
344+
.url(BASE_URL)
345+
.properties(properties)
346+
.build();
347+
348+
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
349+
httpClient,
350+
mockDecoder,
351+
lookupConfig,
352+
new GetRequestFactory(
353+
new GenericGetQueryCreator(lookupRow),
354+
headerPreprocessor,
355+
lookupConfig
356+
)
357+
);
358+
359+
// WHEN
360+
String jsonArray = "[{\"key\":\"value1\"},{\"key\":\"value2\"}]";
361+
List<RowData> result = client.deserializeArray(jsonArray.getBytes());
362+
363+
// THEN
364+
assertThat(result).isNotNull();
365+
assertThat(result).hasSize(2);
366+
}
367+
368+
@Test
369+
public void shouldHandleNullNodesInArray() throws Exception {
370+
// GIVEN
371+
DeserializationSchema<RowData> mockDecoder = new DeserializationSchema<RowData>() {
372+
@Override
373+
public RowData deserialize(byte[] message) throws IOException {
374+
return null;
375+
}
376+
377+
@Override
378+
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
379+
out.collect(GenericRowData.of(StringData.fromString("valid")));
380+
out.close();
381+
}
382+
383+
@Override
384+
public boolean isEndOfStream(RowData nextElement) {
385+
return false;
386+
}
387+
388+
@Override
389+
public org.apache.flink.api.common.typeinfo.TypeInformation<RowData> getProducedType() {
390+
return null;
391+
}
392+
};
393+
394+
Properties properties = new Properties();
395+
properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, "array");
396+
397+
HttpLookupConfig lookupConfig = HttpLookupConfig.builder()
398+
.url(BASE_URL)
399+
.properties(properties)
400+
.build();
401+
402+
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
403+
httpClient,
404+
mockDecoder,
405+
lookupConfig,
406+
new GetRequestFactory(
407+
new GenericGetQueryCreator(lookupRow),
408+
headerPreprocessor,
409+
lookupConfig
410+
)
411+
);
412+
413+
// WHEN
414+
String jsonArray = "[{\"key\":\"value1\"},null,{\"key\":\"value2\"}]";
415+
List<RowData> result = client.deserializeArray(jsonArray.getBytes());
416+
417+
// THEN - null nodes should be skipped
418+
assertThat(result).isNotNull();
419+
assertThat(result).hasSize(2);
420+
}
421+
422+
@Test
423+
public void shouldHandleEmptyDeserializationInArray() throws Exception {
424+
// GIVEN
425+
DeserializationSchema<RowData> mockDecoder = new DeserializationSchema<RowData>() {
426+
@Override
427+
public RowData deserialize(byte[] message) throws IOException {
428+
return null;
429+
}
430+
431+
@Override
432+
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
433+
String msg = new String(message);
434+
// Only collect for specific messages, return empty for others
435+
if (msg.contains("\"status\":\"valid\"")) {
436+
out.collect(GenericRowData.of(StringData.fromString("data")));
437+
}
438+
// Don't collect anything for other messages
439+
out.close();
440+
}
441+
442+
@Override
443+
public boolean isEndOfStream(RowData nextElement) {
444+
return false;
445+
}
446+
447+
@Override
448+
public org.apache.flink.api.common.typeinfo.TypeInformation<RowData> getProducedType() {
449+
return null;
450+
}
451+
};
452+
453+
Properties properties = new Properties();
454+
properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, "array");
455+
456+
HttpLookupConfig lookupConfig = HttpLookupConfig.builder()
457+
.url(BASE_URL)
458+
.properties(properties)
459+
.build();
460+
461+
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
462+
httpClient,
463+
mockDecoder,
464+
lookupConfig,
465+
new GetRequestFactory(
466+
new GenericGetQueryCreator(lookupRow),
467+
headerPreprocessor,
468+
lookupConfig
469+
)
470+
);
471+
472+
// WHEN
473+
String jsonArray = "[{\"status\":\"invalid\"},{\"status\":\"valid\"},{\"status\":\"invalid\"}]";
474+
List<RowData> result = client.deserializeArray(jsonArray.getBytes());
475+
476+
// THEN - only valid deserialization should be included
477+
assertThat(result).isNotNull();
478+
assertThat(result).hasSize(1);
479+
}
227480
}

0 commit comments

Comments
 (0)