diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index fe2f19b2f9a..b15f50c0cf6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -51,6 +51,15 @@ public RebalancePushImpl(String consumerGroup, MessageModel messageModel, this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; } + /** + * Handle message queue changes during rebalancing. + * This method is called when the consumer's assigned message queues change, + * typically during consumer group rebalancing operations. + * + * @param topic the topic name + * @param mqAll all available message queues for the topic + * @param mqDivided the message queues assigned to this consumer after rebalancing + */ @Override public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { /* @@ -182,8 +191,14 @@ else if (-1 == lastOffset) { result = 0L; } else { try { - result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); - } catch (MQClientException e) { + long maxOffset = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + + if (maxOffset <= 1) { + result = 0L; + } else { + result = maxOffset; + } + } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); throw e; } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java index f55b5869e56..0768c66ce98 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java @@ -214,4 +214,171 @@ public void testComputePullFromWhereWithException_eq_minus1_timestamp() throws M assertEquals(23456L, rebalanceImpl.computePullFromWhereWithException(retryMq)); } + /** + * Regression test for maxOffset edge cases: verify fix for first message ignored issue + * Test scenario: empty message queue (maxOffset=0) + */ + @Test + public void testComputePullFromWhereWithException_emptyQueue() throws MQClientException { + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + // Mock empty message queue: maxOffset returns 0 + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(0L); + + // Verify empty queue should start from offset 0 + assertEquals("Empty message queue should start consuming from offset 0", 0L, rebalanceImpl.computePullFromWhereWithException(mq)); + } + + /** + * Regression test for maxOffset edge cases: verify fix for first message ignored issue + * Test scenario: single message queue (maxOffset=1) + * This is the key test case for fixing the first message ignored issue + */ + @Test + public void testComputePullFromWhereWithException_singleMessageQueue() throws MQClientException { + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + // Mock single message queue: maxOffset returns 1 + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(1L); + + // Verify that when maxOffset=1, should start from 0 to avoid skipping the first message + assertEquals("Single message queue should start from offset 0 to avoid skipping first message", 0L, rebalanceImpl.computePullFromWhereWithException(mq)); + } + + /** + * Regression test for maxOffset edge cases: verify fix for first message ignored issue + * Test scenario: boundary case (maxOffset=2) + */ + @Test + public void testComputePullFromWhereWithException_boundaryCase() throws MQClientException { + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + // Mock boundary case: maxOffset returns 2 + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(2L); + + // Verify when maxOffset=2, should start from maxOffset + assertEquals("When maxOffset=2, should start consuming from maxOffset", 2L, rebalanceImpl.computePullFromWhereWithException(mq)); + } + + /** + * Regression test for maxOffset edge cases: verify fix for first message ignored issue + * Test scenario: multiple messages queue (maxOffset>1) + */ + @Test + public void testComputePullFromWhereWithException_multipleMessagesQueue() throws MQClientException { + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + // Mock multiple messages queue: maxOffset returns 10 + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(10L); + + // Verify multiple messages should start from maxOffset + assertEquals("Multiple messages queue should start consuming from maxOffset", 10L, rebalanceImpl.computePullFromWhereWithException(mq)); + } + + /** + * Regression test for maxOffset edge cases: verify fix for first message ignored issue + * Integration test: verify behavior difference before and after fix + */ + @Test + public void testComputePullFromWhereWithException_firstMessageNotIgnored() throws MQClientException { + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + // Test various maxOffset values to verify fix logic + long[] testMaxOffsets = {0L, 1L, 2L, 5L, 100L}; + + for (long maxOffset : testMaxOffsets) { + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(maxOffset); + + long result = rebalanceImpl.computePullFromWhereWithException(mq); + + if (maxOffset <= 1) { + assertEquals("When maxOffset=" + maxOffset + ", should start from 0 to avoid skipping messages", + 0L, result); + } else { + assertEquals("When maxOffset=" + maxOffset + ", should start from maxOffset", + maxOffset, result); + } + } + } + + /** + * Regression test for maxOffset edge cases: verify fix for first message ignored issue + * Test scenario: verify handling for CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST + */ + @Test + public void testComputePullFromWhereWithException_lastOffsetAndMinWhenBootFirst() throws MQClientException { + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST); + + // Test maxOffset=0 case + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(0L); + assertEquals("CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST mode, empty queue should start from 0", + 0L, rebalanceImpl.computePullFromWhereWithException(mq)); + + // Test maxOffset=1 case + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(1L); + assertEquals("CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST mode, single message queue should start from 0", + 0L, rebalanceImpl.computePullFromWhereWithException(mq)); + + // Test maxOffset>1 case + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(5L); + assertEquals("CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST mode, multiple messages queue should start from maxOffset", + 5L, rebalanceImpl.computePullFromWhereWithException(mq)); + } + + /** + * Regression test for maxOffset edge cases: verify fix for first message ignored issue + * Test scenario: verify handling for CONSUME_FROM_MIN_OFFSET + */ + @Test + public void testComputePullFromWhereWithException_minOffset() throws MQClientException { + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET); + + // Test maxOffset=0 case + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(0L); + assertEquals("CONSUME_FROM_MIN_OFFSET mode, empty queue should start from 0", + 0L, rebalanceImpl.computePullFromWhereWithException(mq)); + + // Test maxOffset=1 case + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(1L); + assertEquals("CONSUME_FROM_MIN_OFFSET mode, single message queue should start from 0", + 0L, rebalanceImpl.computePullFromWhereWithException(mq)); + + // Test maxOffset>1 case + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(5L); + assertEquals("CONSUME_FROM_MIN_OFFSET mode, multiple messages queue should start from maxOffset", + 5L, rebalanceImpl.computePullFromWhereWithException(mq)); + } + + /** + * Regression test for maxOffset edge cases: verify fix for first message ignored issue + * Test scenario: verify handling for CONSUME_FROM_MAX_OFFSET + */ + @Test + public void testComputePullFromWhereWithException_maxOffset() throws MQClientException { + when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET); + + // Test maxOffset=0 case + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(0L); + assertEquals("CONSUME_FROM_MAX_OFFSET mode, empty queue should start from 0", + 0L, rebalanceImpl.computePullFromWhereWithException(mq)); + + // Test maxOffset=1 case + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(1L); + assertEquals("CONSUME_FROM_MAX_OFFSET mode, single message queue should start from 0", + 0L, rebalanceImpl.computePullFromWhereWithException(mq)); + + // Test maxOffset>1 case + when(admin.maxOffset(any(MessageQueue.class))).thenReturn(5L); + assertEquals("CONSUME_FROM_MAX_OFFSET mode, multiple messages queue should start from maxOffset", + 5L, rebalanceImpl.computePullFromWhereWithException(mq)); + } + } diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel index 8b7915ba741..ccdbd05c60f 100644 --- a/proxy/BUILD.bazel +++ b/proxy/BUILD.bazel @@ -61,7 +61,6 @@ java_library( "@maven//:org_apache_rocketmq_rocketmq_proto", "@maven//:org_checkerframework_checker_qual", "@maven//:org_lz4_lz4_java", - "@maven//:org_slf4j_slf4j_api", "@maven//:io_github_aliyunmq_rocketmq_slf4j_api", "@maven//:org_slf4j_jul_to_slf4j", "@maven//:io_github_aliyunmq_rocketmq_logback_classic", diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java index c8c3202a785..88b37f9490a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java @@ -40,6 +40,8 @@ import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.apache.rocketmq.store.exception.StoreException; import org.rocksdb.RocksDBException; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class CombineConsumeQueueStore implements ConsumeQueueStoreInterface { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);