Skip to content

Commit 367261a

Browse files
committed
NIFI-4199: Add ProxyConfigService to AWS processors
- Applied ProxyConfigService to S3 processors - Added proxy support to following processors: - PutKinesisFirehose, PutKinesisStream - PutDynamoDB, DeleteDynamoDB, GetDynamoDB - PutKinesisStream - All AWS processors support HTTP proxy now
1 parent 90fe326 commit 367261a

File tree

16 files changed

+79
-26
lines changed

16 files changed

+79
-26
lines changed

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@
7676
<artifactId>nifi-utils</artifactId>
7777
<version>1.7.0-SNAPSHOT</version>
7878
</dependency>
79+
<dependency>
80+
<groupId>org.apache.nifi</groupId>
81+
<artifactId>nifi-proxy-configuration-api</artifactId>
82+
</dependency>
7983
</dependencies>
8084

8185
</project>

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.amazonaws.regions.Regions;
2929
import java.io.File;
3030
import java.io.IOException;
31+
import java.net.Proxy;
3132
import java.util.ArrayList;
3233
import java.util.Arrays;
3334
import java.util.Collection;
@@ -52,6 +53,7 @@
5253
import org.apache.nifi.processor.util.StandardValidators;
5354
import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
5455
import org.apache.nifi.processors.aws.regions.AWSRegions;
56+
import org.apache.nifi.proxy.ProxyConfiguration;
5557
import org.apache.nifi.ssl.SSLContextService;
5658

5759
/**
@@ -185,6 +187,8 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va
185187
problems.add(new ValidationResult.Builder().input("Proxy Host Port").valid(false).explanation("Both proxy host and port must be set").build());
186188
}
187189

190+
ProxyConfiguration.validateProxyType(validationContext, problems, Proxy.Type.HTTP);
191+
188192
return problems;
189193
}
190194

@@ -209,18 +213,31 @@ protected ClientConfiguration createConfiguration(final ProcessContext context)
209213
}
210214
}
211215

212-
if (context.getProperty(PROXY_HOST).isSet()) {
213-
String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
214-
config.setProxyHost(proxyHost);
215-
Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
216-
config.setProxyPort(proxyPort);
217-
}
216+
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
217+
if (context.getProperty(PROXY_HOST).isSet()) {
218+
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
219+
String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
220+
Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
221+
String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
222+
String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
223+
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
224+
componentProxyConfig.setProxyServerHost(proxyHost);
225+
componentProxyConfig.setProxyServerPort(proxyPort);
226+
componentProxyConfig.setProxyUserName(proxyUsername);
227+
componentProxyConfig.setProxyUserPassword(proxyPassword);
228+
return componentProxyConfig;
229+
}
230+
return ProxyConfiguration.DIRECT_CONFIGURATION;
231+
});
218232

219-
if (context.getProperty(PROXY_USERNAME).isSet()) {
220-
String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
221-
config.setProxyUsername(proxyUsername);
222-
String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
223-
config.setProxyPassword(proxyPassword);
233+
if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
234+
config.setProxyHost(proxyConfig.getProxyServerHost());
235+
config.setProxyPort(proxyConfig.getProxyServerPort());
236+
237+
if (proxyConfig.hasCredential()) {
238+
config.setProxyUsername(proxyConfig.getProxyUserName());
239+
config.setProxyPassword(proxyConfig.getProxyUserPassword());
240+
}
224241
}
225242

226243
return config;

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
4747
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
4848

49+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
50+
4951
@SupportsBatching
5052
@SeeAlso({GetDynamoDB.class, PutDynamoDB.class})
5153
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -74,7 +76,8 @@ public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor {
7476
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
7577
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
7678
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
77-
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
79+
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
80+
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
7881

7982
@Override
8083
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
5252
import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
5353

54+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
55+
5456
@SupportsBatching
5557
@SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class})
5658
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -80,7 +82,8 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
8082
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
8183
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
8284
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
83-
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
85+
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
86+
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
8487

8588
public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
8689
.description("FlowFiles are routed to not found relationship if key not found in the table").build();

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
5353
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
5454

55+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
56+
5557
@SupportsBatching
5658
@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class})
5759
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -84,7 +86,8 @@ public class PutDynamoDB extends AbstractWriteDynamoDBProcessor {
8486
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
8587
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
8688
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, DOCUMENT_CHARSET, BATCH_SIZE,
87-
REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
89+
REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
90+
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
8891

8992
/**
9093
* Dyamodb max item size limit 400 kb

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
4545
import com.amazonaws.services.kinesisfirehose.model.Record;
4646

47+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
48+
4749
@SupportsBatching
4850
@InputRequirement(Requirement.INPUT_REQUIRED)
4951
@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
@@ -72,7 +74,7 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
7274

7375
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
7476
Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
75-
PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE));
77+
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE));
7678

7779
/**
7880
* Max buffer size 1 MB

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import com.amazonaws.services.kinesis.model.PutRecordsResult;
4949
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
5050

51+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
52+
5153
@SupportsBatching
5254
@InputRequirement(Requirement.INPUT_REQUIRED)
5355
@Tags({"amazon", "aws", "kinesis", "put", "stream"})
@@ -80,7 +82,7 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
8082

8183
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
8284
Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
83-
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE));
85+
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE));
8486

8587
/** A random number generator for cases where partition key is not available */
8688
protected Random randomParitionKeyGenerator = new Random();

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import com.amazonaws.services.lambda.model.UnsupportedMediaTypeException;
5353
import com.amazonaws.util.Base64;
5454

55+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
56+
5557
@InputRequirement(Requirement.INPUT_REQUIRED)
5658
@Tags({"amazon", "aws", "lambda", "put"})
5759
@CapabilityDescription("Sends the contents to a specified Amazon Lamba Function. "
@@ -127,8 +129,8 @@ public class PutLambda extends AbstractAWSLambdaProcessor {
127129
public static final long MAX_REQUEST_SIZE = 6 * 1000 * 1000;
128130

129131
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
130-
Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT
131-
));
132+
Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
133+
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
132134

133135
@Override
134136
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.apache.nifi.processor.ProcessSession;
4040
import org.apache.nifi.processor.util.StandardValidators;
4141

42+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
43+
4244

4345
@SupportsBatching
4446
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@@ -59,7 +61,7 @@ public class DeleteS3Object extends AbstractS3Processor {
5961
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
6062
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID,
6163
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
62-
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
64+
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
6365

6466
@Override
6567
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import com.amazonaws.services.s3.model.ObjectMetadata;
4747
import com.amazonaws.services.s3.model.S3Object;
4848

49+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
50+
4951
@SupportsBatching
5052
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
5153
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -76,7 +78,7 @@ public class FetchS3Object extends AbstractS3Processor {
7678

7779
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
7880
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
79-
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
81+
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
8082

8183
@Override
8284
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

0 commit comments

Comments
 (0)