Skip to content

Commit c6b8471

Browse files
committed
fix-issue-434 To add proxy related settings to splunk connector
Signed-off-by: Wang, Shu <[email protected]>
1 parent 11e3498 commit c6b8471

File tree

12 files changed

+132
-12
lines changed

12 files changed

+132
-12
lines changed

README.md

+4
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ Use the below schema to configure Splunk Connect for Kafka
119119
"splunk.hec.ack.poll.threads": "<number of threads used to poll event acks>",
120120
"splunk.hec.ssl.validate.certs": "<true|false>",
121121
"splunk.hec.http.keepalive": "<true|false>",
122+
"splunk.hec.http.proxy.host": "<the http proxy host name>",
123+
"splunk.hec.http.proxy.port": "<the http proxy port number>",
122124
"splunk.hec.max.http.connection.per.channel": "<max number of http connections per channel>",
123125
"splunk.hec.total.channels": "<total number of channels>",
124126
"splunk.hec.max.batch.size": "<max number of kafka records post in one batch>",
@@ -167,6 +169,8 @@ Use the below schema to configure Splunk Connect for Kafka
167169
| `splunk.validation.disable` | Disable validating splunk configurations before creating task. | `false` |
168170
| `splunk.hec.ssl.validate.certs` | Valid settings are `true` or `false`. Enables or disables HTTPS certification validation. |`true`|
169171
| `splunk.hec.http.keepalive` | Valid settings are `true` or `false`. Enables or disables HTTP connection keep-alive. |`true`|
172+
| `splunk.hec.http.proxy.host` | This setting is the http proxy server hostname. Configure it to use connector level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS. | `""` |
173+
| `splunk.hec.http.proxy.port` | This setting is the http proxy server port. Configure it to use connector level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS. | `0` |
170174
| `splunk.hec.max.http.connection.per.channel` | Controls how many HTTP connections will be created and cached in the HTTP pool for one HEC channel. |`2`|
171175
| `splunk.hec.total.channels` | Controls the total channels created to perform HEC event POSTs. See the Load balancer section for more details. |`2`|
172176
| `splunk.hec.max.batch.size` | Maximum batch size when posting events to Splunk. The size is the actual number of Kafka events, and not byte size. |`500`|

src/main/java/com/splunk/hecclient/Hec.java

+4
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
283283
return new HttpClientBuilder().setDisableSSLCertVerification(config.getDisableSSLCertVerification())
284284
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
285285
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
286+
.setHttpProxyHost(config.getHttpProxyHost())
287+
.setHttpProxyPort(config.getHttpProxyPort())
286288
.build();
287289
}
288290

