Skip to content

Commit

Permalink
move CRT creation logic to client factory
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmarsuhail committed Mar 7, 2025
1 parent 04f83c6 commit f72e64a
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public class DefaultS3ClientFactory extends Configured
protected static final Logger LOG =
LoggerFactory.getLogger(DefaultS3ClientFactory.class);

/**
* A one-off log stating whether S3 CRT client is enabled
*/
private static final LogExactlyOnce LOG_S3_CRT_ENABLED = new LogExactlyOnce(LOG);


/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
Expand Down Expand Up @@ -118,7 +123,17 @@ public S3Client createS3Client(
}

@Override
public S3AsyncClient createS3AsyncClient(
public S3AsyncClient createS3AsyncClient(final URI uri,
final S3ClientCreationParameters parameters) throws IOException {
if (parameters.isCrtEnabled()) {
LOG_S3_CRT_ENABLED.info("The S3 CRT client is enabled");
return createS3CrtAsyncClient(uri, parameters);
} else {
return createJavaAsyncClient(uri, parameters);
}
}

public S3AsyncClient createJavaAsyncClient(
final URI uri,
final S3ClientCreationParameters parameters) throws IOException {

Expand Down Expand Up @@ -148,8 +163,7 @@ public S3AsyncClient createS3AsyncClient(
return s3AsyncClientBuilder.build();
}

@Override
public S3AsyncClient createS3CrtClient(URI uri, S3ClientCreationParameters parameters)
private S3AsyncClient createS3CrtAsyncClient(URI uri, S3ClientCreationParameters parameters)
throws IOException {
Configuration conf = getConf();
String bucket = uri.getHost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ S3AsyncClient createS3AsyncClient(URI uri,
S3ClientCreationParameters parameters) throws IOException;


S3AsyncClient createS3CrtClient(URI uri, S3ClientCreationParameters parameters) throws IOException;

/**
* Creates a new {@link S3TransferManager}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ public class ClientManagerImpl

public static final Logger LOG = LoggerFactory.getLogger(ClientManagerImpl.class);

/**
* A one-off log stating whether S3 CRT client is enabled
*/
private static final LogExactlyOnce LOG_S3_CRT_ENABLED = new LogExactlyOnce(LOG);

/**
* Client factory to invoke.
Expand Down Expand Up @@ -148,15 +144,9 @@ private CallableRaisingIOE<S3Client> createS3Client() {
* @return a callable which will create the client.
*/
private CallableRaisingIOE<S3AsyncClient> createAsyncClient() {
return trackDurationOfOperation(durationTrackerFactory, STORE_CLIENT_CREATION.getSymbol(),
() -> {
if (clientCreationParameters.isCrtEnabled()) {
LOG_S3_CRT_ENABLED.info("S3 CRT client is enabled");
return clientFactory.createS3CrtClient(getUri(), clientCreationParameters);
} else {
return clientFactory.createS3AsyncClient(getUri(), clientCreationParameters);
}
});
return trackDurationOfOperation(durationTrackerFactory,
STORE_CLIENT_CREATION.getSymbol(),
() -> clientFactory.createS3AsyncClient(getUri(), clientCreationParameters));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1775,10 +1775,9 @@ enhanced connection pool management, and can provide higher transfer from and to
ability to split PUT requests into MPU, and GETs into multiple byte-ranged GETs. More information
can be found [here](https://aws.amazon.com/blogs/developer/introducing-crt-based-s3-client-and-the-s3-transfer-manager-in-the-aws-sdk-for-java-2-x/).

While S3A does not benefit directly from all these features, since when writing files S3A
implements its own MPU is used, the enhanced connection pool management can help when making
many parallel GET requests to ensure load is evenly distributed across S3, as happens in the
Analytics input stream.
Using CRT ensures load is evenly distributed across S3, as happens in the
Analytics input stream when making multiple parallel GET requests, due to the enhanced connection

Check failure on line 1779 in hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md#L1779

blanks: end of line
pool management.

The CRT client can be enabled as follows:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,6 @@ public S3AsyncClient createS3AsyncClient(URI uri, final S3ClientCreationParamete
return s3;
}

@Override
public S3AsyncClient createS3CrtClient(URI uri, S3ClientCreationParameters parameters)
throws IOException {
S3AsyncClient s3 = mock(S3AsyncClient.class);
return s3;
}

@Override
public S3TransferManager createS3TransferManager(S3AsyncClient s3AsyncClient) {
S3TransferManager tm = mock(S3TransferManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,6 @@ public S3AsyncClient createS3AsyncClient(final URI uri,
return asyncClient;
}

@Override
public S3AsyncClient createS3CrtClient(URI uri, S3ClientCreationParameters parameters)
throws IOException {
asyncClientCreationCount.incrementAndGet();
launcher.apply();
return asyncClient;
}

@Override
public S3TransferManager createS3TransferManager(final S3AsyncClient s3AsyncClient) {
transferManagerCreationCount.incrementAndGet();
Expand Down

0 comments on commit f72e64a

Please sign in to comment.