Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
cc35591
KAFKA-19588: Reduce number of events generated in AsyncKafkaConsumer.…
kirktrue Aug 10, 2025
0c4dff6
Updates
kirktrue Aug 10, 2025
d45011c
Updated naming
kirktrue Aug 15, 2025
9d1a934
Made AutoCommitState a top-level class and refactoring to support
kirktrue Aug 17, 2025
4242127
[WIP] Clean up
kirktrue Aug 17, 2025
e26b71c
Revert changes to CommitRequestManager to fix failures in CommitReque…
kirktrue Aug 17, 2025
92e6cfe
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 18, 2025
197cff4
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 19, 2025
e4b983d
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 25, 2025
4cca2d8
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 26, 2025
0e28d8c
Remove redundant autoCommitState timer update in poll
kirktrue Aug 26, 2025
1ea59a1
Updates for clarity
kirktrue Aug 27, 2025
b26d7cf
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 27, 2025
a06ca40
Refactoring to make the intention a little more obvious and in keepin…
kirktrue Aug 27, 2025
d6fe1fd
Reverting unnecessary whitespace diffs
kirktrue Aug 27, 2025
3643e3e
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 28, 2025
278b06b
Optimization to send only one PollEvent per poll() does not work, nee…
kirktrue Aug 28, 2025
d95cc22
Merge branch 'trunk' into KAFKA-19588-avoid-poll-event
kirktrue Aug 28, 2025
9cf406f
Updated AbstractMembershipManager comments related to use of SharedAu…
kirktrue Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public class ConsumerConfig extends AbstractConfig {
*/
public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
private static final String AUTO_COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>enable.auto.commit</code> is set to <code>true</code>.";
public static final int DEFAULT_AUTO_COMMIT_INTERVAL_MS = 5000;

/**
* <code>partition.assignment.strategy</code>
Expand Down Expand Up @@ -462,7 +463,7 @@ public class ConsumerConfig extends AbstractConfig {
ENABLE_AUTO_COMMIT_DOC)
.define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
Type.INT,
5000,
DEFAULT_AUTO_COMMIT_INTERVAL_MS,
atLeast(0),
Importance.LOW,
AUTO_COMMIT_INTERVAL_MS_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
*/
private final ConsumerMetadata metadata;

/**
* Keeps track of the auto-commit state.
*
* <p/>
*
* <em>Note</em>: per its class name, this state is <em>shared</em> with the application thread, so care must be
* taken to evaluate how it's used elsewhere when updating related logic.
*/
protected final SharedAutoCommitState autoCommitState;

/**
* Logger.
*/
Expand Down Expand Up @@ -137,10 +147,17 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl

/**
* If there is a reconciliation running (triggering commit, callbacks) for the
* assignmentReadyToReconcile. This will be true if {@link #maybeReconcile(boolean)} has been triggered
* after receiving a heartbeat response, or a metadata update.
* assignmentReadyToReconcile. {@link SharedReconciliationState#isInProgress()} will be true if
* {@link #maybeReconcile(boolean)} has been triggered after receiving a heartbeat response, or a metadata update.
* Calling code should generally favor {@link #reconciliationInProgress()} for its clarity over direct use of
* this state.
*
* <p/>
*
* <em>Note</em>: per its class name, this state is <em>shared</em> with the application thread, so care must be
* taken to evaluate how it's used elsewhere when updating related logic.
*/
private boolean reconciliationInProgress;
private final SharedReconciliationState reconciliationState;

/**
* True if a reconciliation is in progress and the member rejoined the group since the start
Expand Down Expand Up @@ -192,8 +209,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
*/
private boolean isPollTimerExpired;

private final boolean autoCommitEnabled;

/**
* Indicate the operation on consumer group membership that the consumer will perform when leaving the group.
* The property should remain {@code GroupMembershipOperation.DEFAULT} until the consumer is closing.
Expand All @@ -208,7 +223,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
Logger log,
Time time,
RebalanceMetricsManager metricsManager,
boolean autoCommitEnabled) {
SharedConsumerState sharedConsumerState) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.subscriptions = subscriptions;
Expand All @@ -220,7 +235,8 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
this.stateUpdatesListeners = new ArrayList<>();
this.time = time;
this.metricsManager = metricsManager;
this.autoCommitEnabled = autoCommitEnabled;
this.autoCommitState = sharedConsumerState.autoCommitState();
this.reconciliationState = sharedConsumerState.reconciliationState();
}

/**
Expand Down Expand Up @@ -530,7 +546,7 @@ public void transitionToJoining() {
"the member is in FATAL state");
return;
}
if (reconciliationInProgress) {
if (reconciliationInProgress()) {
rejoinedWhileReconciliationInProgress = true;
}
resetEpoch();
Expand Down Expand Up @@ -830,7 +846,7 @@ public void maybeReconcile(boolean canCommit) {
"current assignment.");
return;
}
if (reconciliationInProgress) {
if (reconciliationInProgress()) {
log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. " +
"Assignment {} will be handled in the next reconciliation loop.", currentTargetAssignment);
return;
Expand All @@ -850,7 +866,7 @@ public void maybeReconcile(boolean canCommit) {
return;
}

if (autoCommitEnabled && !canCommit) return;
if (autoCommitState.isAutoCommitEnabled() && !canCommit) return;
markReconciliationInProgress();

// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are
Expand Down Expand Up @@ -963,7 +979,7 @@ private void revokeAndAssign(LocalAssignment resolvedAssignment,
log.error("Reconciliation failed.", error);
markReconciliationCompleted();
} else {
if (reconciliationInProgress && !maybeAbortReconciliation()) {
if (reconciliationInProgress() && !maybeAbortReconciliation()) {
currentAssignment = resolvedAssignment;

signalReconciliationCompleting();
Expand Down Expand Up @@ -1034,15 +1050,15 @@ protected CompletableFuture<Void> signalPartitionsLost(Set<TopicPartition> parti
* Visible for testing.
*/
void markReconciliationInProgress() {
reconciliationInProgress = true;
reconciliationState.setInProgress(true);
rejoinedWhileReconciliationInProgress = false;
}

/**
* Visible for testing.
*/
void markReconciliationCompleted() {
reconciliationInProgress = false;
reconciliationState.setInProgress(false);
rejoinedWhileReconciliationInProgress = false;
}

Expand Down Expand Up @@ -1372,7 +1388,7 @@ Map<Uuid, SortedSet<Integer>> topicPartitionsAwaitingReconciliation() {
* by a call to {@link #maybeReconcile(boolean)}. Visible for testing.
*/
boolean reconciliationInProgress() {
return reconciliationInProgress;
return reconciliationState.isInProgress();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private StreamsRebalanceListener streamsRebalanceListener() {
private final long retryBackoffMs;
private final int requestTimeoutMs;
private final Duration defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private final SharedConsumerState sharedConsumerState;
private volatile boolean closed = false;
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
Expand Down Expand Up @@ -422,7 +422,6 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
GroupRebalanceConfig.ProtocolType.CONSUMER
);
this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
LogContext logContext = createLogContext(config, groupRebalanceConfig);
this.backgroundEventQueue = backgroundEventQueue;
this.log = logContext.logger(getClass());
Expand Down Expand Up @@ -477,6 +476,13 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig));
this.sharedConsumerState = new SharedConsumerState(
SharedAutoCommitState.newInstance(
requireNonNull(logContext),
requireNonNull(config),
requireNonNull(time)
)
);
final Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time,
logContext,
backgroundEventHandler,
Expand All @@ -492,7 +498,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
metrics,
offsetCommitCallbackInvoker,
memberStateListener,
streamsRebalanceData
streamsRebalanceData,
sharedConsumerState
);
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
metadata,
Expand Down Expand Up @@ -563,7 +570,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
int requestTimeoutMs,
int defaultApiTimeoutMs,
String groupId,
boolean autoCommitEnabled) {
SharedConsumerState sharedConsumerState) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = clientId;
Expand All @@ -586,7 +593,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.applicationEventHandler = applicationEventHandler;
this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
this.sharedConsumerState = sharedConsumerState;
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
Expand All @@ -606,7 +613,6 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.fetchBuffer = new FetchBuffer(logContext);
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.time = time;
Expand Down Expand Up @@ -662,6 +668,13 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
kafkaConsumerMetrics
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.sharedConsumerState = new SharedConsumerState(
SharedAutoCommitState.newInstance(
requireNonNull(logContext),
requireNonNull(config),
requireNonNull(time)
)
);
Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(
time,
logContext,
Expand All @@ -678,7 +691,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
metrics,
offsetCommitCallbackInvoker,
memberStateListener,
Optional.empty()
Optional.empty(),
sharedConsumerState
);
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
Expand Down Expand Up @@ -859,15 +873,9 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}


