Skip to content

Commit d56d364

Browse files
committed
NIFI-4199: Add ProxyConfigurationService to SFTP processors
- Fixed check style issue - Use the same proxy related PropertyDescriptors from FTPTransfer and SFTPTransfer - Dropped FlowFile EL evaluation support to make it align with other processors spec, Now it supports VARIABLE_REGISTRY - Added ProxyConfigurationService to SFTP processors - Added SOCKS proxy support to SFTP processors
1 parent 36e96c5 commit d56d364

File tree

7 files changed

+75
-67
lines changed

7 files changed

+75
-67
lines changed

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
import org.apache.nifi.annotation.documentation.Tags;
3030
import org.apache.nifi.components.PropertyDescriptor;
3131
import org.apache.nifi.processor.ProcessContext;
32+
import org.apache.nifi.processors.standard.util.FTPTransfer;
3233
import org.apache.nifi.processors.standard.util.FileTransfer;
3334
import org.apache.nifi.processors.standard.util.SFTPTransfer;
35+
import org.apache.nifi.proxy.ProxyConfigurationService;
3436

3537
// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
3638
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -76,10 +78,12 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
7678
properties.add(SFTPTransfer.HOST_KEY_FILE);
7779
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
7880
properties.add(SFTPTransfer.USE_COMPRESSION);
79-
properties.add(SFTPTransfer.PROXY_HOST);
80-
properties.add(SFTPTransfer.PROXY_PORT);
81-
properties.add(SFTPTransfer.PROXY_USERNAME);
82-
properties.add(SFTPTransfer.PROXY_PASSWORD);
81+
properties.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
82+
properties.add(FTPTransfer.PROXY_TYPE);
83+
properties.add(FTPTransfer.PROXY_HOST);
84+
properties.add(FTPTransfer.PROXY_PORT);
85+
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
86+
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
8387
return properties;
8488
}
8589

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
import org.apache.nifi.components.ValidationResult;
3434
import org.apache.nifi.processor.ProcessContext;
3535
import org.apache.nifi.processor.ProcessorInitializationContext;
36+
import org.apache.nifi.processors.standard.util.FTPTransfer;
3637
import org.apache.nifi.processors.standard.util.FileTransfer;
3738
import org.apache.nifi.processors.standard.util.SFTPTransfer;
39+
import org.apache.nifi.proxy.ProxyConfigurationService;
3840

3941
@InputRequirement(Requirement.INPUT_FORBIDDEN)
4042
@Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
@@ -81,10 +83,12 @@ protected void init(final ProcessorInitializationContext context) {
8183
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
8284
properties.add(SFTPTransfer.USE_COMPRESSION);
8385
properties.add(SFTPTransfer.USE_NATURAL_ORDERING);
84-
properties.add(SFTPTransfer.PROXY_HOST);
85-
properties.add(SFTPTransfer.PROXY_PORT);
86-
properties.add(SFTPTransfer.PROXY_USERNAME);
87-
properties.add(SFTPTransfer.PROXY_PASSWORD);
86+
properties.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
87+
properties.add(FTPTransfer.PROXY_TYPE);
88+
properties.add(FTPTransfer.PROXY_HOST);
89+
properties.add(FTPTransfer.PROXY_PORT);
90+
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
91+
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
8892
this.properties = Collections.unmodifiableList(properties);
8993
}
9094

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
import org.apache.nifi.components.PropertyDescriptor;
3333
import org.apache.nifi.components.state.Scope;
3434
import org.apache.nifi.processor.ProcessContext;
35+
import org.apache.nifi.processors.standard.util.FTPTransfer;
3536
import org.apache.nifi.processors.standard.util.FileTransfer;
3637
import org.apache.nifi.processors.standard.util.SFTPTransfer;
38+
import org.apache.nifi.proxy.ProxyConfigurationService;
3739

3840
@TriggerSerially
3941
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -83,10 +85,12 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
8385
properties.add(SFTPTransfer.DATA_TIMEOUT);
8486
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
8587
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
86-
properties.add(SFTPTransfer.PROXY_HOST);
87-
properties.add(SFTPTransfer.PROXY_PORT);
88-
properties.add(SFTPTransfer.PROXY_USERNAME);
89-
properties.add(SFTPTransfer.PROXY_PASSWORD);
88+
properties.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
89+
properties.add(FTPTransfer.PROXY_TYPE);
90+
properties.add(FTPTransfer.PROXY_HOST);
91+
properties.add(FTPTransfer.PROXY_PORT);
92+
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
93+
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
9094
return properties;
9195
}
9296

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import org.apache.nifi.components.PropertyDescriptor;
3131
import org.apache.nifi.processor.ProcessContext;
3232
import org.apache.nifi.processor.ProcessorInitializationContext;
33+
import org.apache.nifi.processors.standard.util.FTPTransfer;
3334
import org.apache.nifi.processors.standard.util.SFTPTransfer;
35+
import org.apache.nifi.proxy.ProxyConfigurationService;
3436

