Skip to content

Commit

Permalink
fix: use one group in trans topic (#20)
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye authored Dec 7, 2023
1 parent 70410bd commit edac3c0
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
18 changes: 18 additions & 0 deletions java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ protected static String getTopic(String messageType, String methodName) {
return null;
}

protected static String getTransTopic(String messageType, String methodName) {
String topic = String.format("topic_%s_%s_%s", messageType, methodName, RandomUtils.getStringWithCharacter(6));
log.info("[Topic] topic:{}, messageType:{}, methodName:{}", topic, messageType, methodName);
try {
CreateTopicRequest request = CreateTopicRequest.newBuilder()
.setTopic(topic)
.setCount(1)
.setAcceptTypes(convertAcceptTypes(messageType))
.build();
Long topicId = client.createTopic(endPoint, request).join();
log.info("create topic: {} , topicId:{}", topic, topicId);
return topic;
} catch (Exception e) {
log.error("create topic error", e);
}
return null;
}

private static AcceptTypes convertAcceptTypes(String typeStr) {
switch (typeStr) {
case "NORMAL":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void tearDown() {
public void testTrans_SendCommit_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -105,7 +105,7 @@ public void testTrans_SendCommit_PushConsume() {
@DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer")
public void testTrans_SendRollback_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -127,7 +127,7 @@ public void testTrans_SendRollback_PushConsume() {
public void testTrans_SendCheckerCommit_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -148,7 +148,7 @@ public void testTrans_SendCheckerCommit_PushConsume() {
@DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer")
public void testTrans_CheckerRollback() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -170,7 +170,7 @@ public void testTrans_CheckerRollback() {
public void testTrans_SendCheckerPartionCommit() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String topic = getTransTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
Expand All @@ -195,7 +195,7 @@ public TransactionResolution check(MessageView messageView) {
Message message = MessageFactory.buildMessage(topic, tag, String.valueOf(i));
producer.sendTrans(message, null);
}
await().atMost(90, SECONDS).until(new Callable<Boolean>() {
await().atMost(120, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() {
return rollbackMsgNum.get() == commitMsgNum.get() && commitMsgNum.get() == SEND_NUM / 2;
Expand Down

0 comments on commit edac3c0

Please sign in to comment.