Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,4 @@ This product includes code from Apache Flink.

Copyright: 1999-2022 The Apache Software Foundation.
Home page: https://flink.apache.org/
License: https://www.apache.org/licenses/LICENSE-2.0
License: https://www.apache.org/licenses/LICENSE-2.0
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class TestAssumeRoleAwsClientFactory {

@BeforeEach
public void before() {
roleName = UUID.randomUUID().toString();
roleName = "integrationtest-role-" + UUID.randomUUID().toString();
iam =
IamClient.builder()
.region(Region.AWS_GLOBAL)
Expand Down Expand Up @@ -99,7 +99,7 @@ public void before() {
assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, response.role().arn());
assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX + "key1", "value1");
assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX + "key2", "value2");
policyName = UUID.randomUUID().toString();
policyName = "integrationtest-policy-" + UUID.randomUUID().toString();
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ protected AwsClientProperties awsClientProperties() {
private StsClient sts() {
return StsClient.builder()
.applyMutation(httpClientProperties::applyHttpClientConfigurations)
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsProperties::applyStsEndpointConfigurations)
.build();
}

Expand Down
29 changes: 29 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import software.amazon.awssdk.services.glue.GlueClientBuilder;
import software.amazon.awssdk.services.kms.model.DataKeySpec;
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;
import software.amazon.awssdk.services.sts.StsClientBuilder;

