Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue Schema Registry schema replication converter #352

Open
wants to merge 49 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a724b16
Created MM2 Converter and Added Unit Tests
Jul 26, 2023
f26a158
Modified Converter Based on First CR Feedback
Jul 26, 2023
7c365e2
Modified Converter Based on First CR Feedback
Jul 26, 2023
0ef7d03
Added TODOs
Aug 18, 2023
b562512
Merge pull request #283 from JC-CJ2222/new_cross-region-replication-m…
YangSan0622 Sep 11, 2023
e4c45b3
Created MM2 Converter and Added Unit Tests
Jul 26, 2023
f095ead
Modified Converter Based on First CR Feedback
Jul 26, 2023
2f30def
Modified Converter Based on First CR Feedback
Jul 26, 2023
03f9328
Added TODOs
Aug 18, 2023
f5f0788
Change version in pom
Jun 12, 2024
07afed0
Added 2nd kafka and ZK in docker compose service file
Jun 13, 2024
0c93896
Added mm2 connector files for standalone run
Jun 13, 2024
d86bfcd
Added code for scenario where message doesn't have schema in it.
Jun 14, 2024
fc55cb6
Merge remote-tracking branch 'upstream/schema-replication-converter' …
Jun 14, 2024
e4d4003
Merge code
Jun 14, 2024
a23c8a9
Updated mm2 configs
Jun 14, 2024
13db222
Unit test to show when data doesn't have schema, it still gets replic…
Jun 14, 2024
925ebdd
Fixed some issues in the compose file and added mm2
Jun 15, 2024
66a847d
Dockerfile and mm2 config for mm2 container
Jun 15, 2024
0aa94f1
Added integration tests for schema replication
Jun 18, 2024
f296fd0
Added integration tests for schema replication
Jun 18, 2024
3bd91a0
loading aws credentials from host to docker
Jun 18, 2024
569d4b3
Configuring default aws profile
Jun 18, 2024
6f2bcc4
Renamed class to AWSGlueCrossRegionSchemaReplicationConverter
Jul 3, 2024
8c84686
Increased the sleep time
Jul 3, 2024
110b792
Source schema compatibility mode is persisted in target schema
Jul 8, 2024
dc9c91e
MM2 will pickup topics prefixed with SchemaReplicationTests only
Jul 9, 2024
f0c0210
mongo JAR file deleted as its created during integration tests
Jul 9, 2024
d4211a4
TESTS: Fixed Tests for SchemaReplicationConverter after adding _V2 ve…
Jul 9, 2024
efc21ea
TESTS: Added tests for Serializer-Deserializer module after adding _V…
Jul 9, 2024
837fc97
TESTS: Added tests for schema-registry-common module after adding _V2…
Jul 9, 2024
d68448a
Removed TODO comment for issue #294 as its already working as expected
Jul 9, 2024
c4d2a40
Issue Fix: Order of schema versions in the source are honoured in the…
Jul 11, 2024
8cec19e
Added tests for GlueSchemaRegistryConfiguration and AWSGlueCrossRegio…
Jul 11, 2024
5a492dd
Added tests for SchemaByDefinitionFetcher changes
Jul 11, 2024
d948e2f
Added tests for AWSSchemaRegistryClient changes
Jul 12, 2024
e1c515c
Added an integration test with the schema replication converter and f…
Jul 12, 2024
7698435
Deleted wget-log as its unnecessary
Jul 12, 2024
6b7f1aa
Code Cleanup: Moved the logic of createSchemaAndRegisterAllSchemaVers…
Aug 20, 2024
e0e46ed
Code cleanup and added cache for schema replication converter for ini…
Aug 21, 2024
c2841f7
Code Cleanup: Moved schema replication related GSR configurations to …
Aug 21, 2024
4e80dd6
Code Cleanup: Moved schema replication related GSR configurations to …
Aug 21, 2024
49c04c8
Code Cleanup: Moved schema replication related constants to the conve…
Aug 21, 2024
91b3757
Code refactorings to improve performance of schema replication
Sep 6, 2024
b1a81f7
Added unit test for schema replication and removed schema-replication…
Dec 16, 2024
1adc2b3
Removed redundant code
Dec 16, 2024
cd4a900
Code Refactorings to cleanup AWSSchemaRegistryClient
Dec 17, 2024
8a335aa
mockito library updated
Dec 17, 2024
3928403
Fixed checkstyle issues
Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
**/*.project
**/*.classpath
**/*.factorypath
**/dependency-reduced-pom.xml
**/dependency-reduced-pom.xml
/integration-tests/*
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@
import software.amazon.awssdk.services.glue.model.DataFormat;
import software.amazon.awssdk.services.glue.model.GetSchemaByDefinitionRequest;
import software.amazon.awssdk.services.glue.model.GetSchemaByDefinitionResponse;
import software.amazon.awssdk.services.glue.model.GetSchemaRequest;
import software.amazon.awssdk.services.glue.model.GetSchemaResponse;
import software.amazon.awssdk.services.glue.model.GetSchemaVersionRequest;
import software.amazon.awssdk.services.glue.model.GetSchemaVersionResponse;
import software.amazon.awssdk.services.glue.model.GetTagsRequest;
import software.amazon.awssdk.services.glue.model.GetTagsResponse;
import software.amazon.awssdk.services.glue.model.GlueRequest;
import software.amazon.awssdk.services.glue.model.ListSchemaVersionsRequest;
import software.amazon.awssdk.services.glue.model.ListSchemaVersionsResponse;
import software.amazon.awssdk.services.glue.model.MetadataKeyValuePair;
import software.amazon.awssdk.services.glue.model.PutSchemaVersionMetadataRequest;
import software.amazon.awssdk.services.glue.model.PutSchemaVersionMetadataResponse;
Expand All @@ -56,9 +60,12 @@
import software.amazon.awssdk.services.glue.model.RegisterSchemaVersionResponse;
import software.amazon.awssdk.services.glue.model.RegistryId;
import software.amazon.awssdk.services.glue.model.SchemaId;
import software.amazon.awssdk.services.glue.model.SchemaVersionListItem;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.UUID;
Expand Down Expand Up @@ -180,7 +187,7 @@ public GetSchemaVersionResponse getSchemaVersionResponse(@NonNull String schemaV
return schemaVersionResponse;
}

private GetSchemaVersionRequest getSchemaVersionRequest(String schemaVersionId) {
public GetSchemaVersionRequest getSchemaVersionRequest(String schemaVersionId) {
GetSchemaVersionRequest getSchemaVersionRequest = GetSchemaVersionRequest.builder()
.schemaVersionId(schemaVersionId).build();
return getSchemaVersionRequest;
Expand All @@ -193,6 +200,43 @@ private void validateSchemaVersionResponse(GetSchemaVersionResponse schemaVersio
}
}

/**
* Get Schema by passing the schema id.
* @param schemaId Schema Id
* @return schema returns the schema corresponding to the
* schema id passed and null in case service is not able to found the
* schema corresponding to schema id.
* @throws AWSSchemaRegistryException on any error while fetching the schema
*/
public GetSchemaResponse getSchemaResponse(@NonNull SchemaId schemaId)
throws AWSSchemaRegistryException {
GetSchemaResponse schemaResponse = null;

try {
schemaResponse = client.getSchema(getSchemaRequest(schemaId));
validateSchemaResponse(schemaResponse, schemaId);
} catch (Exception e) {
String errorMessage = String.format("Failed to get schema Id = %s", schemaId);
throw new AWSSchemaRegistryException(errorMessage, e);
}

return schemaResponse;
}

