From 6efc44144e7a90daa82e29a0ab78efe462b722c3 Mon Sep 17 00:00:00 2001 From: Anatolii Popov Date: Mon, 13 Jan 2025 19:23:05 +0000 Subject: [PATCH 1/7] Support for static credentials for AWS Glue Catalog As of now, the upstream implementation does not support default AWS StaticCredentialsProvider for AWS Glue, since it expected the create method to accept Map instead of an instance of AwsCredentials class here https://github.com/apache/iceberg/blob/ff813445916bfd6ec1cc30a02b02f8bade7a26f6/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java#L239. This commit provides an adapted implementation of StaticCredentialsProvider that accepts Map as expected by Iceberg. --- LICENSE | 2 +- .../aws/StaticCredentialsProvider.java | 64 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 aws/src/main/java/org/apache/iceberg/aws/StaticCredentialsProvider.java diff --git a/LICENSE b/LICENSE index 80cfd3652e69..d78102089314 100644 --- a/LICENSE +++ b/LICENSE @@ -336,4 +336,4 @@ This product includes code from Apache Flink. Copyright: 1999-2022 The Apache Software Foundation. Home page: https://flink.apache.org/ -License: https://www.apache.org/licenses/LICENSE-2.0 \ No newline at end of file +License: https://www.apache.org/licenses/LICENSE-2.0 diff --git a/aws/src/main/java/org/apache/iceberg/aws/StaticCredentialsProvider.java b/aws/src/main/java/org/apache/iceberg/aws/StaticCredentialsProvider.java new file mode 100644 index 000000000000..2a080c4ecf32 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/StaticCredentialsProvider.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws; + +import java.util.Map; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.utils.Validate; + +/** + * An implementation of {@link AwsCredentialsProvider} that returns a set implementation of {@link + * AwsCredentials}. + * + *

