diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java new file mode 100644 index 000000000..bd9d58417 --- /dev/null +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneTombstoneBusinessKeyStrategy.java @@ -0,0 +1,61 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.kafka.connect.sink.writemodel.strategy; + +import static com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelHelper.flattenKeys; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; + +import org.bson.BsonDocument; + +import com.mongodb.client.model.DeleteOneModel; +import com.mongodb.client.model.WriteModel; + +import com.mongodb.kafka.connect.sink.Configurable; +import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig; +import com.mongodb.kafka.connect.sink.converter.SinkDocument; +import com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy; +import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy; + +public class DeleteOneTombstoneBusinessKeyStrategy implements WriteModelStrategy, Configurable { + private IdStrategy idStrategy; + + @Override + public WriteModel createWriteModel(final SinkDocument document) { + document + .getKeyDoc() + .orElseThrow( + () -> + new DataException( + "Could not build the WriteModel,the key document was missing unexpectedly")); + + if (!(idStrategy instanceof PartialKeyStrategy)) { + throw new ConnectException( + "DeleteOneTombstoneBusinessKeyStrategy expects PartialKeyStrategy to be defined"); + } + + BsonDocument businessKey = idStrategy.generateId(document, null).asDocument(); + businessKey = flattenKeys(businessKey); + return new DeleteOneModel<>(businessKey); + } + + @Override + public void configure(final MongoSinkTopicConfig configuration) { + idStrategy = configuration.getIdStrategy(); + } +} diff --git a/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java b/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java index cb3e20f04..e8b3c4f2a 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java @@ -108,6 +108,7 @@ import com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy; +import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneTombstoneBusinessKeyStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy; @@ -712,6 +713,9 @@ Collection testGetSingleValidWriteModelStrategy() { UpdateOneBusinessKeyTimestampStrategy.class.getName(), UpdateOneBusinessKeyTimestampStrategy.class); put(DeleteOneBusinessKeyStrategy.class.getName(), DeleteOneBusinessKeyStrategy.class); + put( + DeleteOneTombstoneBusinessKeyStrategy.class.getName(), + DeleteOneTombstoneBusinessKeyStrategy.class); } }; @@ -796,6 +800,9 @@ Collection testGetSingleValidDeleteWriteModelStrategy() { UpdateOneBusinessKeyTimestampStrategy.class.getName(), UpdateOneBusinessKeyTimestampStrategy.class); put(DeleteOneBusinessKeyStrategy.class.getName(), DeleteOneBusinessKeyStrategy.class); + put( + DeleteOneTombstoneBusinessKeyStrategy.class.getName(), + DeleteOneTombstoneBusinessKeyStrategy.class); } }; diff --git a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java index ec48d913b..6ccff2924 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java @@ -29,6 +29,7 @@ import java.util.Map; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -68,6 +69,10 @@ class WriteModelStrategyTest { private static final DeleteOneBusinessKeyStrategy DELETE_ONE_BUSINESS_KEY_STRATEGY = new DeleteOneBusinessKeyStrategy(); private static final DeleteOneBusinessKeyStrategy DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY; + private static final DeleteOneTombstoneBusinessKeyStrategy + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY = new DeleteOneTombstoneBusinessKeyStrategy(); + private static final DeleteOneTombstoneBusinessKeyStrategy + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY; private static final SinkDocument SINK_DOCUMENT_NULL_VALUE = new SinkDocument(new BsonDocument(), null); private static final SinkDocument SINK_DOCUMENT_NULL_KEY = @@ -81,6 +86,9 @@ class WriteModelStrategyTest { MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_CONFIG, PartialKeyStrategy.class.getName()); configMap.put( MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_PARTIAL_KEY_PROJECTION_TYPE_CONFIG, "AllowList"); + configMap.put( + MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_PARTIAL_KEY_PROJECTION_LIST_CONFIG, + "a.a1,b.b1,b.b2"); MongoSinkTopicConfig partialKeyConfig = new MongoSinkConfig(configMap).getMongoSinkTopicConfig(TEST_TOPIC); @@ -93,6 +101,9 @@ class WriteModelStrategyTest { UPDATE_ONE_BUSINESS_KEY_TIMESTAMPS_PARTIAL_STRATEGY.configure(partialKeyConfig); DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY = new DeleteOneBusinessKeyStrategy(); DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.configure(partialKeyConfig); + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY = + new DeleteOneTombstoneBusinessKeyStrategy(); + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.configure(partialKeyConfig); } private static final BsonDocument VALUE_DOC = @@ -379,7 +390,7 @@ void testUpdateOneBusinessKeyTimestampsStrategyPartialWithValidSinkDocument() { @Test @DisplayName( - "when sink document is valid for UpdateOneBusinessKeyTimestampStrategy then correct UpdateOneModel") + "when sink document is valid for DeleteOneBusinessKeyTimestampStrategy then correct DeleteOneModel") void testDeleteOneBusinessKeyStrategyWithValidSinkDocument() { WriteModel result = DELETE_ONE_BUSINESS_KEY_STRATEGY.createWriteModel( @@ -392,7 +403,7 @@ void testDeleteOneBusinessKeyStrategyWithValidSinkDocument() { @Test @DisplayName( - "when sink document is valid for UpdateOneBusinessKeyTimestampStrategy with partial id strategy then correct UpdateOneModel") + "when sink document is valid for DeleteOneBusinessKeyTimestampStrategy with partial id strategy then correct DeleteOneModel") void testDeleteOneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { WriteModel result = DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( @@ -403,6 +414,22 @@ void testDeleteOneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { assertEquals(BUSINESS_KEY_FLATTENED_FILTER, writeModel.getFilter()); } + @Test + @DisplayName( + "when sink document is valid for DeleteOneTombstoneBusinessKeyTimestampStrategy with partial id strategy then correct DeleteOneModel") + void testDeleteOneTombstoneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { + BsonDocument keyDoc = + BsonDocument.parse( + "{_id: {a: {a1: 0}, b: {b1: 0, b2: 0}}, a: {a1: 0}, b: {b1: 0, b2: 0, c1: 0}}"); + WriteModel result = + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( + new SinkDocument(keyDoc, null)); + assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel"); + + DeleteOneModel writeModel = (DeleteOneModel) result; + assertEquals(BsonDocument.parse("{'a.a1': 0, 'b.b1': 0, 'b.b2': 0}"), writeModel.getFilter()); + } + @Test @DisplayName("Test handling empty or missing sink document data") void testIEmptyOrMissingSinkDocumentData() { @@ -493,7 +520,24 @@ void testIEmptyOrMissingSinkDocumentData() { assertThrows( DataException.class, () -> - DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( - SINK_DOCUMENT_EMPTY))); + DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel(SINK_DOCUMENT_EMPTY)), + () -> + assertThrows( + DataException.class, + () -> + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( + SINK_DOCUMENT_NULL_KEY)), + () -> + assertThrows( + ConnectException.class, + () -> + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_STRATEGY.createWriteModel( + SINK_DOCUMENT_EMPTY)), + () -> + assertThrows( + DataException.class, + () -> + DELETE_ONE_TOMBSTONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( + SINK_DOCUMENT_NULL_KEY))); } }