do {
PollEvent event = new PollEvent(timer.currentTimeMs());
// Make sure to let the background thread know that we are still polling.
// This will trigger async auto-commits of consumed positions when hitting
// the interval time or reconciling new assignments
applicationEventHandler.add(event);
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
// retrieve the positions to commit before proceeding with fetching new records
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());
sendPollEvent(timer);

// We must not allow wake-ups between polling for fetches and returning the records.
// If the polled fetches are not empty the consumed position has already been updated in the polling
Expand Down Expand Up @@ -903,6 +911,29 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}
}

private void sendPollEvent(Timer timer) {
long currentTimeMs = timer.currentTimeMs();

// Make sure to let the background thread know that we are still polling.
PollEvent event = new PollEvent(currentTimeMs);

if (sharedConsumerState.canSkipWaitingOnPoll(currentTimeMs)) {
// This will *not* trigger async auto-commits of consumed positions as the shared Timer for
// auto-commit interval will not change between the application thread and the network thread. This
// is true of the reconciliation state. The state will not change between the SharedConsumerState
// check above and the processing of the PollEvent.
applicationEventHandler.add(event);
} else {
// This will trigger async auto-commits of consumed positions when hitting
// the interval time or reconciling new assignments
applicationEventHandler.add(event);

// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
// retrieve the positions to commit before proceeding with fetching new records
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());
}
}

/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
* partitions.
Expand Down Expand Up @@ -1514,7 +1545,7 @@ private void autoCommitOnClose(final Timer timer) {
if (groupMetadata.get().isEmpty())
return;

if (autoCommitEnabled)
if (sharedConsumerState.isAutoCommitEnabled())
commitSyncAllConsumed(timer);

applicationEventHandler.add(new CommitOnCloseEvent());
Expand Down
Loading