private GetSchemaRequest getSchemaRequest(SchemaId schemaId) {
GetSchemaRequest getSchemaRequest = GetSchemaRequest.builder()
.schemaId(schemaId)
.build();
return getSchemaRequest;
}

private void validateSchemaResponse(GetSchemaResponse schemaResponse, SchemaId schemaId) {
if (schemaResponse == null) {
String message = String.format("Schema is not present for the schema id = %s", schemaId);
throw new AWSSchemaRegistryException(message);
}
}

private UUID returnSchemaVersionIdIfAvailable(GetSchemaByDefinitionResponse response) {
if (response.schemaVersionId() != null
&& response.statusAsString().equals(AWSSchemaRegistryConstants.SchemaVersionStatus.AVAILABLE.toString())) {
Expand Down Expand Up @@ -266,6 +310,53 @@ public UUID createSchema(String schemaName,
return schemaVersionId;
}

/**
* List all versions of the schema
* @param schemaName Schema Name
* @return List of schema versions
* @throws AWSSchemaRegistryException on any error during the registration and fetching of schema version
*/
public List<SchemaVersionListItem> getSchemaVersions(String schemaName) {
ListSchemaVersionsRequest listSchemaVersionsRequest = getListSchemaVersionsRequest(schemaName);
List<SchemaVersionListItem> schemaVersionList = new ArrayList<>();
boolean done = false;
try {
while (!done) {
//Get list of schema versions from source registry
ListSchemaVersionsResponse listSchemaVersionsResponse = client.listSchemaVersions(listSchemaVersionsRequest);
schemaVersionList = listSchemaVersionsResponse.schemas();

//Keep paginating till the end
if (listSchemaVersionsResponse.nextToken() == null) {
done = true;
}

//Create the request object to get next set of results using the nextToken
listSchemaVersionsRequest = getListSchemaVersionsRequest(schemaName, listSchemaVersionsResponse.nextToken());
}
} catch (Exception e) {
String errorMessage = String.format("Failed to get schema version for schema = %s", schemaName);
throw new AWSSchemaRegistryException(errorMessage, e);
}

return schemaVersionList;
}

private ListSchemaVersionsRequest getListSchemaVersionsRequest(String schemaName, String nextToken) {
return ListSchemaVersionsRequest
.builder()
.nextToken(nextToken)
.schemaId(SchemaId.builder().schemaName(schemaName).registryName(glueSchemaRegistryConfiguration.getRegistryName()).build())
.build();
}

private ListSchemaVersionsRequest getListSchemaVersionsRequest(String schemaName) {
return ListSchemaVersionsRequest
.builder()
.schemaId(SchemaId.builder().schemaName(schemaName).registryName(glueSchemaRegistryConfiguration.getRegistryName()).build())
.build();
}

/**
* Register the schema and return schema version Id once it is available.
* @param schemaDefinition Schema Definition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private void validateAndSetJacksonDeserializationFeatures(Map<String, ?> configs
}
}

private boolean isPresent(Map<String, ?> configs,
public boolean isPresent(Map<String, ?> configs,
String key) {
if (!GlueSchemaRegistryUtils.getInstance()
.checkIfPresentInMap(configs, key)) {
Expand Down
40 changes: 40 additions & 0 deletions cross-region-replication-converter/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/

resources/

### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
Loading