Skip to content

Commit

Permalink
ARTEMIS-5330 use proper address name on retroactive messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Feb 26, 2025
1 parent 5b12277 commit 295f64b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2544,9 +2544,12 @@ public synchronized int rerouteMessages(final SimpleString queueName, final Filt
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
RoutingContext routingContext = new RoutingContextImpl(tx);
routingContext.setAddress(server.locateQueue(queueName).getAddress());
server.getPostOffice().getBinding(queueName).route(ref.getMessage(), routingContext);
postOffice.processRoute(ref.getMessage(), routingContext, false);
SimpleString address = server.locateQueue(queueName).getAddress();
routingContext.setAddress(address);
Message m = ref.getMessage();
m.setAddress(address);
server.getPostOffice().getBinding(queueName).route(m, routingContext);
postOffice.processRoute(m, routingContext, false);
return false;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,31 @@ public void messageArrived(String topic, MqttMessage message) {
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}

@Test
@Timeout(DEFAULT_TIMEOUT_SEC)
public void testSimpleRetroSendReceive() throws Exception {
final String topic = RandomUtil.randomUUIDString();
server.getAddressSettingsRepository().addMatch(topic, new AddressSettings().setRetroactiveMessageCount(1));

MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1, false);

CountDownLatch latch = new CountDownLatch(1);
MqttClient subscriber = createPahoClient("subscriber");
subscriber.connect();
subscriber.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String t, MqttMessage message) {
logger.info("Message received from topic {}, message={}", t, message);
assertEquals(topic.toString(), t);
latch.countDown();
}
});
subscriber.subscribe(topic, AT_LEAST_ONCE);
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}

@Test
@Timeout(DEFAULT_TIMEOUT_SEC)
public void testTopicNameEscape() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void testRetroactiveAddress() throws Exception {
ClientMessage message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
assertEquals(addressName.toString(), message.getAddress());
assertEquals((i * COUNT) + j, (int) message.getIntProperty("xxx"));
}
consumer.close();
Expand Down

0 comments on commit 295f64b

Please sign in to comment.