3537
@SupportsBatching
3638
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -70,10 +72,12 @@ protected void init(final ProcessorInitializationContext context) {
7072
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
7173
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
7274
properties.add(SFTPTransfer.USE_COMPRESSION);
73-
properties.add(SFTPTransfer.PROXY_HOST);
74-
properties.add(SFTPTransfer.PROXY_PORT);
75-
properties.add(SFTPTransfer.PROXY_USERNAME);
76-
properties.add(SFTPTransfer.PROXY_PASSWORD);
75+
properties.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
76+
properties.add(FTPTransfer.PROXY_TYPE);
77+
properties.add(FTPTransfer.PROXY_HOST);
78+
properties.add(FTPTransfer.PROXY_PORT);
79+
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
80+
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
7781
this.properties = Collections.unmodifiableList(properties);
7882
}
7983

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@
3232
import java.util.List;
3333
import java.util.Locale;
3434
import java.util.concurrent.TimeUnit;
35+
import java.util.function.Supplier;
3536
import java.util.regex.Pattern;
3637

3738
import org.apache.commons.net.ftp.FTPClient;
3839
import org.apache.commons.net.ftp.FTPFile;
3940
import org.apache.commons.net.ftp.FTPHTTPClient;
4041
import org.apache.commons.net.ftp.FTPReply;
4142
import org.apache.nifi.components.PropertyDescriptor;
43+
import org.apache.nifi.context.PropertyContext;
4244
import org.apache.nifi.expression.ExpressionLanguageScope;
4345
import org.apache.nifi.flowfile.FlowFile;
4446
import org.apache.nifi.logging.ComponentLog;
@@ -89,22 +91,26 @@ public class FTPTransfer implements FileTransfer {
8991
.name("Proxy Host")
9092
.description("The fully qualified hostname or IP address of the proxy server")
9193
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
94+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
9295
.build();
9396
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
9497
.name("Proxy Port")
9598
.description("The port of the proxy server")
9699
.addValidator(StandardValidators.PORT_VALIDATOR)
100+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
97101
.build();
98102
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
99103
.name("Http Proxy Username")
100104
.description("Http Proxy Username")
101105
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
106+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
102107
.required(false)
103108
.build();
104109
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
105110
.name("Http Proxy Password")
106111
.description("Http Proxy Password")
107112
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
113+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
108114
.required(false)
109115
.sensitive(true)
110116
.build();
@@ -523,15 +529,7 @@ private FTPClient getClient(final FlowFile flowFile) throws IOException {
523529
}
524530
}
525531

526-
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, () -> {
527-
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
528-
componentProxyConfig.setProxyType(Proxy.Type.valueOf(ctx.getProperty(PROXY_TYPE).getValue()));
529-
componentProxyConfig.setProxyServerHost(ctx.getProperty(PROXY_HOST).getValue());
530-
componentProxyConfig.setProxyServerPort(ctx.getProperty(PROXY_PORT).asInteger());
531-
componentProxyConfig.setProxyUserName(ctx.getProperty(HTTP_PROXY_USERNAME).getValue());
532-
componentProxyConfig.setProxyUserPassword(ctx.getProperty(HTTP_PROXY_PASSWORD).getValue());
533-
return componentProxyConfig;
534-
});
532+
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
535533

536534
final Proxy.Type proxyType = proxyConfig.getProxyType();
537535
final String proxyHost = proxyConfig.getProxyServerHost();
@@ -639,4 +637,17 @@ protected int numberPermissions(String perms) {
639637
}
640638
return number;
641639
}
640+
641+
public static Supplier<ProxyConfiguration> createComponentProxyConfigSupplier(final PropertyContext ctx) {
642+
return () -> {
643+
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
644+
componentProxyConfig.setProxyType(Proxy.Type.valueOf(ctx.getProperty(PROXY_TYPE).getValue()));
645+
componentProxyConfig.setProxyServerHost(ctx.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue());
646+
componentProxyConfig.setProxyServerPort(ctx.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger());
647+
componentProxyConfig.setProxyUserName(ctx.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue());
648+
componentProxyConfig.setProxyUserPassword(ctx.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue());
649+
return componentProxyConfig;
650+
};
651+
}
652+
642653
}

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,4 +297,5 @@ public void setTransferMode(final String val) {
297297
}
298298

