Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## [Unreleased]

- allow format options to be applied to the http response decoding.
- change deserialize method so it can work with Flink 2

## [0.24.0] - 2025-11-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.StringUtils;

Expand Down Expand Up @@ -268,24 +269,52 @@ private Collection<RowData> deserialize(String responseBody) throws IOException
}
}

private List<RowData> deserializeSingleValue(byte[] rawBytes) throws IOException {
return Optional.ofNullable(responseBodyDecoder.deserialize(rawBytes))
.map(Collections::singletonList)
.orElse(Collections.emptyList());
@VisibleForTesting
List<RowData> deserializeSingleValue(byte[] rawBytes) throws IOException {
List<RowData> result = new ArrayList<>();
responseBodyDecoder.deserialize(rawBytes, createRowDataCollector(result));
return result;
}

@VisibleForTesting
Collector<RowData> createRowDataCollector(List<RowData> result) {
return new RowDataCollector(result);
}

/**
* A simple collector implementation that adds RowData records to a list.
*/
@VisibleForTesting
static class RowDataCollector implements Collector<RowData> {
private final List<RowData> result;

RowDataCollector(List<RowData> result) {
this.result = result;
}

@Override
public void collect(RowData record) {
result.add(record);
}

@Override
public void close() {
// No-op - nothing to clean up
}
}

private List<RowData> deserializeArray(byte[] rawBytes) throws IOException {
@VisibleForTesting
List<RowData> deserializeArray(byte[] rawBytes) throws IOException {
List<JsonNode> rawObjects =
objectMapper.readValue(rawBytes, new TypeReference<>() {
});
List<RowData> result = new ArrayList<>();
for (JsonNode rawObject : rawObjects) {
if (!(rawObject instanceof NullNode)) {
RowData deserialized =
responseBodyDecoder.deserialize(rawObject.toString().getBytes());
// deserialize() returns null if deserialization fails
if (deserialized != null) {
result.add(deserialized);
List<RowData> deserialized = deserializeSingleValue(rawObject.toString().getBytes());
// deserialize() may return empty list if deserialization fails
if (deserialized != null && !deserialized.isEmpty()) {
result.addAll(deserialized);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
Expand All @@ -15,6 +16,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ConfigurationException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -224,4 +226,255 @@ public void shouldBuildClientWithHeaders() throws ConfigurationException {
);
assertPropertyArray(headersAndValues, "Access-Control-Allow-Origin", "*");
}

@Test
public void shouldCollectRowDataInCollector() throws ConfigurationException {
// GIVEN
List<RowData> result = new ArrayList<>();
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
httpClient,
decoder,
options,
new GetRequestFactory(
new GenericGetQueryCreator(lookupRow),
headerPreprocessor,
options
)
);

Collector<RowData> collector = client.createRowDataCollector(result);

RowData row1 = GenericRowData.of(StringData.fromString("test1"));
RowData row2 = GenericRowData.of(StringData.fromString("test2"));

// WHEN
collector.collect(row1);
collector.collect(row2);

// THEN
assertThat(result).hasSize(2);
assertThat(result.get(0)).isEqualTo(row1);
assertThat(result.get(1)).isEqualTo(row2);
}

@Test
public void shouldCallCloseOnRowDataCollectorWithoutException() throws ConfigurationException {
// GIVEN
List<RowData> result = new ArrayList<>();
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
httpClient,
decoder,
options,
new GetRequestFactory(
new GenericGetQueryCreator(lookupRow),
headerPreprocessor,
options
)
);

Collector<RowData> collector = client.createRowDataCollector(result);
collector.collect(GenericRowData.of(StringData.fromString("test")));

// WHEN - close should not throw any exception
collector.close();

// THEN
assertThat(result).hasSize(1);
}

@Test
public void shouldHandleEmptyCollectionInRowDataCollector() throws ConfigurationException {
// GIVEN
List<RowData> result = new ArrayList<>();
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
httpClient,
decoder,
options,
new GetRequestFactory(
new GenericGetQueryCreator(lookupRow),
headerPreprocessor,
options
)
);

Collector<RowData> collector = client.createRowDataCollector(result);

// WHEN - close without collecting anything
collector.close();

// THEN
assertThat(result).isEmpty();
}

@Test
public void shouldDeserializeArrayWithValidObjects() throws Exception {
// GIVEN
DeserializationSchema<RowData> mockDecoder = new DeserializationSchema<RowData>() {
@Override
public RowData deserialize(byte[] message) throws IOException {
return null;
}

@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
String msg = new String(message);
if (msg.contains("value1")) {
out.collect(GenericRowData.of(StringData.fromString("row1")));
} else if (msg.contains("value2")) {
out.collect(GenericRowData.of(StringData.fromString("row2")));
}
out.close();
}

@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
}

@Override
public org.apache.flink.api.common.typeinfo.TypeInformation<RowData> getProducedType() {
return null;
}
};

Properties properties = new Properties();
properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, "array");

HttpLookupConfig lookupConfig = HttpLookupConfig.builder()
.url(BASE_URL)
.properties(properties)
.build();

JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
httpClient,
mockDecoder,
lookupConfig,
new GetRequestFactory(
new GenericGetQueryCreator(lookupRow),
headerPreprocessor,
lookupConfig
)
);

