diff --git a/LICENSE b/LICENSE
index 80cfd3652e69..d78102089314 100644
--- a/LICENSE
+++ b/LICENSE
@@ -336,4 +336,4 @@ This product includes code from Apache Flink.
Copyright: 1999-2022 The Apache Software Foundation.
Home page: https://flink.apache.org/
-License: https://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
+License: https://www.apache.org/licenses/LICENSE-2.0
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java b/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java
index bb4605d63150..84d4762bb4e5 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java
@@ -66,7 +66,7 @@ public class TestAssumeRoleAwsClientFactory {
@BeforeEach
public void before() {
- roleName = UUID.randomUUID().toString();
+ roleName = "integrationtest-role-" + UUID.randomUUID().toString();
iam =
IamClient.builder()
.region(Region.AWS_GLOBAL)
@@ -99,7 +99,7 @@ public void before() {
assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, response.role().arn());
assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX + "key1", "value1");
assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX + "key2", "value2");
- policyName = UUID.randomUUID().toString();
+ policyName = "integrationtest-policy-" + UUID.randomUUID().toString();
}
@AfterEach
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
index 59a4d8d3ac38..3e797aa0503e 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
@@ -161,6 +161,8 @@ protected AwsClientProperties awsClientProperties() {
private StsClient sts() {
return StsClient.builder()
.applyMutation(httpClientProperties::applyHttpClientConfigurations)
+ .applyMutation(awsClientProperties::applyClientRegionConfiguration)
+ .applyMutation(awsProperties::applyStsEndpointConfigurations)
.build();
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 62d541da0c54..3cc7d0bfbcac 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -43,6 +43,7 @@
import software.amazon.awssdk.services.glue.GlueClientBuilder;
import software.amazon.awssdk.services.kms.model.DataKeySpec;
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
public class AwsProperties implements Serializable {
@@ -153,6 +154,14 @@ public class AwsProperties implements Serializable {
*/
public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region";
+ /**
+ * Used by {@link AssumeRoleAwsClientFactory}. Optional URL to use as the STS endpoint.
+ *
+ *
For more details, see
+ * https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html#sts-endpoints
+ */
+ public static final String CLIENT_ASSUME_ROLE_STS_ENDPOINT = "client.sts.endpoint";
+
/**
* Used by {@link AssumeRoleAwsClientFactory}. Optional session name used to assume an IAM role.
*
@@ -225,6 +234,7 @@ public class AwsProperties implements Serializable {
private final String clientAssumeRoleExternalId;
private final int clientAssumeRoleTimeoutSec;
private final String clientAssumeRoleRegion;
+ private final String clientAssumeRoleStsEndpoint;
private final String clientAssumeRoleSessionName;
private final String clientCredentialsProvider;
private final Map clientCredentialsProviderProperties;
@@ -253,6 +263,7 @@ public AwsProperties() {
this.clientAssumeRoleTimeoutSec = CLIENT_ASSUME_ROLE_TIMEOUT_SEC_DEFAULT;
this.clientAssumeRoleExternalId = null;
this.clientAssumeRoleRegion = null;
+ this.clientAssumeRoleStsEndpoint = null;
this.clientAssumeRoleSessionName = null;
this.clientCredentialsProvider = null;
this.clientCredentialsProviderProperties = null;
@@ -281,6 +292,7 @@ public AwsProperties(Map properties) {
properties, CLIENT_ASSUME_ROLE_TIMEOUT_SEC, CLIENT_ASSUME_ROLE_TIMEOUT_SEC_DEFAULT);
this.clientAssumeRoleExternalId = properties.get(CLIENT_ASSUME_ROLE_EXTERNAL_ID);
this.clientAssumeRoleRegion = properties.get(CLIENT_ASSUME_ROLE_REGION);
+ this.clientAssumeRoleStsEndpoint = properties.get(CLIENT_ASSUME_ROLE_STS_ENDPOINT);
this.clientAssumeRoleSessionName = properties.get(CLIENT_ASSUME_ROLE_SESSION_NAME);
this.clientCredentialsProvider =
properties.get(AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER);
@@ -341,6 +353,10 @@ public String clientAssumeRoleRegion() {
return clientAssumeRoleRegion;
}
+ public String clientAssumeRoleStsEndpoint() {
+ return clientAssumeRoleStsEndpoint;
+ }
+
public String clientAssumeRoleSessionName() {
return clientAssumeRoleSessionName;
}
@@ -411,6 +427,19 @@ public void applyDynamoDbEndpointConfiguration
configureEndpoint(builder, dynamoDbEndpoint);
}
+ /**
+ * Override the endpoint for an STS client.
+ *
+ * Sample usage:
+ *
+ *
+ * StsClient.builder().applyMutation(awsProperties::applyStsEndpointConfigurations)
+ *
+ */
+ public void applyStsEndpointConfigurations(T builder) {
+ configureEndpoint(builder, clientAssumeRoleStsEndpoint);
+ }
+
public Region restSigningRegion() {
if (restSigningRegion == null) {
this.restSigningRegion = DefaultAwsRegionProviderChain.builder().build().getRegion().id();
diff --git a/aws/src/main/java/org/apache/iceberg/aws/StaticCredentialsProvider.java b/aws/src/main/java/org/apache/iceberg/aws/StaticCredentialsProvider.java
new file mode 100644
index 000000000000..2a080c4ecf32
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/StaticCredentialsProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.util.Map;
+import software.amazon.awssdk.annotations.SdkPublicApi;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.utils.Validate;
+
+/**
+ * An implementation of {@link AwsCredentialsProvider} that returns a set implementation of {@link
+ * AwsCredentials}.
+ *
+ * This code delegates to the {@link
+ * software.amazon.awssdk.auth.credentials.StaticCredentialsProvider} which has the same
+ * implementation but is missing the @link {@link #create(Map)} factory method
+ */
+@SdkPublicApi
+public final class StaticCredentialsProvider implements AwsCredentialsProvider {
+ private static final String ACCESS_KEY_ID = "access-key-id";
+ private static final String SECRET_ACCESS_KEY = "secret-access-key";
+ private final software.amazon.awssdk.auth.credentials.StaticCredentialsProvider inner;
+
+ private StaticCredentialsProvider(Map credentials) {
+ Validate.notNull(credentials, "Credentials must not be null.");
+ this.inner =
+ software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(
+ credentials.get(ACCESS_KEY_ID), credentials.get(SECRET_ACCESS_KEY)));
+ }
+
+ /** Create a credentials provider that always returns the provided set of credentials. */
+ public static StaticCredentialsProvider create(Map credentials) {
+ return new StaticCredentialsProvider(credentials);
+ }
+
+ @Override
+ public AwsCredentials resolveCredentials() {
+ return inner.resolveCredentials();
+ }
+
+ @Override
+ public String toString() {
+ return inner.toString();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java
index e9591585d523..95573897df6e 100644
--- a/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java
@@ -22,6 +22,7 @@
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
+import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.Set;
@@ -42,6 +43,8 @@
import org.apache.hc.core5.util.TimeValue;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Defines an exponential HTTP request retry strategy and provides the same characteristics as the
@@ -79,7 +82,11 @@
* {@link #getRetryInterval(HttpResponse, int, HttpContext)} to achieve exponential backoff.
*/
class ExponentialHttpRequestRetryStrategy implements HttpRequestRetryStrategy {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ExponentialHttpRequestRetryStrategy.class);
+
private final int maxRetries;
+ private final Set> retriableExceptions;
private final Set> nonRetriableExceptions;
private final Set retriableCodes;
private final Set idempotentRetriableCodes;
@@ -97,6 +104,7 @@ class ExponentialHttpRequestRetryStrategy implements HttpRequestRetryStrategy {
HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_GATEWAY_TIMEOUT,
HttpStatus.SC_REQUEST_TIMEOUT);
+ this.retriableExceptions = ImmutableSet.of(SocketTimeoutException.class);
this.nonRetriableExceptions =
ImmutableSet.of(
InterruptedIOException.class,
@@ -115,7 +123,10 @@ public boolean retryRequest(
return false;
}
- if (nonRetriableExceptions.contains(exception.getClass())) {
+ if (retriableExceptions.contains(exception.getClass())) {
+ LOG.info("AIVEN: Socket timed out after {}/{} retries", execCount, maxRetries);
+ // Skip the non-retriable tests if it's explicitly retriable.
+ } else if (nonRetriableExceptions.contains(exception.getClass())) {
return false;
} else {
for (Class extends IOException> rejectException : nonRetriableExceptions) {
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java b/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java
index 0954e3837b0f..8821b4b805ca 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java
@@ -92,13 +92,20 @@ public void basicRetry() {
}
@Test
- public void noRetryOnConnectTimeout() {
+ public void noRetryOnInterruptedIO() {
HttpGet request = new HttpGet("/");
- assertThat(retryStrategy.retryRequest(request, new SocketTimeoutException(), 1, null))
+ assertThat(retryStrategy.retryRequest(request, new InterruptedIOException(), 1, null))
.isFalse();
}
+ @Test
+ public void retryOnSocketTimeout() {
+ HttpGet request = new HttpGet("/");
+
+ assertThat(retryStrategy.retryRequest(request, new SocketTimeoutException(), 1, null)).isTrue();
+ }
+
@Test
public void noRetryOnConnect() {
HttpGet request = new HttpGet("/");
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
index d91601125c74..9d9928abc4bd 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
@@ -20,8 +20,10 @@
import java.io.Serializable;
import java.util.Date;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -42,6 +44,9 @@ public class GCPProperties implements Serializable {
public static final String GCS_CHANNEL_READ_CHUNK_SIZE = "gcs.channel.read.chunk-size-bytes";
public static final String GCS_CHANNEL_WRITE_CHUNK_SIZE = "gcs.channel.write.chunk-size-bytes";
+ public static final String GCP_CREDENTIALS_PATH_PROPERTY = "gcp.auth.credentials-path";
+ public static final String GCP_CREDENTIALS_JSON_PROPERTY = "gcp.auth.credentials-json";
+
public static final String GCS_OAUTH2_TOKEN = "gcs.oauth2.token";
public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at";
// Boolean to explicitly configure "no authentication" for testing purposes using a GCS emulator
@@ -80,6 +85,8 @@ public class GCPProperties implements Serializable {
private Date gcsOAuth2TokenExpiresAt;
private String gcsOauth2RefreshCredentialsEndpoint;
private boolean gcsOauth2RefreshCredentialsEnabled;
+ private String gcsCredentialsPath;
+ private String gcsCredentialsJson;
private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT;
@@ -111,19 +118,39 @@ public GCPProperties(Map properties) {
gcsOAuth2TokenExpiresAt =
new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT)));
}
-
gcsOauth2RefreshCredentialsEndpoint =
RESTUtil.resolveEndpoint(
properties.get(CatalogProperties.URI),
properties.get(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT));
gcsOauth2RefreshCredentialsEnabled =
PropertyUtil.propertyAsBoolean(properties, GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED, true);
+
gcsNoAuth = Boolean.parseBoolean(properties.getOrDefault(GCS_NO_AUTH, "false"));
+
+ gcsCredentialsPath = properties.get(GCP_CREDENTIALS_PATH_PROPERTY);
+ gcsCredentialsJson = properties.get(GCP_CREDENTIALS_JSON_PROPERTY);
+
+ // Get the list of authentication properties that were specified. Only one should be present.
+ List authKeysPresent =
+ Map.of(
+ GCS_NO_AUTH,
+ gcsNoAuth,
+ GCS_OAUTH2_TOKEN,
+ gcsOAuth2Token != null,
+ GCP_CREDENTIALS_PATH_PROPERTY,
+ gcsCredentialsPath != null,
+ GCP_CREDENTIALS_JSON_PROPERTY,
+ gcsCredentialsJson != null)
+ .entrySet()
+ .stream()
+ .filter(Map.Entry::getValue)
+ .map(Map.Entry::getKey)
+ .sorted(String.CASE_INSENSITIVE_ORDER)
+ .collect(Collectors.toList());
Preconditions.checkState(
- !(gcsOAuth2Token != null && gcsNoAuth),
- "Invalid auth settings: must not configure %s and %s",
- GCS_NO_AUTH,
- GCS_OAUTH2_TOKEN);
+ authKeysPresent.size() < 2,
+ "Invalid auth settings: must not configure %s",
+ String.join(", ", authKeysPresent));
gcsDeleteBatchSize =
PropertyUtil.propertyAsInt(
@@ -170,6 +197,14 @@ public boolean noAuth() {
return gcsNoAuth;
}
+ public Optional credentialsPath() {
+ return Optional.ofNullable(gcsCredentialsPath);
+ }
+
+ public Optional credentialsJson() {
+ return Optional.ofNullable(gcsCredentialsJson);
+ }
+
public Optional oauth2TokenExpiresAt() {
return Optional.ofNullable(gcsOAuth2TokenExpiresAt);
}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java b/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java
index a1d2b539ab16..7acc2f4f04e5 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java
@@ -19,9 +19,11 @@
package org.apache.iceberg.gcp.auth;
import com.google.auth.oauth2.GoogleCredentials;
+import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.catalog.SessionCatalog;
@@ -43,8 +45,9 @@
* This manager can be configured with properties such as:
*
*
- * - {@code gcp.auth.credentials-path}: Path to a service account JSON key file. If not set,
- * Application Default Credentials will be used.
+ *
- {@code gcp.auth.credentials-path}: Path to a service account JSON key file.
+ *
- {@code gcp.auth.credentials-json}: Embedded JSON key file. If neither are set, Application
+ * Default Credentials will be used.
*
- {@code gcp.auth.scopes}: Comma-separated list of OAuth scopes to request. Defaults to
* "https://www.googleapis.com/auth/cloud-platform".
*
@@ -54,6 +57,7 @@ public class GoogleAuthManager implements AuthManager {
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
public static final String DEFAULT_SCOPES = "https://www.googleapis.com/auth/cloud-platform";
public static final String GCP_CREDENTIALS_PATH_PROPERTY = "gcp.auth.credentials-path";
+ public static final String GCP_CREDENTIALS_JSON_PROPERTY = "gcp.auth.credentials-json";
public static final String GCP_SCOPES_PROPERTY = "gcp.auth.scopes";
private final String name;
@@ -74,6 +78,7 @@ private void initialize(Map properties) {
}
String credentialsPath = properties.get(GCP_CREDENTIALS_PATH_PROPERTY);
+ String credentialsJson = properties.get(GCP_CREDENTIALS_JSON_PROPERTY);
String scopesString = properties.getOrDefault(GCP_SCOPES_PROPERTY, DEFAULT_SCOPES);
List scopes =
Strings.isNullOrEmpty(scopesString)
@@ -86,6 +91,12 @@ private void initialize(Map properties) {
try (FileInputStream credentialsStream = new FileInputStream(credentialsPath)) {
this.credentials = GoogleCredentials.fromStream(credentialsStream).createScoped(scopes);
}
+ } else if (credentialsJson != null && !credentialsJson.isEmpty()) {
+ LOG.info("Using embedded Google credentials from configuration");
+ try (ByteArrayInputStream credentialsStream =
+ new ByteArrayInputStream(credentialsJson.getBytes(StandardCharsets.UTF_8))) {
+ this.credentials = GoogleCredentials.fromStream(credentialsStream).createScoped(scopes);
+ }
} else {
LOG.info("Using Application Default Credentials with scopes: {}", scopesString);
this.credentials = GoogleCredentials.getApplicationDefault().createScoped(scopes);
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
index e9db60b149da..b050a469af77 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
@@ -19,11 +19,16 @@
package org.apache.iceberg.gcp.gcs;
import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.services.storage.StorageScopes;
+import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.NoCredentials;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.gcp.GCPAuthUtils;
@@ -33,8 +38,11 @@
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class PrefixedStorage implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(PrefixedStorage.class);
private static final String GCS_FILE_IO_USER_AGENT = "gcsfileio/" + EnvironmentContext.get();
private final String storagePrefix;
private final GCPProperties gcpProperties;
@@ -69,10 +77,42 @@ class PrefixedStorage implements AutoCloseable {
// See javadoc of com.google.auth.oauth2.GoogleCredentials.getApplicationDefault()
if (gcpProperties.noAuth()) {
// Explicitly allow "no credentials" for testing purposes
+ LOG.info("Using no credentials for prefix {} (for testing)", storagePrefix);
builder.setCredentials(NoCredentials.getInstance());
}
+ if (gcpProperties.credentialsPath().isPresent()) {
+ LOG.info(
+ "Using Google credentials from path: {} for prefix {}",
+ gcpProperties.credentialsPath().get(),
+ storagePrefix);
+ try (FileInputStream credentialsStream =
+ new FileInputStream(gcpProperties.credentialsPath().get())) {
+ builder.setCredentials(
+ GoogleCredentials.fromStream(credentialsStream)
+ .createScoped(StorageScopes.DEVSTORAGE_READ_WRITE));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ if (gcpProperties.credentialsJson().isPresent()) {
+ LOG.info(
+ "Using embedded Google credentials from configuration for prefix{}",
+ storagePrefix);
+ try (ByteArrayInputStream credentialsStream =
+ new ByteArrayInputStream(
+ gcpProperties.credentialsJson().get().getBytes(StandardCharsets.UTF_8))) {
+ builder.setCredentials(
+ GoogleCredentials.fromStream(credentialsStream)
+ .createScoped(StorageScopes.DEVSTORAGE_READ_WRITE));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
if (gcpProperties.oauth2Token().isPresent()) {
+ LOG.info("Using OAuth2 token from configuration for prefix {}", storagePrefix);
this.closeableGroup = new CloseableGroup();
builder.setCredentials(
GCPAuthUtils.oauth2CredentialsFromGcpProperties(gcpProperties, closeableGroup));
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/TestGCPProperties.java b/gcp/src/test/java/org/apache/iceberg/gcp/TestGCPProperties.java
index 0ec2183fc355..9fae47e3b41e 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/TestGCPProperties.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/TestGCPProperties.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.gcp;
+import static org.apache.iceberg.gcp.GCPProperties.GCP_CREDENTIALS_JSON_PROPERTY;
+import static org.apache.iceberg.gcp.GCPProperties.GCP_CREDENTIALS_PATH_PROPERTY;
import static org.apache.iceberg.gcp.GCPProperties.GCS_NO_AUTH;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT;
@@ -31,16 +33,34 @@
public class TestGCPProperties {
@Test
- public void testOAuthWithNoAuth() {
+ public void testIncompatibleAuth() {
assertThatIllegalStateException()
.isThrownBy(
() ->
new GCPProperties(ImmutableMap.of(GCS_OAUTH2_TOKEN, "oauth", GCS_NO_AUTH, "true")))
.withMessage(
String.format(
- "Invalid auth settings: must not configure %s and %s",
- GCS_NO_AUTH, GCS_OAUTH2_TOKEN));
+ "Invalid auth settings: must not configure %s, %s", GCS_NO_AUTH, GCS_OAUTH2_TOKEN));
+
+ assertThatIllegalStateException()
+ .isThrownBy(
+ () ->
+ new GCPProperties(
+ ImmutableMap.of(
+ GCS_OAUTH2_TOKEN,
+ "oauth",
+ GCP_CREDENTIALS_PATH_PROPERTY,
+ "/path",
+ GCP_CREDENTIALS_JSON_PROPERTY,
+ "{}")))
+ .withMessage(
+ String.format(
+ "Invalid auth settings: must not configure %s, %s, %s",
+ GCP_CREDENTIALS_JSON_PROPERTY, GCP_CREDENTIALS_PATH_PROPERTY, GCS_OAUTH2_TOKEN));
+ }
+ @Test
+ public void testOAuthWithNoAuth() {
GCPProperties gcpProperties =
new GCPProperties(ImmutableMap.of(GCS_OAUTH2_TOKEN, "oauth", GCS_NO_AUTH, "false"));
assertThat(gcpProperties.noAuth()).isFalse();
diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java
index 2a1ded6cd8a1..dbd7358fb36b 100644
--- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java
+++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java
@@ -42,6 +42,7 @@ public class TestContext {
public static final ObjectMapper MAPPER = new ObjectMapper();
public static final int CONNECT_PORT = 8083;
+ public static final String CONTROL_TOPIC_BOOTSTRAP_SERVERS = "kafka:9092";
private static final int MINIO_PORT = 9000;
private static final int CATALOG_PORT = 8181;
diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
index 9650ce16270c..10efbb236de2 100644
--- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
+++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
@@ -18,19 +18,14 @@
*/
package org.apache.iceberg.connect;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -43,13 +38,9 @@
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class IcebergSinkConfig extends AbstractConfig {
- private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkConfig.class.getName());
-
public static final String INTERNAL_TRANSACTIONAL_SUFFIX_PROP =
"iceberg.coordinator.transactional.suffix";
private static final String ROUTE_REGEX = "route-regex";
@@ -93,7 +84,6 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String NAME_PROP = "name";
private static final String TASK_ID = "task.id";
- private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers";
private static final String DEFAULT_CATALOG_NAME = "iceberg";
private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg";
@@ -254,8 +244,7 @@ public IcebergSinkConfig(Map originalProps) {
this.catalogProps = PropertyUtil.propertiesWithPrefix(originalProps, CATALOG_PROP_PREFIX);
this.hadoopProps = PropertyUtil.propertiesWithPrefix(originalProps, HADOOP_PROP_PREFIX);
- this.kafkaProps = Maps.newHashMap(loadWorkerProps());
- kafkaProps.putAll(PropertyUtil.propertiesWithPrefix(originalProps, KAFKA_PROP_PREFIX));
+ this.kafkaProps = PropertyUtil.propertiesWithPrefix(originalProps, KAFKA_PROP_PREFIX);
this.autoCreateProps =
PropertyUtil.propertiesWithPrefix(originalProps, AUTO_CREATE_PROP_PREFIX);
@@ -457,39 +446,4 @@ static boolean checkClassName(String className) {
return (className.matches(".*\\.ConnectDistributed.*")
|| className.matches(".*\\.ConnectStandalone.*"));
}
-
- /**
- * This method attempts to load the Kafka Connect worker properties, which are not exposed to
- * connectors. It does this by parsing the Java command used to launch the worker, extracting the
- * name of the properties file, and then loading the file.
- * The sink uses these properties, if available, when initializing its internal Kafka clients. By
- * doing this, Kafka-related properties only need to be set in the worker properties and do not
- * need to be duplicated in the sink config.
- * If the worker properties cannot be loaded, then Kafka-related properties must be set via the
- * `iceberg.kafka.*` sink configs.
- *
- * @return The Kafka Connect worker properties
- */
- private Map loadWorkerProps() {
- String javaCmd = System.getProperty("sun.java.command");
- if (javaCmd != null && !javaCmd.isEmpty()) {
- List args = Splitter.on(' ').splitToList(javaCmd);
- if (args.size() > 1 && checkClassName(args.get(0))) {
- Properties result = new Properties();
- try (InputStream in = Files.newInputStream(Paths.get(args.get(1)))) {
- result.load(in);
- // sanity check that this is the config we want
- if (result.containsKey(BOOTSTRAP_SERVERS_PROP)) {
- return Maps.fromProperties(result);
- }
- } catch (Exception e) {
- // NO-OP
- }
- }
- }
- LOG.info(
- "Worker properties not loaded, using only {}* properties for Kafka clients",
- KAFKA_PROP_PREFIX);
- return ImmutableMap.of();
- }
}