Skip to content

Commit edf56a1

Browse files
authored
Merge branch 'apache:develop' into comment
2 parents d48debe + 88709c5 commit edf56a1

10 files changed

Lines changed: 323 additions & 67 deletions

File tree

README.md

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
**[Apache RocketMQ](https://rocketmq.apache.org) is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.**
1313

14-
1514
It offers a variety of features:
1615

1716
* Messaging patterns including publish/subscribe, request/reply and streaming
@@ -32,8 +31,8 @@ It offers a variety of features:
3231
* Authentication and authorization
3332
* Free open source connectors, for both sources and sinks
3433
* Lightweight real-time computing
35-
----------
3634

35+
---
3736

3837
## Quick Start
3938

@@ -44,6 +43,7 @@ For local development and testing, only one instance will be created for each co
4443

4544
RocketMQ runs on all major operating systems and requires only a Java JDK version 8 or higher to be installed.
4645
To check, run `java -version`:
46+
4747
```shell
4848
$ java -version
4949
java version "1.8.0_121"
@@ -62,6 +62,7 @@ $ unzip rocketmq-all-5.5.0-bin-release.zip
6262
```
6363

6464
Prepare a terminal and change to the extracted `bin` directory:
65+
6566
```shell
6667
$ cd rocketmq-all-5.5.0-bin-release/bin
6768
```
@@ -71,6 +72,7 @@ $ cd rocketmq-all-5.5.0-bin-release/bin
7172
NameServer will be listening at `0.0.0.0:9876`, make sure that the port is not used by others on the local machine, and then do as follows.
7273

7374
For macOS and Linux users:
75+
7476
```shell
7577
### start Name Server
7678
$ nohup sh mqnamesrv &
@@ -81,13 +83,15 @@ The Name Server boot success...
8183
```
8284

8385
For Windows users, you need to set environment variables first:
84-
- From the desktop, right click the Computer icon.
85-
- Choose Properties from the context menu.
86-
- Click the Advanced system settings link.
87-
- Click Environment Variables.
88-
- Add Environment `ROCKETMQ_HOME="D:\rocketmq"`.
86+
87+
* From the desktop, right click the Computer icon.
88+
* Choose Properties from the context menu.
89+
* Click the Advanced system settings link.
90+
* Click Environment Variables.
91+
* Add Environment `ROCKETMQ_HOME="D:\rocketmq"`.
8992

9093
Then change directory to rocketmq, type and run:
94+
9195
```shell
9296
$ mqnamesrv.cmd
9397
The Name Server boot success...
@@ -96,6 +100,7 @@ The Name Server boot success...
96100
**2) Start Broker**
97101

98102
For macOS and Linux users:
103+
99104
```shell
100105
### start Broker
101106
$ nohup sh mqbroker -n localhost:9876 &
@@ -106,6 +111,7 @@ The broker[broker-a, 192.168.1.2:10911] boot success...
106111
```
107112

108113
For Windows users:
114+
109115
```shell
110116
$ mqbroker.cmd -n localhost:9876
111117
The broker[broker-a, 192.168.1.2:10911] boot success...
@@ -134,6 +140,7 @@ You can also run a RocketMQ cluster within a Kubernetes cluster using [RocketMQ
134140
Before your operations, make sure that `kubectl` and related kubeconfig file installed on your machine.
135141

136142
**1) Install CRDs**
143+
137144
```shell
138145
### install CRDs
139146
$ git clone https://github.com/apache/rocketmq-operator
@@ -152,6 +159,7 @@ rocketmq-operator-6f65c77c49-8hwmj 1/1 Running 0 93s
152159
```
153160

154161
**2) Create Cluster Instance**
162+
155163
```shell
156164
### create RocketMQ cluster resource
157165
$ cd example && kubectl create -f rocketmq_v1alpha1_rocketmq_cluster.yaml
@@ -165,16 +173,18 @@ name-service 1/1 107m
165173
```
166174