This code delegates to the {@link + * software.amazon.awssdk.auth.credentials.StaticCredentialsProvider} which has the same + * implementation but is missing the @link {@link #create(Map)} factory method + */ +@SdkPublicApi +public final class StaticCredentialsProvider implements AwsCredentialsProvider { + private static final String ACCESS_KEY_ID = "access-key-id"; + private static final String SECRET_ACCESS_KEY = "secret-access-key"; + private final software.amazon.awssdk.auth.credentials.StaticCredentialsProvider inner; + + private StaticCredentialsProvider(Map credentials) { + Validate.notNull(credentials, "Credentials must not be null."); + this.inner = + software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + AwsBasicCredentials.create( + credentials.get(ACCESS_KEY_ID), credentials.get(SECRET_ACCESS_KEY))); + } + + /** Create a credentials provider that always returns the provided set of credentials. */ + public static StaticCredentialsProvider create(Map credentials) { + return new StaticCredentialsProvider(credentials); + } + + @Override + public AwsCredentials resolveCredentials() { + return inner.resolveCredentials(); + } + + @Override + public String toString() { + return inner.toString(); + } +} From e96c4c51b03355fc1be0c82e7bd13116315da432 Mon Sep 17 00:00:00 2001 From: Anatolii Popov Date: Fri, 31 Jan 2025 08:31:23 +0000 Subject: [PATCH 2/7] Removing worker.properties loading from IcebergSinkConfig Worker properties parsing is breaking a lot of abstractions from Kafka Connect perspective and exposing to the connector the properties that should not be exposed. More over it breaks config providers support. The client configuration for the control topic should be specified in the connector configuration, and it should be connector's responsibility to provide the necessary configs. --- .../apache/iceberg/connect/TestContext.java | 1 + .../iceberg/connect/IcebergSinkConfig.java | 48 +------------------ 2 files changed, 2 insertions(+), 47 deletions(-) diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java index 2a1ded6cd8a1..dbd7358fb36b 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestContext.java @@ -42,6 +42,7 @@ public class TestContext { public static final ObjectMapper MAPPER = new ObjectMapper(); public static final int CONNECT_PORT = 8083; + public static final String CONTROL_TOPIC_BOOTSTRAP_SERVERS = "kafka:9092"; private static final int MINIO_PORT = 9000; private static final int CATALOG_PORT = 8181; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..10efbb236de2 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -18,19 +18,14 @@ */ package org.apache.iceberg.connect; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -43,13 +38,9 @@ import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class IcebergSinkConfig extends AbstractConfig { - private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkConfig.class.getName()); - public static final String INTERNAL_TRANSACTIONAL_SUFFIX_PROP = "iceberg.coordinator.transactional.suffix"; private static final String ROUTE_REGEX = "route-regex"; @@ -93,7 +84,6 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String NAME_PROP = "name"; private static final String TASK_ID = "task.id"; - private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers"; private static final String DEFAULT_CATALOG_NAME = "iceberg"; private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg"; @@ -254,8 +244,7 @@ public IcebergSinkConfig(Map originalProps) { this.catalogProps = PropertyUtil.propertiesWithPrefix(originalProps, CATALOG_PROP_PREFIX); this.hadoopProps = PropertyUtil.propertiesWithPrefix(originalProps, HADOOP_PROP_PREFIX); - this.kafkaProps = Maps.newHashMap(loadWorkerProps()); - kafkaProps.putAll(PropertyUtil.propertiesWithPrefix(originalProps, KAFKA_PROP_PREFIX)); + this.kafkaProps = PropertyUtil.propertiesWithPrefix(originalProps, KAFKA_PROP_PREFIX); this.autoCreateProps = PropertyUtil.propertiesWithPrefix(originalProps, AUTO_CREATE_PROP_PREFIX); @@ -457,39 +446,4 @@ static boolean checkClassName(String className) { return (className.matches(".*\\.ConnectDistributed.*") || className.matches(".*\\.ConnectStandalone.*")); } - - /** - * This method attempts to load the Kafka Connect worker properties, which are not exposed to - * connectors. It does this by parsing the Java command used to launch the worker, extracting the - * name of the properties file, and then loading the file.
- * The sink uses these properties, if available, when initializing its internal Kafka clients. By - * doing this, Kafka-related properties only need to be set in the worker properties and do not - * need to be duplicated in the sink config.
- * If the worker properties cannot be loaded, then Kafka-related properties must be set via the - * `iceberg.kafka.*` sink configs. - * - * @return The Kafka Connect worker properties - */ - private Map loadWorkerProps() { - String javaCmd = System.getProperty("sun.java.command"); - if (javaCmd != null && !javaCmd.isEmpty()) { - List args = Splitter.on(' ').splitToList(javaCmd); - if (args.size() > 1 && checkClassName(args.get(0))) { - Properties result = new Properties(); - try (InputStream in = Files.newInputStream(Paths.get(args.get(1)))) { - result.load(in); - // sanity check that this is the config we want - if (result.containsKey(BOOTSTRAP_SERVERS_PROP)) { - return Maps.fromProperties(result); - } - } catch (Exception e) { - // NO-OP - } - } - } - LOG.info( - "Worker properties not loaded, using only {}* properties for Kafka clients", - KAFKA_PROP_PREFIX); - return ImmutableMap.of(); - } } From 6bb4c493f563f0211c0746e97fe72ecc63c85790 Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Wed, 16 Jul 2025 13:21:45 +0200 Subject: [PATCH 3/7] AWS: use aws client region for StsClient if available (#10) --- .../java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java | 1 + 1 file changed, 1 insertion(+) diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 59a4d8d3ac38..28de3bedf60a 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -161,6 +161,7 @@ protected AwsClientProperties awsClientProperties() { private StsClient sts() { return StsClient.builder() .applyMutation(httpClientProperties::applyHttpClientConfigurations) + .applyMutation(awsClientProperties::applyClientRegionConfiguration) .build(); } From 45db6cb07de792cfa12f9feee1d43e93381c49c9 Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Wed, 18 Jun 2025 17:40:58 +0200 Subject: [PATCH 4/7] AWS: Add STS endpoint override Refactor STS endpoint configuration --- .../aws/TestAssumeRoleAwsClientFactory.java | 4 +-- .../aws/AssumeRoleAwsClientFactory.java | 1 + .../org/apache/iceberg/aws/AwsProperties.java | 29 +++++++++++++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java b/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java index bb4605d63150..84d4762bb4e5 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java @@ -66,7 +66,7 @@ public class TestAssumeRoleAwsClientFactory { @BeforeEach public void before() { - roleName = UUID.randomUUID().toString(); + roleName = "integrationtest-role-" + UUID.randomUUID().toString(); iam = IamClient.builder() .region(Region.AWS_GLOBAL) @@ -99,7 +99,7 @@ public void before() { assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, response.role().arn()); assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX + "key1", "value1"); assumeRoleProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX + "key2", "value2"); - policyName = UUID.randomUUID().toString(); + policyName = "integrationtest-policy-" + UUID.randomUUID().toString(); } @AfterEach diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 28de3bedf60a..3e797aa0503e 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -162,6 +162,7 @@ private StsClient sts() { return StsClient.builder() .applyMutation(httpClientProperties::applyHttpClientConfigurations) .applyMutation(awsClientProperties::applyClientRegionConfiguration) + .applyMutation(awsProperties::applyStsEndpointConfigurations) .build(); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index 62d541da0c54..3cc7d0bfbcac 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -43,6 +43,7 @@ import software.amazon.awssdk.services.glue.GlueClientBuilder; import software.amazon.awssdk.services.kms.model.DataKeySpec; import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec; +import software.amazon.awssdk.services.sts.StsClientBuilder; public class AwsProperties implements Serializable { @@ -153,6 +154,14 @@ public class AwsProperties implements Serializable { */ public static final String CLIENT_ASSUME_ROLE_REGION = "client.assume-role.region"; + /** + * Used by {@link AssumeRoleAwsClientFactory}. Optional URL to use as the STS endpoint. + * + *

For more details, see + * https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html#sts-endpoints + */ + public static final String CLIENT_ASSUME_ROLE_STS_ENDPOINT = "client.sts.endpoint"; + /** * Used by {@link AssumeRoleAwsClientFactory}. Optional session name used to assume an IAM role. * @@ -225,6 +234,7 @@ public class AwsProperties implements Serializable { private final String clientAssumeRoleExternalId; private final int clientAssumeRoleTimeoutSec; private final String clientAssumeRoleRegion; + private final String clientAssumeRoleStsEndpoint; private final String clientAssumeRoleSessionName; private final String clientCredentialsProvider; private final Map clientCredentialsProviderProperties; @@ -253,6 +263,7 @@ public AwsProperties() { this.clientAssumeRoleTimeoutSec = CLIENT_ASSUME_ROLE_TIMEOUT_SEC_DEFAULT; this.clientAssumeRoleExternalId = null; this.clientAssumeRoleRegion = null; + this.clientAssumeRoleStsEndpoint = null; this.clientAssumeRoleSessionName = null; this.clientCredentialsProvider = null; this.clientCredentialsProviderProperties = null; @@ -281,6 +292,7 @@ public AwsProperties(Map properties) { properties, CLIENT_ASSUME_ROLE_TIMEOUT_SEC, CLIENT_ASSUME_ROLE_TIMEOUT_SEC_DEFAULT); this.clientAssumeRoleExternalId = properties.get(CLIENT_ASSUME_ROLE_EXTERNAL_ID); this.clientAssumeRoleRegion = properties.get(CLIENT_ASSUME_ROLE_REGION); + this.clientAssumeRoleStsEndpoint = properties.get(CLIENT_ASSUME_ROLE_STS_ENDPOINT); this.clientAssumeRoleSessionName = properties.get(CLIENT_ASSUME_ROLE_SESSION_NAME); this.clientCredentialsProvider = properties.get(AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER); @@ -341,6 +353,10 @@ public String clientAssumeRoleRegion() { return clientAssumeRoleRegion; } + public String clientAssumeRoleStsEndpoint() { + return clientAssumeRoleStsEndpoint; + } + public String clientAssumeRoleSessionName() { return clientAssumeRoleSessionName; } @@ -411,6 +427,19 @@ public void applyDynamoDbEndpointConfiguration configureEndpoint(builder, dynamoDbEndpoint); } + /** + * Override the endpoint for an STS client. + * + *

Sample usage: + * + *

+   *     StsClient.builder().applyMutation(awsProperties::applyStsEndpointConfigurations)
+   * 
+ */ + public void applyStsEndpointConfigurations(T builder) { + configureEndpoint(builder, clientAssumeRoleStsEndpoint); + } + public Region restSigningRegion() { if (restSigningRegion == null) { this.restSigningRegion = DefaultAwsRegionProviderChain.builder().build().getRegion().id(); From b4171cf985595327d6da0af808abb7039c9c918d Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Mon, 27 Oct 2025 14:12:15 +0100 Subject: [PATCH 5/7] GCP: Pass in JSON credentials string via config --- .../org/apache/iceberg/gcp/GCPProperties.java | 45 ++++++++++++++++--- .../iceberg/gcp/auth/GoogleAuthManager.java | 15 ++++++- .../iceberg/gcp/gcs/PrefixedStorage.java | 40 +++++++++++++++++ .../apache/iceberg/gcp/TestGCPProperties.java | 26 +++++++++-- 4 files changed, 116 insertions(+), 10 deletions(-) diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java index d91601125c74..9d9928abc4bd 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java @@ -20,8 +20,10 @@ import java.io.Serializable; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -42,6 +44,9 @@ public class GCPProperties implements Serializable { public static final String GCS_CHANNEL_READ_CHUNK_SIZE = "gcs.channel.read.chunk-size-bytes"; public static final String GCS_CHANNEL_WRITE_CHUNK_SIZE = "gcs.channel.write.chunk-size-bytes"; + public static final String GCP_CREDENTIALS_PATH_PROPERTY = "gcp.auth.credentials-path"; + public static final String GCP_CREDENTIALS_JSON_PROPERTY = "gcp.auth.credentials-json"; + public static final String GCS_OAUTH2_TOKEN = "gcs.oauth2.token"; public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at"; // Boolean to explicitly configure "no authentication" for testing purposes using a GCS emulator @@ -80,6 +85,8 @@ public class GCPProperties implements Serializable { private Date gcsOAuth2TokenExpiresAt; private String gcsOauth2RefreshCredentialsEndpoint; private boolean gcsOauth2RefreshCredentialsEnabled; + private String gcsCredentialsPath; + private String gcsCredentialsJson; private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT; @@ -111,19 +118,39 @@ public GCPProperties(Map properties) { gcsOAuth2TokenExpiresAt = new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT))); } - gcsOauth2RefreshCredentialsEndpoint = RESTUtil.resolveEndpoint( properties.get(CatalogProperties.URI), properties.get(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT)); gcsOauth2RefreshCredentialsEnabled = PropertyUtil.propertyAsBoolean(properties, GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED, true); + gcsNoAuth = Boolean.parseBoolean(properties.getOrDefault(GCS_NO_AUTH, "false")); + + gcsCredentialsPath = properties.get(GCP_CREDENTIALS_PATH_PROPERTY); + gcsCredentialsJson = properties.get(GCP_CREDENTIALS_JSON_PROPERTY); + + // Get the list of authentication properties that were specified. Only one should be present. + List authKeysPresent = + Map.of( + GCS_NO_AUTH, + gcsNoAuth, + GCS_OAUTH2_TOKEN, + gcsOAuth2Token != null, + GCP_CREDENTIALS_PATH_PROPERTY, + gcsCredentialsPath != null, + GCP_CREDENTIALS_JSON_PROPERTY, + gcsCredentialsJson != null) + .entrySet() + .stream() + .filter(Map.Entry::getValue) + .map(Map.Entry::getKey) + .sorted(String.CASE_INSENSITIVE_ORDER) + .collect(Collectors.toList()); Preconditions.checkState( - !(gcsOAuth2Token != null && gcsNoAuth), - "Invalid auth settings: must not configure %s and %s", - GCS_NO_AUTH, - GCS_OAUTH2_TOKEN); + authKeysPresent.size() < 2, + "Invalid auth settings: must not configure %s", + String.join(", ", authKeysPresent)); gcsDeleteBatchSize = PropertyUtil.propertyAsInt( @@ -170,6 +197,14 @@ public boolean noAuth() { return gcsNoAuth; } + public Optional credentialsPath() { + return Optional.ofNullable(gcsCredentialsPath); + } + + public Optional credentialsJson() { + return Optional.ofNullable(gcsCredentialsJson); + } + public Optional oauth2TokenExpiresAt() { return Optional.ofNullable(gcsOAuth2TokenExpiresAt); } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java b/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java index a1d2b539ab16..7acc2f4f04e5 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java @@ -19,9 +19,11 @@ package org.apache.iceberg.gcp.auth; import com.google.auth.oauth2.GoogleCredentials; +import java.io.ByteArrayInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import org.apache.iceberg.catalog.SessionCatalog; @@ -43,8 +45,9 @@ *

This manager can be configured with properties such as: * *

    - *
  • {@code gcp.auth.credentials-path}: Path to a service account JSON key file. If not set, - * Application Default Credentials will be used. + *
  • {@code gcp.auth.credentials-path}: Path to a service account JSON key file. + *
  • {@code gcp.auth.credentials-json}: Embedded JSON key file. If neither are set, Application + * Default Credentials will be used. *
  • {@code gcp.auth.scopes}: Comma-separated list of OAuth scopes to request. Defaults to * "https://www.googleapis.com/auth/cloud-platform". *
@@ -54,6 +57,7 @@ public class GoogleAuthManager implements AuthManager { private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); public static final String DEFAULT_SCOPES = "https://www.googleapis.com/auth/cloud-platform"; public static final String GCP_CREDENTIALS_PATH_PROPERTY = "gcp.auth.credentials-path"; + public static final String GCP_CREDENTIALS_JSON_PROPERTY = "gcp.auth.credentials-json"; public static final String GCP_SCOPES_PROPERTY = "gcp.auth.scopes"; private final String name; @@ -74,6 +78,7 @@ private void initialize(Map properties) { } String credentialsPath = properties.get(GCP_CREDENTIALS_PATH_PROPERTY); + String credentialsJson = properties.get(GCP_CREDENTIALS_JSON_PROPERTY); String scopesString = properties.getOrDefault(GCP_SCOPES_PROPERTY, DEFAULT_SCOPES); List scopes = Strings.isNullOrEmpty(scopesString) @@ -86,6 +91,12 @@ private void initialize(Map properties) { try (FileInputStream credentialsStream = new FileInputStream(credentialsPath)) { this.credentials = GoogleCredentials.fromStream(credentialsStream).createScoped(scopes); } + } else if (credentialsJson != null && !credentialsJson.isEmpty()) { + LOG.info("Using embedded Google credentials from configuration"); + try (ByteArrayInputStream credentialsStream = + new ByteArrayInputStream(credentialsJson.getBytes(StandardCharsets.UTF_8))) { + this.credentials = GoogleCredentials.fromStream(credentialsStream).createScoped(scopes); + } } else { LOG.info("Using Application Default Credentials with scopes: {}", scopesString); this.credentials = GoogleCredentials.getApplicationDefault().createScoped(scopes); diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java index e9db60b149da..b050a469af77 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java @@ -19,11 +19,16 @@ package org.apache.iceberg.gcp.gcs; import com.google.api.gax.rpc.FixedHeaderProvider; +import com.google.api.services.storage.StorageScopes; +import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.NoCredentials; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.iceberg.EnvironmentContext; import org.apache.iceberg.gcp.GCPAuthUtils; @@ -33,8 +38,11 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.SerializableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class PrefixedStorage implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(PrefixedStorage.class); private static final String GCS_FILE_IO_USER_AGENT = "gcsfileio/" + EnvironmentContext.get(); private final String storagePrefix; private final GCPProperties gcpProperties; @@ -69,10 +77,42 @@ class PrefixedStorage implements AutoCloseable { // See javadoc of com.google.auth.oauth2.GoogleCredentials.getApplicationDefault() if (gcpProperties.noAuth()) { // Explicitly allow "no credentials" for testing purposes + LOG.info("Using no credentials for prefix {} (for testing)", storagePrefix); builder.setCredentials(NoCredentials.getInstance()); } + if (gcpProperties.credentialsPath().isPresent()) { + LOG.info( + "Using Google credentials from path: {} for prefix {}", + gcpProperties.credentialsPath().get(), + storagePrefix); + try (FileInputStream credentialsStream = + new FileInputStream(gcpProperties.credentialsPath().get())) { + builder.setCredentials( + GoogleCredentials.fromStream(credentialsStream) + .createScoped(StorageScopes.DEVSTORAGE_READ_WRITE)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + if (gcpProperties.credentialsJson().isPresent()) { + LOG.info( + "Using embedded Google credentials from configuration for prefix{}", + storagePrefix); + try (ByteArrayInputStream credentialsStream = + new ByteArrayInputStream( + gcpProperties.credentialsJson().get().getBytes(StandardCharsets.UTF_8))) { + builder.setCredentials( + GoogleCredentials.fromStream(credentialsStream) + .createScoped(StorageScopes.DEVSTORAGE_READ_WRITE)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + if (gcpProperties.oauth2Token().isPresent()) { + LOG.info("Using OAuth2 token from configuration for prefix {}", storagePrefix); this.closeableGroup = new CloseableGroup(); builder.setCredentials( GCPAuthUtils.oauth2CredentialsFromGcpProperties(gcpProperties, closeableGroup)); diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/TestGCPProperties.java b/gcp/src/test/java/org/apache/iceberg/gcp/TestGCPProperties.java index 0ec2183fc355..9fae47e3b41e 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/TestGCPProperties.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/TestGCPProperties.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.gcp; +import static org.apache.iceberg.gcp.GCPProperties.GCP_CREDENTIALS_JSON_PROPERTY; +import static org.apache.iceberg.gcp.GCPProperties.GCP_CREDENTIALS_PATH_PROPERTY; import static org.apache.iceberg.gcp.GCPProperties.GCS_NO_AUTH; import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED; import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT; @@ -31,16 +33,34 @@ public class TestGCPProperties { @Test - public void testOAuthWithNoAuth() { + public void testIncompatibleAuth() { assertThatIllegalStateException() .isThrownBy( () -> new GCPProperties(ImmutableMap.of(GCS_OAUTH2_TOKEN, "oauth", GCS_NO_AUTH, "true"))) .withMessage( String.format( - "Invalid auth settings: must not configure %s and %s", - GCS_NO_AUTH, GCS_OAUTH2_TOKEN)); + "Invalid auth settings: must not configure %s, %s", GCS_NO_AUTH, GCS_OAUTH2_TOKEN)); + + assertThatIllegalStateException() + .isThrownBy( + () -> + new GCPProperties( + ImmutableMap.of( + GCS_OAUTH2_TOKEN, + "oauth", + GCP_CREDENTIALS_PATH_PROPERTY, + "/path", + GCP_CREDENTIALS_JSON_PROPERTY, + "{}"))) + .withMessage( + String.format( + "Invalid auth settings: must not configure %s, %s, %s", + GCP_CREDENTIALS_JSON_PROPERTY, GCP_CREDENTIALS_PATH_PROPERTY, GCS_OAUTH2_TOKEN)); + } + @Test + public void testOAuthWithNoAuth() { GCPProperties gcpProperties = new GCPProperties(ImmutableMap.of(GCS_OAUTH2_TOKEN, "oauth", GCS_NO_AUTH, "false")); assertThat(gcpProperties.noAuth()).isFalse(); From 1fa485e947e17a42e1bc8c307c260d358296d5dd Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Fri, 28 Nov 2025 15:19:10 +0100 Subject: [PATCH 6/7] Core: Allow retries on socket timeouts to REST --- .../rest/ExponentialHttpRequestRetryStrategy.java | 7 ++++++- .../rest/TestExponentialHttpRequestRetryStrategy.java | 11 +++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java index e9591585d523..c744154d7e08 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java +++ b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java @@ -22,6 +22,7 @@ import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.NoRouteToHostException; +import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.time.Instant; import java.util.Set; @@ -80,6 +81,7 @@ */ class ExponentialHttpRequestRetryStrategy implements HttpRequestRetryStrategy { private final int maxRetries; + private final Set> retriableExceptions; private final Set> nonRetriableExceptions; private final Set retriableCodes; private final Set idempotentRetriableCodes; @@ -97,6 +99,7 @@ class ExponentialHttpRequestRetryStrategy implements HttpRequestRetryStrategy { HttpStatus.SC_BAD_GATEWAY, HttpStatus.SC_GATEWAY_TIMEOUT, HttpStatus.SC_REQUEST_TIMEOUT); + this.retriableExceptions = ImmutableSet.of(SocketTimeoutException.class); this.nonRetriableExceptions = ImmutableSet.of( InterruptedIOException.class, @@ -115,7 +118,9 @@ public boolean retryRequest( return false; } - if (nonRetriableExceptions.contains(exception.getClass())) { + if (retriableExceptions.contains(exception.getClass())) { + // Skip the non-retriable tests if it's explicitly retriable. + } else if (nonRetriableExceptions.contains(exception.getClass())) { return false; } else { for (Class rejectException : nonRetriableExceptions) { diff --git a/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java b/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java index 0954e3837b0f..8821b4b805ca 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java @@ -92,13 +92,20 @@ public void basicRetry() { } @Test - public void noRetryOnConnectTimeout() { + public void noRetryOnInterruptedIO() { HttpGet request = new HttpGet("/"); - assertThat(retryStrategy.retryRequest(request, new SocketTimeoutException(), 1, null)) + assertThat(retryStrategy.retryRequest(request, new InterruptedIOException(), 1, null)) .isFalse(); } + @Test + public void retryOnSocketTimeout() { + HttpGet request = new HttpGet("/"); + + assertThat(retryStrategy.retryRequest(request, new SocketTimeoutException(), 1, null)).isTrue(); + } + @Test public void noRetryOnConnect() { HttpGet request = new HttpGet("/"); From 4114cb99a56153264c62283201843a2f49068bae Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Fri, 28 Nov 2025 16:43:26 +0100 Subject: [PATCH 7/7] Extra logging for REST retries This is optional and hasn't been contributed upstream. --- .../iceberg/rest/ExponentialHttpRequestRetryStrategy.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java index c744154d7e08..95573897df6e 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java +++ b/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java @@ -43,6 +43,8 @@ import org.apache.hc.core5.util.TimeValue; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Defines an exponential HTTP request retry strategy and provides the same characteristics as the @@ -80,6 +82,9 @@ * {@link #getRetryInterval(HttpResponse, int, HttpContext)} to achieve exponential backoff. */ class ExponentialHttpRequestRetryStrategy implements HttpRequestRetryStrategy { + private static final Logger LOG = + LoggerFactory.getLogger(ExponentialHttpRequestRetryStrategy.class); + private final int maxRetries; private final Set> retriableExceptions; private final Set> nonRetriableExceptions; @@ -119,6 +124,7 @@ public boolean retryRequest( } if (retriableExceptions.contains(exception.getClass())) { + LOG.info("AIVEN: Socket timed out after {}/{} retries", execCount, maxRetries); // Skip the non-retriable tests if it's explicitly retriable. } else if (nonRetriableExceptions.contains(exception.getClass())) { return false;