-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19826: Implement coordinator adaptive batch linger time #20780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-19826: Implement coordinator adaptive batch linger time #20780
Conversation
NB: This refers to the CoordinatorRuntimeBuilder's default append linger time, not the config-level default.
| private final Deque<CoordinatorEvent> queue; | ||
| private boolean inEvent; | ||
|
|
||
| public DirectEventProcessor() { | ||
| this.queue = new LinkedList<>(); | ||
| this.inEvent = false; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can now enqueue flush events while in the middle of another event. The flush event must be run after the current event, so we have to introduce a queue here.
| .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) | ||
| .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) | ||
| .withSerializer(new StringSerializer()) | ||
| .withAppendLingerMs(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many of the tests in this file run with an adaptive linger time now, except for those with an explicit linger time.
I updated a handful of tests using the ManualEventProcessor to use an append linger time of 0 (preserving the existing behavior) to avoid test churn.
|
The changes are a little long, so it may be easier to review this PR commit by commit, at least for the first 5 commits. |
FrankYang0529
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Leave some minor comments.
| public Builder<S, U> withAppendLingerMs(int appendLingerMs) { | ||
| this.appendLingerMs = OptionalInt.of(appendLingerMs); | ||
| return this; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the references, this function is only used by test code. Should we remove it and update test cases to use OptionalInt one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can remove the method.
| if (appendLingerMs == null) | ||
| appendLingerMs = OptionalInt.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the production code, the appendLingerMs input cannot be null. How about we set a default value OptionalInt.empty() to appendLingerMs and remove appendLingerMs == null check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following the existing pattern in the builder. We don't set a default value for logPrefix, logContext and compression and initialize them in build().
| if (appendLingerMs.isPresent() && appendLingerMs.getAsInt() < -1) | ||
| throw new IllegalArgumentException("AppendLinger must be empty or >= 0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the appendLingerMs is -1, it will be input as OptionalInt.empty(). How about checking the value is >= 0 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! The check is supposed to reject -1.
|
@FrankYang0529 Thanks for the review! I updated the code. I also added some tests for the config logic. |
TaiJuWu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squah-confluent Thanks for the patch. I left a few minor comments for consideration.
| ConfigDef.ValidList.in(false, Group.GroupType.documentValidValues()), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC) | ||
| .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, GROUP_COORDINATOR_NUM_THREADS_DOC) | ||
| .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) | ||
| .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(-1), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether we could strengthen the validation here because -0.5 is not a valid value for instance. Would it be possible to accept -1 or atLeast(0)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. I tested -0.5 and it's already disallowed
org.apache.kafka.common.config.ConfigException: Invalid value -0.5 for configuration group.coordinator.append.linger.ms: Not a number of type INT
at app//org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:776)
at app//org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:531)
at app//org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:524)
at app//org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:118)
at app//org.apache.kafka.server.config.AbstractKafkaConfig.<init>(AbstractKafkaConfig.java:78)
| .define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC) | ||
| .define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC) | ||
| .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC) | ||
| .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(-1), MEDIUM, APPEND_LINGER_MS_DOC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto about the validation.
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks.
|
@squah-confluent There are some checkstyle errors. Could you please fix them? |
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Add support for an adaptive batch linger time in the group and share
coordinators. When an adaptive batch linger time is enabled, we no
longer create a timer to flush the current batch. Instead, we append a
flush operation at the end of the event queue so that any currently
queued operations are naturally collected into the batch.
To avoid double flushing from hitting the maximum batch size or
transactional writes, we number batches with an epoch to check whether
the batch has already been flushed.
The group.coordinator.append.linger.ms and
share.coordinator.append.linger.ms configs are extended to allow -1, to
specify an adaptive append linger time. The default for these configs is
also updated to -1.
Reviewers: PoAn Yang [email protected], TaiJuWu [email protected],
David Jacot [email protected]