From 0e05b1fe9878705a376d1d7b11945e2a2ea59eb2 Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Thu, 10 Apr 2025 11:46:39 -0400 Subject: [PATCH 1/6] KAFKA-438: Create DeleteOneTombstoneBusinessKeyStrategy.java --- ...DeleteOneTombstoneBusinessKeyStrategy.java | 72 +++++++++++++++++++ .../connect/sink/MongoSinkConfigTest.java | 7 ++ .../strategy/WriteModelStrategyTest.java | 67 ++++++++++++++++- 3 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java new file mode 100644 index 000000000..a92a3fbdc --- /dev/null +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java @@ -0,0 +1,72 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.kafka.connect.sink.writemodel.strategy; + +import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ID_FIELD; +import static com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelHelper.flattenKeys; + +import org.apache.kafka.connect.errors.DataException; + +import org.bson.BsonDocument; +import org.bson.BsonValue; + +import com.mongodb.client.model.DeleteOneModel; +import com.mongodb.client.model.WriteModel; + +import com.mongodb.kafka.connect.sink.Configurable; +import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig; +import com.mongodb.kafka.connect.sink.converter.SinkDocument; +import com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy; +import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy; +import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy; + +public class DeleteOneTombstoneBusinessKeyStrategy implements WriteModelStrategy, Configurable { + + private boolean isPartialId = false; + + @Override + public WriteModel createWriteModel(final SinkDocument document) { + BsonDocument vk = + document + .getKeyDoc() + .orElseThrow( + () -> + new DataException( + "Could not build the WriteModel,the key document was missing unexpectedly")); + + BsonValue idValue = vk.get(ID_FIELD); + if (idValue == null || !idValue.isDocument()) { + throw new DataException( + "Could not build the WriteModel, the key document does not contain an _id field of" + + " type BsonDocument which holds the business key fields."); + } + + BsonDocument businessKey = idValue.asDocument(); + vk.remove(ID_FIELD); + if (isPartialId) { + businessKey = flattenKeys(businessKey); + } + return new DeleteOneModel<>(businessKey); + } + + @Override + public void configure(final MongoSinkTopicConfig configuration) { + IdStrategy idStrategy = configuration.getIdStrategy(); + isPartialId = + idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy; + } +} diff --git a/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java b/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java index cb3e20f04..e8b3c4f2a 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java @@ -108,6 +108,7 @@ import com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy; +import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneTombstoneBusinessKeyStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy; @@ -712,6 +713,9 @@ Collection testGetSingleValidWriteModelStrategy() { UpdateOneBusinessKeyTimestampStrategy.class.getName(), UpdateOneBusinessKeyTimestampStrategy.class); put(DeleteOneBusinessKeyStrategy.class.getName(), DeleteOneBusinessKeyStrategy.class); + put( + DeleteOneTombstoneBusinessKeyStrategy.class.getName(), + DeleteOneTombstoneBusinessKeyStrategy.class); } }; @@ -796,6 +800,9 @@ Collection testGetSingleValidDeleteWriteModelStrategy() { UpdateOneBusinessKeyTimestampStrategy.class.getName(), UpdateOneBusinessKeyTimestampStrategy.class); put(DeleteOneBusinessKeyStrategy.class.getName(), DeleteOneBusinessKeyStrategy.class); + put( + DeleteOneTombstoneBusinessKeyStrategy.class.getName(), + DeleteOneTombstoneBusinessKeyStrategy.class); } }; diff --git a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java index ec48d913b..a45ccebca 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java @@ -68,6 +68,10 @@ class WriteModelStrategyTest { private static final DeleteOneBusinessKeyStrategy DELETE_ONE_BUSINESS_KEY_STRATEGY = new DeleteOneBusinessKeyStrategy(); private static final DeleteOneBusinessKeyStrategy DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY; + private static final DeleteOneTombstoneBusinessKeyStrategy + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY = new DeleteOneTombstoneBusinessKeyStrategy(); + private static final DeleteOneTombstoneBusinessKeyStrategy + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY; private static final SinkDocument SINK_DOCUMENT_NULL_VALUE = new SinkDocument(new BsonDocument(), null); private static final SinkDocument SINK_DOCUMENT_NULL_KEY = @@ -93,8 +97,15 @@ class WriteModelStrategyTest { UPDATE_ONE_BUSINESS_KEY_TIMESTAMPS_PARTIAL_STRATEGY.configure(partialKeyConfig); DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY = new DeleteOneBusinessKeyStrategy(); DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.configure(partialKeyConfig); + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY = + new DeleteOneTombstoneBusinessKeyStrategy(); + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.configure(partialKeyConfig); } + private static final BsonDocument KEY_DOC = + BsonDocument.parse( + "{_id: {a: {a1: 1}, b: {b1: 1, b2: 1}}, a: {a1: 0}, b: {b1: 0, b2: 0, c1: 0}}"); + private static final BsonDocument VALUE_DOC = BsonDocument.parse( "{_id: {a: {a1: 1}, b: {b1: 1, b2: 1}}, a: {a1: 1}, b: {b1: 1, b2: 1, c1: 1}}"); @@ -379,7 +390,7 @@ void testUpdateOneBusinessKeyTimestampsStrategyPartialWithValidSinkDocument() { @Test @DisplayName( - "when sink document is valid for UpdateOneBusinessKeyTimestampStrategy then correct UpdateOneModel") + "when sink document is valid for DeleteOneBusinessKeyTimestampStrategy then correct DeleteOneModel") void testDeleteOneBusinessKeyStrategyWithValidSinkDocument() { WriteModel result = DELETE_ONE_BUSINESS_KEY_STRATEGY.createWriteModel( @@ -392,7 +403,7 @@ void testDeleteOneBusinessKeyStrategyWithValidSinkDocument() { @Test @DisplayName( - "when sink document is valid for UpdateOneBusinessKeyTimestampStrategy with partial id strategy then correct UpdateOneModel") + "when sink document is valid for DeleteOneBusinessKeyTimestampStrategy with partial id strategy then correct DeleteOneModel") void testDeleteOneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { WriteModel result = DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( @@ -403,6 +414,33 @@ void testDeleteOneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { assertEquals(BUSINESS_KEY_FLATTENED_FILTER, writeModel.getFilter()); } + @Test + @DisplayName( + "when sink document is valid for DeleteOneTombstoneBusinessKeyTimestampStrategy then correct DeleteOneModel") + void testDeleteOneTombstoneBusinessKeyStrategyWithValidSinkDocument() { + + WriteModel result = + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( + new SinkDocument(KEY_DOC.clone(), null)); + assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel"); + + DeleteOneModel writeModel = (DeleteOneModel) result; + assertEquals(KEY_DOC.get("_id"), writeModel.getFilter()); + } + + @Test + @DisplayName( + "when sink document is valid for DeleteOneTombstoneBusinessKeyTimestampStrategy with partial id strategy then correct DeleteOneModel") + void testDeleteOneTombstoneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { + WriteModel result = + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( + new SinkDocument(KEY_DOC.clone(), null)); + assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel"); + + DeleteOneModel writeModel = (DeleteOneModel) result; + assertEquals(BUSINESS_KEY_FLATTENED_FILTER, writeModel.getFilter()); + } + @Test @DisplayName("Test handling empty or missing sink document data") void testIEmptyOrMissingSinkDocumentData() { @@ -493,7 +531,30 @@ void testIEmptyOrMissingSinkDocumentData() { assertThrows( DataException.class, () -> - DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( + DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel(SINK_DOCUMENT_EMPTY)), + () -> + assertThrows( + DataException.class, + () -> + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( + SINK_DOCUMENT_NULL_KEY)), + () -> + assertThrows( + DataException.class, + () -> + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( + SINK_DOCUMENT_EMPTY)), + () -> + assertThrows( + DataException.class, + () -> + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( + SINK_DOCUMENT_NULL_KEY)), + () -> + assertThrows( + DataException.class, + () -> + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( SINK_DOCUMENT_EMPTY))); } } From a657014a2096118c2a1c9d54856cdf9ceccd4ca3 Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Thu, 10 Apr 2025 14:20:36 -0400 Subject: [PATCH 2/6] base DeleteOneTombstoneBusinessKeyStrategy on DeleteOneDefaultStrategy --- ...DeleteOneTombstoneBusinessKeyStrategy.java | 21 ++++++---------- .../strategy/WriteModelStrategyTest.java | 24 +++++++------------ 2 files changed, 15 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java index a92a3fbdc..e47cfb5f5 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java @@ -22,7 +22,6 @@ import org.apache.kafka.connect.errors.DataException; import org.bson.BsonDocument; -import org.bson.BsonValue; import com.mongodb.client.model.DeleteOneModel; import com.mongodb.client.model.WriteModel; @@ -35,12 +34,12 @@ import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy; public class DeleteOneTombstoneBusinessKeyStrategy implements WriteModelStrategy, Configurable { - + private IdStrategy idStrategy; private boolean isPartialId = false; @Override public WriteModel createWriteModel(final SinkDocument document) { - BsonDocument vk = + BsonDocument kd = document .getKeyDoc() .orElseThrow( @@ -48,24 +47,18 @@ public WriteModel createWriteModel(final SinkDocument document) { new DataException( "Could not build the WriteModel,the key document was missing unexpectedly")); - BsonValue idValue = vk.get(ID_FIELD); - if (idValue == null || !idValue.isDocument()) { - throw new DataException( - "Could not build the WriteModel, the key document does not contain an _id field of" - + " type BsonDocument which holds the business key fields."); - } - - BsonDocument businessKey = idValue.asDocument(); - vk.remove(ID_FIELD); if (isPartialId) { + BsonDocument businessKey = idStrategy.generateId(document, null).asDocument(); businessKey = flattenKeys(businessKey); + return new DeleteOneModel<>(businessKey); } - return new DeleteOneModel<>(businessKey); + + return new DeleteOneModel<>(kd.containsKey(ID_FIELD) ? kd : new BsonDocument(ID_FIELD, kd)); } @Override public void configure(final MongoSinkTopicConfig configuration) { - IdStrategy idStrategy = configuration.getIdStrategy(); + idStrategy = configuration.getIdStrategy(); isPartialId = idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy; } diff --git a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java index a45ccebca..5240e3a5f 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java @@ -85,6 +85,9 @@ class WriteModelStrategyTest { MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_CONFIG, PartialKeyStrategy.class.getName()); configMap.put( MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_PARTIAL_KEY_PROJECTION_TYPE_CONFIG, "AllowList"); + configMap.put( + MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_PARTIAL_KEY_PROJECTION_LIST_CONFIG, + "a.a1,b.b1,b.b2"); MongoSinkTopicConfig partialKeyConfig = new MongoSinkConfig(configMap).getMongoSinkTopicConfig(TEST_TOPIC); @@ -104,7 +107,7 @@ class WriteModelStrategyTest { private static final BsonDocument KEY_DOC = BsonDocument.parse( - "{_id: {a: {a1: 1}, b: {b1: 1, b2: 1}}, a: {a1: 0}, b: {b1: 0, b2: 0, c1: 0}}"); + "{_id: {a: {a1: 1}, b: {b1: 1, b2: 1}}, a: {a1: 1}, b: {b1: 1, b2: 1, c1: 1}}"); private static final BsonDocument VALUE_DOC = BsonDocument.parse( @@ -418,14 +421,15 @@ void testDeleteOneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { @DisplayName( "when sink document is valid for DeleteOneTombstoneBusinessKeyTimestampStrategy then correct DeleteOneModel") void testDeleteOneTombstoneBusinessKeyStrategyWithValidSinkDocument() { + BsonDocument keyDoc = BsonDocument.parse("{id: 1234}"); WriteModel result = DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( - new SinkDocument(KEY_DOC.clone(), null)); + new SinkDocument(keyDoc.clone(), null)); assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel"); DeleteOneModel writeModel = (DeleteOneModel) result; - assertEquals(KEY_DOC.get("_id"), writeModel.getFilter()); + assertEquals(new BsonDocument("_id", keyDoc), writeModel.getFilter()); } @Test @@ -538,23 +542,11 @@ void testIEmptyOrMissingSinkDocumentData() { () -> DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( SINK_DOCUMENT_NULL_KEY)), - () -> - assertThrows( - DataException.class, - () -> - DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( - SINK_DOCUMENT_EMPTY)), - () -> - assertThrows( - DataException.class, - () -> - DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( - SINK_DOCUMENT_NULL_KEY)), () -> assertThrows( DataException.class, () -> DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( - SINK_DOCUMENT_EMPTY))); + SINK_DOCUMENT_NULL_KEY))); } } From 430ec0e24fd11f1584d412b638265b83e23676fa Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Thu, 10 Apr 2025 14:24:17 -0400 Subject: [PATCH 3/6] adjust tests --- .../sink/writemodel/strategy/WriteModelStrategyTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java index 5240e3a5f..660105e61 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java @@ -107,7 +107,7 @@ class WriteModelStrategyTest { private static final BsonDocument KEY_DOC = BsonDocument.parse( - "{_id: {a: {a1: 1}, b: {b1: 1, b2: 1}}, a: {a1: 1}, b: {b1: 1, b2: 1, c1: 1}}"); + "{_id: {a: {a1: 0}, b: {b1: 0, b2: 0}}, a: {a1: 0}, b: {b1: 0, b2: 0, c1: 0}}"); private static final BsonDocument VALUE_DOC = BsonDocument.parse( @@ -125,6 +125,9 @@ class WriteModelStrategyTest { private static final BsonDocument BUSINESS_KEY_FLATTENED_FILTER = BsonDocument.parse("{'a.a1': 1, 'b.b1': 1, 'b.b2': 1}"); + private static final BsonDocument BUSINESS_KEY_FLATTENED_KEY_IDS_FILTER = + BsonDocument.parse("{'a.a1': 0, 'b.b1': 0, 'b.b2': 0}"); + @Test @DisplayName("Ensure default write model strategy sets the expected WriteModelStrategy") void testDefaultWriteModelStrategy() { @@ -442,7 +445,7 @@ void testDeleteOneTombstoneBusinessKeyStrategyStrategyPartialWithValidSinkDocume assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel"); DeleteOneModel writeModel = (DeleteOneModel) result; - assertEquals(BUSINESS_KEY_FLATTENED_FILTER, writeModel.getFilter()); + assertEquals(BUSINESS_KEY_FLATTENED_KEY_IDS_FILTER, writeModel.getFilter()); } @Test From 9f379a31a1f751f01a451d3a7cf69e0ab326cb79 Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Fri, 11 Apr 2025 09:34:03 -0400 Subject: [PATCH 4/6] throw exception if not configured as expected --- ...DeleteOneTombstoneBusinessKeyStrategy.java | 22 +++++------ .../strategy/WriteModelStrategyTest.java | 37 +++++++------------ 2 files changed, 23 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java index e47cfb5f5..70576be3d 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java @@ -16,9 +16,9 @@ package com.mongodb.kafka.connect.sink.writemodel.strategy; -import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ID_FIELD; import static com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelHelper.flattenKeys; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; import org.bson.BsonDocument; @@ -31,7 +31,6 @@ import com.mongodb.kafka.connect.sink.converter.SinkDocument; import com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy; import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy; -import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy; public class DeleteOneTombstoneBusinessKeyStrategy implements WriteModelStrategy, Configurable { private IdStrategy idStrategy; @@ -39,13 +38,12 @@ public class DeleteOneTombstoneBusinessKeyStrategy implements WriteModelStrategy @Override public WriteModel createWriteModel(final SinkDocument document) { - BsonDocument kd = - document - .getKeyDoc() - .orElseThrow( - () -> - new DataException( - "Could not build the WriteModel,the key document was missing unexpectedly")); + document + .getKeyDoc() + .orElseThrow( + () -> + new DataException( + "Could not build the WriteModel,the key document was missing unexpectedly")); if (isPartialId) { BsonDocument businessKey = idStrategy.generateId(document, null).asDocument(); @@ -53,13 +51,13 @@ public WriteModel createWriteModel(final SinkDocument document) { return new DeleteOneModel<>(businessKey); } - return new DeleteOneModel<>(kd.containsKey(ID_FIELD) ? kd : new BsonDocument(ID_FIELD, kd)); + throw new ConnectException( + "DeleteOneTombstoneBusinessKeyStrategy expects PartialKeyStrategy to be defined"); } @Override public void configure(final MongoSinkTopicConfig configuration) { idStrategy = configuration.getIdStrategy(); - isPartialId = - idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy; + isPartialId = idStrategy instanceof PartialKeyStrategy; } } diff --git a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java index 660105e61..c59e2102d 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java @@ -29,6 +29,7 @@ import java.util.Map; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -105,10 +106,6 @@ class WriteModelStrategyTest { DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.configure(partialKeyConfig); } - private static final BsonDocument KEY_DOC = - BsonDocument.parse( - "{_id: {a: {a1: 0}, b: {b1: 0, b2: 0}}, a: {a1: 0}, b: {b1: 0, b2: 0, c1: 0}}"); - private static final BsonDocument VALUE_DOC = BsonDocument.parse( "{_id: {a: {a1: 1}, b: {b1: 1, b2: 1}}, a: {a1: 1}, b: {b1: 1, b2: 1, c1: 1}}"); @@ -125,9 +122,6 @@ class WriteModelStrategyTest { private static final BsonDocument BUSINESS_KEY_FLATTENED_FILTER = BsonDocument.parse("{'a.a1': 1, 'b.b1': 1, 'b.b2': 1}"); - private static final BsonDocument BUSINESS_KEY_FLATTENED_KEY_IDS_FILTER = - BsonDocument.parse("{'a.a1': 0, 'b.b1': 0, 'b.b2': 0}"); - @Test @DisplayName("Ensure default write model strategy sets the expected WriteModelStrategy") void testDefaultWriteModelStrategy() { @@ -420,32 +414,21 @@ void testDeleteOneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { assertEquals(BUSINESS_KEY_FLATTENED_FILTER, writeModel.getFilter()); } - @Test - @DisplayName( - "when sink document is valid for DeleteOneTombstoneBusinessKeyTimestampStrategy then correct DeleteOneModel") - void testDeleteOneTombstoneBusinessKeyStrategyWithValidSinkDocument() { - BsonDocument keyDoc = BsonDocument.parse("{id: 1234}"); - - WriteModel result = - DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( - new SinkDocument(keyDoc.clone(), null)); - assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel"); - - DeleteOneModel writeModel = (DeleteOneModel) result; - assertEquals(new BsonDocument("_id", keyDoc), writeModel.getFilter()); - } - @Test @DisplayName( "when sink document is valid for DeleteOneTombstoneBusinessKeyTimestampStrategy with partial id strategy then correct DeleteOneModel") void testDeleteOneTombstoneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { + BsonDocument keyDoc = + BsonDocument.parse( + "{_id: {a: {a1: 0}, b: {b1: 0, b2: 0}}, a: {a1: 0}, b: {b1: 0, b2: 0, c1: 0}}"); + ; WriteModel result = DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( - new SinkDocument(KEY_DOC.clone(), null)); + new SinkDocument(keyDoc, null)); assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel"); DeleteOneModel writeModel = (DeleteOneModel) result; - assertEquals(BUSINESS_KEY_FLATTENED_KEY_IDS_FILTER, writeModel.getFilter()); + assertEquals(BsonDocument.parse("{'a.a1': 0, 'b.b1': 0, 'b.b2': 0}"), writeModel.getFilter()); } @Test @@ -545,6 +528,12 @@ void testIEmptyOrMissingSinkDocumentData() { () -> DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( SINK_DOCUMENT_NULL_KEY)), + () -> + assertThrows( + ConnectException.class, + () -> + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( + SINK_DOCUMENT_EMPTY)), () -> assertThrows( DataException.class, From c41427431d3d6fc44469727c9b31a87524ce818f Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Fri, 11 Apr 2025 09:51:07 -0400 Subject: [PATCH 5/6] fix lint --- .../connect/sink/writemodel/strategy/WriteModelStrategyTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java index c59e2102d..6ccff2924 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java @@ -421,7 +421,6 @@ void testDeleteOneTombstoneBusinessKeyStrategyStrategyPartialWithValidSinkDocume BsonDocument keyDoc = BsonDocument.parse( "{_id: {a: {a1: 0}, b: {b1: 0, b2: 0}}, a: {a1: 0}, b: {b1: 0, b2: 0, c1: 0}}"); - ; WriteModel result = DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( new SinkDocument(keyDoc, null)); From ce84329ad6362a4d265e255b88f4403e81af9668 Mon Sep 17 00:00:00 2001 From: Calvin Thomas Nix Date: Fri, 11 Apr 2025 14:19:55 -0400 Subject: [PATCH 6/6] clean up code --- .../DeleteOneTombstoneBusinessKeyStrategy.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java index 70576be3d..bd9d58417 100644 --- a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java @@ -34,7 +34,6 @@ public class DeleteOneTombstoneBusinessKeyStrategy implements WriteModelStrategy, Configurable { private IdStrategy idStrategy; - private boolean isPartialId = false; @Override public WriteModel createWriteModel(final SinkDocument document) { @@ -45,19 +44,18 @@ public WriteModel createWriteModel(final SinkDocument document) { new DataException( "Could not build the WriteModel,the key document was missing unexpectedly")); - if (isPartialId) { - BsonDocument businessKey = idStrategy.generateId(document, null).asDocument(); - businessKey = flattenKeys(businessKey); - return new DeleteOneModel<>(businessKey); + if (!(idStrategy instanceof PartialKeyStrategy)) { + throw new ConnectException( + "DeleteOneTombstoneBusinessKeyStrategy expects PartialKeyStrategy to be defined"); } - throw new ConnectException( - "DeleteOneTombstoneBusinessKeyStrategy expects PartialKeyStrategy to be defined"); + BsonDocument businessKey = idStrategy.generateId(document, null).asDocument(); + businessKey = flattenKeys(businessKey); + return new DeleteOneModel<>(businessKey); } @Override public void configure(final MongoSinkTopicConfig configuration) { idStrategy = configuration.getIdStrategy(); - isPartialId = idStrategy instanceof PartialKeyStrategy; } }