Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,14 @@ private boolean maybeAddAcknowledgements(ShareSessionHandler handler,
}
}

public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements) {
public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
if (!fetchMoreRecords) {
log.debug("Fetch more data");
fetchMoreRecords = true;
}

// Process both acknowledgement maps and sends them in the next ShareFetch.
// Store the acknowledgements and send them in the next ShareFetch.
processAcknowledgementsMap(acknowledgementsMap);
processAcknowledgementsMap(controlRecordAcknowledgements);
}

private void processAcknowledgementsMap(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
// and is used to prevent multithreaded access
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refCount = new AtomicInteger(0);
private boolean shouldSendShareFetchEvent = false;

ShareConsumerImpl(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
Expand Down Expand Up @@ -581,6 +582,8 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
throw new IllegalStateException("Consumer is not subscribed to any topics.");
}

shouldSendShareFetchEvent = true;

do {
// Make sure the network thread can tell the application is actively polling
applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
Expand Down Expand Up @@ -654,28 +657,29 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> ack
if (currentFetch.isEmpty()) {
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
if (fetch.isEmpty()) {
// Check for any acknowledgements which could have come from control records (GAP) and include them.
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap, fetch.takeAcknowledgedRecords()));
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements = fetch.takeAcknowledgedRecords();

// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
if (!controlRecordAcknowledgements.isEmpty()) {
// Asynchronously commit any waiting acknowledgements from control records.
sendShareAcknowledgeAsyncEvent(controlRecordAcknowledgements);
}
// We only send one ShareFetchEvent per poll call.
if (shouldSendShareFetchEvent) {
// Check for any acknowledgements which could have come from control records (GAP) and include them.
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap));
shouldSendShareFetchEvent = false;
// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
}
} else if (!acknowledgementsMap.isEmpty()) {
// Asynchronously commit any waiting acknowledgements
Timer timer = time.timer(defaultApiTimeoutMs);
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));

// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
}
return fetch;
} else {
if (!acknowledgementsMap.isEmpty()) {
// Asynchronously commit any waiting acknowledgements
Timer timer = time.timer(defaultApiTimeoutMs);
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));

// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
}
if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
// We cannot leave unacknowledged records in EXPLICIT acknowledgement mode, so we throw an exception to the application.
Expand All @@ -685,6 +689,14 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> ack
}
}

private void sendShareAcknowledgeAsyncEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
Timer timer = time.timer(defaultApiTimeoutMs);
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));

// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ private void process(@SuppressWarnings("unused") final StopFindCoordinatorOnClos
* Process event that tells the share consume request manager to fetch more records.
*/
private void process(final ShareFetchEvent event) {
requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.fetch(event.acknowledgementsMap(), event.controlRecordAcknowledgements()));
requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.fetch(event.acknowledgementsMap()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,15 @@ public class ShareFetchEvent extends ApplicationEvent {

private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;

private final Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements;

public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements) {
public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
super(Type.SHARE_FETCH);
this.acknowledgementsMap = acknowledgementsMap;
this.controlRecordAcknowledgements = controlRecordAcknowledgements;
}

public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}

public Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements() {
return controlRecordAcknowledgements;
}

@Override
protected String toStringBase() {
return super.toStringBase() + ", acknowledgementsMap=" + acknowledgementsMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand Down Expand Up @@ -141,9 +140,6 @@ public void testVerifyHeartbeats() throws InterruptedException {
}
}

// This test is proving sufficiently flaky that it has been disabled pending investigation
@Disabled
// @Flaky("KAFKA-18488")
@Test
public void testVerifyFetchAndCommitSyncImplicit() {
ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false,
Expand Down Expand Up @@ -218,9 +214,6 @@ public void testVerifyFetchAndCommitSyncImplicit() {
}
}

// This test is proving sufficiently flaky that it has been disabled pending investigation
@Disabled
//@Flaky("KAFKA-18794")
@Test
public void testVerifyFetchAndCloseImplicit() {
ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false,
Expand Down
Loading