Skip to content

Commit

Permalink
run AAL test wit CRT
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmarsuhail committed Mar 7, 2025
1 parent b6d5fa8 commit 6cf66d8
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 0 deletions.
15 changes: 15 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
<!-- Stream Type -->
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
<!-- S3 CRT Client -->
<fs.s3a.crt.enabled>${fs.s3a.crt.enabled}</fs.s3a.crt.enabled>
</systemPropertyVariables>
</configuration>
</plugin>
Expand Down Expand Up @@ -324,6 +326,19 @@
</properties>
</profile>

<!-- Use the S3 CRT client -->
<profile>
<id>crt</id>
<activation>
<property>
<name>crt</name>
</property>
</activation>
<properties>
<fs.s3a.crt.enabled>true</fs.s3a.crt.enabled>
</properties>
</profile>

</profiles>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.hadoop.fs.s3a.S3ClientFactory;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
Expand All @@ -59,6 +60,11 @@ public class ClientManagerImpl

public static final Logger LOG = LoggerFactory.getLogger(ClientManagerImpl.class);

/**
* A one-off log stating whether S3 CRT client is enabled
*/
private static final LogExactlyOnce LOG_S3_CRT_ENABLED = new LogExactlyOnce(LOG);

/**
* Client factory to invoke.
*/
Expand Down Expand Up @@ -145,6 +151,7 @@ private CallableRaisingIOE<S3AsyncClient> createAsyncClient() {
return trackDurationOfOperation(durationTrackerFactory, STORE_CLIENT_CREATION.getSymbol(),
() -> {
if (clientCreationParameters.isCrtEnabled()) {
LOG_S3_CRT_ENABLED.info("S3 CRT client is enabled");
return clientFactory.createS3CrtClient(getUri(), clientCreationParameters);
} else {
return clientFactory.createS3AsyncClient(getUri(), clientCreationParameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -36,6 +38,8 @@
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
import org.apache.hadoop.fs.statistics.IOStatistics;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
Expand All @@ -44,6 +48,7 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.CRT_CLIENT_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
Expand All @@ -61,14 +66,31 @@
* parquet files.
*
*/
@RunWith(Parameterized.class)
public class ITestS3AAnalyticsAcceleratorStreamReading extends AbstractS3ATestBase {

private static final String PHYSICAL_IO_PREFIX = "physicalio";
private static final String LOGICAL_IO_PREFIX = "logicalio";

/**
* Parameterization.
*/
@Parameterized.Parameters(name = "crtEnabled={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{true},
{false},
});
}

private final boolean isCRTEnabled;

private Path externalTestFile;

public ITestS3AAnalyticsAcceleratorStreamReading(final boolean isCRTEnabled) {
this.isCRTEnabled = isCRTEnabled;
}

@Before
public void setUp() throws Exception {
super.setup();
Expand All @@ -78,6 +100,10 @@ public void setUp() throws Exception {
@Override
public Configuration createConfiguration() {
Configuration configuration = super.createConfiguration();

removeBaseAndBucketOverrides(configuration, CRT_CLIENT_ENABLED);
configuration.setBoolean(CRT_CLIENT_ENABLED, isCRTEnabled);

enableAnalyticsAccelerator(configuration);
return configuration;
}
Expand Down

0 comments on commit 6cf66d8

Please sign in to comment.