From 836a11cf1760ef487c35842b5ad4b69cf0b3ae3f Mon Sep 17 00:00:00 2001 From: Josep Anguera Peralta Date: Mon, 11 Mar 2024 21:06:28 +0100 Subject: [PATCH] Stop using SDK v1 When using this library it pulls the AWS SDK v1 because of the dependency declared on STS. But it seems is not actually used. This fixes [issue #274][1]. [1]: https://github.com/awslabs/aws-glue-schema-registry/issues/274 --- examples/pom.xml | 14 ++- .../kds/PutRecordGetRecordExample.java | 112 +++++++++--------- pom.xml | 6 - serializer-deserializer/pom.xml | 5 - 4 files changed, 70 insertions(+), 67 deletions(-) diff --git a/examples/pom.xml b/examples/pom.xml index dfd3f90c..f210e96a 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -61,8 +61,18 @@ - com.amazonaws - aws-java-sdk-kinesis + software.amazon.kinesis + amazon-kinesis-client + + + + + + com.google.protobuf + protobuf-java + + + compile com.fasterxml.jackson.dataformat diff --git a/examples/src/main/java/com/amazonaws/services/schemaregistry/examples/kds/PutRecordGetRecordExample.java b/examples/src/main/java/com/amazonaws/services/schemaregistry/examples/kds/PutRecordGetRecordExample.java index f20fab85..9698cb7f 100644 --- a/examples/src/main/java/com/amazonaws/services/schemaregistry/examples/kds/PutRecordGetRecordExample.java +++ b/examples/src/main/java/com/amazonaws/services/schemaregistry/examples/kds/PutRecordGetRecordExample.java @@ -14,19 +14,6 @@ */ package com.amazonaws.services.schemaregistry.examples.kds; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.PutRecordsRequest; -import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; -import com.amazonaws.services.kinesis.model.PutRecordsResult; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.schemaregistry.common.Schema; import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration; import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer; @@ -34,6 +21,17 @@ import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializer; import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl; import com.amazonaws.services.schemaregistry.utils.AVROUtils; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; @@ -49,19 +47,22 @@ import org.joda.time.DateTime; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.glue.model.DataFormat; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.logging.Logger; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; /** * This is an example of how to use Glue Schema Registry (GSR) with Kinesis Data Streams Get / Put Record APIs. @@ -70,7 +71,7 @@ */ public class PutRecordGetRecordExample { private static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc"; - private static AmazonKinesis kinesisClient; + private static KinesisClient kinesisClient; private static final Logger LOGGER = Logger.getLogger(PutRecordGetRecordExample.class.getSimpleName()); private static AwsCredentialsProvider awsCredentialsProvider = DefaultCredentialsProvider @@ -97,7 +98,7 @@ public static void main(final String[] args) throws Exception { int numOfRecords = Integer.parseInt(cmd.getOptionValue("numRecords", "10")); //Kinesis data streams client initialization. - kinesisClient = AmazonKinesisClientBuilder.standard().withRegion(regionName).build(); + kinesisClient = KinesisClient.builder().region(Region.of(regionName)).build(); //Glue Schema Registry serializer initialization for the producer. glueSchemaRegistrySerializer = @@ -129,38 +130,41 @@ public static void main(final String[] args) throws Exception { private static void getRecordsWithSchema(String streamName, Date timestamp) throws IOException { //Standard Kinesis code to getRecords from a Kinesis Data Stream. String shardIterator; - DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); + DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() + .streamName(streamName) + .build(); List shards = new ArrayList<>(); - DescribeStreamResult streamRes; + DescribeStreamResponse streamRes; do { streamRes = kinesisClient.describeStream(describeStreamRequest); - shards.addAll(streamRes.getStreamDescription().getShards()); + shards.addAll(streamRes.streamDescription().shards()); if (shards.size() > 0) { - shards.get(shards.size() - 1).getShardId(); + shards.get(shards.size() - 1).shardId(); } - } while (streamRes.getStreamDescription().getHasMoreShards()); + } while (streamRes.streamDescription().hasMoreShards()); - GetShardIteratorRequest itReq = new GetShardIteratorRequest(); - itReq.setStreamName(streamName); - itReq.setShardId(shards.get(0).getShardId()); - itReq.setTimestamp(timestamp); - itReq.setShardIteratorType("AT_TIMESTAMP"); + GetShardIteratorRequest itReq = GetShardIteratorRequest.builder() + .streamName(streamName) + .shardId(shards.get(0).shardId()) + .timestamp(timestamp.toInstant()) + .shardIteratorType(ShardIteratorType.AT_TIMESTAMP) + .build(); - GetShardIteratorResult shardIteratorResult = kinesisClient.getShardIterator(itReq); - shardIterator = shardIteratorResult.getShardIterator(); + GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq); + shardIterator = shardIteratorResult.shardIterator(); // Create new GetRecordsRequest with existing shardIterator. - GetRecordsRequest recordsRequest = new GetRecordsRequest(); - recordsRequest.setShardIterator(shardIterator); - recordsRequest.setLimit(1000); + GetRecordsRequest recordsRequest = GetRecordsRequest.builder() + .shardIterator(shardIterator) + .limit(1000) + .build(); - GetRecordsResult result = kinesisClient.getRecords(recordsRequest); + GetRecordsResponse result = kinesisClient.getRecords(recordsRequest); - for (Record record : result.getRecords()) { - ByteBuffer recordAsByteBuffer = record.getData(); + for (Record record : result.records()) { + ByteBuffer recordAsByteBuffer = record.data().asByteBuffer(); GenericRecord decodedRecord = decodeRecord(recordAsByteBuffer); LOGGER.info("Decoded Record: " + decodedRecord); } @@ -168,8 +172,8 @@ private static void getRecordsWithSchema(String streamName, Date timestamp) thro private static void putRecordsWithSchema(String streamName, int numOfRecords, Schema gsrSchema, Date timestamp) { //Standard Kinesis code to putRecords into a Kinesis Data Stream. - PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); - putRecordsRequest.setStreamName(streamName); + PutRecordsRequest.Builder putRecordsRequest = PutRecordsRequest.builder(); + putRecordsRequest.streamName(streamName); List recordsRequestEntries = new ArrayList<>(); @@ -177,17 +181,17 @@ private static void putRecordsWithSchema(String streamName, int numOfRecords, Sc for (int i = 0; i < numOfRecords; i++) { GenericRecord record = (GenericRecord) getTestRecord(i); byte[] recordWithSchema = encodeRecord(record, streamName, gsrSchema); - PutRecordsRequestEntry entry = new PutRecordsRequestEntry(); - entry.setData(ByteBuffer.wrap(recordWithSchema)); - entry.setPartitionKey(String.valueOf(timestamp.toInstant() + PutRecordsRequestEntry.Builder entry = PutRecordsRequestEntry.builder(); + entry.data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(recordWithSchema))); + entry.partitionKey(String.valueOf(timestamp.toInstant() .toEpochMilli())); - recordsRequestEntries.add(entry); + recordsRequestEntries.add(entry.build()); } - putRecordsRequest.setRecords(recordsRequestEntries); + putRecordsRequest.records(recordsRequestEntries); - PutRecordsResult putRecordResult = kinesisClient.putRecords(putRecordsRequest); + PutRecordsResponse putRecordResult = kinesisClient.putRecords(putRecordsRequest.build()); LOGGER.info("Successfully put records: " + putRecordResult); } diff --git a/pom.xml b/pom.xml index b1afa22f..2918b185 100644 --- a/pom.xml +++ b/pom.xml @@ -81,7 +81,6 @@ UTF-8 software.amazon.glue 2.22.12 - 1.12.660 2.12 3.6.1 1.11.3 @@ -252,11 +251,6 @@ everit-json-schema ${everit.json.schema.version} - - com.amazonaws - aws-java-sdk-kinesis - ${aws.sdk.v1.version} - com.fasterxml.jackson.dataformat jackson-dataformat-cbor diff --git a/serializer-deserializer/pom.xml b/serializer-deserializer/pom.xml index 12716b75..d8216251 100644 --- a/serializer-deserializer/pom.xml +++ b/serializer-deserializer/pom.xml @@ -60,11 +60,6 @@ - - com.amazonaws - aws-java-sdk-sts - ${aws.sdk.v1.version} - software.amazon.awssdk sts