28
28
import reactor .test .StepVerifier ;
29
29
import reactor .util .concurrent .Queues ;
30
30
31
+ import java .time .Duration ;
31
32
import java .util .List ;
32
33
import java .util .Map ;
33
34
import java .util .UUID ;
34
35
import java .util .concurrent .Semaphore ;
35
36
import java .util .concurrent .ThreadLocalRandom ;
37
+ import java .util .concurrent .TimeoutException ;
36
38
import java .util .stream .Collectors ;
37
39
import java .util .stream .IntStream ;
38
40
@@ -58,6 +60,25 @@ public void init(ReactiveMessageSender sender) {
58
60
asyncGateway = new RabbitDirectAsyncGateway (config , router , sender , "exchange" , converter );
59
61
}
60
62
63
+ @ Test
64
+ public void shouldReleaseRouterResourcesOnTimeout (){
65
+ BrokerConfig config = new BrokerConfig (false , false , false , Duration .ofSeconds (1 ));
66
+ asyncGateway = new RabbitDirectAsyncGateway (config , router , senderMock , "ex" , converter );
67
+ when (router .register (anyString ())).thenReturn (Mono .never ());
68
+ when (senderMock .sendNoConfirm (any (), anyString (), anyString (), anyMap (), anyBoolean ()))
69
+ .thenReturn (Mono .empty ());
70
+
71
+ AsyncQuery <String > query = new AsyncQuery <>("some.query" , "data" );
72
+ asyncGateway .requestReply (query , "some.target" , String .class )
73
+ .as (StepVerifier ::create )
74
+ .expectError (TimeoutException .class )
75
+ .verify ();
76
+
77
+ ArgumentCaptor <String > captor = ArgumentCaptor .forClass (String .class );
78
+ verify (router ).register (captor .capture ());
79
+ verify (router ).deregister (captor .getValue ());
80
+ }
81
+
61
82
@ Test
62
83
public void shouldSendInOptimalTime () throws InterruptedException {
63
84
init (getReactiveMessageSender ());
0 commit comments