Skip to content

Commit 49ac83b

Browse files
add support for accessing cloud/restricted environments (#18)
1 parent cd78be4 commit 49ac83b

12 files changed

+449
-18
lines changed

docs/Elasticsearch-batchsink.md

+20-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ with a stream batch source and Elasticsearch as a sink.
1414

1515
Configuration
1616
-------------
17-
**referenceName:** This will be used to uniquely identify this sink for lineage, annotating metadata, etc.
17+
**Reference Name:** This will be used to uniquely identify this sink for lineage, annotating metadata, etc.
1818

1919
**es.host:** The hostname and port for the Elasticsearch instance. (Macro-enabled)
2020

@@ -27,6 +27,9 @@ exist, it will be created. (Macro-enabled)
2727
**es.idField:** The field that will determine the id for the document; it should match a fieldname
2828
in the Structured Record of the input. (Macro-enabled)
2929

30+
**Additional Properties:** Additional properties to use with the es-hadoop client when writing the data,
31+
documented at [elastic.co](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html).
32+
(Macro-enabled)
3033

3134
Example
3235
-------
@@ -44,3 +47,19 @@ in the record. Each run, the documents will be updated if they are still present
4447
"es.idField": "id"
4548
}
4649
}
50+
51+
This example connects to Elasticsearch, which is running in a remote restricted environment (e.g. elastic.co),
52+
and writes the data to the specified index (megacorp) and type (employee). The data is indexed using the id field
53+
in the record. Each run, the documents will be updated if they are still present in the source:
54+
55+
{
56+
"name": "Elasticsearch",
57+
"type": "batchsink",
58+
"properties": {
59+
"es.host": "https://remote.region.gcp.cloud.es.io:9243",
60+
"es.index": "megacorp",
61+
"es.type": "employee",
62+
"es.idField": "id",
63+
"additionalProperties": "es.net.http.auth.user=username\nes.net.http.auth.pass=password\nes.nodes.wan.only=true"
64+
}
65+
}

docs/Elasticsearch-batchsource.md

+27-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ in an index and type from Elasticsearch and store the data in an HBase table.
1313

1414
Configuration
1515
-------------
16-
**referenceName:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
16+
**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
1717

1818
**es.host:** The hostname and port for the Elasticsearch instance. (Macro-enabled)
1919

@@ -26,6 +26,9 @@ see Elasticsearch for additional query examples. (Macro-enabled)
2626

2727
**schema:** The schema or mapping of the data in Elasticsearch.
2828

29+
**Additional Properties:** Additional properties to use with the es-hadoop client when reading the data,
30+
documented at [elastic.co](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html).
31+
(Macro-enabled)
2932