167175
---
176+
168177
## Apache RocketMQ Community
178+
169179
* [RocketMQ Streams](https://github.com/apache/rocketmq-streams): A lightweight stream computing engine based on Apache RocketMQ.
170180
* [RocketMQ Flink](https://github.com/apache/rocketmq-flink): The Apache RocketMQ connector of Apache Flink that supports source and sink connector in data stream and Table.
171181
* [RocketMQ APIs](https://github.com/apache/rocketmq-apis): RocketMQ protobuf protocol.
172182
* [RocketMQ Clients](https://github.com/apache/rocketmq-clients): gRPC/protobuf-based RocketMQ clients.
173183
* RocketMQ Remoting-based Clients
174-
- [RocketMQ Client CPP](https://github.com/apache/rocketmq-client-cpp)
175-
- [RocketMQ Client Go](https://github.com/apache/rocketmq-client-go)
176-
- [RocketMQ Client Python](https://github.com/apache/rocketmq-client-python)
177-
- [RocketMQ Client Nodejs](https://github.com/apache/rocketmq-client-nodejs)
184+
* [RocketMQ Client CPP](https://github.com/apache/rocketmq-client-cpp)
185+
* [RocketMQ Client Go](https://github.com/apache/rocketmq-client-go)
186+
* [RocketMQ Client Python](https://github.com/apache/rocketmq-client-python)
187+
* [RocketMQ Client Nodejs](https://github.com/apache/rocketmq-client-nodejs)
178188
* [RocketMQ Spring](https://github.com/apache/rocketmq-spring): A project which helps developers quickly integrate Apache RocketMQ with Spring Boot.
179189
* [RocketMQ Exporter](https://github.com/apache/rocketmq-exporter): An Apache RocketMQ exporter for Prometheus.
180190
* [RocketMQ Operator](https://github.com/apache/rocketmq-operator): Providing a way to run an Apache RocketMQ cluster on Kubernetes.
@@ -187,31 +197,33 @@ name-service 1/1 107m
187197
* [RocketMQ Site](https://github.com/apache/rocketmq-site): The repository for Apache RocketMQ website.
188198
* [RocketMQ E2E](https://github.com/apache/rocketmq-e2e): A project for testing Apache RocketMQ, including end-to-end, performance, compatibility tests.
189199

200+
---
190201

191-
----------
192202
## Learn it & Contact us
203+
193204
* Mailing Lists: <https://rocketmq.apache.org/about/contact/>
194205
* Home: <https://rocketmq.apache.org>
195206
* Docs: <https://rocketmq.apache.org/docs/quick-start/>
196207
* Issues: <https://github.com/apache/rocketmq/issues>
197208
* Rips: <https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal>
198209
* Ask: <https://stackoverflow.com/questions/tagged/rocketmq>
199210

200-
201-
----------
202-
203-
211+
---
204212

205213
## Contributing
214+
206215
We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).
207216

208-
----------
217+
---
218+
209219
## License
220+
210221
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
211222

223+
---
212224

213-
----------
214225
## Export Control Notice
226+
215227
This distribution includes cryptographic software. The country in which you currently reside may have
216228
restrictions on the import, possession, use, and/or re-export to another country, of encryption software.
217229
BEFORE using any encryption software, please check your country's laws, regulations and policies concerning

broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ public void deleteLmq(String parentTopic, String lmqName) {
205205
groups.forEach(group -> {
206206
String topicAtGroup = lmqName + TOPIC_GROUP_SEPARATOR + group;
207207
brokerController.getConsumerOffsetManager().getOffsetTable().remove(topicAtGroup);
208+
brokerController.getConsumerOffsetManager().eraseResetOffset(lmqName, group, 0);
208209
brokerController.getConsumerOffsetManager().removeConsumerOffset(topicAtGroup); // no iteration
209210
brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().remove(lmqName, group);
210211
});

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,8 +466,21 @@ public Long queryThenEraseResetOffset(String topic, String group, Integer queueI
466466
ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
467467
if (null == map) {
468468
return null;
469-
} else {
470-
return map.remove(queueId);
471469
}
470+
Long offset = map.remove(queueId);
471+
if (map.isEmpty()) {
472+
resetOffsetTable.computeIfPresent(key, (k, _map) ->
473+
_map.isEmpty() ? null : _map
474+
);
475+
}
476+
return offset;
477+
}
478+
479+
public void eraseResetOffset(String topic, String group, int queueId) {
480+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
481+
resetOffsetTable.computeIfPresent(key, (k, map) -> {
482+
map.remove(queueId);
483+
return map.isEmpty() ? null : map;
484+
});
472485
}
473486
}

broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,10 @@ public Pair<StringBuilder, GetMessageResult> popLiteTopic(String parentTopic, St
311311
}
312312

313313
public boolean isFifoBlocked(String attemptId, String group, String lmqName, long invisibleTime) {
314+
if (brokerController.getBrokerConfig().isUseServerSideResetOffset() &&
315+
this.brokerController.getConsumerOffsetManager().hasOffsetReset(lmqName, group, 0)) {
316+
return false;
317+
}
314318
return consumerOrderInfoManager.checkBlock(attemptId, lmqName, group, 0, invisibleTime);
315319
}
316320

broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,25 @@ public void testOffsetPersistInMemory() {
100100
ConcurrentMap<Integer, Long> offsetTableLoaded = manager.getOffsetTable().get(group);
101101
Assert.assertEquals(table, offsetTableLoaded);
102102
}
103+
104+
@Test
105+
public void testEraseResetOffset() {
106+
String topic = "Topic";
107+
String group = "Group";
108+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
109+
consumerOffsetManager.assignResetOffset(topic, group, 0, 100L);
110+
consumerOffsetManager.assignResetOffset(topic, group, 1, 200L);
111+
112+
Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 0));
113+
Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 1));
114+
115+
consumerOffsetManager.eraseResetOffset(topic, group, 0);
116+
Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group, 0));
117+
Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 1));
118+
Assert.assertTrue(consumerOffsetManager.resetOffsetTable.containsKey(key));
119+
120+
consumerOffsetManager.eraseResetOffset(topic, group, 1);
121+
Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group, 1));
122+
Assert.assertFalse(consumerOffsetManager.resetOffsetTable.containsKey(key));
123+
}
103124
}

broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656

5757
import static org.assertj.core.api.Assertions.assertThat;
5858
import static org.junit.Assert.assertEquals;
59+
import static org.junit.Assert.assertFalse;
5960
import static org.junit.Assert.assertNotNull;
6061
import static org.junit.Assert.assertNull;
6162
import static org.junit.Assert.assertTrue;
@@ -149,6 +150,16 @@ public void testIsFifoBlocked() {
149150
verify(consumerOrderInfoManager).checkBlock("attemptId", "lmqName", "group", 0, 1000L);
150151
}
151152

153+
@Test
154+
public void testIsFifoBlocked_hasResetOffset() {
155+
brokerConfig.setUseServerSideResetOffset(true);
156+
when(consumerOffsetManager.hasOffsetReset("lmqName", "group", 0)).thenReturn(true);
157+
158+
assertFalse(popLiteMessageProcessor.isFifoBlocked("attemptId", "group", "lmqName", 1000L));
159+
verify(consumerOffsetManager).hasOffsetReset("lmqName", "group", 0);
160+
verify(consumerOrderInfoManager, never()).checkBlock(anyString(), anyString(), anyString(), anyInt(), anyLong());
161+
}
162+
152163
@Test
153164
public void testGetPopOffset_normal() throws ConsumeQueueException {
154165
String group = "group";

common/src/main/java/org/apache/rocketmq/common/ServiceThread.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.concurrent.TimeUnit;
2020
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.locks.LockSupport;
2122

2223
import org.apache.rocketmq.common.constant.LoggerName;
2324
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -28,9 +29,8 @@ public abstract class ServiceThread implements Runnable {
2829

2930
private static final long JOIN_TIME = 90 * 1000;
3031

31-
protected Thread thread;
32-
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
33-
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
32+
protected volatile Thread thread;
33+
protected final AtomicBoolean hasNotified = new AtomicBoolean(false);
3434
protected volatile boolean stopped = false;
3535
protected boolean isDaemon = false;
3636

@@ -97,32 +97,49 @@ public void makeStop() {
9797
return;
9898
}
9999
this.stopped = true;
100+
// wake up the parked worker so it observes the stop flag promptly
101+
wakeup();
100102
log.info("makestop thread[{}] ", this.getServiceName());
101103
}
102104

103105
public void wakeup() {
106+
if (hasNotified.get()) {
107+
return;
108+
}
104109
if (hasNotified.compareAndSet(false, true)) {
105-
waitPoint.countDown(); // notify
110+
LockSupport.unpark(this.thread); // notify
106111
}
107112
}
108113

109114
protected void waitForRunning(long interval) {
115+
// Publish the parking thread so wakeup() can target it (also handles restart).
116+
this.thread = Thread.currentThread();
117+
110118
if (hasNotified.compareAndSet(true, false)) {
111119
this.onWaitEnd();
112120
return;
113121
}
114122

115-
//entry to wait
116-
waitPoint.reset();
117-
118-
try {
119-
waitPoint.await(interval, TimeUnit.MILLISECONDS);
120-
} catch (InterruptedException e) {
121-
log.error("Interrupted", e);
122-
} finally {
123-
hasNotified.set(false);
124-
this.onWaitEnd();
123+
// LockSupport permits are sticky: an unpark delivered before park makes the next park
124+
// return at once, and the loop re-checks hasNotified, so no wakeup can be lost.
125+
long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(interval);
126+
while (!hasNotified.get()) {
127+
if (stopped) {
128+
break;
129+
}
130+
long remain = deadline - System.nanoTime();
131+
if (remain <= 0) {
132+
break;
133+
}
134+
LockSupport.parkNanos(this, remain);
135+
if (Thread.interrupted()) {
136+
Thread.currentThread().interrupt();
137+
break;
138+
}
125139
}
140+
141+
hasNotified.set(false);
142+
this.onWaitEnd();
126143
}
127144

128145
protected void onWaitEnd() {

0 commit comments

Comments
 (0)