Skip to content

Commit

Permalink
feat: support trans message e2e test (#19)
Browse files Browse the repository at this point in the history
* feat: support trans message e2e test

Signed-off-by: wangxye <[email protected]>

* feat: support trans message in v4 remoting

Signed-off-by: wangxye <[email protected]>

---------

Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye authored Dec 6, 2023
1 parent 809375b commit 5498379
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.rocketmq.utils.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.stringtemplate.v4.ST;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -73,4 +74,25 @@ public static RMQTransactionProducer getTransProducer(String ns, ExecutorService
return new RMQTransactionProducer(producer);
}

public static RMQTransactionProducer getTransProducer(String ns, String groupName, ExecutorService executorService, TransactionListener transactionListener, RPCHook rpcHook) {
TransactionMQProducer producer;
if (aclEnable) {
producer = new TransactionMQProducer(groupName, rpcHook);
} else {
producer = new TransactionMQProducer(groupName);
}
producer.setInstanceName(UUID.randomUUID().toString());
producer.setNamesrvAddr(ns);
try {
if (executorService != null) {
producer.setExecutorService(executorService);
}
producer.setTransactionListener(transactionListener);
producer.start();
} catch (MQClientException e) {
logger.info("Start TransactionMQProducer failed, {}", e.getMessage());
}
return new RMQTransactionProducer(producer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ protected static String getGroupId(String methodName) {
return getGroupId(methodName, SubscriptionMode.SUB_MODE_PULL);
}

protected static String getTransGroupId(String groupName, SubscriptionMode mode) {
CreateGroupRequest request = CreateGroupRequest.newBuilder()
.setName(groupName)
.setMaxDeliveryAttempt(16)
.setGroupType(GroupType.GROUP_TYPE_STANDARD)
.setSubMode(mode)
.build();
Long reply = createConsumerGroup(request).join();
logger.info("[ConsumerGroupId] groupId:{} , mode: {} , reply:{}", groupName, mode, reply);
return groupName;
}

protected static String getGroupId(String methodName, SubscriptionMode mode) {
String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6));
// prepare consumer group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt message) {
// }
// }
// 本地事务已成功则提交消息
System.out.println("checkLocalTransaction: commit");
return checker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public void testDelayMsgSize4M() {
}
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed")
public void testTransMsgSize4MAdd1() {
Expand Down Expand Up @@ -212,7 +211,6 @@ public Thread newThread(Runnable r) {
producer.shutdown();
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success")
public void testTransMsgSize4M() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.rocketmq.server.transaction;

import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
Expand All @@ -30,7 +32,6 @@
import org.apache.rocketmq.frame.BaseOperate;
import org.apache.rocketmq.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.listener.rmq.concurrent.TransactionListenerImpl;
import org.apache.rocketmq.utils.MQAdmin;
import org.apache.rocketmq.utils.NameUtils;
import org.apache.rocketmq.utils.TestUtils;
import org.apache.rocketmq.utils.VerifyUtils;
Expand All @@ -56,7 +57,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

@Disabled
@Tag(TESTSET.TRANSACTION)
@Tag(TESTSET.SMOKE)
public class TransactionMessageTest extends BaseOperate {
Expand All @@ -68,16 +68,15 @@ public class TransactionMessageTest extends BaseOperate {

@BeforeEach
public void setUp() {
topic = NameUtils.getTopicName();
tag = NameUtils.getTagName();
groupId = NameUtils.getGroupName();
MQAdmin.createTopic(namesrvAddr, cluster, topic, 8);
logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId);
}

@Test
@DisplayName("Send 10 transaction messages synchronously, expecting all to be consumed")
public void testConsumeNormalMessage() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);
RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
consumer.subscribeAndStart(topic, tag, new RMQNormalListener());

Expand All @@ -91,7 +90,7 @@ public Thread newThread(Runnable r) {
}
});

RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, executorService,
RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, topic, executorService,
new TransactionListenerImpl(LocalTransactionState.COMMIT_MESSAGE, LocalTransactionState.COMMIT_MESSAGE),
rpcHook);
producer.sendTrans(topic, tag, SEND_NUM);
Expand All @@ -105,7 +104,7 @@ public Thread newThread(Runnable r) {
@DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer")
public void testTrans_SendRollback_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand All @@ -130,7 +129,8 @@ public Thread newThread(Runnable r) {
// message
TestUtils.waitForSeconds(60);
Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed");
Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize());
Assertions.assertEquals(SEND_NUM, pushConsumer.getListener().getDequeueMessages().getDataSize());
Assertions.assertEquals(0, pushConsumer.getListener().getEnqueueMessages().getDataSize());
producer.shutdown();
pushConsumer.shutdown();
}
Expand All @@ -140,7 +140,7 @@ public Thread newThread(Runnable r) {
public void testTrans_SendCheckerCommit_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand All @@ -156,7 +156,7 @@ public Thread newThread(Runnable r) {
}
});

RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, executorService,
RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, topic, executorService,
new TransactionListenerImpl(LocalTransactionState.COMMIT_MESSAGE, LocalTransactionState.UNKNOW),
rpcHook);
producer.sendTrans(topic, tag, SEND_NUM);
Expand All @@ -173,7 +173,7 @@ public Thread newThread(Runnable r) {
@DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer")
public void testTrans_CheckerRollback() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand Down Expand Up @@ -206,8 +206,8 @@ public Thread newThread(Runnable r) {
public void testTrans_SendCheckerPartionCommit() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(methodName);
String groupId = getGroupId(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
pushConsumer.subscribeAndStart(topic, MessageSelector.byTag(tag), new RMQNormalListener());
Expand All @@ -225,7 +225,7 @@ public Thread newThread(Runnable r) {
}
});

RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, executorService,
RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, topic, executorService,
new TransactionListener() {

@Override
Expand Down Expand Up @@ -253,6 +253,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
await().atMost(90, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() {
System.out.printf("rollbackMsg: %d, commitMsg: %d \n", rollbackMsgNum.get(), commitMsgNum.get());
return rollbackMsgNum.get() == commitMsgNum.get() && commitMsgNum.get() == SEND_NUM / 2;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ public class NormalMessageSizeTest extends BaseOperate {
public static void setUpAll() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
normalTopic = getTopic(TopicMessageType.NORMAL.getValue(), methodName);
// transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
delayTopic = getTopic(TopicMessageType.DELAY.getValue(), methodName);
fifoTopic = getTopic(TopicMessageType.FIFO.getValue(), methodName);
try {
producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> TransactionResolution.COMMIT)
.setClientConfiguration(ClientConfigurationFactory.build(account))
.setTopics(normalTopic, delayTopic, fifoTopic)
.setTopics(normalTopic, delayTopic, transTopic, fifoTopic)
.build();
} catch (ClientException e) {
Assertions.fail("create producer failed");
Expand Down Expand Up @@ -133,7 +133,6 @@ public void testDelayMsgSize4M() {
}
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed")
public void testTransMsgSize4MAdd1() {
Expand All @@ -147,7 +146,6 @@ public void testTransMsgSize4MAdd1() {
});
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success")
public void testTransMsgSize4M() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public void tearDown() {
}
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and synchronously commit the transaction (Checker performs rollback), expecting those 10 messages to be consumed via PushConsumer")
public void testTrans_SendCommit_PushConsume() {
Expand All @@ -91,8 +90,6 @@ public void testTrans_SendCommit_PushConsume() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK));
Assertions.assertNotNull(producer);
Expand All @@ -104,7 +101,6 @@ public void testTrans_SendCommit_PushConsume() {
VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages());
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer")
public void testTrans_SendRollback_PushConsume() {
Expand All @@ -113,8 +109,6 @@ public void testTrans_SendRollback_PushConsume() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT));
Assertions.assertNotNull(producer);
Expand All @@ -128,7 +122,6 @@ public void testTrans_SendRollback_PushConsume() {
Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize());
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and COMMIT the transaction by Checker (perform COMMIT), expecting the 10 messages to be consumed by PushConsumer")
public void testTrans_SendCheckerCommit_PushConsume() {
Expand All @@ -138,8 +131,6 @@ public void testTrans_SendCheckerCommit_PushConsume() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT));
Assertions.assertNotNull(producer);
Expand All @@ -153,7 +144,6 @@ public void testTrans_SendCheckerCommit_PushConsume() {
VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages());
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer")
public void testTrans_CheckerRollback() {
Expand All @@ -162,8 +152,6 @@ public void testTrans_CheckerRollback() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK));
Assertions.assertNotNull(producer);
Expand All @@ -177,7 +165,6 @@ public void testTrans_CheckerRollback() {
Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize());
}

@Disabled
@Test
@DisplayName("Send 10 transactional messages and commit them by checking back (Checker commits for partial messages), and the expected committed messages can be consumed by PushConsumer")
public void testTrans_SendCheckerPartionCommit() {
Expand All @@ -187,8 +174,6 @@ public void testTrans_SendCheckerPartionCommit() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

AtomicInteger commitMsgNum = new AtomicInteger(0);
AtomicInteger rollbackMsgNum = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void testDelay_simple_receive_ack() {
VerifyUtils.waitDelayReceiveThenAck(producer, consumer, 1, 10000);
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages synchronously and expect SimpleConsumer to receive() and ack() messages properly")
public void testTrans_simple_receive_ackAsync() {
Expand All @@ -93,7 +92,6 @@ public void testTrans_simple_receive_ackAsync() {
String groupId = getGroupId(methodName);

SimpleConsumer consumer = ConsumerFactory.getSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(consumer);
RMQNormalProducer producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT));
Assertions.assertNotNull(producer, "Get Producer failed");
for (int i = 0; i < SEND_NUM; i++) {
Expand Down

0 comments on commit 5498379

Please sign in to comment.