Skip to content

Commit 41f225a

Browse files
committed
fix error in offsetManager when in broadcast
1 parent e4b731c commit 41f225a

File tree

5 files changed

+44
-43
lines changed

5 files changed

+44
-43
lines changed

broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ protected void removeConsumerOffset(String topicAtGroup) {
5959
}
6060

6161
String[] topicGroup = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
62-
if (topicGroup.length != 2) {
62+
if (!validateOffsetTableKey(topicAtGroup)) {
6363
LOG.error("Invalid topic group: {}", topicAtGroup);
6464
return;
6565
}
@@ -452,7 +452,7 @@ public long queryPullOffset(String group, String topic, int queueId) {
452452
public void assignResetOffset(String topic, String group, int queueId, long offset) {
453453
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
454454
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
455-
topic, group, queueId, offset);
455+
topic, group, queueId, offset);
456456
return;
457457
}
458458
if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {

broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.rocketmq.common.ServiceThread;
2828
import org.apache.rocketmq.store.exception.ConsumeQueueException;
2929

30+
import static org.apache.rocketmq.common.MixAll.BROADCAST_KEY;
31+
3032
/**
3133
* manage the offset of broadcast.
3234
* now, use this to support switch remoting client between proxy and broker
@@ -169,7 +171,7 @@ protected void scanOffsetData() {
169171

170172
queueMinOffset.forEach((queueId, offset) ->
171173
this.brokerController.getConsumerOffsetManager().commitOffset("BroadcastOffset",
172-
broadcastGroupId(broadcastOffsetData.group), broadcastOffsetData.topic, queueId, offset));
174+
broadcastGroupId(broadcastOffsetData.group), broadcastOffsetData.topic, queueId, offset));
173175
}
174176
}
175177

@@ -182,7 +184,7 @@ private String buildKey(String topic, String group) {
182184
* @return the groupId used to commit offset
183185
*/
184186
private static String broadcastGroupId(String group) {
185-
return group + TOPIC_GROUP_SEPARATOR + "broadcast";
187+
return group + TOPIC_GROUP_SEPARATOR + BROADCAST_KEY;
186188
}
187189

188190
@Override

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.offset;
1818

19+
import com.google.common.base.Strings;
1920
import com.google.common.collect.Maps;
2021
import java.util.HashMap;
2122
import java.util.HashSet;
@@ -26,9 +27,6 @@
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.ConcurrentMap;
2829
import java.util.concurrent.atomic.AtomicLong;
29-
30-
import com.google.common.base.Strings;
31-
3230
import java.util.function.Function;
3331
import org.apache.rocketmq.broker.BrokerController;
3432
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -41,20 +39,19 @@
4139
import org.apache.rocketmq.remoting.protocol.DataVersion;
4240
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
4341

42+
import static org.apache.rocketmq.common.MixAll.BROADCAST_KEY;
43+
4444
public class ConsumerOffsetManager extends ConfigManager {
4545
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
4646
public static final String TOPIC_GROUP_SEPARATOR = "@";
4747

4848
protected DataVersion dataVersion = new DataVersion();
4949

50-
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
51-
new ConcurrentHashMap<>(512);
50+
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512);
5251

53-
protected final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable =
54-
new ConcurrentHashMap<>(512);
52+
protected final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable = new ConcurrentHashMap<>(512);
5553

56-
private final ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> pullOffsetTable =
57-
new ConcurrentHashMap<>(512);
54+
private final ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> pullOffsetTable = new ConcurrentHashMap<>(512);
5855

5956
protected transient BrokerController brokerController;
6057

@@ -78,7 +75,7 @@ public void cleanOffset(String group) {
7875
String topicAtGroup = next.getKey();
7976
if (topicAtGroup.contains(group)) {
8077
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
81-
if (arrays.length == 2 && group.equals(arrays[1])) {
78+
if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) {
8279
it.remove();
8380
removeConsumerOffset(topicAtGroup);
8481
LOG.warn("Clean group's offset, {}, {}", topicAtGroup, next.getValue());
@@ -94,7 +91,7 @@ public void cleanOffsetByTopic(String topic) {
9491
String topicAtGroup = next.getKey();
9592
if (topicAtGroup.contains(topic)) {
9693
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
97-
if (arrays.length == 2 && topic.equals(arrays[0])) {
94+
if (validateOffsetTableKey(topicAtGroup) && topic.equals(arrays[0])) {
9895
it.remove();
9996
removeConsumerOffset(topicAtGroup);
10097
LOG.warn("Clean topic's offset, {}, {}", topicAtGroup, next.getValue());
@@ -109,12 +106,11 @@ public void scanUnsubscribedTopic() {
109106
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
110107
String topicAtGroup = next.getKey();
111108
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
112-
if (arrays.length == 2) {
109+
if (validateOffsetTableKey(topicAtGroup)) {
113110
String topic = arrays[0];
114111
String group = arrays[1];
115112

116-
if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
117-
&& this.offsetBehindMuchThanData(topic, next.getValue())) {
113+
if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic) && this.offsetBehindMuchThanData(topic, next.getValue())) {
118114
it.remove();
119115
removeConsumerOffset(topicAtGroup);
120116
LOG.warn("remove topic offset, {}", topicAtGroup);
@@ -139,13 +135,12 @@ private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integ
139135

140136
public Set<String> whichTopicByConsumer(final String group) {
141137
Set<String> topics = new HashSet<>();
142-
143138
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
144139
while (it.hasNext()) {
145140
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
146141
String topicAtGroup = next.getKey();
147142
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
148-
if (arrays.length == 2) {
143+
if (validateOffsetTableKey(topicAtGroup)) {
149144
if (group.equals(arrays[1])) {
150145
topics.add(arrays[0]);
151146
}
@@ -163,7 +158,7 @@ public Set<String> whichGroupByTopic(final String topic) {
163158
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
164159
String topicAtGroup = next.getKey();
165160
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
166-
if (arrays.length == 2) {
161+
if (validateOffsetTableKey(topicAtGroup)) {
167162
if (topic.equals(arrays[0])) {
168163
groups.add(arrays[1]);
169164
}
@@ -178,7 +173,7 @@ public Map<String, Set<String>> getGroupTopicMap() {
178173

179174
for (String key : this.offsetTable.keySet()) {
180175
String[] arr = key.split(TOPIC_GROUP_SEPARATOR);
181-
if (arr.length == 2) {
176+
if (validateOffsetTableKey(key)) {
182177
String topic = arr[0];
183178
String group = arr[1];
184179

@@ -224,16 +219,16 @@ public void commitPullOffset(final String clientHost, final String group, final
224219
final long offset) {
225220
// topic@group
226221
String key = topic + TOPIC_GROUP_SEPARATOR + group;
227-
ConcurrentMap<Integer, Long> map = this.pullOffsetTable.computeIfAbsent(
228-
key, k -> new ConcurrentHashMap<>(32));
222+
ConcurrentMap<Integer, Long> map = this.pullOffsetTable.computeIfAbsent(key, k -> new ConcurrentHashMap<>(32));
229223
map.put(queueId, offset);
230224
}
231225

232226
/**
233227
* If the target queue has temporary reset offset, return the reset-offset.
234228
* Otherwise, return the current consume offset in the offset store.
235-
* @param group Consumer group
236-
* @param topic Topic
229+
*
230+
* @param group Consumer group
231+
* @param topic Topic
237232
* @param queueId Queue ID
238233
* @return current consume offset or reset offset if there were one.
239234
*/
@@ -261,8 +256,9 @@ public long queryOffset(final String group, final String topic, final int queueI
261256

262257
/**
263258
* Query pull offset in pullOffsetTable
264-
* @param group Consumer group
265-
* @param topic Topic
259+
*
260+
* @param group Consumer group
261+
* @param topic Topic
266262
* @param queueId Queue ID
267263
* @return latest pull offset of consumer group
268264
*/
@@ -330,7 +326,7 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
330326
Iterator<String> it = topicGroups.iterator();
331327
while (it.hasNext()) {
332328
String topicAtGroup = it.next();
333-
if (group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) {
329+
if (validateOffsetTableKey(topicAtGroup) && group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) {
334330
it.remove();
335331
removeConsumerOffset(topicAtGroup);
336332
}
@@ -341,7 +337,7 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
341337
for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
342338
String topicGroup = offSetEntry.getKey();
343339
String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
344-
if (topic.equals(topicGroupArr[0])) {
340+
if (validateOffsetTableKey(topicGroup) && topic.equals(topicGroupArr[0])) {
345341
for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
346342
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());
347343
if (entry.getValue() >= minOffset) {
@@ -407,7 +403,7 @@ public void removeOffset(final String group) {
407403
String topicAtGroup = entry.getKey();
408404
if (topicAtGroup.contains(group)) {
409405
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
410-
if (arrays.length == 2 && group.equals(arrays[1])) {
406+
if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) {
411407
it.remove();
412408
removeConsumerOffset(topicAtGroup);
413409
removed = true;
@@ -421,14 +417,12 @@ public void removeOffset(final String group) {
421417
boolean clearReset = deleteFunction.apply(this.resetOffsetTable.entrySet().iterator());
422418
boolean clearPull = deleteFunction.apply(this.pullOffsetTable.entrySet().iterator());
423419

424-
LOG.info("Consumer offset manager clean group offset, groupName={}, " +
425-
"offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull);
420+
LOG.info("Consumer offset manager clean group offset, groupName={}, " + "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull);
426421
}
427422

428423
public void assignResetOffset(String topic, String group, int queueId, long offset) {
429424
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
430-
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
431-
topic, group, queueId, offset);
425+
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", topic, group, queueId, offset);
432426
return;
433427
}
434428

@@ -461,4 +455,9 @@ public Long queryThenEraseResetOffset(String topic, String group, Integer queueI
461455
return map.remove(queueId);
462456
}
463457
}
464-
}
458+
459+
public boolean validateOffsetTableKey(String key) {
460+
String[] arr = key.split(TOPIC_GROUP_SEPARATOR);
461+
return arr.length == 2 || (arr.length == 3 && BROADCAST_KEY.equals(arr[2]));
462+
}
463+
}

broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
*/
1717
package org.apache.rocketmq.broker.offset;
1818

19+
import com.google.common.base.Strings;
1920
import java.util.HashMap;
2021
import java.util.Iterator;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.concurrent.ConcurrentMap;
24-
25-
import com.google.common.base.Strings;
2625
import org.apache.rocketmq.broker.BrokerController;
2726
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
2827
import org.apache.rocketmq.common.MixAll;
@@ -126,7 +125,7 @@ public void removeOffset(String group) {
126125
String topicAtGroup = next.getKey();
127126
if (topicAtGroup.contains(group)) {
128127
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
129-
if (arrays.length == 2 && group.equals(arrays[1])) {
128+
if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) {
130129
it.remove();
131130
removeConsumerOffset(topicAtGroup);
132131
LOG.warn("clean lmq group offset {}", topicAtGroup);
@@ -139,7 +138,7 @@ public void removeOffset(String group) {
139138
public void assignResetOffset(String topic, String group, int queueId, long offset) {
140139
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
141140
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
142-
topic, group, queueId, offset);
141+
topic, group, queueId, offset);
143142
return;
144143
}
145144
if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {

common/src/main/java/org/apache/rocketmq/common/MixAll.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.common;
1818

19+
import com.google.common.collect.ImmutableSet;
1920
import java.io.ByteArrayInputStream;
2021
import java.io.File;
2122
import java.io.FileInputStream;
@@ -43,8 +44,6 @@
4344
import java.util.TreeMap;
4445
import java.util.concurrent.atomic.AtomicLong;
4546
import java.util.function.Predicate;
46-
47-
import com.google.common.collect.ImmutableSet;
4847
import org.apache.commons.lang3.StringUtils;
4948
import org.apache.rocketmq.common.annotation.ImportantField;
5049
import org.apache.rocketmq.common.constant.LoggerName;
@@ -141,6 +140,7 @@ public class MixAll {
141140
CID_ONSAPI_PULL_GROUP,
142141
CID_SYS_RMQ_TRANS
143142
);
143+
public static final String BROADCAST_KEY = "broadcast";
144144

145145
public static boolean isWindows() {
146146
return OS.contains("win");
@@ -182,7 +182,8 @@ public static boolean isSysConsumerGroup(final String consumerGroup) {
182182
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
183183
}
184184

185-
public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup, final boolean isEnableCreateSysGroup) {
185+
public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup,
186+
final boolean isEnableCreateSysGroup) {
186187
return isEnableCreateSysGroup && isSysConsumerGroup(consumerGroup);
187188
}
188189

0 commit comments

Comments
 (0)