Skip to content

Commit

Permalink
✨ 升级rocketmq-spring-boot-starter 2.2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahaochan committed Sep 11, 2023
1 parent 4b4cd5a commit 55b4bb7
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 28 deletions.
6 changes: 5 additions & 1 deletion ahao-spring-boot-rocketmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--======================================================= Spring Boot =======================================================-->
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package moe.ahao.spring.boot.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;

import java.nio.charset.StandardCharsets;

/**
* 抽象的消费者MessageListener组件
* 实现RocktMQ原生的RocketMQListener
*/
@Slf4j
public abstract class AbstractRocketMQListener implements RocketMQListener<MessageExt> {

@Override
public void onMessage(MessageExt message) {
try {
log.info("接收到MQ消息开始, message:{}", message);

this.onMessage(new String(message.getBody(), StandardCharsets.UTF_8));
} catch (Exception e) {
log.error("接收到MQ消息, 消费MQ消息异常, message:{}", message, e);
}
}

public abstract void onMessage(String message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package moe.ahao.spring.boot.rocketmq;

public class Constant {
public static final String DEFAULT_TOPIC = "ahao-topic";
public static final String DEFAULT_NAMESPACE = "ahao-namespace";

public static final String CONSUMER_GROUP_TAG1 = "ahao-consumer-group-tag1";
public static final String CONSUMER_GROUP_TAG2 = "ahao-consumer-group-tag2";

public static final String TAG1 = "TAG1";
public static final String TAG2 = "TAG2";


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package moe.ahao.spring.boot.rocketmq;

import lombok.extern.slf4j.Slf4j;
import moe.ahao.exception.BizException;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(
namespace = "${rocketmq.consumer.namespace}",
topic = Constant.DEFAULT_TOPIC,
consumerGroup = Constant.CONSUMER_GROUP_TAG1,
selectorExpression = Constant.TAG1,
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING,
consumeThreadNumber = 1
)
public class Event1MQListener extends AbstractRocketMQListener {
@Override
public void onMessage(String message) {
log.info("事件监听器Event1MQListener, 接收到message:{}", message);
try {

} catch (BizException e) {
log.error("业务失败", e);
// throw e;
} catch (Exception e) {
log.error("异常失败", e);
throw e; // nack
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package moe.ahao.spring.boot.rocketmq;

import lombok.extern.slf4j.Slf4j;
import moe.ahao.exception.BizException;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(
namespace = "${rocketmq.consumer.namespace}",
topic = Constant.DEFAULT_TOPIC,
consumerGroup = Constant.CONSUMER_GROUP_TAG2,
selectorExpression = Constant.TAG2,
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING,
consumeThreadNumber = 1
)
public class Event2MQListener extends AbstractRocketMQListener {
@Override
public void onMessage(String message) {
log.info("事件监听器Event2MQListener, 接收到message:{}", message);
try {

} catch (BizException e) {
log.error("业务失败", e);
// throw e;
} catch (Exception e) {
log.error("异常失败", e);
throw e; // nack
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
spring:
application:
name: ahao-rocketmq


rocketmq.name-server: 192.168.19.131:9876
rocketmq.producer.namespace: ahao-namespace
rocketmq.producer.group: ahao-producer-group
rocketmq.consumer.namespace: ahao-namespace
5 changes: 5 additions & 0 deletions ahao-spring-boot-rocketmq/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
spring:
application:
name: ahao-rocketmq
profiles:
active: rocketmq
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.List;


@RocketMQMessageListener(consumerGroup = "consumer-group", topic = RocketMQTemplateTest.TOPIC)
@RocketMQMessageListener(consumerGroup = "MyRocketMQListenerConsumerGroup", topic = RocketMQTemplateTest.TOPIC)
public class MyRocketMQListener implements RocketMQListener<String> {
public List<String> receiveList = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.ArrayList;
import java.util.List;

@RocketMQTransactionListener(txProducerGroup = RocketMQTemplateTest.GROUP_TX)
@RocketMQTransactionListener
public class MyRocketMQTransactionListener implements RocketMQLocalTransactionListener {
public List<String> receiveList = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
import java.util.concurrent.TimeUnit;

class RocketMQNativeTest {
public static final String NS = "192.168.19.128:9876";
public static final String TOPIC = "AhaoTopic1";
public static final String PRODUCT_GROUP = "ahao-product-group";
public static final String NS = "192.168.19.131:9876";
public static final String NAMESPACE = Constant.DEFAULT_NAMESPACE;
public static final String TOPIC = Constant.DEFAULT_TOPIC;
public static final String PRODUCER_GROUP = "ahao-producer-group";
public static final String CONSUMER_GROUP = "ahao-consumer-group";

@Test
void simpleProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP);
producer.setNamesrvAddr(NS);
producer.start();

for (int i = 0; i < 10; i++) {
Message msg = new Message(TOPIC, "simpleProducer", ("simpleProducer" + i).getBytes(StandardCharsets.UTF_8));
Message msg = new Message(TOPIC, Constant.TAG1, ("simpleProducer" + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(msg);
System.out.println("第" + i + "条消息发送结果:" + sendResult);
}
Expand All @@ -39,7 +41,7 @@ void simpleProducer() throws Exception {

@Test
void asyncProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP);
producer.setNamesrvAddr(NS);
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
Expand Down Expand Up @@ -71,7 +73,7 @@ public void onException(Throwable e) {

@Test
void onewayProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP);
producer.setNamesrvAddr(NS);
producer.start();

Expand All @@ -84,7 +86,7 @@ void onewayProducer() throws Exception {

@Test
void orderlyProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP);
producer.setNamesrvAddr(NS);
producer.start();

Expand All @@ -106,7 +108,7 @@ public MessageQueue select(List<MessageQueue> messageQueues, Message msg, Object

@Test
void delayProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP);
producer.setNamesrvAddr(NS);
producer.start();

Expand All @@ -124,7 +126,7 @@ void delayProducer() throws Exception {

@Test
void batchProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP);
producer.setNamesrvAddr(NS);
producer.start();

Expand All @@ -142,7 +144,7 @@ void batchProducer() throws Exception {
void transactionProducer() throws Exception {
Map<String, Boolean> flagMap = new HashMap<>();

TransactionMQProducer producer = new TransactionMQProducer(PRODUCT_GROUP);
TransactionMQProducer producer = new TransactionMQProducer(NAMESPACE, PRODUCER_GROUP);
producer.setNamesrvAddr(NS);
producer.setExecutorService(Executors.newFixedThreadPool(10));
producer.setTransactionListener(new TransactionListener() {
Expand Down Expand Up @@ -189,7 +191,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {

@Test
void pullConsumer() throws Exception {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ahao-consumer-group");
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(NAMESPACE, CONSUMER_GROUP);
consumer.setNamesrvAddr(NS);
consumer.start();

Expand All @@ -208,7 +210,7 @@ void pullConsumer() throws Exception {

@Test
public void pushConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ahao-consumer-group");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(NAMESPACE, CONSUMER_GROUP);
consumer.setNamesrvAddr(NS);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(TOPIC, "*");
Expand All @@ -235,7 +237,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList,

@Test
public void orderlyConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ahao-consumer-group");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(NAMESPACE, CONSUMER_GROUP);
consumer.setNamesrvAddr(NS);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(TOPIC, "*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
MyRocketMQListener.class, MyRocketMQTransactionListener.class})
@ActiveProfiles("rocketmq")
public class RocketMQTemplateTest {
public static final String GROUP_TX = "ahao-transaction-group";
public static final String TOPIC = "ahao-topic";
public static final String TOPIC = Constant.DEFAULT_TOPIC;

@Autowired
private RocketMQTemplate rocketMQTemplate;
Expand All @@ -49,7 +48,7 @@ public void transactionMessage() throws Exception {
Message message = MessageBuilder.withPayload("payload-"+i)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
this.rocketMQTemplate.sendMessageInTransaction(GROUP_TX, TOPIC, message, null);
this.rocketMQTemplate.sendMessageInTransaction(TOPIC, message, null);
}
Thread.sleep(5000);

Expand Down

This file was deleted.

0 comments on commit 55b4bb7

Please sign in to comment.