diff --git a/ahao-spring-boot-rocketmq/pom.xml b/ahao-spring-boot-rocketmq/pom.xml index 3b38ea1d4..cc316cae0 100644 --- a/ahao-spring-boot-rocketmq/pom.xml +++ b/ahao-spring-boot-rocketmq/pom.xml @@ -26,7 +26,11 @@ org.apache.rocketmq rocketmq-spring-boot-starter - 2.0.2 + 2.2.3 + + + org.projectlombok + lombok diff --git a/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/AbstractRocketMQListener.java b/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/AbstractRocketMQListener.java new file mode 100644 index 000000000..b4d62146c --- /dev/null +++ b/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/AbstractRocketMQListener.java @@ -0,0 +1,28 @@ +package moe.ahao.spring.boot.rocketmq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.core.RocketMQListener; + +import java.nio.charset.StandardCharsets; + +/** + * 抽象的消费者MessageListener组件 + * 实现RocktMQ原生的RocketMQListener + */ +@Slf4j +public abstract class AbstractRocketMQListener implements RocketMQListener { + + @Override + public void onMessage(MessageExt message) { + try { + log.info("接收到MQ消息开始, message:{}", message); + + this.onMessage(new String(message.getBody(), StandardCharsets.UTF_8)); + } catch (Exception e) { + log.error("接收到MQ消息, 消费MQ消息异常, message:{}", message, e); + } + } + + public abstract void onMessage(String message); +} diff --git a/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/Constant.java b/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/Constant.java new file mode 100644 index 000000000..89bbc528c --- /dev/null +++ b/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/Constant.java @@ -0,0 +1,14 @@ +package moe.ahao.spring.boot.rocketmq; + +public class Constant { + public static final String DEFAULT_TOPIC = "ahao-topic"; + public static final String DEFAULT_NAMESPACE = "ahao-namespace"; + + public static final String CONSUMER_GROUP_TAG1 = "ahao-consumer-group-tag1"; + public static final String CONSUMER_GROUP_TAG2 = "ahao-consumer-group-tag2"; + + public static final String TAG1 = "TAG1"; + public static final String TAG2 = "TAG2"; + + +} diff --git a/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/Event1MQListener.java b/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/Event1MQListener.java new file mode 100644 index 000000000..b9a05fae4 --- /dev/null +++ b/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/Event1MQListener.java @@ -0,0 +1,35 @@ +package moe.ahao.spring.boot.rocketmq; + +import lombok.extern.slf4j.Slf4j; +import moe.ahao.exception.BizException; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RocketMQMessageListener( + namespace = "${rocketmq.consumer.namespace}", + topic = Constant.DEFAULT_TOPIC, + consumerGroup = Constant.CONSUMER_GROUP_TAG1, + selectorExpression = Constant.TAG1, + consumeMode = ConsumeMode.CONCURRENTLY, + messageModel = MessageModel.CLUSTERING, + consumeThreadNumber = 1 +) +public class Event1MQListener extends AbstractRocketMQListener { + @Override + public void onMessage(String message) { + log.info("事件监听器Event1MQListener, 接收到message:{}", message); + try { + + } catch (BizException e) { + log.error("业务失败", e); + // throw e; + } catch (Exception e) { + log.error("异常失败", e); + throw e; // nack + } + } +} diff --git a/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/Event2MQListener.java b/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/Event2MQListener.java new file mode 100644 index 000000000..013e4658b --- /dev/null +++ b/ahao-spring-boot-rocketmq/src/main/java/moe/ahao/spring/boot/rocketmq/Event2MQListener.java @@ -0,0 +1,35 @@ +package moe.ahao.spring.boot.rocketmq; + +import lombok.extern.slf4j.Slf4j; +import moe.ahao.exception.BizException; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RocketMQMessageListener( + namespace = "${rocketmq.consumer.namespace}", + topic = Constant.DEFAULT_TOPIC, + consumerGroup = Constant.CONSUMER_GROUP_TAG2, + selectorExpression = Constant.TAG2, + consumeMode = ConsumeMode.CONCURRENTLY, + messageModel = MessageModel.CLUSTERING, + consumeThreadNumber = 1 +) +public class Event2MQListener extends AbstractRocketMQListener { + @Override + public void onMessage(String message) { + log.info("事件监听器Event2MQListener, 接收到message:{}", message); + try { + + } catch (BizException e) { + log.error("业务失败", e); + // throw e; + } catch (Exception e) { + log.error("异常失败", e); + throw e; // nack + } + } +} diff --git a/ahao-spring-boot-rocketmq/src/main/resources/application-rocketmq.yml b/ahao-spring-boot-rocketmq/src/main/resources/application-rocketmq.yml new file mode 100644 index 000000000..c1f85d809 --- /dev/null +++ b/ahao-spring-boot-rocketmq/src/main/resources/application-rocketmq.yml @@ -0,0 +1,9 @@ +spring: + application: + name: ahao-rocketmq + + +rocketmq.name-server: 192.168.19.131:9876 +rocketmq.producer.namespace: ahao-namespace +rocketmq.producer.group: ahao-producer-group +rocketmq.consumer.namespace: ahao-namespace diff --git a/ahao-spring-boot-rocketmq/src/main/resources/application.yml b/ahao-spring-boot-rocketmq/src/main/resources/application.yml new file mode 100644 index 000000000..ca2443b66 --- /dev/null +++ b/ahao-spring-boot-rocketmq/src/main/resources/application.yml @@ -0,0 +1,5 @@ +spring: + application: + name: ahao-rocketmq + profiles: + active: rocketmq diff --git a/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/MyRocketMQListener.java b/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/MyRocketMQListener.java index 78064f30d..ecbd4b20e 100644 --- a/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/MyRocketMQListener.java +++ b/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/MyRocketMQListener.java @@ -7,7 +7,7 @@ import java.util.List; -@RocketMQMessageListener(consumerGroup = "consumer-group", topic = RocketMQTemplateTest.TOPIC) +@RocketMQMessageListener(consumerGroup = "MyRocketMQListenerConsumerGroup", topic = RocketMQTemplateTest.TOPIC) public class MyRocketMQListener implements RocketMQListener { public List receiveList = new ArrayList<>(); diff --git a/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/MyRocketMQTransactionListener.java b/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/MyRocketMQTransactionListener.java index de48dd6c7..6432fde15 100644 --- a/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/MyRocketMQTransactionListener.java +++ b/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/MyRocketMQTransactionListener.java @@ -10,7 +10,7 @@ import java.util.ArrayList; import java.util.List; -@RocketMQTransactionListener(txProducerGroup = RocketMQTemplateTest.GROUP_TX) +@RocketMQTransactionListener public class MyRocketMQTransactionListener implements RocketMQLocalTransactionListener { public List receiveList = new ArrayList<>(); diff --git a/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/RocketMQNativeTest.java b/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/RocketMQNativeTest.java index f3424449c..b6150f560 100644 --- a/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/RocketMQNativeTest.java +++ b/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/RocketMQNativeTest.java @@ -19,18 +19,20 @@ import java.util.concurrent.TimeUnit; class RocketMQNativeTest { - public static final String NS = "192.168.19.128:9876"; - public static final String TOPIC = "AhaoTopic1"; - public static final String PRODUCT_GROUP = "ahao-product-group"; + public static final String NS = "192.168.19.131:9876"; + public static final String NAMESPACE = Constant.DEFAULT_NAMESPACE; + public static final String TOPIC = Constant.DEFAULT_TOPIC; + public static final String PRODUCER_GROUP = "ahao-producer-group"; + public static final String CONSUMER_GROUP = "ahao-consumer-group"; @Test void simpleProducer() throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP); + DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP); producer.setNamesrvAddr(NS); producer.start(); for (int i = 0; i < 10; i++) { - Message msg = new Message(TOPIC, "simpleProducer", ("simpleProducer" + i).getBytes(StandardCharsets.UTF_8)); + Message msg = new Message(TOPIC, Constant.TAG1, ("simpleProducer" + i).getBytes(StandardCharsets.UTF_8)); SendResult sendResult = producer.send(msg); System.out.println("第" + i + "条消息发送结果:" + sendResult); } @@ -39,7 +41,7 @@ void simpleProducer() throws Exception { @Test void asyncProducer() throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP); + DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP); producer.setNamesrvAddr(NS); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); @@ -71,7 +73,7 @@ public void onException(Throwable e) { @Test void onewayProducer() throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP); + DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP); producer.setNamesrvAddr(NS); producer.start(); @@ -84,7 +86,7 @@ void onewayProducer() throws Exception { @Test void orderlyProducer() throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP); + DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP); producer.setNamesrvAddr(NS); producer.start(); @@ -106,7 +108,7 @@ public MessageQueue select(List messageQueues, Message msg, Object @Test void delayProducer() throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP); + DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP); producer.setNamesrvAddr(NS); producer.start(); @@ -124,7 +126,7 @@ void delayProducer() throws Exception { @Test void batchProducer() throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP); + DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP); producer.setNamesrvAddr(NS); producer.start(); @@ -142,7 +144,7 @@ void batchProducer() throws Exception { void transactionProducer() throws Exception { Map flagMap = new HashMap<>(); - TransactionMQProducer producer = new TransactionMQProducer(PRODUCT_GROUP); + TransactionMQProducer producer = new TransactionMQProducer(NAMESPACE, PRODUCER_GROUP); producer.setNamesrvAddr(NS); producer.setExecutorService(Executors.newFixedThreadPool(10)); producer.setTransactionListener(new TransactionListener() { @@ -189,7 +191,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { @Test void pullConsumer() throws Exception { - DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ahao-consumer-group"); + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(NAMESPACE, CONSUMER_GROUP); consumer.setNamesrvAddr(NS); consumer.start(); @@ -208,7 +210,7 @@ void pullConsumer() throws Exception { @Test public void pushConsumer() throws Exception { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ahao-consumer-group"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(NAMESPACE, CONSUMER_GROUP); consumer.setNamesrvAddr(NS); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(TOPIC, "*"); @@ -235,7 +237,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List messageExtList, @Test public void orderlyConsumer() throws Exception { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ahao-consumer-group"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(NAMESPACE, CONSUMER_GROUP); consumer.setNamesrvAddr(NS); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(TOPIC, "*"); diff --git a/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/RocketMQTemplateTest.java b/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/RocketMQTemplateTest.java index aaca479c1..f404b275d 100644 --- a/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/RocketMQTemplateTest.java +++ b/ahao-spring-boot-rocketmq/src/test/java/moe/ahao/spring/boot/rocketmq/RocketMQTemplateTest.java @@ -21,8 +21,7 @@ MyRocketMQListener.class, MyRocketMQTransactionListener.class}) @ActiveProfiles("rocketmq") public class RocketMQTemplateTest { - public static final String GROUP_TX = "ahao-transaction-group"; - public static final String TOPIC = "ahao-topic"; + public static final String TOPIC = Constant.DEFAULT_TOPIC; @Autowired private RocketMQTemplate rocketMQTemplate; @@ -49,7 +48,7 @@ public void transactionMessage() throws Exception { Message message = MessageBuilder.withPayload("payload-"+i) .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()) .build(); - this.rocketMQTemplate.sendMessageInTransaction(GROUP_TX, TOPIC, message, null); + this.rocketMQTemplate.sendMessageInTransaction(TOPIC, message, null); } Thread.sleep(5000); diff --git a/ahao-spring-boot-rocketmq/src/test/resources/application-rocketmq.yml b/ahao-spring-boot-rocketmq/src/test/resources/application-rocketmq.yml deleted file mode 100644 index 71078f731..000000000 --- a/ahao-spring-boot-rocketmq/src/test/resources/application-rocketmq.yml +++ /dev/null @@ -1,8 +0,0 @@ -spring: - application: - name: ahao-rocketmq - -rocketmq: - name-server: 192.168.154.128:9876 - producer: - group: ahao-producer-group