Skip to content

[ISSUE 9535]: Prevent first message loss in CONSUME_FROM_LAST_OFFSET #7187 #9552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
}

/**
* Handle message queue changes during rebalancing.
* This method is called when the consumer's assigned message queues change,
* typically during consumer group rebalancing operations.
*
* @param topic the topic name
* @param mqAll all available message queues for the topic
* @param mqDivided the message queues assigned to this consumer after rebalancing
*/
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
/*
Expand Down Expand Up @@ -182,8 +191,14 @@ else if (-1 == lastOffset) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
long maxOffset = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);

if (maxOffset <= 1) {
result = 0L;
} else {
result = maxOffset;
}
} catch (MQClientException e) {
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,171 @@ public void testComputePullFromWhereWithException_eq_minus1_timestamp() throws M
assertEquals(23456L, rebalanceImpl.computePullFromWhereWithException(retryMq));
}

/**
* Regression test for maxOffset edge cases: verify fix for first message ignored issue
* Test scenario: empty message queue (maxOffset=0)
*/
@Test
public void testComputePullFromWhereWithException_emptyQueue() throws MQClientException {
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// Mock empty message queue: maxOffset returns 0
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(0L);

// Verify empty queue should start from offset 0
assertEquals("Empty message queue should start consuming from offset 0", 0L, rebalanceImpl.computePullFromWhereWithException(mq));
}

/**
* Regression test for maxOffset edge cases: verify fix for first message ignored issue
* Test scenario: single message queue (maxOffset=1)
* This is the key test case for fixing the first message ignored issue
*/
@Test
public void testComputePullFromWhereWithException_singleMessageQueue() throws MQClientException {
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// Mock single message queue: maxOffset returns 1
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(1L);

// Verify that when maxOffset=1, should start from 0 to avoid skipping the first message
assertEquals("Single message queue should start from offset 0 to avoid skipping first message", 0L, rebalanceImpl.computePullFromWhereWithException(mq));
}

/**
* Regression test for maxOffset edge cases: verify fix for first message ignored issue
* Test scenario: boundary case (maxOffset=2)
*/
@Test
public void testComputePullFromWhereWithException_boundaryCase() throws MQClientException {
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// Mock boundary case: maxOffset returns 2
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(2L);

// Verify when maxOffset=2, should start from maxOffset
assertEquals("When maxOffset=2, should start consuming from maxOffset", 2L, rebalanceImpl.computePullFromWhereWithException(mq));
}

/**
* Regression test for maxOffset edge cases: verify fix for first message ignored issue
* Test scenario: multiple messages queue (maxOffset>1)
*/
@Test
public void testComputePullFromWhereWithException_multipleMessagesQueue() throws MQClientException {
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// Mock multiple messages queue: maxOffset returns 10
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(10L);

// Verify multiple messages should start from maxOffset
assertEquals("Multiple messages queue should start consuming from maxOffset", 10L, rebalanceImpl.computePullFromWhereWithException(mq));
}

/**
* Regression test for maxOffset edge cases: verify fix for first message ignored issue
* Integration test: verify behavior difference before and after fix
*/
@Test
public void testComputePullFromWhereWithException_firstMessageNotIgnored() throws MQClientException {
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// Test various maxOffset values to verify fix logic
long[] testMaxOffsets = {0L, 1L, 2L, 5L, 100L};

for (long maxOffset : testMaxOffsets) {
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(maxOffset);

long result = rebalanceImpl.computePullFromWhereWithException(mq);

if (maxOffset <= 1) {
assertEquals("When maxOffset=" + maxOffset + ", should start from 0 to avoid skipping messages",
0L, result);
} else {
assertEquals("When maxOffset=" + maxOffset + ", should start from maxOffset",
maxOffset, result);
}
}
}

/**
* Regression test for maxOffset edge cases: verify fix for first message ignored issue
* Test scenario: verify handling for CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST
*/
@Test
public void testComputePullFromWhereWithException_lastOffsetAndMinWhenBootFirst() throws MQClientException {
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST);

// Test maxOffset=0 case
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(0L);
assertEquals("CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST mode, empty queue should start from 0",
0L, rebalanceImpl.computePullFromWhereWithException(mq));

// Test maxOffset=1 case
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(1L);
assertEquals("CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST mode, single message queue should start from 0",
0L, rebalanceImpl.computePullFromWhereWithException(mq));

// Test maxOffset>1 case
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(5L);
assertEquals("CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST mode, multiple messages queue should start from maxOffset",
5L, rebalanceImpl.computePullFromWhereWithException(mq));
}

/**
* Regression test for maxOffset edge cases: verify fix for first message ignored issue
* Test scenario: verify handling for CONSUME_FROM_MIN_OFFSET
*/
@Test
public void testComputePullFromWhereWithException_minOffset() throws MQClientException {
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET);

// Test maxOffset=0 case
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(0L);
assertEquals("CONSUME_FROM_MIN_OFFSET mode, empty queue should start from 0",
0L, rebalanceImpl.computePullFromWhereWithException(mq));

// Test maxOffset=1 case
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(1L);
assertEquals("CONSUME_FROM_MIN_OFFSET mode, single message queue should start from 0",
0L, rebalanceImpl.computePullFromWhereWithException(mq));

// Test maxOffset>1 case
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(5L);
assertEquals("CONSUME_FROM_MIN_OFFSET mode, multiple messages queue should start from maxOffset",
5L, rebalanceImpl.computePullFromWhereWithException(mq));
}

/**
* Regression test for maxOffset edge cases: verify fix for first message ignored issue
* Test scenario: verify handling for CONSUME_FROM_MAX_OFFSET
*/
@Test
public void testComputePullFromWhereWithException_maxOffset() throws MQClientException {
when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET);

// Test maxOffset=0 case
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(0L);
assertEquals("CONSUME_FROM_MAX_OFFSET mode, empty queue should start from 0",
0L, rebalanceImpl.computePullFromWhereWithException(mq));

// Test maxOffset=1 case
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(1L);
assertEquals("CONSUME_FROM_MAX_OFFSET mode, single message queue should start from 0",
0L, rebalanceImpl.computePullFromWhereWithException(mq));

// Test maxOffset>1 case
when(admin.maxOffset(any(MessageQueue.class))).thenReturn(5L);
assertEquals("CONSUME_FROM_MAX_OFFSET mode, multiple messages queue should start from maxOffset",
5L, rebalanceImpl.computePullFromWhereWithException(mq));
}

}
1 change: 0 additions & 1 deletion proxy/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ java_library(
"@maven//:org_apache_rocketmq_rocketmq_proto",
"@maven//:org_checkerframework_checker_qual",
"@maven//:org_lz4_lz4_java",
"@maven//:org_slf4j_slf4j_api",
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.exception.StoreException;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

public class CombineConsumeQueueStore implements ConsumeQueueStoreInterface {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
Expand Down