Skip to content

Commit cee6025

Browse files
committed
NIFI-4199: Added ProxyConfigurationService to ElasticsearchHttp processors
- ElasticsearchHttp processors now support SOCKS proxy, too - Added proxy support to PutElasticsearchHttpRecord - Moved more common property descriptors to AbstractElasticsearchHttpProcessor and just return static unmodifiable property descriptor list at each implementation processors
1 parent d56d364 commit cee6025

File tree

7 files changed

+53
-76
lines changed

7 files changed

+53
-76
lines changed

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ language governing permissions and limitations under the License. -->
5656
<groupId>org.apache.nifi</groupId>
5757
<artifactId>nifi-record</artifactId>
5858
</dependency>
59+
<dependency>
60+
<groupId>org.apache.nifi</groupId>
61+
<artifactId>nifi-proxy-configuration-api</artifactId>
62+
</dependency>
5963
<dependency>
6064
<groupId>org.apache.commons</groupId>
6165
<artifactId>commons-lang3</artifactId>

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,14 @@
3333
import org.apache.nifi.processor.ProcessContext;
3434
import org.apache.nifi.processor.exception.ProcessException;
3535
import org.apache.nifi.processor.util.StandardValidators;
36+
import org.apache.nifi.proxy.ProxyConfiguration;
37+
import org.apache.nifi.proxy.ProxyConfigurationService;
3638
import org.apache.nifi.ssl.SSLContextService;
3739
import org.apache.nifi.util.StringUtils;
3840

3941
import javax.net.ssl.SSLContext;
4042
import java.io.IOException;
4143
import java.io.InputStream;
42-
import java.net.InetSocketAddress;
4344
import java.net.Proxy;
4445
import java.net.URL;
4546
import java.util.ArrayList;
@@ -136,25 +137,23 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String proper
136137
.build();
137138
}
138139

139-
private static final List<PropertyDescriptor> propertyDescriptors;
140+
protected static final List<PropertyDescriptor> COMMON_PROPERTY_DESCRIPTORS;
140141

141142
static {
142143
final List<PropertyDescriptor> properties = new ArrayList<>();
143144
properties.add(ES_URL);
145+
properties.add(PROP_SSL_CONTEXT_SERVICE);
146+
properties.add(USERNAME);
147+
properties.add(PASSWORD);
148+
properties.add(CONNECT_TIMEOUT);
149+
properties.add(RESPONSE_TIMEOUT);
150+
properties.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
144151
properties.add(PROXY_HOST);
145152
properties.add(PROXY_PORT);
146153
properties.add(PROXY_USERNAME);
147154
properties.add(PROXY_PASSWORD);
148-
properties.add(RESPONSE_TIMEOUT);
149155

150-
propertyDescriptors = Collections.unmodifiableList(properties);
151-
}
152-
153-
@Override
154-
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
155-
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
156-
properties.addAll(propertyDescriptors);
157-
return properties;
156+
COMMON_PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
158157
}
159158

160159
@Override
@@ -164,28 +163,39 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE
164163
OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
165164

166165
// Add a proxy if set
167-
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
168-
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
169-
if (proxyHost != null && proxyPort != null) {
170-
final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
166+
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
167+
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
168+
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
169+
if (proxyHost != null && proxyPort != null) {
170+
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
171+
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
172+
componentProxyConfig.setProxyServerHost(proxyHost);
173+
componentProxyConfig.setProxyServerPort(proxyPort);
174+
componentProxyConfig.setProxyUserName(context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue());
175+
componentProxyConfig.setProxyUserPassword(context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue());
176+
return componentProxyConfig;
177+
}
178+
return ProxyConfiguration.DIRECT_CONFIGURATION;
179+
});
180+
181+
if (!Proxy.Type.DIRECT.equals(proxyConfig.getProxyType())) {
182+
final Proxy proxy = proxyConfig.createProxy();
171183
okHttpClient.proxy(proxy);
172-
}
173184

174-
final String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
175-
final String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
176-
177-
if (proxyUsername != null && proxyPassword != null){
178-
okHttpClient.proxyAuthenticator(new Authenticator() {
179-
@Override
180-
public Request authenticate(Route route, Response response) throws IOException {
181-
final String credential=Credentials.basic(proxyUsername, proxyPassword);
182-
return response.request().newBuilder()
183-
.header("Proxy-Authorization", credential)
184-
.build();
185-
}
186-
});
185+
if (proxyConfig.hasCredential()){
186+
okHttpClient.proxyAuthenticator(new Authenticator() {
187+
@Override
188+
public Request authenticate(Route route, Response response) throws IOException {
189+
final String credential=Credentials.basic(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
190+
return response.request().newBuilder()
191+
.header("Proxy-Authorization", credential)
192+
.build();
193+
}
194+
});
195+
}
187196
}
188197

198+
189199
// Set timeouts
190200
okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
191201
okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
@@ -208,6 +218,7 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
208218
results.add(new ValidationResult.Builder()
209219
.valid(false)
210220
.explanation("Proxy Host and Proxy Port must be both set or empty")
221+
.subject("Proxy server configuration")
211222
.build());
212223
}
213224
return results;

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
147147
_rels.add(REL_NOT_FOUND);
148148
relationships = Collections.unmodifiableSet(_rels);
149149

