Skip to content

Commit cb66f1f

Browse files
authored
Merge branch 'main' into http179
2 parents ddb0d3d + 9604080 commit cb66f1f

File tree

7 files changed

+22
-48
lines changed

7 files changed

+22
-48
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
## [Unreleased]
44

5+
56
- Ability to specify http versions for http lookups.
7+
- Amend to not log HTTP request response and header values by default.
8+
- Added http 2 support.
69

710
## [0.22.0] - 2025-10-03
811

README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -427,10 +427,9 @@ that implements interface `HttpPostRequestCallbackFactory<HttpRequest>` to creat
427427
of class `CustomHttpSinkPostRequestCallbackFactory` in `resources/META-INF/services/org.apache.flink.table.factories.Factory` file
428428
and then reference identifier `rest-sink-logger` in the HttpSink DDL property field `gid.connector.http.sink.request-callback`.
429429

430-
A default implementation that logs those pairs as *INFO* level logs using Slf4j
431-
([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java))
432-
is provided.
433-
430+
A default implementation that logs those pairs as *INFO* level logs using Slf4j ([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java)) is provided.
431+
If you would like to log more http content (that maybe contain sensitive information), then you can provide a customized version
432+
of this callback; for inspiration on how to customize in this way, look back in the git history of this file.
434433

435434
- Http Lookup Source processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
436435
behaviour of the additional stage of processing done by Table Function API by implementing

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,9 @@ private HttpRowDataWrapper processHttpResponse(
210210
boolean isError) throws IOException {
211211

212212
this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());
213-
214213
var responseBody = response.body();
215214

216-
log.debug("Received status code [{}] for RestTableSource request with Server response body [{}] ",
217-
response.statusCode(), responseBody);
215+
log.debug("Received status code [{}] for RestTableSource request", response.statusCode());
218216
if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response))) {
219217
return HttpRowDataWrapper.builder()
220218
.data(Collections.emptyList())

src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.time.Duration;
77
import java.util.Arrays;
88
import java.util.Map;
9-
import java.util.stream.Collectors;
109

1110
import lombok.extern.slf4j.Slf4j;
1211
import org.apache.flink.annotation.VisibleForTesting;
@@ -63,11 +62,6 @@ public RequestFactoryBase(
6362
);
6463

6564
this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(headerMap);
66-
67-
log.debug("RequestFactoryBase headersAndValues: " +
68-
Arrays.stream(headersAndValues)
69-
.map(Object::toString)
70-
.collect(Collectors.joining(",")));
7165
this.httpRequestTimeOutSeconds = Integer.parseInt(
7266
options.getProperties().getProperty(
7367
HttpConnectorConfigConstants.LOOKUP_HTTP_TIMEOUT_SECONDS,

src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,11 @@
22

33
import java.net.http.HttpRequest;
44
import java.net.http.HttpResponse;
5-
import java.util.List;
65
import java.util.Map;
7-
import java.util.Map.Entry;
8-
import java.util.StringJoiner;
96

107
import lombok.extern.slf4j.Slf4j;
118

129
import com.getindata.connectors.http.HttpPostRequestCallback;
13-
import com.getindata.connectors.http.internal.utils.ConfigUtils;
1410

1511
/**
1612
* A {@link HttpPostRequestCallback} that logs pairs of request and response as <i>INFO</i> level
@@ -31,40 +27,26 @@ public void call(
3127
Map<String, String> headerMap) {
3228

3329
HttpRequest httpRequest = requestEntry.getHttpRequest();
34-
StringJoiner headers = new StringJoiner(";");
35-
36-
for (Entry<String, List<String>> reqHeaders : httpRequest.headers().map().entrySet()) {
37-
StringJoiner values = new StringJoiner(";");
38-
for (String value : reqHeaders.getValue()) {
39-
values.add(value);
40-
}
41-
String header = reqHeaders.getKey() + ": [" + values + "]";
42-
headers.add(header);
43-
}
4430

4531
if (response == null) {
4632
log.warn("Null Http response for request " + httpRequest.uri().toString());
4733

4834
log.info(
4935
"Got response for a request.\n Request:\n URL: {}\n " +
50-
"Method: {}\n Headers: {}\n Params/Body: {}\nResponse: null",
36+
"Method: {}\n Params/Body: {}\nResponse: null",
5137
httpRequest.uri().toString(),
5238
httpRequest.method(),
53-
headers,
5439
requestEntry.getLookupQueryInfo()
5540
);
5641
} else {
5742
log.info(
5843
"Got response for a request.\n Request:\n URL: {}\n " +
59-
"Method: {}\n Headers: {}\n Params/Body: {}\nResponse: {}\n Body: {}",
44+
"Method: {}\n Params/Body: {}\nResponse status code: {}\n",
6045
httpRequest.uri().toString(),
6146
httpRequest.method(),
62-
headers,
6347
requestEntry.getLookupQueryInfo(),
64-
response,
65-
response.body().replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, "")
48+
response.statusCode()
6649
);
6750
}
68-
6951
}
7052
}

src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
package com.getindata.connectors.http.internal.table.sink;
22

33
import java.net.http.HttpResponse;
4-
import java.nio.charset.StandardCharsets;
54
import java.util.Map;
6-
import java.util.stream.Collectors;
75

86
import lombok.extern.slf4j.Slf4j;
97

108
import com.getindata.connectors.http.HttpPostRequestCallback;
119
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
12-
import com.getindata.connectors.http.internal.utils.ConfigUtils;
1310

1411
/**
1512
* A {@link HttpPostRequestCallback} that logs pairs of request and response as <i>INFO</i> level
16-
* logs using <i>Slf4j</i>.
13+
* logs using <i>Slf4j</i>. As the request/response body or header might contain sensitive information,
14+
* we do not log those values.
1715
*
1816
* <p>Serving as a default implementation of {@link HttpPostRequestCallback} for
1917
* the {@link HttpDynamicSink}.
@@ -28,25 +26,23 @@ public void call(
2826
String endpointUrl,
2927
Map<String, String> headerMap) {
3028

31-
String requestBody = requestEntry.getElements().stream()
32-
.map(element -> new String(element, StandardCharsets.UTF_8))
33-
.collect(Collectors.joining());
29+
// Uncomment if you want to see the requestBody in the log
30+
//String requestBody = requestEntry.getElements().stream()
31+
// .map(element -> new String(element, StandardCharsets.UTF_8))
32+
// .collect(Collectors.joining());
3433

3534
if (response == null) {
3635
log.info(
3736
"Got response for a request.\n Request:\n " +
38-
"Method: {}\n Body: {}\n Response: null",
39-
requestEntry.getMethod(),
40-
requestBody
37+
"Method: {}\n Response: null",
38+
requestEntry.getMethod()
4139
);
4240
} else {
4341
log.info(
4442
"Got response for a request.\n Request:\n " +
45-
"Method: {}\n Body: {}\n Response: {}\n Body: {}",
43+
"Method: {}\n Response status code: {}\n ",
4644
requestEntry.method,
47-
requestBody,
48-
response,
49-
response.body().replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, "")
45+
response.statusCode()
5046
);
5147
}
5248
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
org.slf4j.simpleLogger.defaultLogLevel=INFO
2+
org.slf4j.simpleLogger.log.com.getindata.connectors.http.internal.table.lookup.RequestAndResponseLogger=DEBUG

0 commit comments

Comments
 (0)