@@ -295,6 +297,8 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
295297
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
296298
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
297299
.setSslContext(context)
300+
.setHttpProxyHost(config.getHttpProxyHost())
301+
.setHttpProxyPort(config.getHttpProxyPort())
298302
.build();
299303
}
300304
else {

src/main/java/com/splunk/hecclient/HecConfig.java

+20
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public final class HecConfig {
4141
private String kerberosKeytabPath;
4242
private int concurrentHecQueueCapacity = 100;
4343
private Boolean autoExtractTimestamp;
44+
private String httpProxyHost;
45+
private int httpProxyPort = 0;
4446

4547
public HecConfig(List<String> uris, String token) {
4648
this.uris = uris;
@@ -63,6 +65,14 @@ public boolean getHttpKeepAlive() {
6365
return httpKeepAlive;
6466
}
6567

68+
public String getHttpProxyHost() {
69+
return httpProxyHost;
70+
}
71+
72+
public int getHttpProxyPort() {
73+
return httpProxyPort;
74+
}
75+
6676
public int getSocketTimeout() {
6777
return socketTimeout;
6878
}
@@ -127,6 +137,16 @@ public HecConfig setHttpKeepAlive(boolean keepAlive) {
127137
return this;
128138
}
129139

140+
public HecConfig setHttpProxyHost(final String httpProxyHost) {
141+
this.httpProxyHost = httpProxyHost;
142+
return this;
143+
}
144+
145+
public HecConfig setHttpProxyPort(final int httpProxyPort) {
146+
this.httpProxyPort = httpProxyPort;
147+
return this;
148+
}
149+
130150
public HecConfig setSocketTimeout(int timeout /*seconds*/) {
131151
socketTimeout = timeout;
132152
return this;

src/main/java/com/splunk/hecclient/HttpClientBuilder.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
import java.security.KeyStoreException;
2020
import java.security.NoSuchAlgorithmException;
2121
import java.security.Principal;
22+
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.http.HttpHost;
2225
import org.apache.http.auth.AuthSchemeProvider;
2326
import org.apache.http.auth.AuthScope;
2427
import org.apache.http.auth.Credentials;
@@ -50,6 +53,8 @@ public final class HttpClientBuilder {
5053
private int socketTimeout = 60; // in seconds
5154
private int socketSendBufferSize = 8 * 1024 * 1024; // in bytes
5255
private boolean disableSSLCertVerification = false;
56+
private String httpProxyHost;
57+
private int httpProxyPort;
5358
private SSLContext sslContext = null;
5459

5560
public HttpClientBuilder setMaxConnectionPoolSizePerDestination(int connections) {
@@ -77,6 +82,16 @@ public HttpClientBuilder setDisableSSLCertVerification(boolean disableVerificati
7782
return this;
7883
}
7984

85+
public HttpClientBuilder setHttpProxyHost(final String httpProxyHost) {
86+
this.httpProxyHost = httpProxyHost;
87+
return this;
88+
}
89+
90+
public HttpClientBuilder setHttpProxyPort(final int httpProxyPort) {
91+
this.httpProxyPort = httpProxyPort;
92+
return this;
93+
}
94+
8095
public HttpClientBuilder setSslContext(SSLContext context) {
8196
this.sslContext = context;
8297
return this;
@@ -88,9 +103,12 @@ public CloseableHttpClient build() {
88103
.setSndBufSize(socketSendBufferSize)
89104
.setSoTimeout(socketTimeout * 1000)
90105
.build();
91-
RequestConfig requestConfig = RequestConfig.custom()
92-
.setCookieSpec(CookieSpecs.STANDARD)
93-
.build();
106+
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
107+
.setCookieSpec(CookieSpecs.STANDARD);
108+
if (StringUtils.isNotEmpty(this.httpProxyHost) && this.httpProxyPort != 0) {
109+
requestConfigBuilder.setProxy(new HttpHost(this.httpProxyHost, this.httpProxyPort));
110+
}
111+
RequestConfig requestConfig = requestConfigBuilder.build();
94112

95113
return HttpClients.custom()
96114
.useSystemProperties()

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
5151
static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel";
5252
static final String MAX_BATCH_SIZE_CONF = "splunk.hec.max.batch.size"; // record count
5353
static final String HTTP_KEEPALIVE_CONF = "splunk.hec.http.keepalive";
54+
static final String HTTP_PROXY_HOST_CONF = "splunk.hec.http.proxy.host";
55+
static final String HTTP_PROXY_PORT_CONF = "splunk.hec.http.proxy.port";
5456
static final String HEC_THREDS_CONF = "splunk.hec.threads";
5557
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
5658
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
@@ -128,6 +130,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
128130
+ "Kafka events not the byte size. By default, this is set to 100.";
129131
static final String HTTP_KEEPALIVE_DOC = "Valid settings are true or false. Enables or disables HTTP connection "
130132
+ "keep-alive. By default, this is set to true";
133+
static final String HTTP_PROXY_HOST_DOC = "This setting is the http proxy server hostname. Configure it to use connector "
134+
+ "level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS.";
135+
static final String HTTP_PROXY_PORT_DOC = "This setting is the http proxy server port. Configure it to use connector "
136+
+ "level proxy when connecting to HEC endpoint, otherwise, it'll use JVM level proxy setting in JVM_OPTS.";
131137
static final String HEC_THREADS_DOC = "Controls how many threads are spawned to do data injection via HEC in a single "
132138
+ "connector task. By default, this is set to 1.";
133139
static final String SOCKET_TIMEOUT_DOC = "Max duration in seconds to read / write data to network before internal TCP "
@@ -225,6 +231,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
225231
final int maxHttpConnPerChannel;
226232
final int maxBatchSize;
227233
final boolean httpKeepAlive;
234+
final String httpProxyHost;
235+
final int httpProxyPort;
228236
final int numberOfThreads;
229237
final int socketTimeout;
230238
final boolean validateCertificates;
@@ -280,6 +288,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
280288
sourcetypes = getString(SOURCETYPE_CONF);
281289
sources = getString(SOURCE_CONF);
282290
httpKeepAlive = getBoolean(HTTP_KEEPALIVE_CONF);
291+
httpProxyHost = getString(HTTP_PROXY_HOST_CONF);
292+
httpProxyPort = getInt(HTTP_PROXY_PORT_CONF);
283293
validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF);
284294
trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF);
285295
hasTrustStorePath = StringUtils.isNotBlank(trustStorePath);
@@ -330,7 +340,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
330340
autoExtractTimestamp = getBoolean(AUTO_EXTRACT_TIMESTAMP_CONF);
331341
}
332342

333-
343+
334344
public static ConfigDef conf() {
335345
return new ConfigDef()
336346
.define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC)
@@ -341,6 +351,8 @@ public static ConfigDef conf() {
341351
.define(SOURCETYPE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCETYPE_DOC)
342352
.define(SOURCE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCE_DOC)
343353
.define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC)
354+
.define(HTTP_PROXY_HOST_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, HTTP_PROXY_HOST_DOC)
355+
.define(HTTP_PROXY_PORT_CONF, ConfigDef.Type.INT, 0, ConfigDef.Importance.HIGH, HTTP_PROXY_PORT_DOC)
344356
.define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
345357
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
346358
.define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC)
@@ -392,6 +404,8 @@ public HecConfig getHecConfig() {
392404
.setTotalChannels(totalHecChannels)
393405
.setEventBatchTimeout(eventBatchTimeout)
394406
.setHttpKeepAlive(httpKeepAlive)
407+
.setHttpProxyHost(httpProxyHost)
408+
.setHttpProxyPort(httpProxyPort)
395409
.setAckPollInterval(ackPollInterval)
396410
.setlbPollInterval(lbPollInterval)
397411
.setAckPollThreads(ackPollThreads)
@@ -424,6 +438,8 @@ public String toString() {
424438
+ "headerSupport:" + headerSupport + ", "
425439
+ "headerCustom:" + headerCustom + ", "
426440
+ "httpKeepAlive:" + httpKeepAlive + ", "
441+
+ "httpProxyHost:" + httpProxyHost + ", "
442+
+ "httpProxyPort:" + httpProxyPort + ", "
427443
+ "validateCertificates:" + validateCertificates + ", "
428444
+ "trustStorePath:" + trustStorePath + ", "
429445
+ "trustStoreType:" + trustStoreType + ", "

src/test/java/com/splunk/hecclient/HecConfigTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public void getterSetter() {
3636
config.setAckPollInterval(1)
3737
.setDisableSSLCertVerification(true)
3838
.setHttpKeepAlive(false)
39+
.setHttpProxyHost("test.host")
40+
.setHttpProxyPort(8080)
3941
.setSocketSendBufferSize(2)
4042
.setSocketTimeout(3)
4143
.setMaxHttpConnectionPerChannel(4)
@@ -60,6 +62,8 @@ public void getterSetter() {
6062
Assert.assertEquals(5, config.getTotalChannels());
6163
Assert.assertEquals(6, config.getAckPollThreads());
6264
Assert.assertEquals(7, config.getEventBatchTimeout());
65+
Assert.assertEquals("test.host", config.getHttpProxyHost());
66+
Assert.assertEquals(8080, config.getHttpProxyPort());
6367
Assert.assertEquals("test", config.getTrustStorePath());
6468
Assert.assertEquals("PKCS12", config.getTrustStoreType());
6569
Assert.assertEquals("pass", config.getTrustStorePassword());

src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,27 @@ public void buildSecureDefault() {
4444
.build();
4545
Assert.assertNotNull(client);
4646
}
47+
48+
@Test
49+
public void buildHttpProxy() {
50+
HttpClientBuilder builder = new HttpClientBuilder();
51+
CloseableHttpClient client = builder.setMaxConnectionPoolSizePerDestination(1)
52+
.setHttpProxyHost("rest.host")
53+
.setHttpProxyPort(8080)
54+
.build();
55+
Assert.assertNotNull(client);
56+
builder = new HttpClientBuilder();
57+
client = builder.setMaxConnectionPoolSizePerDestination(1)
58+
.setHttpProxyPort(8080)
59+
.build();
60+
Assert.assertNotNull(client);
61+
builder = new HttpClientBuilder();
62+
client = builder.setMaxConnectionPoolSizePerDestination(1)
63+
.setHttpProxyHost("rest.host")
64+
.build();
65+
Assert.assertNotNull(client);
66+
}
67+
4768
@Test
4869
public void buildSecureCustomKeystore() {
4970
HttpClientBuilder builder = new HttpClientBuilder();

src/test/java/com/splunk/kafka/connect/ConfigProfile.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public class ConfigProfile {
1414
private String sourcetypes;
1515
private String sources;
1616
private boolean httpKeepAlive;
17+
private String httpProxyHost;
18+
private int httpProxyPort;
1719
private boolean validateCertificates;
1820
private boolean hasTrustStorePath;
1921
private String trustStorePath;
@@ -75,6 +77,8 @@ public ConfigProfile buildProfileDefault() {
7577
this.sourcetypes = "";
7678
this.sources = "";
7779
this.httpKeepAlive = true;
80+
this.httpProxyHost = "proxy.host";
81+
this.httpProxyPort = 8080;
7882
this.validateCertificates = true;
7983
this.hasTrustStorePath = true;
8084
this.trustStorePath = "./src/test/resources/keystoretest.jks";
@@ -311,6 +315,22 @@ public void setHttpKeepAlive(boolean httpKeepAlive) {
311315
this.httpKeepAlive = httpKeepAlive;
312316
}
313317

318+
public int getHttpProxyPort() {
319+
return httpProxyPort;
320+
}
321+
322+
public String getHttpProxyHost() {
323+
return httpProxyHost;
324+
}
325+
326+
public void setHttpProxyHost(final String httpProxyHost) {
327+
this.httpProxyHost = httpProxyHost;
328+
}
329+
330+
public void setHttpProxyPort(final int httpProxyPort) {
331+
this.httpProxyPort = httpProxyPort;
332+
}
333+
314334
public boolean isValidateCertificates() {
315335
return validateCertificates;
316336
}
@@ -472,6 +492,6 @@ public void setHeaderHost(String headerHost) {
472492
}
473493

474494
@Override public String toString() {
475-
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
495+
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", httpProxyHost=" + httpProxyHost + ", httpProxyPort=" + httpProxyPort + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", " + "trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
476496
}
477497
}

src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java

+11
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,17 @@ public void testSpecialCharLineBreaker() {
284284
Assert.assertEquals("\t", connectorConfig.lineBreaker);
285285
}
286286

287+
@Test
288+
public void testHttpProxy() {
289+
UnitUtil uu = new UnitUtil(0);
290+
Map<String, String> taskConfig = uu.createTaskConfig();
291+
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(taskConfig);
292+
HecConfig config = connectorConfig.getHecConfig();
293+
294+
Assert.assertEquals("proxy.host", config.getHttpProxyHost());
295+
Assert.assertEquals(8080, config.getHttpProxyPort());
296+
}
297+
287298
@Test
288299
public void toStr() {
289300
UnitUtil uu = new UnitUtil(0);

src/test/java/com/splunk/kafka/connect/UnitUtil.java

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public Map<String, String> createTaskConfig() {
4141
config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, configProfile.getSourcetypes());
4242
config.put(SplunkSinkConnectorConfig.SOURCE_CONF, configProfile.getSources());
4343
config.put(SplunkSinkConnectorConfig.HTTP_KEEPALIVE_CONF, String.valueOf(configProfile.isHttpKeepAlive()));
44+
config.put(SplunkSinkConnectorConfig.HTTP_PROXY_HOST_CONF, configProfile.getHttpProxyHost());
45+
config.put(SplunkSinkConnectorConfig.HTTP_PROXY_PORT_CONF, String.valueOf(configProfile.getHttpProxyPort()));
4446
config.put(SplunkSinkConnectorConfig.SSL_VALIDATE_CERTIFICATES_CONF, String.valueOf(configProfile.isValidateCertificates()));
4547

4648
if(configProfile.getTrustStorePath() != null ) {

0 commit comments

Comments
 (0)