Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;

public sealed interface AzureAuth
permits AzureAuthAccessKey, AzureAuthDefault, AzureAuthOauth
permits AzureAuthAccessKey, AzureAuthDefault, AzureAuthOauth, AzureVendedAuth
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing this new auth means that with this PR,

return switch (azureAuth) {
case AzureAuthOauth _ -> oauth2PresignedUri(location, ttl, Optional.empty());
case AzureAuthAccessKey _ -> accessKeyPresignedUri(location, ttl, Optional.empty());
default -> throw new UnsupportedOperationException("Unsupported azure auth: " + azureAuth);
};

throws.

Maybe this is fine for now, as I see it's not supported even for default auth?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also highlighting that I'm not adding this type to

public enum AuthType
{
ACCESS_KEY,
OAUTH,
DEFAULT,
}

I think that this should be an internal auth type that cannot enabled by config. Though I appreciate this design of an internal, IRC-centred auth may be controversial.

{
void setAuth(String storageAccount, BlobContainerClientBuilder builder);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.azure;

public final class AzureFileSystemConstants
{
/**
* Internal property enabling {@link AzureVendedAuth} on the filesystem when set to true.
*/
public static final String EXTRA_USE_VENDED_TOKEN = "internal$use_vended_token";

/**
* Internal prefix for SAS token property keys, mapping storage accounts to their SAS tokens.
*/
public static final String EXTRA_SAS_TOKEN_PROPERTY_PREFIX = "internal$account_sas$";

private AzureFileSystemConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void destroy()
@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
return new AzureFileSystem(httpClient, concurrencyPolicy, uploadExecutor, tracingOptions, auth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize, multipart);
return new AzureFileSystem(httpClient, concurrencyPolicy, uploadExecutor, tracingOptions, withVendedAuth(identity, auth), endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize, multipart);
}

public static HttpClient createAzureHttpClient(ConnectionProvider connectionProvider, EventLoopGroup eventLoopGroup, HttpClientOptions clientOptions)
Expand All @@ -143,4 +143,13 @@ public static HttpClient createAzureHttpClient(ConnectionProvider connectionProv
.eventLoopGroup(eventLoopGroup)
.build();
}

private static AzureAuth withVendedAuth(ConnectorIdentity identity, AzureAuth defaultAuth)
{
if (identity.getExtraCredentials().containsKey(AzureFileSystemConstants.EXTRA_USE_VENDED_TOKEN) &&
identity.getExtraCredentials().get(AzureFileSystemConstants.EXTRA_USE_VENDED_TOKEN).equalsIgnoreCase("true")) {
return new AzureVendedAuth(identity.getExtraCredentials(), defaultAuth);
}
return defaultAuth;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.azure;

import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;

import java.util.Map;
import java.util.Optional;

public final class AzureVendedAuth
implements AzureAuth
{
private final Map<String, String> sasTokens;
private final AzureAuth fallbackAuth;

public AzureVendedAuth(Map<String, String> sasTokens, AzureAuth fallbackAuth)
{
this.sasTokens = sasTokens;
this.fallbackAuth = fallbackAuth;
}

@Override
public void setAuth(String storageAccount, BlobContainerClientBuilder builder)
{
getSasToken(storageAccount)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.ifPresentOrElse(builder::sasToken, () -> fallbackAuth.setAuth(storageAccount, builder));
}

@Override
public void setAuth(String storageAccount, DataLakeServiceClientBuilder builder)
{
getSasToken(storageAccount)
.ifPresentOrElse(builder::sasToken, () -> fallbackAuth.setAuth(storageAccount, builder));
}

public Optional<String> getSasToken(String storageAccount)
{
return Optional.ofNullable(sasTokens.get(AzureFileSystemConstants.EXTRA_SAS_TOKEN_PROPERTY_PREFIX + storageAccount));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.iceberg.IcebergFileSystemFactory;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.iceberg.util.PropertyUtil;

import java.util.Map;
import java.util.Optional;

import static io.trino.filesystem.azure.AzureFileSystemConstants.EXTRA_SAS_TOKEN_PROPERTY_PREFIX;
import static io.trino.filesystem.azure.AzureFileSystemConstants.EXTRA_USE_VENDED_TOKEN;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY;
Expand All @@ -34,6 +38,8 @@ public class IcebergRestCatalogFileSystemFactory
private static final String VENDED_S3_SECRET_KEY = "s3.secret-access-key";
private static final String VENDED_S3_SESSION_TOKEN = "s3.session-token";

private static final String VENDED_ADLS_SAS_TOKEN_PREFIX = "adls.sas-token.";

private final TrinoFileSystemFactory fileSystemFactory;
private final boolean vendedCredentialsEnabled;

Expand All @@ -47,25 +53,56 @@ public IcebergRestCatalogFileSystemFactory(TrinoFileSystemFactory fileSystemFact
@Override
public TrinoFileSystem create(ConnectorIdentity identity, Map<String, String> fileIoProperties)
{
if (vendedCredentialsEnabled &&
fileIoProperties.containsKey(VENDED_S3_ACCESS_KEY) &&
if (vendedCredentialsEnabled) {
return fileSystemFactory.create(
getVendedS3Identity(identity, fileIoProperties)
.or(() -> getVendedAzureIdentity(identity, fileIoProperties))
.orElse(identity));
}

return fileSystemFactory.create(identity);
}

private static Optional<ConnectorIdentity> getVendedS3Identity(ConnectorIdentity identity, Map<String, String> fileIoProperties)
{
if (fileIoProperties.containsKey(VENDED_S3_ACCESS_KEY) &&
fileIoProperties.containsKey(VENDED_S3_SECRET_KEY) &&
fileIoProperties.containsKey(VENDED_S3_SESSION_TOKEN)) {
// Do not include original credentials as they should not be used in vended mode
ConnectorIdentity identityWithExtraCredentials = ConnectorIdentity.forUser(identity.getUser())
.withGroups(identity.getGroups())
.withPrincipal(identity.getPrincipal())
.withEnabledSystemRoles(identity.getEnabledSystemRoles())
.withConnectorRole(identity.getConnectorRole())
.withExtraCredentials(ImmutableMap.<String, String>builder()
.put(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY, fileIoProperties.get(VENDED_S3_ACCESS_KEY))
.put(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY, fileIoProperties.get(VENDED_S3_SECRET_KEY))
.put(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY, fileIoProperties.get(VENDED_S3_SESSION_TOKEN))
.buildOrThrow())
.build();
return fileSystemFactory.create(identityWithExtraCredentials);
return Optional.of(getVendedIdentity(identity, ImmutableMap.<String, String>builder()
.put(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY, fileIoProperties.get(VENDED_S3_ACCESS_KEY))
.put(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY, fileIoProperties.get(VENDED_S3_SECRET_KEY))
.put(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY, fileIoProperties.get(VENDED_S3_SESSION_TOKEN))
.buildOrThrow()));
}
return Optional.empty();
}

return fileSystemFactory.create(identity);
private static Optional<ConnectorIdentity> getVendedAzureIdentity(ConnectorIdentity identity, Map<String, String> fileIoProperties)
{
ImmutableMap.Builder<String, String> azureCredentialBuilder = ImmutableMap.builder();
PropertyUtil.propertiesWithPrefix(fileIoProperties, VENDED_ADLS_SAS_TOKEN_PREFIX)
.forEach((host, token) -> {
String storageAccount = host.contains(".") ? host.substring(0, host.indexOf('.')) : host;

if (!storageAccount.isEmpty() && !token.isEmpty()) {
azureCredentialBuilder.put(EXTRA_SAS_TOKEN_PROPERTY_PREFIX + storageAccount, token);
azureCredentialBuilder.put(EXTRA_USE_VENDED_TOKEN, "true");
}
});

Map<String, String> azureCredentials = azureCredentialBuilder.buildKeepingLast();
return azureCredentials.isEmpty() ? Optional.empty() : Optional.of(getVendedIdentity(identity, azureCredentials));
}

private static ConnectorIdentity getVendedIdentity(ConnectorIdentity identity, Map<String, String> extraCredentials)
{
// Do not include original credentials as they should not be used in vended mode
return ConnectorIdentity.forUser(identity.getUser())
.withGroups(identity.getGroups())
.withPrincipal(identity.getPrincipal())
.withEnabledSystemRoles(identity.getEnabledSystemRoles())
.withConnectorRole(identity.getConnectorRole())
.withExtraCredentials(extraCredentials)
.build();
}
}
Loading