299299
}
300+
300301
}

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.regex.Pattern;
3535

36+
import com.jcraft.jsch.ProxySOCKS5;
3637
import org.apache.nifi.components.PropertyDescriptor;
3738
import org.apache.nifi.components.PropertyValue;
3839
import org.apache.nifi.expression.ExpressionLanguageScope;
3940
import org.apache.nifi.flowfile.FlowFile;
4041
import org.apache.nifi.logging.ComponentLog;
4142
import org.apache.nifi.processor.ProcessContext;
4243
import org.apache.nifi.processor.util.StandardValidators;
44+
import org.apache.nifi.proxy.ProxyConfiguration;
4345
import org.slf4j.LoggerFactory;
4446

4547
import com.jcraft.jsch.ChannelSftp;
@@ -51,6 +53,8 @@
5153
import com.jcraft.jsch.Session;
5254
import com.jcraft.jsch.SftpException;
5355

56+
import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
57+
5458
public class SFTPTransfer implements FileTransfer {
5559

5660
public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder()
@@ -96,38 +100,6 @@ public class SFTPTransfer implements FileTransfer {
96100
.defaultValue("true")
97101
.required(true)
98102
.build();
99-
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
100-
.name("PROXY_HOST")
101-
.displayName("Proxy Host")
102-
.description("The fully qualified hostname or IP address of the proxy server")
103-
.expressionLanguageSupported(true)
104-
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
105-
.build();
106-
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
107-
.name("PROXY_PORT")
108-
.displayName("Proxy Port")
109-
.description("The port of the proxy server")
110-
.expressionLanguageSupported(true)
111-
.addValidator(StandardValidators.PORT_VALIDATOR)
112-
.build();
113-
public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder()
114-
.name("PROXY_USERNAME")
115-
.displayName("Proxy Username")
116-
.description("Proxy Username")
117-
.expressionLanguageSupported(true)
118-
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
119-
.required(false)
120-
.build();
121-
public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder()
122-
.name("PROXY_PASSWORD")
123-
.displayName("Proxy Password")
124-
.description("Proxy Password")
125-
.expressionLanguageSupported(true)
126-
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
127-
.required(false)
128-
.sensitive(true)
129-
.build();
130-
131103

132104

133105
/**
@@ -452,16 +424,24 @@ protected ChannelSftp getChannel(final FlowFile flowFile) throws IOException {
452424
ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
453425
ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
454426

455-
if (ctx.getProperty(PROXY_HOST).evaluateAttributeExpressions(flowFile).isSet()) {
456-
final ProxyHTTP proxy = new ProxyHTTP(
457-
ctx.getProperty(PROXY_HOST).evaluateAttributeExpressions(flowFile).getValue(),
458-
ctx.getProperty(PROXY_PORT).evaluateAttributeExpressions(flowFile).asInteger()
459-
);
460-
// Check if Username is set and populate the proxy accordingly
461-
if (ctx.getProperty(PROXY_USERNAME).evaluateAttributeExpressions(flowFile).isSet()) {
462-
proxy.setUserPasswd(ctx.getProperty(PROXY_USERNAME).evaluateAttributeExpressions(flowFile).getValue(), ctx.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions(flowFile).getValue());
463-
}
464-
session.setProxy(proxy);
427+
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
428+
switch (proxyConfig.getProxyType()) {
429+
case HTTP:
430+
final ProxyHTTP proxyHTTP = new ProxyHTTP(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort());
431+
// Check if Username is set and populate the proxy accordingly
432+
if (proxyConfig.hasCredential()) {
433+
proxyHTTP.setUserPasswd(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
434+
}
435+
session.setProxy(proxyHTTP);
436+
break;
437+
case SOCKS:
438+
final ProxySOCKS5 proxySOCKS5 = new ProxySOCKS5(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort());
439+
if (proxyConfig.hasCredential()) {
440+
proxySOCKS5.setUserPasswd(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
441+
}
442+
session.setProxy(proxySOCKS5);
443+
break;
444+
465445
}
466446

467447
final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();

0 commit comments

Comments
 (0)