Skip to content

Commit deca841

Browse files
dbuosDaniel Bustamante Ospina
authored and
Daniel Bustamante Ospina
committed
Remove UnicastProcessor from reply router when timeout triggers
1 parent 8dbf4c7 commit deca841

File tree

5 files changed

+79
-6
lines changed

5 files changed

+79
-6
lines changed

async/async-commons/src/main/java/org/reactivecommons/async/impl/RabbitDirectAsyncGateway.java

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.HashMap;
1818
import java.util.Map;
1919
import java.util.UUID;
20+
import java.util.concurrent.TimeoutException;
2021

2122
import static java.lang.Boolean.TRUE;
2223
import static org.reactivecommons.async.impl.Headers.*;
@@ -62,6 +63,7 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
6263

6364
final Mono<R> replyHolder = router.register(correlationID)
6465
.timeout(replyTimeout)
66+
.doOnError(TimeoutException.class, e -> router.deregister(correlationID))
6567
.flatMap(s -> fromCallable(() -> converter.readValue(s, type)));
6668

6769
Map<String, Object> headers = new HashMap<>();

async/async-commons/src/main/java/org/reactivecommons/async/impl/reply/ReactiveReplyRouter.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,8 @@ public void routeReply(String correlationID, Message data) {
2424
}
2525
}
2626

27-
public <E> void routeError(String correlationID, String data) {
28-
final UnicastProcessor<Message> processor = processors.remove(correlationID);
29-
if (processor != null) {
30-
processor.onError(new RuntimeException(data));
31-
}
27+
public void deregister(String correlationID){
28+
processors.remove(correlationID);
3229
}
3330

3431
public void routeEmpty(String correlationID) {

async/async-commons/src/test/java/org/reactivecommons/async/impl/RabbitDirectAsyncGatewayTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
import reactor.test.StepVerifier;
2929
import reactor.util.concurrent.Queues;
3030

31+
import java.time.Duration;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.UUID;
3435
import java.util.concurrent.Semaphore;
3536
import java.util.concurrent.ThreadLocalRandom;
37+
import java.util.concurrent.TimeoutException;
3638
import java.util.stream.Collectors;
3739
import java.util.stream.IntStream;
3840

@@ -58,6 +60,25 @@ public void init(ReactiveMessageSender sender) {
5860
asyncGateway = new RabbitDirectAsyncGateway(config, router, sender, "exchange", converter);
5961
}
6062

63+
@Test
64+
public void shouldReleaseRouterResourcesOnTimeout(){
65+
BrokerConfig config = new BrokerConfig(false, false, false, Duration.ofSeconds(1));
66+
asyncGateway = new RabbitDirectAsyncGateway(config, router, senderMock, "ex", converter);
67+
when(router.register(anyString())).thenReturn(Mono.never());
68+
when(senderMock.sendNoConfirm(any(), anyString(), anyString(), anyMap(), anyBoolean()))
69+
.thenReturn(Mono.empty());
70+
71+
AsyncQuery<String> query = new AsyncQuery<>("some.query", "data");
72+
asyncGateway.requestReply(query, "some.target", String.class)
73+
.as(StepVerifier::create)
74+
.expectError(TimeoutException.class)
75+
.verify();
76+
77+
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
78+
verify(router).register(captor.capture());
79+
verify(router).deregister(captor.getValue());
80+
}
81+
6182
@Test
6283
public void shouldSendInOptimalTime() throws InterruptedException {
6384
init(getReactiveMessageSender());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package org.reactivecommons.async.impl.reply;
2+
3+
import org.junit.Test;
4+
import org.mockito.Mockito;
5+
import org.reactivecommons.async.impl.communications.Message;
6+
import reactor.core.publisher.Mono;
7+
import reactor.test.StepVerifier;
8+
9+
import java.time.Duration;
10+
import java.util.UUID;
11+
12+
public class ReactiveReplyRouterTest {
13+
14+
private ReactiveReplyRouter replyRouter = new ReactiveReplyRouter();
15+
16+
@Test
17+
public void shouldRouteReply(){
18+
final String uuid = UUID.randomUUID().toString();
19+
final Mono<Message> registered = replyRouter.register(uuid);
20+
21+
Message message = Mockito.mock(Message.class);
22+
replyRouter.routeReply(uuid, message);
23+
24+
StepVerifier.create(registered)
25+
.expectNext(message)
26+
.verifyComplete();
27+
28+
}
29+
30+
@Test
31+
public void shouldRouteEmptyResponse(){
32+
final String uuid = UUID.randomUUID().toString();
33+
final Mono<Message> registered = replyRouter.register(uuid);
34+
35+
replyRouter.routeEmpty(uuid);
36+
37+
StepVerifier.create(registered)
38+
.verifyComplete();
39+
}
40+
41+
@Test
42+
public void shouldDeRegisterProcessor(){
43+
final String uuid = UUID.randomUUID().toString();
44+
final Mono<Message> registered = replyRouter.register(uuid);
45+
46+
replyRouter.deregister(uuid);
47+
replyRouter.routeEmpty(uuid);
48+
49+
StepVerifier.create(registered.timeout(Duration.ofSeconds(1)))
50+
.expectTimeout(Duration.ofSeconds(3)).verify();
51+
}
52+
53+
}

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=1.0.0-beta2
1+
version=1.0.0-beta3
22
springBootVersion=2.2.9.RELEASE
33
gradleVersionsVersion=0.28.0
44
reactorRabbitVersion=1.5.0

0 commit comments

Comments
 (0)