public class AwsProperties implements Serializable {

Expand Down Expand Up @@ -153,6 +154,14 @@ public class AwsProperties implements Serializable {
*/
public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";

/**
* Used by {@link AssumeRoleAwsClientFactory}. Optional URL to use as the STS endpoint.
*
* <p>For more details, see
* https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html#sts-endpoints
*/
public static final String CLIENT_ASSUME_ROLE_STS_ENDPOINT = "client.sts.endpoint";

/**
* Used by {@link AssumeRoleAwsClientFactory}. Optional session name used to assume an IAM role.
*
Expand Down Expand Up @@ -225,6 +234,7 @@ public class AwsProperties implements Serializable {
private final String clientAssumeRoleExternalId;
private final int clientAssumeRoleTimeoutSec;
private final String clientAssumeRoleRegion;
private final String clientAssumeRoleStsEndpoint;
private final String clientAssumeRoleSessionName;
private final String clientCredentialsProvider;
private final Map<String, String> clientCredentialsProviderProperties;
Expand Down Expand Up @@ -253,6 +263,7 @@ public AwsProperties() {
this.clientAssumeRoleTimeoutSec = CLIENT_ASSUME_ROLE_TIMEOUT_SEC_DEFAULT;
this.clientAssumeRoleExternalId = null;
this.clientAssumeRoleRegion = null;
this.clientAssumeRoleStsEndpoint = null;
this.clientAssumeRoleSessionName = null;
this.clientCredentialsProvider = null;
this.clientCredentialsProviderProperties = null;
Expand Down Expand Up @@ -281,6 +292,7 @@ public AwsProperties(Map<String, String> properties) {
properties, CLIENT_ASSUME_ROLE_TIMEOUT_SEC, CLIENT_ASSUME_ROLE_TIMEOUT_SEC_DEFAULT);
this.clientAssumeRoleExternalId = properties.get(CLIENT_ASSUME_ROLE_EXTERNAL_ID);
this.clientAssumeRoleRegion = properties.get(CLIENT_ASSUME_ROLE_REGION);
this.clientAssumeRoleStsEndpoint = properties.get(CLIENT_ASSUME_ROLE_STS_ENDPOINT);
this.clientAssumeRoleSessionName = properties.get(CLIENT_ASSUME_ROLE_SESSION_NAME);
this.clientCredentialsProvider =
properties.get(AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER);
Expand Down Expand Up @@ -341,6 +353,10 @@ public String clientAssumeRoleRegion() {
return clientAssumeRoleRegion;
}

public String clientAssumeRoleStsEndpoint() {
return clientAssumeRoleStsEndpoint;
}

public String clientAssumeRoleSessionName() {
return clientAssumeRoleSessionName;
}
Expand Down Expand Up @@ -411,6 +427,19 @@ public <T extends DynamoDbClientBuilder> void applyDynamoDbEndpointConfiguration
configureEndpoint(builder, dynamoDbEndpoint);
}

/**
* Override the endpoint for an STS client.
*
* <p>Sample usage:
*
* <pre>
* StsClient.builder().applyMutation(awsProperties::applyStsEndpointConfigurations)
* </pre>
*/
public <T extends StsClientBuilder> void applyStsEndpointConfigurations(T builder) {
configureEndpoint(builder, clientAssumeRoleStsEndpoint);
}

public Region restSigningRegion() {
if (restSigningRegion == null) {
this.restSigningRegion = DefaultAwsRegionProviderChain.builder().build().getRegion().id();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import java.util.Map;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.utils.Validate;

/**
* An implementation of {@link AwsCredentialsProvider} that returns a set implementation of {@link
* AwsCredentials}.
*
* <p>This code delegates to the {@link
* software.amazon.awssdk.auth.credentials.StaticCredentialsProvider} which has the same
* implementation but is missing the @link {@link #create(Map)} factory method
*/
@SdkPublicApi
public final class StaticCredentialsProvider implements AwsCredentialsProvider {
private static final String ACCESS_KEY_ID = "access-key-id";
private static final String SECRET_ACCESS_KEY = "secret-access-key";
private final software.amazon.awssdk.auth.credentials.StaticCredentialsProvider inner;

private StaticCredentialsProvider(Map<String, String> credentials) {
Validate.notNull(credentials, "Credentials must not be null.");
this.inner =
software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create(
AwsBasicCredentials.create(
credentials.get(ACCESS_KEY_ID), credentials.get(SECRET_ACCESS_KEY)));
}

/** Create a credentials provider that always returns the provided set of credentials. */
public static StaticCredentialsProvider create(Map<String, String> credentials) {
return new StaticCredentialsProvider(credentials);
}

@Override
public AwsCredentials resolveCredentials() {
return inner.resolveCredentials();
}

@Override
public String toString() {
return inner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.Set;
Expand All @@ -42,6 +43,8 @@
import org.apache.hc.core5.util.TimeValue;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Defines an exponential HTTP request retry strategy and provides the same characteristics as the
Expand Down Expand Up @@ -79,7 +82,11 @@
* {@link #getRetryInterval(HttpResponse, int, HttpContext)} to achieve exponential backoff.
*/
class ExponentialHttpRequestRetryStrategy implements HttpRequestRetryStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(ExponentialHttpRequestRetryStrategy.class);

private final int maxRetries;
private final Set<Class<? extends IOException>> retriableExceptions;
private final Set<Class<? extends IOException>> nonRetriableExceptions;
private final Set<Integer> retriableCodes;
private final Set<Integer> idempotentRetriableCodes;
Expand All @@ -97,6 +104,7 @@ class ExponentialHttpRequestRetryStrategy implements HttpRequestRetryStrategy {
HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_GATEWAY_TIMEOUT,
HttpStatus.SC_REQUEST_TIMEOUT);
this.retriableExceptions = ImmutableSet.of(SocketTimeoutException.class);
this.nonRetriableExceptions =
ImmutableSet.of(
InterruptedIOException.class,
Expand All @@ -115,7 +123,10 @@ public boolean retryRequest(
return false;
}

if (nonRetriableExceptions.contains(exception.getClass())) {
if (retriableExceptions.contains(exception.getClass())) {
LOG.info("AIVEN: Socket timed out after {}/{} retries", execCount, maxRetries);
// Skip the non-retriable tests if it's explicitly retriable.
} else if (nonRetriableExceptions.contains(exception.getClass())) {
return false;
} else {
for (Class<? extends IOException> rejectException : nonRetriableExceptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,20 @@ public void basicRetry() {
}

@Test
public void noRetryOnConnectTimeout() {
public void noRetryOnInterruptedIO() {
HttpGet request = new HttpGet("/");

assertThat(retryStrategy.retryRequest(request, new SocketTimeoutException(), 1, null))
assertThat(retryStrategy.retryRequest(request, new InterruptedIOException(), 1, null))
.isFalse();
}

@Test
public void retryOnSocketTimeout() {
HttpGet request = new HttpGet("/");

assertThat(retryStrategy.retryRequest(request, new SocketTimeoutException(), 1, null)).isTrue();
}

@Test
public void noRetryOnConnect() {
HttpGet request = new HttpGet("/");
Expand Down
45 changes: 40 additions & 5 deletions gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import java.io.Serializable;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -42,6 +44,9 @@ public class GCPProperties implements Serializable {
public static final String GCS_CHANNEL_READ_CHUNK_SIZE = "gcs.channel.read.chunk-size-bytes";
public static final String GCS_CHANNEL_WRITE_CHUNK_SIZE = "gcs.channel.write.chunk-size-bytes";

public static final String GCP_CREDENTIALS_PATH_PROPERTY = "gcp.auth.credentials-path";
public static final String GCP_CREDENTIALS_JSON_PROPERTY = "gcp.auth.credentials-json";

public static final String GCS_OAUTH2_TOKEN = "gcs.oauth2.token";
public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at";
// Boolean to explicitly configure "no authentication" for testing purposes using a GCS emulator
Expand Down Expand Up @@ -80,6 +85,8 @@ public class GCPProperties implements Serializable {
private Date gcsOAuth2TokenExpiresAt;
private String gcsOauth2RefreshCredentialsEndpoint;
private boolean gcsOauth2RefreshCredentialsEnabled;
private String gcsCredentialsPath;
private String gcsCredentialsJson;

private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT;

Expand Down Expand Up @@ -111,19 +118,39 @@ public GCPProperties(Map<String, String> properties) {
gcsOAuth2TokenExpiresAt =
new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT)));
}

gcsOauth2RefreshCredentialsEndpoint =
RESTUtil.resolveEndpoint(
properties.get(CatalogProperties.URI),
properties.get(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT));
gcsOauth2RefreshCredentialsEnabled =
PropertyUtil.propertyAsBoolean(properties, GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED, true);

gcsNoAuth = Boolean.parseBoolean(properties.getOrDefault(GCS_NO_AUTH, "false"));

gcsCredentialsPath = properties.get(GCP_CREDENTIALS_PATH_PROPERTY);
gcsCredentialsJson = properties.get(GCP_CREDENTIALS_JSON_PROPERTY);

// Get the list of authentication properties that were specified. Only one should be present.
List<String> authKeysPresent =
Map.of(
GCS_NO_AUTH,
gcsNoAuth,
GCS_OAUTH2_TOKEN,
gcsOAuth2Token != null,
GCP_CREDENTIALS_PATH_PROPERTY,
gcsCredentialsPath != null,
GCP_CREDENTIALS_JSON_PROPERTY,
gcsCredentialsJson != null)
.entrySet()
.stream()
.filter(Map.Entry::getValue)
.map(Map.Entry::getKey)
.sorted(String.CASE_INSENSITIVE_ORDER)
.collect(Collectors.toList());
Preconditions.checkState(
!(gcsOAuth2Token != null && gcsNoAuth),
"Invalid auth settings: must not configure %s and %s",
GCS_NO_AUTH,
GCS_OAUTH2_TOKEN);
authKeysPresent.size() < 2,
"Invalid auth settings: must not configure %s",
String.join(", ", authKeysPresent));

gcsDeleteBatchSize =
PropertyUtil.propertyAsInt(
Expand Down Expand Up @@ -170,6 +197,14 @@ public boolean noAuth() {
return gcsNoAuth;
}

public Optional<String> credentialsPath() {
return Optional.ofNullable(gcsCredentialsPath);
}

public Optional<String> credentialsJson() {
return Optional.ofNullable(gcsCredentialsJson);
}

public Optional<Date> oauth2TokenExpiresAt() {
return Optional.ofNullable(gcsOAuth2TokenExpiresAt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.iceberg.gcp.auth;

import com.google.auth.oauth2.GoogleCredentials;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.catalog.SessionCatalog;
Expand All @@ -43,8 +45,9 @@
* <p>This manager can be configured with properties such as:
*
* <ul>
* <li>{@code gcp.auth.credentials-path}: Path to a service account JSON key file. If not set,
* Application Default Credentials will be used.
* <li>{@code gcp.auth.credentials-path}: Path to a service account JSON key file.
* <li>{@code gcp.auth.credentials-json}: Embedded JSON key file. If neither are set, Application
* Default Credentials will be used.
* <li>{@code gcp.auth.scopes}: Comma-separated list of OAuth scopes to request. Defaults to
* "https://www.googleapis.com/auth/cloud-platform".
* </ul>
Expand All @@ -54,6 +57,7 @@ public class GoogleAuthManager implements AuthManager {
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
public static final String DEFAULT_SCOPES = "https://www.googleapis.com/auth/cloud-platform";
public static final String GCP_CREDENTIALS_PATH_PROPERTY = "gcp.auth.credentials-path";
public static final String GCP_CREDENTIALS_JSON_PROPERTY = "gcp.auth.credentials-json";
public static final String GCP_SCOPES_PROPERTY = "gcp.auth.scopes";
private final String name;

Expand All @@ -74,6 +78,7 @@ private void initialize(Map<String, String> properties) {
}

String credentialsPath = properties.get(GCP_CREDENTIALS_PATH_PROPERTY);
String credentialsJson = properties.get(GCP_CREDENTIALS_JSON_PROPERTY);
String scopesString = properties.getOrDefault(GCP_SCOPES_PROPERTY, DEFAULT_SCOPES);
List<String> scopes =
Strings.isNullOrEmpty(scopesString)
Expand All @@ -86,6 +91,12 @@ private void initialize(Map<String, String> properties) {
try (FileInputStream credentialsStream = new FileInputStream(credentialsPath)) {
this.credentials = GoogleCredentials.fromStream(credentialsStream).createScoped(scopes);
}
} else if (credentialsJson != null && !credentialsJson.isEmpty()) {
LOG.info("Using embedded Google credentials from configuration");
try (ByteArrayInputStream credentialsStream =
new ByteArrayInputStream(credentialsJson.getBytes(StandardCharsets.UTF_8))) {
this.credentials = GoogleCredentials.fromStream(credentialsStream).createScoped(scopes);
}
} else {
LOG.info("Using Application Default Credentials with scopes: {}", scopesString);
this.credentials = GoogleCredentials.getApplicationDefault().createScoped(scopes);
Expand Down
Loading
Loading