From ba3c7511c79344e03381c572f575cf6779d873cc Mon Sep 17 00:00:00 2001 From: Alexander Patrikalakis Date: Sun, 23 Apr 2017 04:05:24 +0900 Subject: [PATCH 1/2] Update SDK. Allow creating destination if it does not exist. --- .gitignore | 5 + README.md | 223 +++++++++------ pom.xml | 29 +- .../dynamodb/bootstrap/CommandLineArgs.java | 56 ++-- .../bootstrap/CommandLineInterface.java | 257 ++++++++++++------ .../bootstrap/DynamoDBBootstrapWorker.java | 15 +- .../dynamodb/bootstrap/DynamoDBConsumer.java | 6 +- .../bootstrap/DynamoDBConsumerWorker.java | 5 +- .../dynamodb/bootstrap/DynamoDBTableScan.java | 6 +- .../dynamodb/bootstrap/ScanSegmentWorker.java | 6 +- .../TransferDataFromOneTableToAnother.java | 38 +++ ...ansferDataFromOneTableToBlockingQueue.java | 38 +++ 12 files changed, 448 insertions(+), 236 deletions(-) create mode 100644 .gitignore create mode 100644 src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java create mode 100644 src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5a7dc41 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/.idea +/target +/*.iml +.*~ +*~ diff --git a/README.md b/README.md index a4fb283..656e127 100644 --- a/README.md +++ b/README.md @@ -1,79 +1,127 @@ # DynamoDB Import Export Tool -The DynamoDB Import Export Tool is designed to perform parallel scans on the source table, store scan results in a queue, then consume the queue by writing the items asynchronously to a destination table. +The DynamoDB Import Export Tool is designed to perform parallel scans on the source table, +store scan results in a queue, then consume the queue by writing the items asynchronously to a destination table. ## Requirements ## * Maven * JRE 1.7+ -* Pre-existing source and destination DynamoDB tables +* Pre-existing source DynamoDB tables. The destination table is optional in the CLI; you can choose to create the +destination table if it does not exist. ## Running as an executable - -1. Build the library: - -``` - mvn install +1. Build the library with `mvn install`. This produces the target jar in the target/ directory. +The CLI's usage follows with required parameters marked by asterisks. + +```bash + --consistentScan + Use this flag to use strongly consistent scan. If the flag is not used + it will default to eventually consistent scan + Default: false + --createDestination + Create destination table if it does not exist + Default: false + --copyStreamSpecificationWhenCreating + Use the source table stream specification for the destination table + during its creation. + Default: false + --destinationEndpoint + Endpoint of the destination table + * --destinationRegion + Signing region for the destination endpoint + * --destinationTable + Name of the destination table + --help + Display usage information + --maxWriteThreads + Number of max threads to write to destination table + Default: 1024 + * --readThroughputRatio + Percentage of total read throughput to scan the source table + Default: 0.0 + --section + Section number to scan when running multiple programs concurrently [0, + 1... totalSections-1] + Default: 0 + --sourceEndpoint + Endpoint of the source table + * --sourceRegion + Signing region for the source endpoint + * --sourceTable + Name of the source table + --totalSections + Total number of sections to divide the scan into + Default: 1 + * --writeThroughputRatio + Percentage of total write throughput to write the destination table + Default: 0.0 ``` -2. This produces the target jar in the target/ directory, to start the replication process: - -java -jar dynamodb-import-export-tool.jar - ---destinationEndpoint // the DynamoDB endpoint where the destination table is located. - ---destinationTable // the destination table to write to. - ---sourceEndpoint // the endpoint where the source table is located. - ---sourceTable // the source table to read from. - ---readThroughputRatio // the ratio of read throughput to consume from the source table. - ---writeThroughputRatio // the ratio of write throughput to consume from the destination table. - ---maxWriteThreads // (Optional, default=128 * Available_Processors) Maximum number of write threads to create. - ---totalSections // (Optional, default=1) Total number of sections to split the bootstrap into. Each application will only scan and write one section. - ---section // (Optional, default=0) section to read and write. Only will scan this one section of all sections, [0...totalSections-1]. - ---consistentScan // (Optional, default=false) indicates whether consistent scan should be used when reading from the source table. +2. An example command you can use on one EC2 host to copy from one table `foo` in `us-east-1` to a new table +called `bar` in `us-east-2` follows. + +```bash +java -jar target/dynamodb-import-export-tool-1.1.0.jar \ +--sourceRegion us-east-1 \ +--sourceTable foo \ +--destinationRegion us-east-2 \ +--destinationTable bar \ +--readThroughputRatio 1 \ +--writeThroughputRatio 1 +``` -> **NOTE**: To split the replication process across multiple machines, simply use the totalSections & section command line arguments, where each machine will run one section out of [0 ... totalSections-1]. +> **NOTE**: To split the replication process across multiple machines, simply use the totalSections & section +command line arguments, where each machine will run one section out of [0 ... totalSections-1]. ## Using the API +Find some examples of how to use the Import-Export tool's API below. +The first demonstrates how to use the API to copy data from one DynamoDB table to another. +The second demonstrates how to enqueue the data in a DynamoDB table in a +`BlockingQueueConsumer` in memory. ### 1. Transfer Data from One DynamoDB Table to Another DynamoDB Table -The below example will read from "mySourceTable" at 100 reads per second, using 4 threads. And it will write to "myDestinationTable" at 50 writes per second, using 8 threads. -Both tables are located at "dynamodb.us-west-1.amazonaws.com". (to transfer to a different region, create 2 AmazonDynamoDBClients +The below example will read from "mySourceTable" at 100 reads per second, using four threads. +And it will write to "myDestinationTable" at 50 writes per second, using eight threads. +Both tables are located at "dynamodb.us-west-1.amazonaws.com". +To transfer to a different region, create two AmazonDynamoDBClients with different endpoints to pass into the DynamoDBBootstrapWorker and the DynamoDBConsumer. ```java -AmazonDynamoDBClient client = new AmazonDynamoDBClient(new ProfileCredentialsProvider()); -client.setEndpoint("dynamodb.us-west-1.amazonaws.com"); - -DynamoDBBootstrapWorker worker = null; - -try { - // 100.0 read operations per second. 4 threads to scan the table. - worker = new DynamoDBBootstrapWorker(client, - 100.0, "mySourceTable", 4); -} catch (NullReadCapacityException e) { - LOGGER.error("The DynamoDB source table returned a null read capacity.", e); - System.exit(1); -} - - // 50.0 write operations per second. 8 threads to scan the table. -DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", 50.0, Executors.newFixedThreadPool(8)); - -try { - worker.pipe(consumer); -} catch (ExecutionException e) { - LOGGER.error("Encountered exception when executing transfer.", e); - System.exit(1); -} catch (InterruptedException e){ - LOGGER.error("Interrupted when executing transfer.", e); - System.exit(1); +import com.amazonaws.dynamodb.bootstrap.DynamoDBBootstrapWorker; +import com.amazonaws.dynamodb.bootstrap.DynamoDBConsumer; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +class TransferDataFromOneTableToAnother { + public static void main(String[] args) { + AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() + .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + DynamoDBBootstrapWorker worker = null; + try { + // 100.0 read operations per second. 4 threads to scan the table. + worker = new DynamoDBBootstrapWorker(client, + 100.0, "mySourceTable", 4); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } + // 50.0 write operations per second. 8 threads to scan the table. + DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", 50.0, + Executors.newFixedThreadPool(8)); + try { + worker.pipe(consumer); + } catch (ExecutionException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } catch (InterruptedException e){ + System.err.println("Interrupted when executing transfer: " + e.getMessage()); + System.exit(1); + } + } } ``` @@ -85,29 +133,40 @@ the DynamoDB entries but does not have a setup application for it. They can just to then process the new entries. ```java -AmazonDynamoDBClient client = new AmazonDynamoDBClient(new ProfileCredentialsProvider()); -client.setEndpoint("dynamodb.us-west-1.amazonaws.com"); - -DynamoDBBootstrapWorker worker = null; - -try { - // 100.0 read operations per second. 4 threads to scan the table. - worker = new DynamoDBBootstrapWorker(client, - 100.0, "mySourceTable", 4); -} catch (NullReadCapacityException e) { - LOGGER.error("The DynamoDB source table returned a null read capacity.", e); - System.exit(1); -} - -BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); - -try { - worker.pipe(consumer); -} catch (ExecutionException e) { - LOGGER.error("Encountered exception when executing transfer.", e); - System.exit(1); -} catch (InterruptedException e){ - LOGGER.error("Interrupted when executing transfer.", e); - System.exit(1); +import com.amazonaws.dynamodb.bootstrap.BlockingQueueConsumer; +import com.amazonaws.dynamodb.bootstrap.DynamoDBBootstrapWorker; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +import java.util.concurrent.ExecutionException; + +class TransferDataFromOneTableToBlockingQueue { + public static void main(String[] args) { + AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() + .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + + DynamoDBBootstrapWorker worker = null; + + try { + // 100.0 read operations per second. 4 threads to scan the table. + worker = new DynamoDBBootstrapWorker(client, 100.0, "mySourceTable", 4); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } + + BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); + + try { + worker.pipe(consumer); + } catch (ExecutionException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } catch (InterruptedException e){ + System.err.println("Interrupted when executing transfer: " + e.getMessage()); + System.exit(1); + } + } } ``` diff --git a/pom.xml b/pom.xml index a0360bb..930ee26 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@ 4.0.0 com.amazonaws - 1.0.1 + 1.1.0 dynamodb-import-export-tool jar DynamoDB Import Export Tool @@ -11,14 +11,17 @@ https://github.com/awslabs/dynamodb-import-export-tool.git - 1.10.10 - 1.6.2 - 1.48 - 15.0 + 1.7 + 1.11.123 + 1.6.6 + 1.69 + 21.0 1.2.17 - 3.2 + 3.4 1.2 - 2.4.1 + 3.0.0 + 3.0 + 1.6 true @@ -84,6 +87,11 @@ log4j ${log4j.core.version} + + org.projectlombok + lombok + 1.16.14 + org.powermock powermock-module-junit4 @@ -109,14 +117,15 @@ maven-compiler-plugin - 1.7 - 1.7 + ${jdk.version} + ${jdk.version} - 3.0 + ${maven.compiler.version} org.apache.maven.plugins maven-gpg-plugin + ${maven.gpg.version} sign-artifacts diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java index 1b41a91..fd73b6f 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java @@ -16,11 +16,13 @@ import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; import com.beust.jcommander.Parameter; +import lombok.Getter; /** * This class contains the parameters to input when executing the program from * command line. */ +@Getter public class CommandLineArgs { public static final String HELP = "--help"; @Parameter(names = HELP, description = "Display usage information", help = true) @@ -31,82 +33,58 @@ public boolean getHelp() { } public static final String SOURCE_ENDPOINT = "--sourceEndpoint"; - @Parameter(names = SOURCE_ENDPOINT, description = "Endpoint of the source table", required = true) + @Parameter(names = SOURCE_ENDPOINT, description = "Endpoint of the source table") private String sourceEndpoint; - public String getSourceEndpoint() { - return sourceEndpoint; - } + public static final String SOURCE_SIGNING_REGION = "--sourceSigningRegion"; + @Parameter(names = SOURCE_SIGNING_REGION, description = "Signing region for the source endpoint", required = true) + private String sourceSigningRegion; public static final String SOURCE_TABLE = "--sourceTable"; @Parameter(names = SOURCE_TABLE, description = "Name of the source table", required = true) private String sourceTable; - public String getSourceTable() { - return sourceTable; - } - public static final String DESTINATION_ENDPOINT = "--destinationEndpoint"; - @Parameter(names = DESTINATION_ENDPOINT, description = "Endpoint of the destination table", required = true) + @Parameter(names = DESTINATION_ENDPOINT, description = "Endpoint of the destination table") private String destinationEndpoint; - public String getDestinationEndpoint() { - return destinationEndpoint; - } + public static final String DESTINATION_SIGNING_REGION = "--destinationSigningRegion"; + @Parameter(names = DESTINATION_SIGNING_REGION, description = "Signing region for the destination endpoint", required = true) + private String destinationSigningRegion; public static final String DESTINATION_TABLE = "--destinationTable"; @Parameter(names = DESTINATION_TABLE, description = "Name of the destination table", required = true) private String destinationTable; - public String getDestinationTable() { - return destinationTable; - } + public static final String CREATE_DESTINATION_TABLE_IF_MISSING = "--createDestination"; + @Parameter(names = CREATE_DESTINATION_TABLE_IF_MISSING, description = "Create destination table if it does not exist") + private boolean createDestinationTableIfMissing; + + public static final String COPY_STREAM_SPECIFICATION_WHEN_CREATING = "--copyStreamSpecificationWhenCreating"; + @Parameter(names = COPY_STREAM_SPECIFICATION_WHEN_CREATING, description = "Use the source table stream specification for the destination table during its creation.") + private boolean copyStreamSpecification; public static final String READ_THROUGHPUT_RATIO = "--readThroughputRatio"; @Parameter(names = READ_THROUGHPUT_RATIO, description = "Percentage of total read throughput to scan the source table", required = true) private double readThroughputRatio; - public double getReadThroughputRatio() { - return readThroughputRatio; - } - public static final String WRITE_THROUGHPUT_RATIO = "--writeThroughputRatio"; @Parameter(names = WRITE_THROUGHPUT_RATIO, description = "Percentage of total write throughput to write the destination table", required = true) private double writeThroughputRatio; - public double getWriteThroughputRatio() { - return writeThroughputRatio; - } - public static final String MAX_WRITE_THREADS = "--maxWriteThreads"; @Parameter(names = MAX_WRITE_THREADS, description = "Number of max threads to write to destination table", required = false) private int maxWriteThreads = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE; - public int getMaxWriteThreads() { - return maxWriteThreads; - } - public static final String TOTAL_SECTIONS = "--totalSections"; @Parameter(names = TOTAL_SECTIONS, description = "Total number of sections to divide the scan into", required = false) private int totalSections = 1; - public int getTotalSections() { - return totalSections; - } - public static final String SECTION = "--section"; @Parameter(names = SECTION, description = "Section number to scan when running multiple programs concurrently [0, 1... totalSections-1]", required = false) private int section = 0; - - public int getSection() { - return section; - } public static final String CONSISTENT_SCAN = "--consistentScan"; @Parameter(names = CONSISTENT_SCAN, description = "Use this flag to use strongly consistent scan. If the flag is not used it will default to eventually consistent scan") private boolean consistentScan = false; - - public boolean getConsistentScan() { - return consistentScan; - } } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java index 67639fc..2be82f3 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java @@ -14,22 +14,33 @@ */ package com.amazonaws.dynamodb.bootstrap; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.model.*; +import com.amazonaws.waiters.WaiterParameters; +import com.amazonaws.waiters.WaiterTimedOutException; +import com.amazonaws.waiters.WaiterUnrecoverableException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import lombok.extern.log4j.Log4j; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.beust.jcommander.JCommander; import com.beust.jcommander.ParameterException; @@ -37,13 +48,148 @@ * The interface that parses the arguments, and begins to transfer data from one * DynamoDB table to another */ +@Log4j public class CommandLineInterface { - /** - * Logger for the DynamoDBBootstrapWorker. - */ - private static final Logger LOGGER = LogManager - .getLogger(CommandLineInterface.class); + public static final String ENCOUNTERED_EXCEPTION_WHEN_EXECUTING_TRANSFER = "Encountered exception when executing transfer."; + + static AwsClientBuilder.EndpointConfiguration createEndpointConfiguration(Region region, Optional endpoint, String endpointPrefix) { + return new AwsClientBuilder.EndpointConfiguration(endpoint.or("https://" + region.getServiceEndpoint(endpointPrefix)), region.getName()); + } + + private final AwsClientBuilder.EndpointConfiguration sourceEndpointConfiguration; + private final String sourceTable; + private final AwsClientBuilder.EndpointConfiguration destinationEndpointConfiguration; + private final String destinationTable; + private final double readThroughputRatio; + private final double writeThroughputRatio; + private final int maxWriteThreads; + private final boolean isConsistentScan; + private final boolean createDestinationTableIfMissing; + private final boolean copyStreamSpecification; + private final int sectionNumber; + private final int totalSections; + + private CommandLineInterface(final CommandLineArgs params) { + sourceEndpointConfiguration = createEndpointConfiguration(Region.getRegion(Regions.fromName(params.getSourceSigningRegion())), + Optional.fromNullable(params.getSourceEndpoint()), AmazonDynamoDB.ENDPOINT_PREFIX); + sourceTable = params.getSourceTable(); + destinationEndpointConfiguration = createEndpointConfiguration(Region.getRegion(Regions.fromName(params.getDestinationSigningRegion())), + Optional.fromNullable(params.getDestinationEndpoint()), AmazonDynamoDB.ENDPOINT_PREFIX); + destinationTable = params.getDestinationTable(); + readThroughputRatio = params.getReadThroughputRatio(); + writeThroughputRatio = params.getWriteThroughputRatio(); + maxWriteThreads = params.getMaxWriteThreads(); + isConsistentScan = params.isConsistentScan(); + createDestinationTableIfMissing = params.isCreateDestinationTableIfMissing(); + copyStreamSpecification = params.isCopyStreamSpecification(); + sectionNumber = params.getSection(); + totalSections = params.getTotalSections(); + } + + static List convertGlobalSecondaryIndexDescriptions(List list) { + final List result = new ArrayList<>(list.size()); + for (GlobalSecondaryIndexDescription description : list) { + result.add(new GlobalSecondaryIndex() + .withIndexName(description.getIndexName()) + .withKeySchema(description.getKeySchema()) + .withProjection(description.getProjection()) + .withProvisionedThroughput(getProvisionedThroughputFromDescription(description.getProvisionedThroughput()))); + } + return result; + } + + static List convertLocalSecondaryIndexDescriptions(List list) { + final List result = new ArrayList<>(list.size()); + for (LocalSecondaryIndexDescription description : list) { + result.add(new LocalSecondaryIndex() + .withIndexName(description.getIndexName()) + .withKeySchema(description.getKeySchema()) + .withProjection(description.getProjection())); + } + return result; + } + + @VisibleForTesting + static CreateTableRequest convertTableDescriptionToCreateTableRequest(TableDescription description, + String newTableName, + boolean copyStreamSpecification) { + List gsiDesc = description.getGlobalSecondaryIndexes(); + List gsi = gsiDesc == null ? null : convertGlobalSecondaryIndexDescriptions(gsiDesc); + List lsiDesc = description.getLocalSecondaryIndexes(); + List lsi = lsiDesc == null ? null : convertLocalSecondaryIndexDescriptions(lsiDesc); + ProvisionedThroughput pt = getProvisionedThroughputFromDescription(description.getProvisionedThroughput()); + CreateTableRequest ctr = new CreateTableRequest() + .withTableName(newTableName) + .withProvisionedThroughput(pt) + .withAttributeDefinitions(description.getAttributeDefinitions()) + .withKeySchema(description.getKeySchema()) + .withGlobalSecondaryIndexes(gsi) + .withLocalSecondaryIndexes(lsi); + if (copyStreamSpecification) { + ctr.withStreamSpecification(description.getStreamSpecification()); + } + return ctr; + } + + private static ProvisionedThroughput getProvisionedThroughputFromDescription(ProvisionedThroughputDescription description) { + return new ProvisionedThroughput(description.getReadCapacityUnits(), description.getWriteCapacityUnits()); + } + + private void bootstrapTable() throws InterruptedException, ExecutionException, SectionOutOfRangeException { + final ClientConfiguration config = new ClientConfiguration().withMaxConnections(BootstrapConstants.MAX_CONN_SIZE); + + final DefaultAWSCredentialsProviderChain credentials = new DefaultAWSCredentialsProviderChain(); + final AmazonDynamoDB sourceClient = AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration(sourceEndpointConfiguration) + .withCredentials(credentials) + .withClientConfiguration(config) + .build(); + final AmazonDynamoDB destinationClient = AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration(destinationEndpointConfiguration) + .withCredentials(credentials) + .withClientConfiguration(config) + .build(); + + final TableDescription readTableDescription = sourceClient.describeTable(sourceTable).getTable(); + try { + destinationClient.describeTable(destinationTable).getTable(); + } catch(ResourceNotFoundException e) { + if(!createDestinationTableIfMissing) { + throw new IllegalArgumentException("Destination table " + destinationTable + " did not exist", e); + } + try { + destinationClient.createTable(convertTableDescriptionToCreateTableRequest(readTableDescription, + destinationTable, copyStreamSpecification)); + destinationClient.waiters().tableExists().run(new WaiterParameters<>(new DescribeTableRequest(destinationTable))); + } catch(WaiterUnrecoverableException | WaiterTimedOutException | AmazonServiceException ase) { + throw new IllegalArgumentException("Unable to create destination table", ase); + } + } + + final TableDescription writeTableDescription = destinationClient.describeTable(destinationTable).getTable(); + + final int numSegments; + try { + numSegments = DynamoDBBootstrapWorker.estimateNumberOfSegments(readTableDescription); + } catch (NullReadCapacityException e) { + throw new IllegalStateException("All tables should have a read capacity set", e); + } + + final double readThroughput = calculateThroughput(readTableDescription, readThroughputRatio, true); + final double writeThroughput = calculateThroughput(writeTableDescription, writeThroughputRatio, false); + + final ExecutorService sourceExec = getThreadPool(numSegments); + final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(sourceClient, readThroughput, sourceTable, + sourceExec, sectionNumber, totalSections, numSegments, isConsistentScan); + + final ExecutorService destinationExec = getThreadPool(maxWriteThreads); + final DynamoDBConsumer consumer = new DynamoDBConsumer(destinationClient, destinationTable, writeThroughput, destinationExec); + + log.info("Starting transfer."); + worker.pipe(consumer); + log.info("Finished transfer."); + } /** * Main class to begin transferring data from one DynamoDB table to another @@ -59,8 +205,7 @@ public static void main(String[] args) { // parse given arguments cmd.parse(args); } catch (ParameterException e) { - LOGGER.error(e); - JCommander.getConsole().println(e.getMessage()); + log.error(e); cmd.usage(); System.exit(1); } @@ -68,65 +213,24 @@ public static void main(String[] args) { // show usage information if help flag exists if (params.getHelp()) { cmd.usage(); - return; + System.exit(0); } - final String sourceEndpoint = params.getSourceEndpoint(); - final String destinationEndpoint = params.getDestinationEndpoint(); - final String destinationTable = params.getDestinationTable(); - final String sourceTable = params.getSourceTable(); - final double readThroughputRatio = params.getReadThroughputRatio(); - final double writeThroughputRatio = params.getWriteThroughputRatio(); - final int maxWriteThreads = params.getMaxWriteThreads(); - final boolean consistentScan = params.getConsistentScan(); - - final ClientConfiguration sourceConfig = new ClientConfiguration().withMaxConnections(BootstrapConstants.MAX_CONN_SIZE); - final ClientConfiguration destinationConfig = new ClientConfiguration().withMaxConnections(BootstrapConstants.MAX_CONN_SIZE); - - final AmazonDynamoDBClient sourceClient = new AmazonDynamoDBClient( - new DefaultAWSCredentialsProviderChain(), sourceConfig); - final AmazonDynamoDBClient destinationClient = new AmazonDynamoDBClient( - new DefaultAWSCredentialsProviderChain(), destinationConfig); - sourceClient.setEndpoint(sourceEndpoint); - destinationClient.setEndpoint(destinationEndpoint); - - TableDescription readTableDescription = sourceClient.describeTable( - sourceTable).getTable(); - TableDescription writeTableDescription = destinationClient - .describeTable(destinationTable).getTable(); - int numSegments = 10; - try { - numSegments = DynamoDBBootstrapWorker - .getNumberOfSegments(readTableDescription); - } catch (NullReadCapacityException e) { - LOGGER.warn("Number of segments not specified - defaulting to " - + numSegments, e); - } - - final double readThroughput = calculateThroughput(readTableDescription, - readThroughputRatio, true); - final double writeThroughput = calculateThroughput( - writeTableDescription, writeThroughputRatio, false); + final CommandLineInterface cli = new CommandLineInterface(params); try { - ExecutorService sourceExec = getSourceThreadPool(numSegments); - ExecutorService destinationExec = getDestinationThreadPool(maxWriteThreads); - DynamoDBConsumer consumer = new DynamoDBConsumer(destinationClient, - destinationTable, writeThroughput, destinationExec); - - final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker( - sourceClient, readThroughput, sourceTable, sourceExec, - params.getSection(), params.getTotalSections(), numSegments, consistentScan); - - LOGGER.info("Starting transfer..."); - worker.pipe(consumer); - LOGGER.info("Finished Copying Table."); - } catch (ExecutionException e) { - LOGGER.error("Encountered exception when executing transfer.", e); + cli.bootstrapTable(); } catch (InterruptedException e) { - LOGGER.error("Interrupted when executing transfer.", e); + log.error("Interrupted when executing transfer.", e); + System.exit(1); + } catch (ExecutionException e) { + log.error(ENCOUNTERED_EXCEPTION_WHEN_EXECUTING_TRANSFER, e); System.exit(1); } catch (SectionOutOfRangeException e) { - LOGGER.error("Invalid section parameter", e); + log.error("Invalid section parameter", e); + System.exit(1); + } catch (Exception e) { + log.error(ENCOUNTERED_EXCEPTION_WHEN_EXECUTING_TRANSFER, e); + System.exit(1); } } @@ -138,17 +242,15 @@ private static double calculateThroughput( TableDescription tableDescription, double throughputRatio, boolean read) { if (read) { - return tableDescription.getProvisionedThroughput() - .getReadCapacityUnits() * throughputRatio; + return tableDescription.getProvisionedThroughput().getReadCapacityUnits() * throughputRatio; } - return tableDescription.getProvisionedThroughput() - .getWriteCapacityUnits() * throughputRatio; + return tableDescription.getProvisionedThroughput().getWriteCapacityUnits() * throughputRatio; } /** * Returns the thread pool for the destination DynamoDB table. */ - private static ExecutorService getDestinationThreadPool(int maxWriteThreads) { + private static ExecutorService getThreadPool(int maxWriteThreads) { int corePoolSize = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE; if (corePoolSize > maxWriteThreads) { corePoolSize = maxWriteThreads - 1; @@ -161,21 +263,4 @@ private static ExecutorService getDestinationThreadPool(int maxWriteThreads) { return exec; } - /** - * Returns the thread pool for the source DynamoDB table. - */ - private static ExecutorService getSourceThreadPool(int numSegments) { - int corePoolSize = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE; - if (corePoolSize > numSegments) { - corePoolSize = numSegments - 1; - } - - final long keepAlive = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_KEEP_ALIVE; - ExecutorService exec = new ThreadPoolExecutor(corePoolSize, - numSegments, keepAlive, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue(numSegments), - new ThreadPoolExecutor.CallerRunsPolicy()); - return exec; - } - } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java index 10278ef..5aced79 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java @@ -21,7 +21,7 @@ import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity; import com.amazonaws.services.dynamodbv2.model.ScanRequest; @@ -32,7 +32,7 @@ * consumer to accept the results. */ public class DynamoDBBootstrapWorker extends AbstractLogProvider { - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; private final double rateLimit; private final String tableName; private final int numSegments; @@ -46,7 +46,7 @@ public class DynamoDBBootstrapWorker extends AbstractLogProvider { * * @throws Exception */ - public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, + public DynamoDBBootstrapWorker(AmazonDynamoDB client, double rateLimit, String tableName, ExecutorService exec, int section, int totalSections, int numSegments, boolean consistentScan) throws SectionOutOfRangeException { @@ -74,7 +74,7 @@ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, * * @throws Exception */ - public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, + public DynamoDBBootstrapWorker(AmazonDynamoDB client, double rateLimit, String tableName, int numThreads) throws NullReadCapacityException { this.client = client; @@ -86,7 +86,7 @@ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, this.totalSections = 1; this.consistentScan = false; - this.numSegments = getNumberOfSegments(description); + this.numSegments = estimateNumberOfSegments(description); int numProcessors = Runtime.getRuntime().availableProcessors() * 4; if (numProcessors > numThreads) { numThreads = numProcessors; @@ -132,7 +132,7 @@ public void pipe(final AbstractLogConsumer consumer) * @throws NullReadCapacityException * if the table returns a null readCapacity units. */ - public static int getNumberOfSegments(TableDescription description) + public static int estimateNumberOfSegments(TableDescription description) throws NullReadCapacityException { ProvisionedThroughputDescription provisionedThroughput = description .getProvisionedThroughput(); @@ -148,8 +148,7 @@ public static int getNumberOfSegments(TableDescription description) "Cannot scan with a null readCapacity provisioned throughput"); } double throughput = (readCapacity + 3 * writeCapacity) / 3000.0; - return (int) (10 * Math.max(Math.ceil(throughput), - Math.ceil(tableSizeInGigabytes) / 10)); + return 10 * ((int) Math.max(Math.ceil(throughput), Math.ceil(tableSizeInGigabytes) / 10)); } } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java index a5bfa6c..77f2e1a 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java @@ -23,6 +23,7 @@ import java.util.concurrent.Future; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; @@ -38,15 +39,14 @@ */ public class DynamoDBConsumer extends AbstractLogConsumer { - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; private final String tableName; private final RateLimiter rateLimiter; /** * Class to consume logs and write them to a DynamoDB table. */ - public DynamoDBConsumer(AmazonDynamoDBClient client, String tableName, - double rateLimit, ExecutorService exec) { + public DynamoDBConsumer(AmazonDynamoDB client, String tableName, double rateLimit, ExecutorService exec) { this.client = client; this.tableName = tableName; this.rateLimiter = RateLimiter.create(rateLimit); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java index 432ddc9..ec29f55 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java @@ -21,6 +21,7 @@ import java.util.concurrent.Callable; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult; @@ -33,7 +34,7 @@ */ public class DynamoDBConsumerWorker implements Callable { - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; private final RateLimiter rateLimiter; private long exponentialBackoffTime; private BatchWriteItemRequest batch; @@ -45,7 +46,7 @@ public class DynamoDBConsumerWorker implements Callable { * off until it succeeds. */ public DynamoDBConsumerWorker(BatchWriteItemRequest batchWriteItemRequest, - AmazonDynamoDBClient client, RateLimiter rateLimiter, + AmazonDynamoDB client, RateLimiter rateLimiter, String tableName) { this.batch = batchWriteItemRequest; this.client = client; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java index 7ebe248..6c4481f 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java @@ -16,7 +16,7 @@ import java.util.concurrent.Executor; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.google.common.util.concurrent.RateLimiter; @@ -26,12 +26,12 @@ public class DynamoDBTableScan { private final RateLimiter rateLimiter; - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; /** * Initializes the RateLimiter and sets the AmazonDynamoDBClient. */ - public DynamoDBTableScan(double rateLimit, AmazonDynamoDBClient client) { + public DynamoDBTableScan(double rateLimit, AmazonDynamoDB client) { rateLimiter = RateLimiter.create(rateLimit); this.client = client; } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java index c496e75..d783c1c 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java @@ -17,7 +17,7 @@ import java.util.concurrent.Callable; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.ConsumedCapacity; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.amazonaws.services.dynamodbv2.model.ScanResult; @@ -34,10 +34,10 @@ public class ScanSegmentWorker implements Callable { private boolean hasNext; private int lastConsumedCapacity; private long exponentialBackoffTime; - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; private final RateLimiter rateLimiter; - ScanSegmentWorker(final AmazonDynamoDBClient client, + ScanSegmentWorker(final AmazonDynamoDB client, final RateLimiter rateLimiter, ScanRequest request) { this.request = request; this.client = client; diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java new file mode 100644 index 0000000..b688dbd --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java @@ -0,0 +1,38 @@ +package com.amazonaws.dynamodb.bootstrap.com.amazonaws.dynamodb.bootstrap.example; + +import com.amazonaws.dynamodb.bootstrap.DynamoDBBootstrapWorker; +import com.amazonaws.dynamodb.bootstrap.DynamoDBConsumer; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +class TransferDataFromOneTableToAnother { + public static void main(String[] args) { + AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() + .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + DynamoDBBootstrapWorker worker = null; + try { + // 100.0 read operations per second. 4 threads to scan the table. + worker = new DynamoDBBootstrapWorker(client, + 100.0, "mySourceTable", 4); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } + // 50.0 write operations per second. 8 threads to scan the table. + DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", 50.0, + Executors.newFixedThreadPool(8)); + try { + worker.pipe(consumer); + } catch (ExecutionException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } catch (InterruptedException e){ + System.err.println("Interrupted when executing transfer: " + e.getMessage()); + System.exit(1); + } + } +} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java new file mode 100644 index 0000000..b4adb58 --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java @@ -0,0 +1,38 @@ +package com.amazonaws.dynamodb.bootstrap.com.amazonaws.dynamodb.bootstrap.example; + +import com.amazonaws.dynamodb.bootstrap.BlockingQueueConsumer; +import com.amazonaws.dynamodb.bootstrap.DynamoDBBootstrapWorker; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +import java.util.concurrent.ExecutionException; + +class TransferDataFromOneTableToBlockingQueue { + public static void main(String[] args) { + AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() + .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + + DynamoDBBootstrapWorker worker = null; + + try { + // 100.0 read operations per second. 4 threads to scan the table. + worker = new DynamoDBBootstrapWorker(client, 100.0, "mySourceTable", 4); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } + + BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); + + try { + worker.pipe(consumer); + } catch (ExecutionException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } catch (InterruptedException e){ + System.err.println("Interrupted when executing transfer: " + e.getMessage()); + System.exit(1); + } + } +} From eafbb4c9b2a267dc54bf657ad914cfa19ba87bb0 Mon Sep 17 00:00:00 2001 From: Alexander Patrikalakis Date: Sun, 23 Apr 2017 23:05:24 +0900 Subject: [PATCH 2/2] moved classes into different packages. upgraded SDK. tested table create request factory --- README.md | 75 +++--- pom.xml | 2 +- .../bootstrap/AbstractLogProvider.java | 19 +- .../bootstrap/AttributeValueMixIn.java | 50 ---- .../dynamodb/bootstrap/CommandLineArgs.java | 21 +- .../bootstrap/CommandLineInterface.java | 153 +++++------- .../dynamodb/bootstrap/DynamoDBTableScan.java | 28 +-- .../bootstrap/ParallelScanExecutor.java | 19 +- .../bootstrap/SegmentedScanResult.java | 1 - ...criptionToCreateTableRequestConverter.java | 93 ++++++++ .../constants/BootstrapConstants.java | 10 +- .../{ => consumer}/AbstractLogConsumer.java | 25 +- .../{ => consumer}/BlockingQueueConsumer.java | 9 +- .../{ => consumer}/DynamoDBConsumer.java | 24 +- .../bootstrap/items/AttributeValueMixIn.java | 88 +++++++ .../{ => items}/DynamoDBEntryWithSize.java | 2 +- .../{ => items}/ItemSizeCalculator.java | 14 +- .../{ => worker}/BlockingQueueWorker.java | 20 +- .../{ => worker}/DynamoDBBootstrapWorker.java | 60 ++--- .../{ => worker}/DynamoDBConsumerWorker.java | 12 +- .../{ => worker}/ScanSegmentWorker.java | 20 +- .../bootstrap/CommandLineInterfaceTests.java | 21 ++ .../bootstrap/DynamoDBTableScanTest.java | 26 +- .../bootstrap/SegmentedScanResultTest.java | 7 +- ...tionToCreateTableRequestConverterTest.java | 222 ++++++++++++++++++ .../TransferDataFromOneTableToAnother.java | 38 --- ...ansferDataFromOneTableToBlockingQueue.java | 38 --- .../BlockingQueueConsumerTest.java | 18 +- .../{ => consumer}/DynamoDBConsumerTest.java | 19 +- .../TransferDataFromOneTableToAnother.java | 43 ++++ ...ansferDataFromOneTableToBlockingQueue.java | 41 ++++ .../{ => items}/AttributeValueMixInTest.java | 14 +- .../{ => worker}/BlockingQueueWorkerTest.java | 29 +-- .../DynamoDBBootstrapWorkerTest.java | 22 +- 34 files changed, 786 insertions(+), 497 deletions(-) delete mode 100644 src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java create mode 100644 src/main/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverter.java rename src/main/java/com/amazonaws/dynamodb/bootstrap/{ => consumer}/AbstractLogConsumer.java (78%) rename src/main/java/com/amazonaws/dynamodb/bootstrap/{ => consumer}/BlockingQueueConsumer.java (89%) rename src/main/java/com/amazonaws/dynamodb/bootstrap/{ => consumer}/DynamoDBConsumer.java (82%) create mode 100644 src/main/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixIn.java rename src/main/java/com/amazonaws/dynamodb/bootstrap/{ => items}/DynamoDBEntryWithSize.java (95%) rename src/main/java/com/amazonaws/dynamodb/bootstrap/{ => items}/ItemSizeCalculator.java (96%) rename src/main/java/com/amazonaws/dynamodb/bootstrap/{ => worker}/BlockingQueueWorker.java (82%) rename src/main/java/com/amazonaws/dynamodb/bootstrap/{ => worker}/DynamoDBBootstrapWorker.java (73%) rename src/main/java/com/amazonaws/dynamodb/bootstrap/{ => worker}/DynamoDBConsumerWorker.java (92%) rename src/main/java/com/amazonaws/dynamodb/bootstrap/{ => worker}/ScanSegmentWorker.java (83%) create mode 100644 src/test/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterfaceTests.java create mode 100644 src/test/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverterTest.java delete mode 100644 src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java delete mode 100644 src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java rename src/test/java/com/amazonaws/dynamodb/bootstrap/{ => consumer}/BlockingQueueConsumerTest.java (83%) rename src/test/java/com/amazonaws/dynamodb/bootstrap/{ => consumer}/DynamoDBConsumerTest.java (82%) create mode 100644 src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java create mode 100644 src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java rename src/test/java/com/amazonaws/dynamodb/bootstrap/{ => items}/AttributeValueMixInTest.java (87%) rename src/test/java/com/amazonaws/dynamodb/bootstrap/{ => worker}/BlockingQueueWorkerTest.java (80%) rename src/test/java/com/amazonaws/dynamodb/bootstrap/{ => worker}/DynamoDBBootstrapWorkerTest.java (88%) diff --git a/README.md b/README.md index 656e127..d5ba5b1 100644 --- a/README.md +++ b/README.md @@ -17,21 +17,31 @@ The CLI's usage follows with required parameters marked by asterisks. Use this flag to use strongly consistent scan. If the flag is not used it will default to eventually consistent scan Default: false - --createDestination - Create destination table if it does not exist - Default: false --copyStreamSpecificationWhenCreating Use the source table stream specification for the destination table during its creation. Default: false + --createAllGsi + Create all GSI in destination table + Default: false + --createAllLsi + Create all LSI in destination table + Default: false + --createDestination + Create destination table if it does not exist + Default: false --destinationEndpoint Endpoint of the destination table - * --destinationRegion + * --destinationSigningRegion Signing region for the destination endpoint * --destinationTable Name of the destination table --help Display usage information + --includeGsi + Include the following GSI in the destination table + --includeLsi + Include the following LSI in the destination table --maxWriteThreads Number of max threads to write to destination table Default: 1024 @@ -44,7 +54,7 @@ The CLI's usage follows with required parameters marked by asterisks. Default: 0 --sourceEndpoint Endpoint of the source table - * --sourceRegion + * --sourceSigningRegion Signing region for the source endpoint * --sourceTable Name of the source table @@ -87,8 +97,8 @@ To transfer to a different region, create two AmazonDynamoDBClients with different endpoints to pass into the DynamoDBBootstrapWorker and the DynamoDBConsumer. ```java -import com.amazonaws.dynamodb.bootstrap.DynamoDBBootstrapWorker; -import com.amazonaws.dynamodb.bootstrap.DynamoDBConsumer; +import DynamoDBBootstrapWorker; +import com.amazonaws.dynamodb.bootstrap.consumer.DynamoDBConsumer; import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; @@ -98,28 +108,22 @@ import java.util.concurrent.Executors; class TransferDataFromOneTableToAnother { public static void main(String[] args) { - AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() + final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); - DynamoDBBootstrapWorker worker = null; try { // 100.0 read operations per second. 4 threads to scan the table. - worker = new DynamoDBBootstrapWorker(client, + final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client, 100.0, "mySourceTable", 4); + // 50.0 write operations per second. 8 threads to scan the table. + final DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", + 50.0, Executors.newFixedThreadPool(8)); + worker.pipe(consumer); } catch (NullReadCapacityException e) { System.err.println("The DynamoDB source table returned a null read capacity."); System.exit(1); - } - // 50.0 write operations per second. 8 threads to scan the table. - DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", 50.0, - Executors.newFixedThreadPool(8)); - try { - worker.pipe(consumer); - } catch (ExecutionException e) { + } catch (ExecutionException | InterruptedException e) { System.err.println("Encountered exception when executing transfer: " + e.getMessage()); System.exit(1); - } catch (InterruptedException e){ - System.err.println("Interrupted when executing transfer: " + e.getMessage()); - System.exit(1); } } } @@ -128,13 +132,15 @@ class TransferDataFromOneTableToAnother { ### 2. Transfer Data From one DynamoDB Table to a Blocking Queue. -The below example will read from a DynamoDB table and export to an array blocking queue. This is useful for when another application would like to consume -the DynamoDB entries but does not have a setup application for it. They can just retrieve the queue (consumer.getQueue()) and then continually pop() from it +The below example will read from a DynamoDB table and export to an array blocking queue. +This is useful for when another application would like to consume +the DynamoDB entries but does not have a setup application for it. +They can just retrieve the queue (consumer.getQueue()) and then continually pop() from it to then process the new entries. ```java -import com.amazonaws.dynamodb.bootstrap.BlockingQueueConsumer; -import com.amazonaws.dynamodb.bootstrap.DynamoDBBootstrapWorker; +import com.amazonaws.dynamodb.bootstrap.consumer.BlockingQueueConsumer; +import DynamoDBBootstrapWorker; import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; @@ -143,29 +149,20 @@ import java.util.concurrent.ExecutionException; class TransferDataFromOneTableToBlockingQueue { public static void main(String[] args) { - AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() + final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); - - DynamoDBBootstrapWorker worker = null; - try { // 100.0 read operations per second. 4 threads to scan the table. - worker = new DynamoDBBootstrapWorker(client, 100.0, "mySourceTable", 4); + final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client, 100.0, + "mySourceTable", 4); + final BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); + worker.pipe(consumer); } catch (NullReadCapacityException e) { System.err.println("The DynamoDB source table returned a null read capacity."); System.exit(1); - } - - BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); - - try { - worker.pipe(consumer); - } catch (ExecutionException e) { + } catch (ExecutionException | InterruptedException e) { System.err.println("Encountered exception when executing transfer: " + e.getMessage()); System.exit(1); - } catch (InterruptedException e){ - System.err.println("Interrupted when executing transfer: " + e.getMessage()); - System.exit(1); } } } diff --git a/pom.xml b/pom.xml index 930ee26..eb60c2d 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ https://github.com/awslabs/dynamodb-import-export-tool.git - 1.7 + 1.8 1.11.123 1.6.6 1.69 diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java index 40c4883..80a03b5 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java @@ -22,6 +22,7 @@ import org.apache.log4j.Logger; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.dynamodb.bootstrap.consumer.AbstractLogConsumer; /** * Abstract class to send inputs from a source to a consumer. @@ -31,8 +32,7 @@ public abstract class AbstractLogProvider { /** * Logger for the DynamoDBBootstrapWorker. */ - private static final Logger LOGGER = LogManager - .getLogger(AbstractLogProvider.class); + private static final Logger LOGGER = LogManager.getLogger(AbstractLogProvider.class); protected ExecutorService threadPool; @@ -40,16 +40,14 @@ public abstract class AbstractLogProvider { * Begins to read log results and transfer them to the consumer who will * write the results. */ - public abstract void pipe(final AbstractLogConsumer consumer) - throws ExecutionException, InterruptedException; + public abstract void pipe(final AbstractLogConsumer consumer) throws ExecutionException, InterruptedException; /** * Shuts the thread pool down. - * - * @param - * If true, this method waits for the threads in the pool to - * finish. If false, this thread pool shuts down without - * finishing their current tasks. + * + * @param If true, this method waits for the threads in the pool to + * finish. If false, this thread pool shuts down without + * finishing their current tasks. */ public void shutdown(boolean awaitTermination) { if (awaitTermination) { @@ -61,8 +59,7 @@ public void shutdown(boolean awaitTermination) { } } catch (InterruptedException e) { interrupted = true; - LOGGER.warn("Threadpool was interrupted when trying to shutdown: " - + e.getMessage()); + LOGGER.warn("Threadpool was interrupted when trying to shutdown: " + e.getMessage()); } finally { if (interrupted) Thread.currentThread().interrupt(); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java deleted file mode 100644 index 31c2c6e..0000000 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.dynamodb.bootstrap; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.fasterxml.jackson.annotation.JsonProperty; - - -/** - * Mixin for attribute values to stay all capital when mapping them as strings. - * - */ -public abstract class AttributeValueMixIn { - @JsonProperty("S") public abstract String getS(); - @JsonProperty("S") public abstract void setS(String s); - @JsonProperty("N") public abstract String getN(); - @JsonProperty("N") public abstract void setN(String n); - @JsonProperty("B") public abstract ByteBuffer getB(); - @JsonProperty("B") public abstract void setB(ByteBuffer b); - @JsonProperty("NULL") public abstract Boolean isNULL(); - @JsonProperty("NULL") public abstract void setNULL(Boolean nU); - @JsonProperty("BOOL") public abstract Boolean getBOOL(); - @JsonProperty("BOOL") public abstract void setBOOL(Boolean bO); - @JsonProperty("SS") public abstract List getSS(); - @JsonProperty("SS") public abstract void setSS(List sS); - @JsonProperty("NS") public abstract List getNS(); - @JsonProperty("NS") public abstract void setNS(List nS); - @JsonProperty("BS") public abstract List getBS(); - @JsonProperty("BS") public abstract void setBS(List bS); - @JsonProperty("M") public abstract Map getM(); - @JsonProperty("M") public abstract void setM(Map val); - @JsonProperty("L") public abstract List getL(); - @JsonProperty("L") public abstract void setL(List val); -} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java index fd73b6f..66f2c76 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java @@ -14,8 +14,11 @@ */ package com.amazonaws.dynamodb.bootstrap; +import java.util.List; + import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; import com.beust.jcommander.Parameter; + import lombok.Getter; /** @@ -60,6 +63,22 @@ public boolean getHelp() { @Parameter(names = CREATE_DESTINATION_TABLE_IF_MISSING, description = "Create destination table if it does not exist") private boolean createDestinationTableIfMissing; + public static final String CREATE_ALL_LSI = "--createAllLsi"; + @Parameter(names = CREATE_ALL_LSI, description = "Create all LSI in destination table") + private boolean createAllLsi; + + public static final String CREATE_ALL_GSI = "--createAllGsi"; + @Parameter(names = CREATE_ALL_GSI, description = "Create all GSI in destination table") + private boolean createAllGsi; + + public static final String INCLUDE_LSI = "--includeLsi"; + @Parameter(names = INCLUDE_LSI, description = "Include the following LSI in the destination table") + private List includeLsi; + + public static final String INCLUDE_GSI = "--includeGsi"; + @Parameter(names = INCLUDE_GSI, description = "Include the following GSI in the destination table") + private List includeGsi; + public static final String COPY_STREAM_SPECIFICATION_WHEN_CREATING = "--copyStreamSpecificationWhenCreating"; @Parameter(names = COPY_STREAM_SPECIFICATION_WHEN_CREATING, description = "Use the source table stream specification for the destination table during its creation.") private boolean copyStreamSpecification; @@ -83,7 +102,7 @@ public boolean getHelp() { public static final String SECTION = "--section"; @Parameter(names = SECTION, description = "Section number to scan when running multiple programs concurrently [0, 1... totalSections-1]", required = false) private int section = 0; - + public static final String CONSISTENT_SCAN = "--consistentScan"; @Parameter(names = CONSISTENT_SCAN, description = "Use this flag to use strongly consistent scan. If the flag is not used it will default to eventually consistent scan") private boolean consistentScan = false; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java index 2be82f3..64fc187 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java @@ -14,8 +14,9 @@ */ package com.amazonaws.dynamodb.bootstrap; -import java.util.ArrayList; -import java.util.List; +import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -23,26 +24,30 @@ import java.util.concurrent.TimeUnit; import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.dynamodb.bootstrap.consumer.DynamoDBConsumer; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException; +import com.amazonaws.dynamodb.bootstrap.worker.DynamoDBBootstrapWorker; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.model.*; +import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; +import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; +import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.waiters.WaiterParameters; import com.amazonaws.waiters.WaiterTimedOutException; import com.amazonaws.waiters.WaiterUnrecoverableException; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import lombok.extern.log4j.Log4j; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; -import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; -import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException; import com.beust.jcommander.JCommander; import com.beust.jcommander.ParameterException; +import com.google.common.base.Preconditions; + +import lombok.NonNull; +import lombok.extern.log4j.Log4j; /** * The interface that parses the arguments, and begins to transfer data from one @@ -54,115 +59,80 @@ public class CommandLineInterface { public static final String ENCOUNTERED_EXCEPTION_WHEN_EXECUTING_TRANSFER = "Encountered exception when executing transfer."; static AwsClientBuilder.EndpointConfiguration createEndpointConfiguration(Region region, Optional endpoint, String endpointPrefix) { - return new AwsClientBuilder.EndpointConfiguration(endpoint.or("https://" + region.getServiceEndpoint(endpointPrefix)), region.getName()); + return new AwsClientBuilder.EndpointConfiguration(endpoint.orElse("https://" + region.getServiceEndpoint(endpointPrefix)), region.getName()); } + @NonNull private final AwsClientBuilder.EndpointConfiguration sourceEndpointConfiguration; + @NonNull private final String sourceTable; + @NonNull private final AwsClientBuilder.EndpointConfiguration destinationEndpointConfiguration; + @NonNull private final String destinationTable; private final double readThroughputRatio; private final double writeThroughputRatio; private final int maxWriteThreads; private final boolean isConsistentScan; private final boolean createDestinationTableIfMissing; + private final boolean createAllGsi; + private final boolean createAllLsi; + @NonNull + private final SortedSet includeGsi; + @NonNull + private final SortedSet includeLsi; private final boolean copyStreamSpecification; private final int sectionNumber; private final int totalSections; private CommandLineInterface(final CommandLineArgs params) { - sourceEndpointConfiguration = createEndpointConfiguration(Region.getRegion(Regions.fromName(params.getSourceSigningRegion())), - Optional.fromNullable(params.getSourceEndpoint()), AmazonDynamoDB.ENDPOINT_PREFIX); + sourceEndpointConfiguration = + createEndpointConfiguration(Region.getRegion(Regions.fromName(params.getSourceSigningRegion())), Optional.ofNullable(params.getSourceEndpoint()), + AmazonDynamoDB.ENDPOINT_PREFIX); sourceTable = params.getSourceTable(); - destinationEndpointConfiguration = createEndpointConfiguration(Region.getRegion(Regions.fromName(params.getDestinationSigningRegion())), - Optional.fromNullable(params.getDestinationEndpoint()), AmazonDynamoDB.ENDPOINT_PREFIX); + destinationEndpointConfiguration = + createEndpointConfiguration(Region.getRegion(Regions.fromName(params.getDestinationSigningRegion())), Optional.ofNullable(params.getDestinationEndpoint()), + AmazonDynamoDB.ENDPOINT_PREFIX); destinationTable = params.getDestinationTable(); readThroughputRatio = params.getReadThroughputRatio(); writeThroughputRatio = params.getWriteThroughputRatio(); maxWriteThreads = params.getMaxWriteThreads(); isConsistentScan = params.isConsistentScan(); createDestinationTableIfMissing = params.isCreateDestinationTableIfMissing(); + createAllGsi = params.isCreateAllGsi(); + createAllLsi = params.isCreateAllLsi(); + includeLsi = new TreeSet<>(params.getIncludeLsi()); + Preconditions.checkArgument(includeLsi.size() == params.getIncludeLsi().size(), "list of LSI names must be unique"); + includeGsi = new TreeSet<>(params.getIncludeGsi()); + Preconditions.checkArgument(includeGsi.size() == params.getIncludeGsi().size(), "list of GSI names must be unique"); copyStreamSpecification = params.isCopyStreamSpecification(); sectionNumber = params.getSection(); totalSections = params.getTotalSections(); } - static List convertGlobalSecondaryIndexDescriptions(List list) { - final List result = new ArrayList<>(list.size()); - for (GlobalSecondaryIndexDescription description : list) { - result.add(new GlobalSecondaryIndex() - .withIndexName(description.getIndexName()) - .withKeySchema(description.getKeySchema()) - .withProjection(description.getProjection()) - .withProvisionedThroughput(getProvisionedThroughputFromDescription(description.getProvisionedThroughput()))); - } - return result; - } - - static List convertLocalSecondaryIndexDescriptions(List list) { - final List result = new ArrayList<>(list.size()); - for (LocalSecondaryIndexDescription description : list) { - result.add(new LocalSecondaryIndex() - .withIndexName(description.getIndexName()) - .withKeySchema(description.getKeySchema()) - .withProjection(description.getProjection())); - } - return result; - } - - @VisibleForTesting - static CreateTableRequest convertTableDescriptionToCreateTableRequest(TableDescription description, - String newTableName, - boolean copyStreamSpecification) { - List gsiDesc = description.getGlobalSecondaryIndexes(); - List gsi = gsiDesc == null ? null : convertGlobalSecondaryIndexDescriptions(gsiDesc); - List lsiDesc = description.getLocalSecondaryIndexes(); - List lsi = lsiDesc == null ? null : convertLocalSecondaryIndexDescriptions(lsiDesc); - ProvisionedThroughput pt = getProvisionedThroughputFromDescription(description.getProvisionedThroughput()); - CreateTableRequest ctr = new CreateTableRequest() - .withTableName(newTableName) - .withProvisionedThroughput(pt) - .withAttributeDefinitions(description.getAttributeDefinitions()) - .withKeySchema(description.getKeySchema()) - .withGlobalSecondaryIndexes(gsi) - .withLocalSecondaryIndexes(lsi); - if (copyStreamSpecification) { - ctr.withStreamSpecification(description.getStreamSpecification()); - } - return ctr; - } - - private static ProvisionedThroughput getProvisionedThroughputFromDescription(ProvisionedThroughputDescription description) { - return new ProvisionedThroughput(description.getReadCapacityUnits(), description.getWriteCapacityUnits()); - } - private void bootstrapTable() throws InterruptedException, ExecutionException, SectionOutOfRangeException { final ClientConfiguration config = new ClientConfiguration().withMaxConnections(BootstrapConstants.MAX_CONN_SIZE); final DefaultAWSCredentialsProviderChain credentials = new DefaultAWSCredentialsProviderChain(); - final AmazonDynamoDB sourceClient = AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(sourceEndpointConfiguration) - .withCredentials(credentials) - .withClientConfiguration(config) - .build(); - final AmazonDynamoDB destinationClient = AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(destinationEndpointConfiguration) - .withCredentials(credentials) - .withClientConfiguration(config) - .build(); + final AmazonDynamoDB sourceClient = + AmazonDynamoDBClientBuilder.standard().withEndpointConfiguration(sourceEndpointConfiguration).withCredentials(credentials).withClientConfiguration(config).build(); + final AmazonDynamoDB destinationClient = + AmazonDynamoDBClientBuilder.standard().withEndpointConfiguration(destinationEndpointConfiguration).withCredentials(credentials).withClientConfiguration(config).build(); final TableDescription readTableDescription = sourceClient.describeTable(sourceTable).getTable(); try { destinationClient.describeTable(destinationTable).getTable(); - } catch(ResourceNotFoundException e) { - if(!createDestinationTableIfMissing) { + } catch (ResourceNotFoundException e) { + if (!createDestinationTableIfMissing) { throw new IllegalArgumentException("Destination table " + destinationTable + " did not exist", e); } try { - destinationClient.createTable(convertTableDescriptionToCreateTableRequest(readTableDescription, - destinationTable, copyStreamSpecification)); + final TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().copyStreamSpecification(copyStreamSpecification).newTableName(destinationTable) + .createAllGsi(createAllGsi).gsiToInclude(includeGsi).createAllLsi(createAllLsi).lsiToInclude(includeLsi).build(); + destinationClient.createTable(converter.apply(readTableDescription)); destinationClient.waiters().tableExists().run(new WaiterParameters<>(new DescribeTableRequest(destinationTable))); - } catch(WaiterUnrecoverableException | WaiterTimedOutException | AmazonServiceException ase) { + } catch (WaiterUnrecoverableException | WaiterTimedOutException | AmazonServiceException ase) { throw new IllegalArgumentException("Unable to create destination table", ase); } } @@ -180,8 +150,8 @@ private void bootstrapTable() throws InterruptedException, ExecutionException, S final double writeThroughput = calculateThroughput(writeTableDescription, writeThroughputRatio, false); final ExecutorService sourceExec = getThreadPool(numSegments); - final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(sourceClient, readThroughput, sourceTable, - sourceExec, sectionNumber, totalSections, numSegments, isConsistentScan); + final DynamoDBBootstrapWorker worker = + new DynamoDBBootstrapWorker(sourceClient, readThroughput, sourceTable, sourceExec, sectionNumber, totalSections, numSegments, isConsistentScan); final ExecutorService destinationExec = getThreadPool(maxWriteThreads); final DynamoDBConsumer consumer = new DynamoDBConsumer(destinationClient, destinationTable, writeThroughput, destinationExec); @@ -194,7 +164,7 @@ private void bootstrapTable() throws InterruptedException, ExecutionException, S /** * Main class to begin transferring data from one DynamoDB table to another * DynamoDB table. - * + * * @param args */ public static void main(String[] args) { @@ -222,13 +192,10 @@ public static void main(String[] args) { } catch (InterruptedException e) { log.error("Interrupted when executing transfer.", e); System.exit(1); - } catch (ExecutionException e) { - log.error(ENCOUNTERED_EXCEPTION_WHEN_EXECUTING_TRANSFER, e); - System.exit(1); } catch (SectionOutOfRangeException e) { log.error("Invalid section parameter", e); System.exit(1); - } catch (Exception e) { + } catch (Exception e) { //coalesces ExecutionException log.error(ENCOUNTERED_EXCEPTION_WHEN_EXECUTING_TRANSFER, e); System.exit(1); } @@ -238,9 +205,7 @@ public static void main(String[] args) { * returns the provisioned throughput based on the input ratio and the * specified DynamoDB table provisioned throughput. */ - private static double calculateThroughput( - TableDescription tableDescription, double throughputRatio, - boolean read) { + private static double calculateThroughput(TableDescription tableDescription, double throughputRatio, boolean read) { if (read) { return tableDescription.getProvisionedThroughput().getReadCapacityUnits() * throughputRatio; } @@ -256,10 +221,8 @@ private static ExecutorService getThreadPool(int maxWriteThreads) { corePoolSize = maxWriteThreads - 1; } final long keepAlive = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_KEEP_ALIVE; - ExecutorService exec = new ThreadPoolExecutor(corePoolSize, - maxWriteThreads, keepAlive, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue(maxWriteThreads), - new ThreadPoolExecutor.CallerRunsPolicy()); + ExecutorService exec = new ThreadPoolExecutor(corePoolSize, maxWriteThreads, keepAlive, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(maxWriteThreads), + new ThreadPoolExecutor.CallerRunsPolicy()); return exec; } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java index 6c4481f..1ede697 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java @@ -16,6 +16,7 @@ import java.util.concurrent.Executor; +import com.amazonaws.dynamodb.bootstrap.worker.ScanSegmentWorker; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.google.common.util.concurrent.RateLimiter; @@ -39,19 +40,15 @@ public DynamoDBTableScan(double rateLimit, AmazonDynamoDB client) { /** * This function copies a scan request for the number of segments and then * adds those workers to the executor service to begin scanning. - * + * * @param totalSections * @param section - * * @return the parallel scan executor to grab results - * when a segment is finished. + * when a segment is finished. */ - public ParallelScanExecutor getParallelScanCompletionService( - ScanRequest initialRequest, int numSegments, Executor executor, - int section, int totalSections) { + public ParallelScanExecutor getParallelScanCompletionService(ScanRequest initialRequest, int numSegments, Executor executor, int section, int totalSections) { final int segments = Math.max(1, numSegments); - final ParallelScanExecutor completion = new ParallelScanExecutor( - executor, segments); + final ParallelScanExecutor completion = new ParallelScanExecutor(executor, segments); int sectionSize = segments / totalSections; int start = sectionSize * section; @@ -61,22 +58,15 @@ public ParallelScanExecutor getParallelScanCompletionService( } for (int segment = start; segment < end; segment++) { - ScanRequest scanSegment = copyScanRequest(initialRequest) - .withTotalSegments(segments).withSegment(segment); - completion.addWorker(new ScanSegmentWorker(this.client, - this.rateLimiter, scanSegment), segment); + ScanRequest scanSegment = copyScanRequest(initialRequest).withTotalSegments(segments).withSegment(segment); + completion.addWorker(new ScanSegmentWorker(this.client, this.rateLimiter, scanSegment), segment); } return completion; } public ScanRequest copyScanRequest(ScanRequest request) { - return new ScanRequest() - .withTableName(request.getTableName()) - .withTotalSegments(request.getTotalSegments()) - .withSegment(request.getSegment()) - .withReturnConsumedCapacity(request.getReturnConsumedCapacity()) - .withLimit(request.getLimit()) - .withConsistentRead(request.getConsistentRead()); + return new ScanRequest().withTableName(request.getTableName()).withTotalSegments(request.getTotalSegments()).withSegment(request.getSegment()) + .withReturnConsumedCapacity(request.getReturnConsumedCapacity()).withLimit(request.getLimit()).withConsistentRead(request.getConsistentRead()); } } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java index 3424221..01c33a5 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java @@ -15,16 +15,17 @@ package com.amazonaws.dynamodb.bootstrap; import java.util.BitSet; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; + +import com.amazonaws.dynamodb.bootstrap.worker.ScanSegmentWorker; /** * This class executes multiple scan requests on one segment of a table in * series, as a runnable. Instances meant to be used as tasks of the worker * thread pool for parallel scans. - * */ public class ParallelScanExecutor { private final BitSet finished; @@ -44,8 +45,7 @@ public ParallelScanExecutor(Executor executor, int segments) { public void finishSegment(int segment) { synchronized (finished) { if (segment > finished.size()) { - throw new IllegalArgumentException( - "Invalid segment passed to finishSegment"); + throw new IllegalArgumentException("Invalid segment passed to finishSegment"); } finished.set(segment); } @@ -63,15 +63,12 @@ public boolean finished() { /** * This method gets a segmentedScanResult and submits the next scan request * for that segment, if there is one. - * + * * @return the next available ScanResult - * @throws ExecutionException - * if one of the segment pages threw while executing - * @throws InterruptedException - * if one of the segment pages was interrupted while executing. + * @throws ExecutionException if one of the segment pages threw while executing + * @throws InterruptedException if one of the segment pages was interrupted while executing. */ - public SegmentedScanResult grab() throws ExecutionException, - InterruptedException { + public SegmentedScanResult grab() throws ExecutionException, InterruptedException { Future ret = exec.take(); int segment = ret.get().getSegment(); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java index 6f5b356..cbdfb43 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java @@ -18,7 +18,6 @@ /** * Encapsulates segment number in scan result - * */ public class SegmentedScanResult { private final ScanResult result; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverter.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverter.java new file mode 100644 index 0000000..5fdfa20 --- /dev/null +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverter.java @@ -0,0 +1,93 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap; + +import java.util.List; +import java.util.SortedSet; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex; +import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndex; +import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import com.amazonaws.services.dynamodbv2.model.TableDescription; + +import lombok.Builder; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +/** + * Created by amcp on 2017/04/23. + */ +@RequiredArgsConstructor +@Builder +public class TableDescriptionToCreateTableRequestConverter implements Function { + + @NonNull + private final String newTableName; + private final boolean createAllGsi; + private final boolean createAllLsi; + private final boolean copyStreamSpecification; + @NonNull + private final SortedSet gsiToInclude; + @NonNull + private final SortedSet lsiToInclude; + + static ProvisionedThroughput getProvisionedThroughputFromDescription(ProvisionedThroughputDescription description) { + return new ProvisionedThroughput(description.getReadCapacityUnits(), description.getWriteCapacityUnits()); + } + + private static GlobalSecondaryIndex convertGlobalSecondaryIndexDescription(GlobalSecondaryIndexDescription d) { + return new GlobalSecondaryIndex().withIndexName(d.getIndexName()).withKeySchema(d.getKeySchema()).withProjection(d.getProjection()) + .withProvisionedThroughput(getProvisionedThroughputFromDescription(d.getProvisionedThroughput())); + } + + private static LocalSecondaryIndex convertLocalSecondaryIndexDescription(LocalSecondaryIndexDescription d) { + return new LocalSecondaryIndex().withIndexName(d.getIndexName()).withKeySchema(d.getKeySchema()).withProjection(d.getProjection()); + } + + @Override + public CreateTableRequest apply(TableDescription description) { + final List lsiDesc = description.getLocalSecondaryIndexes(); + final List lsi; + if (lsiDesc == null || (!createAllLsi && lsiToInclude.isEmpty())) { + lsi = null; + } else { + lsi = lsiDesc.stream().filter(l -> createAllLsi || lsiToInclude.contains(l.getIndexName())) + .map(TableDescriptionToCreateTableRequestConverter::convertLocalSecondaryIndexDescription).collect(Collectors.toList()); + } + + final List gsiDesc = description.getGlobalSecondaryIndexes(); + final List gsi; + if (gsiDesc == null || (!createAllGsi && gsiToInclude.isEmpty())) { + gsi = null; + } else { + gsi = gsiDesc.stream().filter(g -> createAllGsi || gsiToInclude.contains(g.getIndexName())) + .map(TableDescriptionToCreateTableRequestConverter::convertGlobalSecondaryIndexDescription).collect(Collectors.toList()); + } + + ProvisionedThroughput pt = getProvisionedThroughputFromDescription(description.getProvisionedThroughput()); + CreateTableRequest ctr = new CreateTableRequest().withTableName(newTableName).withProvisionedThroughput(pt).withAttributeDefinitions(description.getAttributeDefinitions()) + .withKeySchema(description.getKeySchema()).withGlobalSecondaryIndexes(gsi).withLocalSecondaryIndexes(lsi); + if (copyStreamSpecification) { + ctr.withStreamSpecification(description.getStreamSpecification()); + } + return ctr; + } +} diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java index abcc53f..37e1dbe 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java @@ -34,14 +34,12 @@ public class BootstrapConstants { /** * Max ThreadPool size for the ExecutorService to use. */ - public static final int DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE = Runtime - .getRuntime().availableProcessors() * 128; + public static final int DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 128; /** * Core pool size of a default thread pool. */ - public static final int DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE = Runtime - .getRuntime().availableProcessors() * 4; + public static final int DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 4; /** * Amount of time in milliseconds to keep the ExecutorService alive for @@ -89,12 +87,12 @@ public class BootstrapConstants { * Max number of bytes in a DynamoDB number attribute. */ public static final int MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21; - + /** * Number of bytes for an item being read with strongly consistent reads */ public static final int STRONGLY_CONSISTENT_READ_ITEM_SIZE = 4 * 1024; - + /** * Number of bytes for an item being read with eventually consistent reads */ diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/AbstractLogConsumer.java similarity index 78% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/AbstractLogConsumer.java index 960a59d..299ac98 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/AbstractLogConsumer.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -22,6 +22,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; /** @@ -36,26 +37,23 @@ public abstract class AbstractLogConsumer { /** * Logger for the DynamoDBBootstrapWorker. */ - private static final Logger LOGGER = LogManager - .getLogger(AbstractLogConsumer.class); + private static final Logger LOGGER = LogManager.getLogger(AbstractLogConsumer.class); /** * Writes the result of a scan to another endpoint asynchronously. Will call * getWorker to determine what job to submit with the result. - * - * @param - * the SegmentedScanResult to asynchronously write to another - * endpoint. + * + * @param the SegmentedScanResult to asynchronously write to another + * endpoint. */ public abstract Future writeResult(SegmentedScanResult result); /** * Shuts the thread pool down. - * - * @param - * If true, this method waits for the threads in the pool to - * finish. If false, this thread pool shuts down without - * finishing their current tasks. + * + * @param If true, this method waits for the threads in the pool to + * finish. If false, this thread pool shuts down without + * finishing their current tasks. */ public void shutdown(boolean awaitTermination) { if (awaitTermination) { @@ -67,8 +65,7 @@ public void shutdown(boolean awaitTermination) { } } catch (InterruptedException e) { interrupted = true; - LOGGER.warn("Threadpool was interrupted when trying to shutdown: " - + e.getMessage()); + LOGGER.warn("Threadpool was interrupted when trying to shutdown: " + e.getMessage()); } finally { if (interrupted) Thread.currentThread().interrupt(); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumer.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumer.java similarity index 89% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumer.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumer.java index 11cb995..d7eb255 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumer.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumer.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -20,7 +20,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import com.amazonaws.dynamodb.bootstrap.DynamoDBEntryWithSize; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; +import com.amazonaws.dynamodb.bootstrap.items.DynamoDBEntryWithSize; +import com.amazonaws.dynamodb.bootstrap.worker.BlockingQueueWorker; /** * This class implements ILogConsumer, and when called to writeResult, it will @@ -48,8 +50,7 @@ public Future writeResult(SegmentedScanResult result) { try { jobSubmission = exec.submit(new BlockingQueueWorker(queue, result)); } catch (NullPointerException npe) { - throw new NullPointerException( - "Thread pool not initialized for LogStashExecutor"); + throw new NullPointerException("Thread pool not initialized for LogStashExecutor"); } return jobSubmission; } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumer.java similarity index 82% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumer.java index 77f2e1a..e75b6e1 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumer.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; import java.util.Iterator; import java.util.LinkedList; @@ -22,9 +22,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.dynamodb.bootstrap.worker.DynamoDBConsumerWorker; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; import com.amazonaws.services.dynamodbv2.model.PutRequest; @@ -62,17 +63,13 @@ public DynamoDBConsumer(AmazonDynamoDB client, String tableName, double rateLimi @Override public Future writeResult(SegmentedScanResult result) { Future jobSubmission = null; - List batches = splitResultIntoBatches( - result.getScanResult(), tableName); + List batches = splitResultIntoBatches(result.getScanResult(), tableName); Iterator batchesIterator = batches.iterator(); while (batchesIterator.hasNext()) { try { - jobSubmission = exec - .submit(new DynamoDBConsumerWorker(batchesIterator - .next(), client, rateLimiter, tableName)); + jobSubmission = exec.submit(new DynamoDBConsumerWorker(batchesIterator.next(), client, rateLimiter, tableName)); } catch (NullPointerException npe) { - throw new NullPointerException( - "Thread pool not initialized for LogStashExecutor"); + throw new NullPointerException("Thread pool not initialized for LogStashExecutor"); } } return jobSubmission; @@ -82,13 +79,11 @@ public Future writeResult(SegmentedScanResult result) { * Splits up a ScanResult into a list of BatchWriteItemRequests of size 25 * items or less each. */ - public static List splitResultIntoBatches( - ScanResult result, String tableName) { + public static List splitResultIntoBatches(ScanResult result, String tableName) { List batches = new LinkedList(); Iterator> it = result.getItems().iterator(); - BatchWriteItemRequest req = new BatchWriteItemRequest() - .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); + BatchWriteItemRequest req = new BatchWriteItemRequest().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); List writeRequests = new LinkedList(); int i = 0; while (it.hasNext()) { @@ -99,8 +94,7 @@ public static List splitResultIntoBatches( if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); - req = new BatchWriteItemRequest() - .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); + req = new BatchWriteItemRequest().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); writeRequests = new LinkedList(); i = 0; } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixIn.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixIn.java new file mode 100644 index 0000000..10243c2 --- /dev/null +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixIn.java @@ -0,0 +1,88 @@ +/* + * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap.items; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.fasterxml.jackson.annotation.JsonProperty; + + +/** + * Mixin for attribute values to stay all capital when mapping them as strings. + */ +public abstract class AttributeValueMixIn { + @JsonProperty("S") + public abstract String getS(); + + @JsonProperty("S") + public abstract void setS(String s); + + @JsonProperty("N") + public abstract String getN(); + + @JsonProperty("N") + public abstract void setN(String n); + + @JsonProperty("B") + public abstract ByteBuffer getB(); + + @JsonProperty("B") + public abstract void setB(ByteBuffer b); + + @JsonProperty("NULL") + public abstract Boolean isNULL(); + + @JsonProperty("NULL") + public abstract void setNULL(Boolean nU); + + @JsonProperty("BOOL") + public abstract Boolean getBOOL(); + + @JsonProperty("BOOL") + public abstract void setBOOL(Boolean bO); + + @JsonProperty("SS") + public abstract List getSS(); + + @JsonProperty("SS") + public abstract void setSS(List sS); + + @JsonProperty("NS") + public abstract List getNS(); + + @JsonProperty("NS") + public abstract void setNS(List nS); + + @JsonProperty("BS") + public abstract List getBS(); + + @JsonProperty("BS") + public abstract void setBS(List bS); + + @JsonProperty("M") + public abstract Map getM(); + + @JsonProperty("M") + public abstract void setM(Map val); + + @JsonProperty("L") + public abstract List getL(); + + @JsonProperty("L") + public abstract void setL(List val); +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/DynamoDBEntryWithSize.java similarity index 95% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/items/DynamoDBEntryWithSize.java index 98ee9ea..53e69cc 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/DynamoDBEntryWithSize.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.items; import java.util.Map; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/ItemSizeCalculator.java similarity index 96% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/items/ItemSizeCalculator.java index c965548..a9af6fe 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/ItemSizeCalculator.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.items; import java.nio.ByteBuffer; import java.util.Iterator; @@ -25,7 +25,6 @@ /** * Class used to calculate the size of a DynamoDB item in bytes. - * */ public class ItemSizeCalculator { @@ -47,17 +46,19 @@ public static int calculateItemSizeInBytes(Map item) { } return size; } - + public static int calculateScanResultSizeInBytes(ScanResult result) { final Iterator> it = result.getItems().iterator(); int totalBytes = 0; - while(it.hasNext()){ + while (it.hasNext()) { totalBytes += calculateItemSizeInBytes(it.next()); } return totalBytes; } - /** Calculate attribute value size */ + /** + * Calculate attribute value size + */ private static int calculateAttributeSizeInBytes(AttributeValue value) { int attrValSize = 0; if (value == null) { @@ -98,8 +99,7 @@ private static int calculateAttributeSizeInBytes(AttributeValue value) { } else if (value.getNULL() != null) { attrValSize += 1; } else if (value.getM() != null) { - for (Map.Entry entry : value.getM() - .entrySet()) { + for (Map.Entry entry : value.getM().entrySet()) { attrValSize += entry.getKey().getBytes(BootstrapConstants.UTF8).length; attrValSize += calculateAttributeSizeInBytes(entry.getValue()); attrValSize += BootstrapConstants.BASE_LOGICAL_SIZE_OF_NESTED_TYPES; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorker.java similarity index 82% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorker.java index b819042..19fa175 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorker.java @@ -12,17 +12,20 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; +import com.amazonaws.dynamodb.bootstrap.items.DynamoDBEntryWithSize; +import com.amazonaws.dynamodb.bootstrap.items.ItemSizeCalculator; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.ScanResult; @@ -36,14 +39,12 @@ public class BlockingQueueWorker implements Callable { /** * Logger for the LogStashQueueWorker. */ - private static final Logger LOGGER = LogManager - .getLogger(BlockingQueueWorker.class); + private static final Logger LOGGER = LogManager.getLogger(BlockingQueueWorker.class); private final BlockingQueue queue; private final SegmentedScanResult result; - public BlockingQueueWorker(BlockingQueue queue, - SegmentedScanResult result) { + public BlockingQueueWorker(BlockingQueue queue, SegmentedScanResult result) { this.queue = queue; this.result = result; } @@ -58,14 +59,11 @@ public Void call() { do { try { Map item = it.next(); - DynamoDBEntryWithSize entryWithSize = new DynamoDBEntryWithSize( - item, - ItemSizeCalculator.calculateItemSizeInBytes(item)); + DynamoDBEntryWithSize entryWithSize = new DynamoDBEntryWithSize(item, ItemSizeCalculator.calculateItemSizeInBytes(item)); queue.put(entryWithSize); } catch (InterruptedException e) { interrupted = true; - LOGGER.warn("interrupted when writing item to queue: " - + e.getMessage()); + LOGGER.warn("interrupted when writing item to queue: " + e.getMessage()); } } while (it.hasNext()); } finally { diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorker.java similarity index 73% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorker.java index 5aced79..a74bb58 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorker.java @@ -12,13 +12,18 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.amazonaws.dynamodb.bootstrap.AbstractLogProvider; +import com.amazonaws.dynamodb.bootstrap.DynamoDBTableScan; +import com.amazonaws.dynamodb.bootstrap.ParallelScanExecutor; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.dynamodb.bootstrap.consumer.AbstractLogConsumer; import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; @@ -43,16 +48,13 @@ public class DynamoDBBootstrapWorker extends AbstractLogProvider { /** * Creates the DynamoDBBootstrapWorker, calculates the number of segments a * table should have, and creates a thread pool to prepare to scan. - * + * * @throws Exception */ - public DynamoDBBootstrapWorker(AmazonDynamoDB client, - double rateLimit, String tableName, ExecutorService exec, - int section, int totalSections, int numSegments, - boolean consistentScan) throws SectionOutOfRangeException { + public DynamoDBBootstrapWorker(AmazonDynamoDB client, double rateLimit, String tableName, ExecutorService exec, int section, int totalSections, int numSegments, + boolean consistentScan) throws SectionOutOfRangeException { if (section > totalSections - 1 || section < 0) { - throw new SectionOutOfRangeException( - "Section of scan must be within [0...totalSections-1]"); + throw new SectionOutOfRangeException("Section of scan must be within [0...totalSections-1]"); } this.client = client; @@ -71,17 +73,14 @@ public DynamoDBBootstrapWorker(AmazonDynamoDB client, * Creates the DynamoDBBootstrapWorker, calculates the number of segments a * table should have, and creates a thread pool to prepare to scan using an * eventually consistent scan. - * + * * @throws Exception */ - public DynamoDBBootstrapWorker(AmazonDynamoDB client, - double rateLimit, String tableName, int numThreads) - throws NullReadCapacityException { + public DynamoDBBootstrapWorker(AmazonDynamoDB client, double rateLimit, String tableName, int numThreads) throws NullReadCapacityException { this.client = client; this.rateLimit = rateLimit; this.tableName = tableName; - TableDescription description = client.describeTable(tableName) - .getTable(); + TableDescription description = client.describeTable(tableName).getTable(); this.section = 0; this.totalSections = 1; this.consistentScan = false; @@ -98,19 +97,13 @@ public DynamoDBBootstrapWorker(AmazonDynamoDB client, * Begins to pipe the log results by parallel scanning the table and the * consumer writing the results. */ - public void pipe(final AbstractLogConsumer consumer) - throws ExecutionException, InterruptedException { - final DynamoDBTableScan scanner = new DynamoDBTableScan(rateLimit, - client); + public void pipe(final AbstractLogConsumer consumer) throws ExecutionException, InterruptedException { + final DynamoDBTableScan scanner = new DynamoDBTableScan(rateLimit, client); - final ScanRequest request = new ScanRequest().withTableName(tableName) - .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) - .withLimit(BootstrapConstants.SCAN_LIMIT) - .withConsistentRead(consistentScan); + final ScanRequest request = new ScanRequest().withTableName(tableName).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withLimit(BootstrapConstants.SCAN_LIMIT) + .withConsistentRead(consistentScan); - final ParallelScanExecutor scanService = scanner - .getParallelScanCompletionService(request, numSegments, - threadPool, section, totalSections); + final ParallelScanExecutor scanService = scanner.getParallelScanCompletionService(request, numSegments, threadPool, section, totalSections); while (!scanService.finished()) { SegmentedScanResult result = scanService.grab(); @@ -128,24 +121,19 @@ public void pipe(final AbstractLogConsumer consumer) * table, which should need many more segments in order to scan the table * fast enough in parallel so that one worker does not finish long before * other workers. - * - * @throws NullReadCapacityException - * if the table returns a null readCapacity units. + * + * @throws NullReadCapacityException if the table returns a null readCapacity units. */ - public static int estimateNumberOfSegments(TableDescription description) - throws NullReadCapacityException { - ProvisionedThroughputDescription provisionedThroughput = description - .getProvisionedThroughput(); - double tableSizeInGigabytes = Math.ceil(description.getTableSizeBytes() - / BootstrapConstants.GIGABYTE); + public static int estimateNumberOfSegments(TableDescription description) throws NullReadCapacityException { + ProvisionedThroughputDescription provisionedThroughput = description.getProvisionedThroughput(); + double tableSizeInGigabytes = Math.ceil(description.getTableSizeBytes() / BootstrapConstants.GIGABYTE); Long readCapacity = provisionedThroughput.getReadCapacityUnits(); Long writeCapacity = provisionedThroughput.getWriteCapacityUnits(); if (writeCapacity == null) { writeCapacity = 1L; } if (readCapacity == null) { - throw new NullReadCapacityException( - "Cannot scan with a null readCapacity provisioned throughput"); + throw new NullReadCapacityException("Cannot scan with a null readCapacity provisioned throughput"); } double throughput = (readCapacity + 3 * writeCapacity) / 3000.0; return 10 * ((int) Math.max(Math.ceil(throughput), Math.ceil(tableSizeInGigabytes) / 10)); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBConsumerWorker.java similarity index 92% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBConsumerWorker.java index ec29f55..9e43e89 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBConsumerWorker.java @@ -12,17 +12,16 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import java.util.Iterator; -import java.util.List; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult; import com.amazonaws.services.dynamodbv2.model.ConsumedCapacity; @@ -45,9 +44,7 @@ public class DynamoDBConsumerWorker implements Callable { * table. If the write returns unprocessed items it will exponentially back * off until it succeeds. */ - public DynamoDBConsumerWorker(BatchWriteItemRequest batchWriteItemRequest, - AmazonDynamoDB client, RateLimiter rateLimiter, - String tableName) { + public DynamoDBConsumerWorker(BatchWriteItemRequest batchWriteItemRequest, AmazonDynamoDB client, RateLimiter rateLimiter, String tableName) { this.batch = batchWriteItemRequest; this.client = client; this.rateLimiter = rateLimiter; @@ -85,8 +82,7 @@ public List runWithBackoff(BatchWriteItemRequest req) { do { writeItemResult = client.batchWriteItem(req); unprocessedItems = writeItemResult.getUnprocessedItems(); - consumedCapacities - .addAll(writeItemResult.getConsumedCapacity()); + consumedCapacities.addAll(writeItemResult.getConsumedCapacity()); if (unprocessedItems != null) { req.setRequestItems(unprocessedItems); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/ScanSegmentWorker.java similarity index 83% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/worker/ScanSegmentWorker.java index d783c1c..dce9f46 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/ScanSegmentWorker.java @@ -12,11 +12,13 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import java.util.concurrent.Callable; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.dynamodb.bootstrap.items.ItemSizeCalculator; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.ConsumedCapacity; import com.amazonaws.services.dynamodbv2.model.ScanRequest; @@ -27,7 +29,6 @@ * This class executes multiple scan requests on one segment of a table in * series, as a runnable. Instances meant to be used as tasks of the worker * thread pool for parallel scans. - * */ public class ScanSegmentWorker implements Callable { private final ScanRequest request; @@ -37,8 +38,7 @@ public class ScanSegmentWorker implements Callable { private final AmazonDynamoDB client; private final RateLimiter rateLimiter; - ScanSegmentWorker(final AmazonDynamoDB client, - final RateLimiter rateLimiter, ScanRequest request) { + public ScanSegmentWorker(final AmazonDynamoDB client, final RateLimiter rateLimiter, ScanRequest request) { this.request = request; this.client = client; this.rateLimiter = rateLimiter; @@ -59,20 +59,16 @@ public SegmentedScanResult call() { final ConsumedCapacity cc = result.getConsumedCapacity(); if (cc != null && cc.getCapacityUnits() != null) { - lastConsumedCapacity = result.getConsumedCapacity() - .getCapacityUnits().intValue(); + lastConsumedCapacity = result.getConsumedCapacity().getCapacityUnits().intValue(); } else if (result.getScannedCount() != null && result.getCount() != null) { final boolean isConsistent = request.getConsistentRead(); - int itemSize = isConsistent ? BootstrapConstants.STRONGLY_CONSISTENT_READ_ITEM_SIZE - : BootstrapConstants.EVENTUALLY_CONSISTENT_READ_ITEM_SIZE; + int itemSize = isConsistent ? BootstrapConstants.STRONGLY_CONSISTENT_READ_ITEM_SIZE : BootstrapConstants.EVENTUALLY_CONSISTENT_READ_ITEM_SIZE; - lastConsumedCapacity = (result.getScannedCount() / (int) Math.max(1.0, result.getCount())) - * (ItemSizeCalculator.calculateScanResultSizeInBytes(result) / itemSize); + lastConsumedCapacity = (result.getScannedCount() / (int) Math.max(1.0, result.getCount())) * (ItemSizeCalculator.calculateScanResultSizeInBytes(result) / itemSize); } - if (result.getLastEvaluatedKey() != null - && !result.getLastEvaluatedKey().isEmpty()) { + if (result.getLastEvaluatedKey() != null && !result.getLastEvaluatedKey().isEmpty()) { hasNext = true; request.setExclusiveStartKey(result.getLastEvaluatedKey()); } else { diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterfaceTests.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterfaceTests.java new file mode 100644 index 0000000..4ef152b --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterfaceTests.java @@ -0,0 +1,21 @@ +/* + * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap; + +/** + * Created by amcp on 2017/04/23. + */ +public class CommandLineInterfaceTests { +} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java index 2572606..7f1940f 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java @@ -31,24 +31,23 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import com.amazonaws.dynamodb.bootstrap.worker.ScanSegmentWorker; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.google.common.util.concurrent.RateLimiter; /** * Unit Tests for DynamoDBTableScan - * */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ RateLimiter.class, DynamoDBTableScan.class }) +@PrepareForTest({RateLimiter.class, DynamoDBTableScan.class}) @PowerMockIgnore("javax.management.*") public class DynamoDBTableScanTest { private static String tableName = "testTableName"; private static Integer totalSegments = 1; private static Integer segment = 0; - private static ScanRequest req = new ScanRequest().withTableName(tableName) - .withTotalSegments(totalSegments).withSegment(segment); + private static ScanRequest req = new ScanRequest().withTableName(tableName).withTotalSegments(totalSegments).withSegment(segment); private double rateLimit = 12.3; /** @@ -56,8 +55,7 @@ public class DynamoDBTableScanTest { * make sure it creates the correct number of segments */ @Test - public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments() - throws Exception { + public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments() throws Exception { int segments = 0; ExecutorService mockExec = createMock(ExecutorService.class); mockStatic(RateLimiter.class); @@ -70,22 +68,16 @@ public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments( ParallelScanExecutor mockScanExecutor = createMock(ParallelScanExecutor.class); ScanSegmentWorker mockSegmentWorker = createMock(ScanSegmentWorker.class); - expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, req) - .andReturn(mockSegmentWorker); - expectNew(ParallelScanExecutor.class, mockExec, 1).andReturn( - mockScanExecutor); + expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, req).andReturn(mockSegmentWorker); + expectNew(ParallelScanExecutor.class, mockExec, 1).andReturn(mockScanExecutor); mockScanExecutor.addWorker(mockSegmentWorker, 0); int segments2 = 3; - ScanRequest testReq = scanner.copyScanRequest(req).withTotalSegments( - segments2); - expectNew(ParallelScanExecutor.class, mockExec, segments2).andReturn( - mockScanExecutor); + ScanRequest testReq = scanner.copyScanRequest(req).withTotalSegments(segments2); + expectNew(ParallelScanExecutor.class, mockExec, segments2).andReturn(mockScanExecutor); for (int i = 0; i < segments2; i++) { - expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, - scanner.copyScanRequest(testReq).withSegment(i)).andReturn( - mockSegmentWorker); + expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, scanner.copyScanRequest(testReq).withSegment(i)).andReturn(mockSegmentWorker); mockScanExecutor.addWorker(mockSegmentWorker, i); } diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResultTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResultTest.java index f11b209..ebf9896 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResultTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResultTest.java @@ -14,11 +14,11 @@ */ package com.amazonaws.dynamodb.bootstrap; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import org.junit.Test; -import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.services.dynamodbv2.model.ScanResult; /** @@ -33,8 +33,7 @@ public class SegmentedScanResultTest { public void test() { ScanResult result = new ScanResult(); int numSegments = 3; - SegmentedScanResult segmentedScanResult = new SegmentedScanResult( - result, numSegments); + SegmentedScanResult segmentedScanResult = new SegmentedScanResult(result, numSegments); assertSame(result, segmentedScanResult.getScanResult()); assertEquals(numSegments, segmentedScanResult.getSegment()); diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverterTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverterTest.java new file mode 100644 index 0000000..7b53ea1 --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverterTest.java @@ -0,0 +1,222 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.junit.Test; + +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.Projection; +import com.amazonaws.services.dynamodbv2.model.ProjectionType; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; +import com.amazonaws.services.dynamodbv2.model.StreamSpecification; +import com.amazonaws.services.dynamodbv2.model.StreamViewType; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.google.common.collect.Lists; + +/** + * Created by amcp on 2017/04/23. + */ +public class TableDescriptionToCreateTableRequestConverterTest { + + public static final String ORDER_ID = "order_id"; + public static final String CUSTOMER_ID = "customer_id"; + public static final String INPUT_TABLE = "InputTable"; + public static final String OUTPUT_TABLE = "OutputTable"; + public static final ProvisionedThroughput DEFAULT_PT = new ProvisionedThroughput(1L, 4L); + + + static class NoLsiNoGsi { + static final List attributeDefinitionList = + Lists.newArrayList(new AttributeDefinition(ORDER_ID, ScalarAttributeType.S), new AttributeDefinition(CUSTOMER_ID, ScalarAttributeType.S)); + static final List keySchemata = Lists.newArrayList(new KeySchemaElement(CUSTOMER_ID, KeyType.HASH), new KeySchemaElement(ORDER_ID, KeyType.RANGE)); + static final ProvisionedThroughputDescription pt = new ProvisionedThroughputDescription().withReadCapacityUnits(1L).withWriteCapacityUnits(4L); + static final TableDescription description = + new TableDescription().withAttributeDefinitions(NoLsiNoGsi.attributeDefinitionList).withKeySchema(NoLsiNoGsi.keySchemata).withProvisionedThroughput(NoLsiNoGsi.pt) + .withTableName(INPUT_TABLE); + } + + @Test + public void apply_whenNoGsiButIncludeAllGsi_andNoLsiButIncludeAllLsi_andNoStreamButIncludeStream() { + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(new TreeSet<>()).lsiToInclude(new TreeSet<>()).newTableName(OUTPUT_TABLE).createAllGsi(true) + .createAllLsi(true).copyStreamSpecification(true).build(); + CreateTableRequest ctr = converter.apply(NoLsiNoGsi.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(NoLsiNoGsi.attributeDefinitionList, NoLsiNoGsi.description.getAttributeDefinitions()); + assertEquals(NoLsiNoGsi.keySchemata, NoLsiNoGsi.description.getKeySchema()); + assertNull(ctr.getLocalSecondaryIndexes()); + assertNull(ctr.getGlobalSecondaryIndexes()); + assertNull(ctr.getStreamSpecification()); + } + + @Test + public void apply_whenNoGsiButDontIncludeAllGsi_andNoLsiButDontIncludeAllLsi_andNoStreamButDontIncludeStream() { + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(new TreeSet<>()).lsiToInclude(new TreeSet<>()).newTableName(OUTPUT_TABLE).createAllGsi(false) + .createAllLsi(false).copyStreamSpecification(false).build(); + CreateTableRequest ctr = converter.apply(NoLsiNoGsi.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(NoLsiNoGsi.attributeDefinitionList, NoLsiNoGsi.description.getAttributeDefinitions()); + assertEquals(NoLsiNoGsi.keySchemata, NoLsiNoGsi.description.getKeySchema()); + assertNull(ctr.getLocalSecondaryIndexes()); + assertNull(ctr.getGlobalSecondaryIndexes()); + assertNull(ctr.getStreamSpecification()); + } + + private static final String GSI_NAME_ONE = "gsi1"; + private static final String GSI_NAME_TWO = "gsi2"; + private static final String LSI_NAME_ONE = "lsi1"; + private static final String LSI_NAME_TWO = "lsi2"; + private static final String ORDER_TS_MILLIS = "order_ts_millis"; + private static final String ORDER_DATE = "order_date"; + + + static class TwoLsiTwoGsiStream { + static final List attributeDefinitionList = Lists + .newArrayList(new AttributeDefinition(ORDER_ID, ScalarAttributeType.S), new AttributeDefinition(CUSTOMER_ID, ScalarAttributeType.S), + new AttributeDefinition(ORDER_DATE, ScalarAttributeType.N), new AttributeDefinition(ORDER_TS_MILLIS, ScalarAttributeType.N)); + static final List baseKeySchema = Lists.newArrayList(new KeySchemaElement(CUSTOMER_ID, KeyType.HASH), new KeySchemaElement(ORDER_ID, KeyType.RANGE)); + static final List gsiKeySchema = Lists.newArrayList(new KeySchemaElement(ORDER_DATE, KeyType.HASH), new KeySchemaElement(ORDER_TS_MILLIS, KeyType.RANGE)); + static final List lsiKeySchema = + Lists.newArrayList(new KeySchemaElement(CUSTOMER_ID, KeyType.HASH), new KeySchemaElement(ORDER_TS_MILLIS, KeyType.RANGE)); + static final ProvisionedThroughputDescription pt = new ProvisionedThroughputDescription().withReadCapacityUnits(1L).withWriteCapacityUnits(4L); + static final StreamSpecification streamSpecification = new StreamSpecification().withStreamEnabled(true).withStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); + static final TableDescription description = new TableDescription().withAttributeDefinitions(attributeDefinitionList).withKeySchema(baseKeySchema) + .withGlobalSecondaryIndexes(new GlobalSecondaryIndexDescription().withIndexName(GSI_NAME_ONE).withKeySchema(gsiKeySchema).withProvisionedThroughput(pt) + .withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY)), + new GlobalSecondaryIndexDescription().withIndexName(GSI_NAME_TWO).withKeySchema(gsiKeySchema).withProvisionedThroughput(pt) + .withProjection(new Projection().withProjectionType(ProjectionType.ALL))).withLocalSecondaryIndexes( + new LocalSecondaryIndexDescription().withIndexName(LSI_NAME_ONE).withKeySchema(lsiKeySchema) + .withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY)), + new LocalSecondaryIndexDescription().withIndexName(LSI_NAME_TWO).withKeySchema(lsiKeySchema) + .withProjection(new Projection().withProjectionType(ProjectionType.ALL))).withStreamSpecification(streamSpecification).withProvisionedThroughput(pt) + .withTableName(INPUT_TABLE); + } + + @Test + public void apply_whenTwoGsiAndIncludeAllGsi_andTwoLsiAndIncludeAllLsi_andStreamAndIncludeStream() { + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(new TreeSet<>()).lsiToInclude(new TreeSet<>()).newTableName(OUTPUT_TABLE).createAllGsi(true) + .createAllLsi(true).copyStreamSpecification(true).build(); + CreateTableRequest ctr = converter.apply(TwoLsiTwoGsiStream.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(TwoLsiTwoGsiStream.attributeDefinitionList, TwoLsiTwoGsiStream.description.getAttributeDefinitions()); + assertEquals(TwoLsiTwoGsiStream.baseKeySchema, TwoLsiTwoGsiStream.description.getKeySchema()); + + //LSI + assertNotNull(ctr.getLocalSecondaryIndexes()); + assertFalse(ctr.getLocalSecondaryIndexes().isEmpty()); + assertEquals(2, ctr.getLocalSecondaryIndexes().size()); + assertEquals(TwoLsiTwoGsiStream.lsiKeySchema, ctr.getLocalSecondaryIndexes().get(0).getKeySchema()); + assertEquals(ProjectionType.KEYS_ONLY.toString(), ctr.getLocalSecondaryIndexes().get(0).getProjection().getProjectionType()); + assertEquals(LSI_NAME_ONE, ctr.getLocalSecondaryIndexes().get(0).getIndexName()); + assertEquals(TwoLsiTwoGsiStream.lsiKeySchema, ctr.getLocalSecondaryIndexes().get(1).getKeySchema()); + assertEquals(ProjectionType.ALL.toString(), ctr.getLocalSecondaryIndexes().get(1).getProjection().getProjectionType()); + assertEquals(LSI_NAME_TWO, ctr.getLocalSecondaryIndexes().get(1).getIndexName()); + + //GSI + assertNotNull(ctr.getGlobalSecondaryIndexes()); + assertFalse(ctr.getGlobalSecondaryIndexes().isEmpty()); + assertEquals(2, ctr.getGlobalSecondaryIndexes().size()); + assertEquals(TwoLsiTwoGsiStream.gsiKeySchema, ctr.getGlobalSecondaryIndexes().get(0).getKeySchema()); + assertEquals(ProjectionType.KEYS_ONLY.toString(), ctr.getGlobalSecondaryIndexes().get(0).getProjection().getProjectionType()); + assertEquals(GSI_NAME_ONE, ctr.getGlobalSecondaryIndexes().get(0).getIndexName()); + assertEquals(DEFAULT_PT, ctr.getGlobalSecondaryIndexes().get(0).getProvisionedThroughput()); + assertEquals(TwoLsiTwoGsiStream.gsiKeySchema, ctr.getGlobalSecondaryIndexes().get(1).getKeySchema()); + assertEquals(ProjectionType.ALL.toString(), ctr.getGlobalSecondaryIndexes().get(1).getProjection().getProjectionType()); + assertEquals(GSI_NAME_TWO, ctr.getGlobalSecondaryIndexes().get(1).getIndexName()); + assertEquals(DEFAULT_PT, ctr.getGlobalSecondaryIndexes().get(1).getProvisionedThroughput()); + + //STREAM SPECIFICATION + assertNotNull(ctr.getStreamSpecification()); + assertTrue(ctr.getStreamSpecification().getStreamEnabled()); + assertEquals(StreamViewType.NEW_AND_OLD_IMAGES.toString(), ctr.getStreamSpecification().getStreamViewType()); + } + + @Test + public void apply_whenTwoGsiAndDontIncludeAllGsi_andTwoLsiAndDontIncludeAllLsi_andStreamAndDontIncludeStream() { + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(new TreeSet<>()).lsiToInclude(new TreeSet<>()).newTableName(OUTPUT_TABLE).createAllGsi(false) + .createAllLsi(false).copyStreamSpecification(false).build(); + CreateTableRequest ctr = converter.apply(TwoLsiTwoGsiStream.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(TwoLsiTwoGsiStream.attributeDefinitionList, TwoLsiTwoGsiStream.description.getAttributeDefinitions()); + assertEquals(TwoLsiTwoGsiStream.baseKeySchema, TwoLsiTwoGsiStream.description.getKeySchema()); + + //LSI + assertNull(ctr.getLocalSecondaryIndexes()); + + //GSI + assertNull(ctr.getGlobalSecondaryIndexes()); + + //STREAM SPECIFICATION + assertNull(ctr.getStreamSpecification()); + } + + @Test + public void apply_whenTwoGsiAndDontIncludeAllGsiButIncludeOne_andTwoLsiAndDontIncludeAllLsiButIncludeOne_andStreamAndDontIncludeStream() { + SortedSet gsiToInclude = new TreeSet<>(); + gsiToInclude.add(GSI_NAME_TWO); + SortedSet lsiToInclude = new TreeSet<>(); + lsiToInclude.add(LSI_NAME_TWO); + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(gsiToInclude).lsiToInclude(lsiToInclude).newTableName(OUTPUT_TABLE).createAllGsi(false) + .createAllLsi(false).copyStreamSpecification(false).build(); + CreateTableRequest ctr = converter.apply(TwoLsiTwoGsiStream.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(TwoLsiTwoGsiStream.attributeDefinitionList, TwoLsiTwoGsiStream.description.getAttributeDefinitions()); + assertEquals(TwoLsiTwoGsiStream.baseKeySchema, TwoLsiTwoGsiStream.description.getKeySchema()); + + //LSI + assertNotNull(ctr.getLocalSecondaryIndexes()); + assertFalse(ctr.getLocalSecondaryIndexes().isEmpty()); + assertEquals(TwoLsiTwoGsiStream.lsiKeySchema, ctr.getLocalSecondaryIndexes().get(0).getKeySchema()); + assertEquals(ProjectionType.ALL.toString(), ctr.getLocalSecondaryIndexes().get(0).getProjection().getProjectionType()); + assertEquals(LSI_NAME_TWO, ctr.getLocalSecondaryIndexes().get(0).getIndexName()); + + //GSI + assertNotNull(ctr.getGlobalSecondaryIndexes()); + assertFalse(ctr.getGlobalSecondaryIndexes().isEmpty()); + assertEquals(TwoLsiTwoGsiStream.gsiKeySchema, ctr.getGlobalSecondaryIndexes().get(0).getKeySchema()); + assertEquals(ProjectionType.ALL.toString(), ctr.getGlobalSecondaryIndexes().get(0).getProjection().getProjectionType()); + assertEquals(GSI_NAME_TWO, ctr.getGlobalSecondaryIndexes().get(0).getIndexName()); + assertEquals(DEFAULT_PT, ctr.getGlobalSecondaryIndexes().get(0).getProvisionedThroughput()); + + //STREAM SPECIFICATION + assertNull(ctr.getStreamSpecification()); + } +} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java deleted file mode 100644 index b688dbd..0000000 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.amazonaws.dynamodb.bootstrap.com.amazonaws.dynamodb.bootstrap.example; - -import com.amazonaws.dynamodb.bootstrap.DynamoDBBootstrapWorker; -import com.amazonaws.dynamodb.bootstrap.DynamoDBConsumer; -import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; - -class TransferDataFromOneTableToAnother { - public static void main(String[] args) { - AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() - .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); - DynamoDBBootstrapWorker worker = null; - try { - // 100.0 read operations per second. 4 threads to scan the table. - worker = new DynamoDBBootstrapWorker(client, - 100.0, "mySourceTable", 4); - } catch (NullReadCapacityException e) { - System.err.println("The DynamoDB source table returned a null read capacity."); - System.exit(1); - } - // 50.0 write operations per second. 8 threads to scan the table. - DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", 50.0, - Executors.newFixedThreadPool(8)); - try { - worker.pipe(consumer); - } catch (ExecutionException e) { - System.err.println("Encountered exception when executing transfer: " + e.getMessage()); - System.exit(1); - } catch (InterruptedException e){ - System.err.println("Interrupted when executing transfer: " + e.getMessage()); - System.exit(1); - } - } -} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java deleted file mode 100644 index b4adb58..0000000 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.amazonaws.dynamodb.bootstrap.com.amazonaws.dynamodb.bootstrap.example; - -import com.amazonaws.dynamodb.bootstrap.BlockingQueueConsumer; -import com.amazonaws.dynamodb.bootstrap.DynamoDBBootstrapWorker; -import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; - -import java.util.concurrent.ExecutionException; - -class TransferDataFromOneTableToBlockingQueue { - public static void main(String[] args) { - AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() - .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); - - DynamoDBBootstrapWorker worker = null; - - try { - // 100.0 read operations per second. 4 threads to scan the table. - worker = new DynamoDBBootstrapWorker(client, 100.0, "mySourceTable", 4); - } catch (NullReadCapacityException e) { - System.err.println("The DynamoDB source table returned a null read capacity."); - System.exit(1); - } - - BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); - - try { - worker.pipe(consumer); - } catch (ExecutionException e) { - System.err.println("Encountered exception when executing transfer: " + e.getMessage()); - System.exit(1); - } catch (InterruptedException e){ - System.err.println("Interrupted when executing transfer: " + e.getMessage()); - System.exit(1); - } - } -} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumerTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumerTest.java similarity index 83% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumerTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumerTest.java index e8e2bb5..f17ad71 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumerTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumerTest.java @@ -12,9 +12,14 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; -import static org.junit.Assert.*; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.powermock.api.easymock.PowerMock.createMock; +import static org.powermock.api.easymock.PowerMock.mockStatic; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; @@ -26,14 +31,10 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import static org.powermock.api.easymock.PowerMock.*; -import static org.easymock.EasyMock.expect; - -import com.amazonaws.dynamodb.bootstrap.BlockingQueueConsumer; +import com.amazonaws.dynamodb.bootstrap.items.DynamoDBEntryWithSize; /** * Unit Tests for LogStashExecutor - * */ @RunWith(PowerMockRunner.class) @PrepareForTest(BlockingQueueConsumer.class) @@ -52,8 +53,7 @@ public void testInitializeAndShutdown() { mockStatic(Executors.class); ExecutorService mockThreadPool = createMock(ExecutorService.class); - expect(Executors.newFixedThreadPool(totalThreads)).andReturn( - mockThreadPool); + expect(Executors.newFixedThreadPool(totalThreads)).andReturn(mockThreadPool); BlockingQueue queue = logExec.getQueue(); diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumerTest.java similarity index 82% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumerTest.java index 9ae7113..c438b94 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumerTest.java @@ -12,23 +12,24 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import static org.powermock.api.easymock.PowerMock.*; - import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; @@ -36,7 +37,6 @@ /** * Unit tests for DynamoDBConsumerWorker - * */ @RunWith(PowerMockRunner.class) @PrepareForTest(DynamoDBConsumer.class) @@ -56,8 +56,7 @@ public void splitResultIntoBatchesTest() { List> items = new LinkedList>(); for (int i = 0; i < numItems; i++) { Map sampleScanResult = new HashMap(); - sampleScanResult.put("key", new AttributeValue("attribute value " - + i)); + sampleScanResult.put("key", new AttributeValue("attribute value " + i)); items.add(sampleScanResult); } scanResult.setItems(items); @@ -65,10 +64,8 @@ public void splitResultIntoBatchesTest() { SegmentedScanResult result = new SegmentedScanResult(scanResult, 0); replayAll(); - List batches = DynamoDBConsumer - .splitResultIntoBatches(result.getScanResult(), tableName); - assertEquals(Math.ceil(numItems / BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM), - batches.size(), 0.0); + List batches = DynamoDBConsumer.splitResultIntoBatches(result.getScanResult(), tableName); + assertEquals(Math.ceil(numItems / BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM), batches.size(), 0.0); verifyAll(); } diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java new file mode 100644 index 0000000..354618d --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap.example; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +import com.amazonaws.dynamodb.bootstrap.consumer.DynamoDBConsumer; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.dynamodb.bootstrap.worker.DynamoDBBootstrapWorker; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +class TransferDataFromOneTableToAnother { + public static void main(String[] args) { + final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + try { + // 100.0 read operations per second. 4 threads to scan the table. + final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client, 100.0, "mySourceTable", 4); + // 50.0 write operations per second. 8 threads to scan the table. + final DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", 50.0, Executors.newFixedThreadPool(8)); + worker.pipe(consumer); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } catch (ExecutionException | InterruptedException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } + } +} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java new file mode 100644 index 0000000..c5bb967 --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java @@ -0,0 +1,41 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap.example; + +import java.util.concurrent.ExecutionException; + +import com.amazonaws.dynamodb.bootstrap.consumer.BlockingQueueConsumer; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.dynamodb.bootstrap.worker.DynamoDBBootstrapWorker; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +class TransferDataFromOneTableToBlockingQueue { + public static void main(String[] args) { + final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + try { + // 100.0 read operations per second. 4 threads to scan the table. + final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client, 100.0, "mySourceTable", 4); + final BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); + worker.pipe(consumer); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } catch (ExecutionException | InterruptedException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } + } +} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixInTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixInTest.java similarity index 87% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixInTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixInTest.java index 86230fb..a7da451 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixInTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixInTest.java @@ -12,9 +12,9 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.items; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; import java.util.HashMap; import java.util.LinkedList; @@ -30,7 +30,6 @@ /** * Unit test for AttributeValueMixIn - * */ public class AttributeValueMixInTest { @@ -57,17 +56,14 @@ public void testReturnsCapitalSWithMixin() throws JsonProcessingException { ObjectMapper mapperWith = new ObjectMapper(); mapperWith.setSerializationInclusion(Include.NON_NULL); - mapperWith.addMixInAnnotations(AttributeValue.class, - AttributeValueMixIn.class); + mapperWith.addMixIn(AttributeValue.class, AttributeValueMixIn.class); - String withMixIn = mapperWith.writeValueAsString(sampleScanResult() - .get(0)); + String withMixIn = mapperWith.writeValueAsString(sampleScanResult().get(0)); ObjectMapper mapperWithout = new ObjectMapper(); mapperWithout.setSerializationInclusion(Include.NON_NULL); - String withoutMixIn = mapperWithout - .writeValueAsString(sampleScanResult().get(0)); + String withoutMixIn = mapperWithout.writeValueAsString(sampleScanResult().get(0)); assertTrue(withMixIn.contains(capitalS)); assertTrue(withoutMixIn.contains(lowercaseS)); diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorkerTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorkerTest.java similarity index 80% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorkerTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorkerTest.java index ee2d2a0..95ce49d 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorkerTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorkerTest.java @@ -12,9 +12,14 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; -import static org.junit.Assert.*; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.powermock.api.easymock.PowerMock.createMock; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; import java.util.HashMap; import java.util.LinkedList; @@ -29,40 +34,32 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import static org.powermock.api.easymock.PowerMock.*; -import static org.easymock.EasyMock.expect; - -import com.amazonaws.dynamodb.bootstrap.BlockingQueueWorker; import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; +import com.amazonaws.dynamodb.bootstrap.items.DynamoDBEntryWithSize; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.ScanResult; /** * Unit Tests for LogStashQueueWorker - * */ @RunWith(PowerMockRunner.class) @PrepareForTest(BlockingQueueWorker.class) @PowerMockIgnore("javax.management.*") public class BlockingQueueWorkerTest { - + /** * Test the initialization of a BlockingQueueWorker and make sure it places the items in the queue when called. */ @Test public void testInitializationAndCall() { ScanResult mockResult = createMock(ScanResult.class); - SegmentedScanResult segmentedScanResult = new SegmentedScanResult( - mockResult, 0); - BlockingQueue queue = new ArrayBlockingQueue( - 20); - BlockingQueueWorker callable = new BlockingQueueWorker(queue, - segmentedScanResult); + SegmentedScanResult segmentedScanResult = new SegmentedScanResult(mockResult, 0); + BlockingQueue queue = new ArrayBlockingQueue(20); + BlockingQueueWorker callable = new BlockingQueueWorker(queue, segmentedScanResult); List> items = new LinkedList>(); Map sampleScanResult = new HashMap(); - sampleScanResult.put("sample key", new AttributeValue( - "sample attribute value")); + sampleScanResult.put("sample key", new AttributeValue("sample attribute value")); items.add(sampleScanResult); expect(mockResult.getItems()).andReturn(items); diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorkerTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorkerTest.java similarity index 88% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorkerTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorkerTest.java index 3e91fe9..453431b 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorkerTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorkerTest.java @@ -12,11 +12,13 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertTrue; import static org.powermock.api.easymock.PowerMock.createMock; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; import java.util.concurrent.ThreadPoolExecutor; @@ -26,14 +28,11 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import static org.powermock.api.easymock.PowerMock.*; - import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; /** * Unit Tests for DynamoDBBootstrapWorker - * */ @RunWith(PowerMockRunner.class) @PrepareForTest(DynamoDBBootstrapWorker.class) @@ -62,12 +61,11 @@ public void testInitialization() throws Exception { replayAll(); - new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, - mockThreadPool, 0, 1, 10, false); + new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, mockThreadPool, 0, 1, 10, false); verifyAll(); } - + /** * Test the initialization of a DynamoDBBootstrapWorker with an invalid section. */ @@ -78,10 +76,9 @@ public void testInitializationInvalidSection() throws Exception { replayAll(); boolean exceptionThrown = false; - try{ - new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, - mockThreadPool, 1, 1, 10, false); - }catch (SectionOutOfRangeException e){ + try { + new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, mockThreadPool, 1, 1, 10, false); + } catch (SectionOutOfRangeException e) { exceptionThrown = true; } @@ -99,8 +96,7 @@ public void testShutdownWithoutWaiting() throws Exception { expect(mockThreadPool.shutdownNow()).andReturn(null); replayAll(); - DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker( - mockClient, rateLimit, tableName, mockThreadPool, 0, 1, 10, false); + DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, mockThreadPool, 0, 1, 10, false); worker.shutdown(false); verifyAll();