3033
Example
3134
-------
@@ -50,3 +53,26 @@ All data from the index will be read on each run:
5053
{\"name\":\"age\",\"type\":\"int\"}]}"
5154
}
5255
}
56+
57+
This example connects to Elasticsearch, which is running in a remote restricted environment (e.g. elastic.co),
58+
and reads in records in the specified index (*megacorp*) and type (*employee*) which match the query to
59+
(in this case) select all records. All data from the index will be read on each run:
60+
61+
{
62+
"name": "Elasticsearch",
63+
"type": "batchsource",
64+
"properties": {
65+
"es.host": "https://remote.region.gcp.cloud.es.io:9243",
66+
"es.index": "megacorp",
67+
"es.type": "employee",
68+
"query": "?q=*",
69+
"schema": "{
70+
\"type\":\"record\",
71+
\"name\":\"etlSchemaBody\",
72+
\"fields\":[
73+
{\"name\":\"id\",\"type\":\"long\"},
74+
{\"name\":\"name\",\"type\":\"string\"},
75+
{\"name\":\"age\",\"type\":\"int\"}]}",
76+
"additionalProperties": "es.net.http.auth.user=username\nes.net.http.auth.pass=password\nes.nodes.wan.only=true"
77+
}
78+
}

pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@
8484
<hadoop.version>2.3.0</hadoop.version>
8585
<junit.version>4.11</junit.version>
8686
<hydrator.version>2.3.0-SNAPSHOT</hydrator.version>
87-
<es.version>7.5.2</es.version>
88-
<es-hadoop.version>7.5.2</es-hadoop.version>
87+
<es.version>7.6.2</es.version>
88+
<es-hadoop.version>7.6.2</es-hadoop.version>
8989
<slf4j.version>1.7.5</slf4j.version>
9090
<netty.version>4.0.30.Final</netty.version>
9191
<log4j.version>2.7</log4j.version>

src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java

+58-6
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,29 @@
2222
import io.cdap.cdap.api.annotation.Name;
2323
import io.cdap.cdap.etl.api.FailureCollector;
2424
import io.cdap.plugin.common.IdUtils;
25+
import io.cdap.plugin.common.KeyValueListParser;
2526
import io.cdap.plugin.common.ReferencePluginConfig;
2627

28+
import java.util.Arrays;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
import javax.annotation.Nullable;
32+
2733
/**
2834
* Basic config class for Elasticsearch plugin.
2935
*/
3036
public abstract class BaseElasticsearchConfig extends ReferencePluginConfig {
37+
3138
public static final String INDEX_NAME = "es.index";
3239
public static final String TYPE_NAME = "es.type";
3340
public static final String HOST = "es.host";
41+
public static final String ADDITIONAL_PROPERTIES = "additionalProperties";
3442

3543
private static final String HOST_DESCRIPTION = "The hostname and port for the Elasticsearch instance; " +
36-
"for example, localhost:9200.";
44+
"for example, localhost:9200 or https://remote.region.gcp.cloud.es.io:9243.";
3745
private static final String INDEX_DESCRIPTION = "The name of the index to query.";
3846
private static final String TYPE_DESCRIPTION = "The name of the type where the data is stored.";
47+
private static final String ADDITIONAL_PROPERTIES_DESCRIPTION = "Additional client properties for ES-Hadoop";
3948

4049
@Name(HOST)
4150
@Description(HOST_DESCRIPTION)
@@ -52,11 +61,19 @@ public abstract class BaseElasticsearchConfig extends ReferencePluginConfig {
5261
@Macro
5362
private final String type;
5463

55-
public BaseElasticsearchConfig(String referenceName, String hostname, String index, String type) {
64+
@Nullable
65+
@Name(ADDITIONAL_PROPERTIES)
66+
@Description(ADDITIONAL_PROPERTIES_DESCRIPTION)
67+
@Macro
68+
private final String additionalProperties;
69+
70+
public BaseElasticsearchConfig(String referenceName, String hostname, String index, String type,
71+
String additionalProperties) {
5672
super(referenceName);
5773
this.hostname = hostname;
5874
this.index = index;
5975
this.type = type;
76+
this.additionalProperties = additionalProperties;
6077
}
6178

6279
public String getHostname() {
@@ -75,6 +92,28 @@ public String getResource() {
7592
return String.format("%s/%s", index, type);
7693
}
7794

95+
@Nullable
96+
public String getAdditionalProperties() {
97+
return additionalProperties;
98+
}
99+
100+
public Map<String, String> getAdditionalPropertiesMap() {
101+
Map<String, String> propertiesMap = new HashMap<>();
102+
if (additionalProperties == null || additionalProperties.trim().isEmpty()) {
103+
return propertiesMap;
104+
}
105+
106+
KeyValueListParser parser = new KeyValueListParser("\n", "=");
107+
parser.parse(additionalProperties).forEach(kv -> {
108+
if (kv.getKey().trim().isEmpty()) {
109+
throw new IllegalArgumentException("Key should not be empty");
110+
} else {
111+
propertiesMap.put(kv.getKey().trim(), kv.getValue().trim());
112+
}
113+
});
114+
return propertiesMap;
115+
}
116+
78117
public void validate(FailureCollector collector) {
79118
IdUtils.validateReferenceName(referenceName, collector);
80119

@@ -93,20 +132,33 @@ public void validate(FailureCollector collector) {
93132
if (!containsMacro(TYPE_NAME) && Strings.isNullOrEmpty(type)) {
94133
collector.addFailure("Type must be specified.", null).withConfigProperty(TYPE_NAME);
95134
}
135+
136+
if (!containsMacro(ADDITIONAL_PROPERTIES)) {
137+
try {
138+
getAdditionalPropertiesMap();
139+
} catch (Exception e) {
140+
collector.addFailure("Additional properties must be a valid KV map", null)
141+
.withConfigProperty(ADDITIONAL_PROPERTIES).withStacktrace(e.getStackTrace());
142+
}
143+
}
96144
}
97145

98146
private void validateHost(FailureCollector collector) {
99147
String[] hostParts = hostname.split(":");
100148

101149
// Elasticsearch Hadoop does not support IPV6 https://github.com/elastic/elasticsearch-hadoop/issues/1105
102-
if (hostParts.length != 2) {
150+
// Length range [2,3] allowed for https hosts
151+
if ((hostParts.length < 2) || (hostParts.length > 3) || (hostParts.length == 3
152+
&& !(hostParts[0].equalsIgnoreCase("https") || hostParts[0].equalsIgnoreCase("http")))) {
153+
103154
collector.addFailure(
104155
"Invalid format of hostname",
105-
"Hostname and port must be specified for the Elasticsearch instance, for example: 'localhost:9200'"
156+
"Hostname and port must be specified for the Elasticsearch instance, " +
157+
"for example: 'localhost:9200' or https://remote.region.gcp.cloud.es.io:9243"
106158
).withConfigProperty(HOST);
107159
} else {
108-
String host = hostParts[0];
109-
String port = hostParts[1];
160+
String host = String.join(":", Arrays.asList(hostParts).subList(0, hostParts.length - 1));
161+
String port = hostParts[hostParts.length - 1];
110162

111163
if (host.isEmpty()) {
112164
collector.addFailure("Host should not be empty.", null)

src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public void prepareRun(BatchSinkContext context) throws IOException {
8383
conf.set("es.resource.write", config.getResource());
8484
conf.set("es.input.json", "yes");
8585
conf.set("es.mapping.id", config.getIdField());
86+
config.getAdditionalPropertiesMap().forEach((k, v) -> conf.set(k, v));
8687

8788
context.addOutput(Output.of(config.referenceName, new SinkOutputFormatProvider(EsOutputFormat.class, conf)));
8889
}

src/main/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfig.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,14 @@ public class ElasticsearchSinkConfig extends BaseElasticsearchConfig {
3737
@Macro
3838
private final String idField;
3939

40-
public ElasticsearchSinkConfig(String referenceName, String hostname, String index, String type, String idField) {
41-
super(referenceName, hostname, index, type);
40+
public ElasticsearchSinkConfig(String referenceName, String hostname, String index, String type, String idField,
41+
String additionalProperties) {
42+
super(referenceName, hostname, index, type, additionalProperties);
4243
this.idField = idField;
4344
}
4445

4546
private ElasticsearchSinkConfig(Builder builder) {
46-
super(builder.referenceName, builder.hostname, builder.index, builder.type);
47+
super(builder.referenceName, builder.hostname, builder.index, builder.type, builder.additionalProperties);
4748
idField = builder.idField;
4849
}
4950

@@ -58,6 +59,7 @@ public static Builder newBuilder(ElasticsearchSinkConfig copy) {
5859
builder.index = copy.getIndex();
5960
builder.type = copy.getType();
6061
builder.idField = copy.getIdField();
62+
builder.additionalProperties = copy.getAdditionalProperties();
6163
return builder;
6264
}
6365

@@ -82,6 +84,7 @@ public static final class Builder {
8284
private String index;
8385
private String type;
8486
private String idField;
87+
private String additionalProperties;
8588

8689
private Builder() {
8790
}
@@ -111,6 +114,11 @@ public Builder setIdField(String idField) {
111114
return this;
112115
}
113116

117+
public Builder setAdditionalProperties(String additionalProperties) {
118+
this.additionalProperties = additionalProperties;
119+
return this;
120+
}
121+
114122
public ElasticsearchSinkConfig build() {
115123
return new ElasticsearchSinkConfig(this);
116124
}

src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSource.java

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
8989
conf.set("es.nodes", config.getHostname());
9090
conf.set("es.resource.read", config.getResource());
9191
conf.set("es.query", config.getQuery());
92+
config.getAdditionalPropertiesMap().forEach((k, v) -> conf.set(k, v));
9293
job.setMapOutputKeyClass(Text.class);
9394
job.setMapOutputValueClass(MapWritable.class);
9495
context.setInput(Input.of(config.referenceName, new SourceInputFormatProvider(EsInputFormat.class, conf)));

src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfig.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ public class ElasticsearchSourceConfig extends BaseElasticsearchConfig {
4646
private final String schema;
4747

4848
public ElasticsearchSourceConfig(String referenceName, String hostname, String index, String type, String query,
49-
String schema) {
50-
super(referenceName, hostname, index, type);
49+
String schema, String additionalProperties) {
50+
super(referenceName, hostname, index, type, additionalProperties);
5151
this.schema = schema;
5252
this.query = query;
5353
}
5454

5555
private ElasticsearchSourceConfig(Builder builder) {
56-
super(builder.referenceName, builder.hostname, builder.index, builder.type);
56+
super(builder.referenceName, builder.hostname, builder.index, builder.type, builder.additionalProperties);
5757
query = builder.query;
5858
schema = builder.schema;
5959
}
@@ -70,6 +70,7 @@ public static Builder newBuilder(ElasticsearchSourceConfig copy) {
7070
builder.type = copy.getType();
7171
builder.query = copy.getQuery();
7272
builder.schema = copy.getSchema();
73+
builder.additionalProperties = copy.getAdditionalProperties();
7374
return builder;
7475
}
7576

@@ -118,6 +119,7 @@ public static final class Builder {
118119
private String type;
119120
private String query;
120121
private String schema;
122+
private String additionalProperties;
121123

122124
private Builder() {
123125
}
@@ -152,6 +154,11 @@ public Builder setSchema(String schema) {
152154
return this;
153155
}
154156

157+
public Builder setAdditionalProperties(String additionalProperties) {
158+
this.additionalProperties = additionalProperties;
159+
return this;
160+
}
161+
155162
public ElasticsearchSourceConfig build() {
156163
return new ElasticsearchSourceConfig(this);
157164
}

0 commit comments

Comments
 (0)