150-
final List<PropertyDescriptor> descriptors = new ArrayList<>();
151-
descriptors.add(ES_URL);
152-
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
153-
descriptors.add(USERNAME);
154-
descriptors.add(PASSWORD);
155-
descriptors.add(CONNECT_TIMEOUT);
156-
descriptors.add(RESPONSE_TIMEOUT);
150+
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
157151
descriptors.add(DOC_ID);
158152
descriptors.add(INDEX);
159153
descriptors.add(TYPE);
@@ -169,9 +163,7 @@ public Set<Relationship> getRelationships() {
169163

170164
@Override
171165
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
172-
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
173-
properties.addAll(propertyDescriptors);
174-
return properties;
166+
return propertyDescriptors;
175167
}
176168

177169
@OnScheduled

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
150150
_rels.add(REL_RETRY);
151151
relationships = Collections.unmodifiableSet(_rels);
152152

153-
final List<PropertyDescriptor> descriptors = new ArrayList<>();
154-
descriptors.add(ES_URL);
155-
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
156-
descriptors.add(USERNAME);
157-
descriptors.add(PASSWORD);
158-
descriptors.add(CONNECT_TIMEOUT);
159-
descriptors.add(RESPONSE_TIMEOUT);
153+
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
160154
descriptors.add(ID_ATTRIBUTE);
161155
descriptors.add(INDEX);
162156
descriptors.add(TYPE);
@@ -174,9 +168,7 @@ public Set<Relationship> getRelationships() {
174168

175169
@Override
176170
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
177-
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
178-
properties.addAll(propertyDescriptors);
179-
return properties;
171+
return propertyDescriptors;
180172
}
181173

182174
@Override

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
189189
_rels.add(REL_RETRY);
190190
relationships = Collections.unmodifiableSet(_rels);
191191

192-
final List<PropertyDescriptor> descriptors = new ArrayList<>();
193-
descriptors.add(ES_URL);
194-
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
195-
descriptors.add(USERNAME);
196-
descriptors.add(PASSWORD);
197-
descriptors.add(CONNECT_TIMEOUT);
198-
descriptors.add(RESPONSE_TIMEOUT);
192+
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
199193
descriptors.add(RECORD_READER);
200194
descriptors.add(ID_RECORD_PATH);
201195
descriptors.add(INDEX);

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -230,13 +230,7 @@ public enum QueryInfoRouteStrategy {
230230
private QueryInfoRouteStrategy queryInfoRouteStrategy = QueryInfoRouteStrategy.NEVER;
231231

232232
static {
233-
final List<PropertyDescriptor> descriptors = new ArrayList<>();
234-
descriptors.add(ES_URL);
235-
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
236-
descriptors.add(USERNAME);
237-
descriptors.add(PASSWORD);
238-
descriptors.add(CONNECT_TIMEOUT);
239-
descriptors.add(RESPONSE_TIMEOUT);
233+
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
240234
descriptors.add(QUERY);
241235
descriptors.add(PAGE_SIZE);
242236
descriptors.add(INDEX);
@@ -257,9 +251,7 @@ public Set<Relationship> getRelationships() {
257251

258252
@Override
259253
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
260-
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
261-
properties.addAll(propertyDescriptors);
262-
return properties;
254+
return propertyDescriptors;
263255
}
264256

265257
@OnScheduled

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
182182
_rels.add(REL_FAILURE);
183183
relationships = Collections.unmodifiableSet(_rels);
184184

185-
final List<PropertyDescriptor> descriptors = new ArrayList<>();
186-
descriptors.add(ES_URL);
187-
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
188-
descriptors.add(USERNAME);
189-
descriptors.add(PASSWORD);
190-
descriptors.add(CONNECT_TIMEOUT);
191-
descriptors.add(RESPONSE_TIMEOUT);
185+
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
192186
descriptors.add(QUERY);
193187
descriptors.add(SCROLL_DURATION);
194188
descriptors.add(PAGE_SIZE);
@@ -207,9 +201,7 @@ public Set<Relationship> getRelationships() {
207201

208202
@Override
209203
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
210-
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
211-
properties.addAll(propertyDescriptors);
212-
return properties;
204+
return propertyDescriptors;
213205
}
214206

215207
@OnScheduled

0 commit comments

Comments
 (0)