// WHEN
String jsonArray = "[{\"key\":\"value1\"},{\"key\":\"value2\"}]";
List<RowData> result = client.deserializeArray(jsonArray.getBytes());

// THEN
assertThat(result).isNotNull();
assertThat(result).hasSize(2);
}

@Test
public void shouldHandleNullNodesInArray() throws Exception {
// GIVEN
DeserializationSchema<RowData> mockDecoder = new DeserializationSchema<RowData>() {
@Override
public RowData deserialize(byte[] message) throws IOException {
return null;
}

@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
out.collect(GenericRowData.of(StringData.fromString("valid")));
out.close();
}

@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
}

@Override
public org.apache.flink.api.common.typeinfo.TypeInformation<RowData> getProducedType() {
return null;
}
};

Properties properties = new Properties();
properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, "array");

HttpLookupConfig lookupConfig = HttpLookupConfig.builder()
.url(BASE_URL)
.properties(properties)
.build();

JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
httpClient,
mockDecoder,
lookupConfig,
new GetRequestFactory(
new GenericGetQueryCreator(lookupRow),
headerPreprocessor,
lookupConfig
)
);

// WHEN
String jsonArray = "[{\"key\":\"value1\"},null,{\"key\":\"value2\"}]";
List<RowData> result = client.deserializeArray(jsonArray.getBytes());

// THEN - null nodes should be skipped
assertThat(result).isNotNull();
assertThat(result).hasSize(2);
}

@Test
public void shouldHandleEmptyDeserializationInArray() throws Exception {
// GIVEN
DeserializationSchema<RowData> mockDecoder = new DeserializationSchema<RowData>() {
@Override
public RowData deserialize(byte[] message) throws IOException {
return null;
}

@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
String msg = new String(message);
// Only collect for specific messages, return empty for others
if (msg.contains("\"status\":\"valid\"")) {
out.collect(GenericRowData.of(StringData.fromString("data")));
}
// Don't collect anything for other messages
out.close();
}

@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
}

@Override
public org.apache.flink.api.common.typeinfo.TypeInformation<RowData> getProducedType() {
return null;
}
};

Properties properties = new Properties();
properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, "array");

HttpLookupConfig lookupConfig = HttpLookupConfig.builder()
.url(BASE_URL)
.properties(properties)
.build();

JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
httpClient,
mockDecoder,
lookupConfig,
new GetRequestFactory(
new GenericGetQueryCreator(lookupRow),
headerPreprocessor,
lookupConfig
)
);

// WHEN
String jsonArray = "[{\"status\":\"invalid\"},{\"status\":\"valid\"},{\"status\":\"invalid\"}]";
List<RowData> result = client.deserializeArray(jsonArray.getBytes());

// THEN - only valid deserialization should be included
assertThat(result).isNotNull();
assertThat(result).hasSize(1);
}
}
Loading