-
Notifications
You must be signed in to change notification settings - Fork 257
KAFKA-438: Create DeleteOneTombstoneBusinessKeyStrategy.java #175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy; | ||
import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy; | ||
|
||
public class DeleteOneTombstoneBusinessKeyStrategy implements WriteModelStrategy, Configurable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up opting to just create a new write model instead of updating DeleteOneBusinessKeyStrategy, the risk would be if we add the ability to start deleting on key values when tombstone events are sent, documents might start being deleted when they weren't in previous versions.
|
||
@Override | ||
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) { | ||
BsonDocument kd = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed until after the if (isPartialId)
statement. The advantage of moving this down is that we would be able to support generating the business key from both the message key and value with the PartialKeyStrategy
and PartialValueStrategy
strategies respectively.
Let's add some tests around this as well.
EDIT: Not sure if the name of this write model would still make sense in that case though 🤔
} | ||
|
||
private static final BsonDocument KEY_DOC = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[opt] just a personal preference, so feel free to ignore. Thoughts on defining this directly within the test that uses it to improve readability? Same goes for BUSINESS_KEY_FLATTENED_KEY_IDS_FILTER
.
return new DeleteOneModel<>(businessKey); | ||
} | ||
|
||
return new DeleteOneModel<>(kd.containsKey(ID_FIELD) ? kd : new BsonDocument(ID_FIELD, kd)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] this is a bit odd to me. I see we do something similar in the DeleteOneDefaultStrategy
, but that's only with the DefaultIdFieldStrategy
, which seems to only be used for testing.
Would it make more sense to always create the delete filter from the value returned by idStrategy.generateId()
? I was thinking about something along these lines:
BsonDocument businessKey = idStrategy.generateId(document, null).asDocument();
return new DeleteOneModel<>(businessKey.isDocument() ? businessKey : new BsonDocument(ID_FIELD, businessKey));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, originally I had this logic throwing an error saying something along the lines of "DeleteOneTombstoneBusinessKeyStrategy expects PartialKeyStrategy to be defined"
This logic would let us remove defining that kd variable.
|
||
if (isPartialId) { | ||
BsonDocument businessKey = idStrategy.generateId(document, null).asDocument(); | ||
businessKey = flattenKeys(businessKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] any ideas on why we flatten the keys only with a partial id strategy? Would it make sense to do the same with a FullKeyStrategy
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this to maintain consistency with the other *BusinessKeyStrategy
s, I guess technically they could always just use PartialKeyStrategy and just define all of the fields.
public void configure(final MongoSinkTopicConfig configuration) { | ||
idStrategy = configuration.getIdStrategy(); | ||
isPartialId = | ||
idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this is specifically for tombstone events I'm going to remove PartialValueStrategy
throw new ConnectException( | ||
"DeleteOneTombstoneBusinessKeyStrategy expects PartialKeyStrategy to be defined"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arahmanan this is what I had originally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! just an optional comment.
new DataException( | ||
"Could not build the WriteModel,the key document was missing unexpectedly")); | ||
|
||
if (isPartialId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[opt] thoughts on returning early for the error case instead? Also, is isPartialId
still needed? Could we directly check if (idStrategy instanceof PartialKeyStrategy)
?
No description provided.