@@ -12,316 +12,3 @@ Sponsor by: https://medium.com/bancolombia-tech
12
12
Even though the main purpose is to provide such abstractions in a mostly generic way such abstractions would be of little use without a concrete implementation so we provide some implementations in a best effors maner that aim to be easy to change, personalize and extend.
13
13
14
14
The first approach to this work was to release a very simple abstractions and a corresponding implementation over asyncronous message driven communication between microservices build on top of project-reactor and spring boot.
15
-
16
- ## Get Started
17
- To include all (API and implementation) (Spring boot Starter):
18
- ``` groovy
19
-
20
- dependencies {
21
- compile 'org.reactivecommons:async-commons-rabbit-starter:<version-here>'
22
- }
23
-
24
- //IMPORTANT! if you use the version 0.6.x
25
- repositories {
26
- ...
27
- maven { url "https://repo.spring.io/milestone" }
28
- }
29
- configurations.all {
30
- resolutionStrategy.eachDependency {DependencyResolveDetails details ->
31
- if (details.requested.group == 'io.projectreactor.rabbitmq'){
32
- details.useVersion('1.5.0')
33
- details.because('Upgrade')
34
- }
35
- }
36
- }
37
-
38
- ```
39
-
40
- In application.properties
41
- ```
42
- spring.application.name=MyAppName
43
- ```
44
-
45
- Or yaml
46
-
47
- ```
48
- spring:
49
- application:
50
- name: myAppName
51
- ```
52
-
53
- To include only domain events API:
54
-
55
- ``` groovy
56
- dependencies {
57
- compile 'org.reactivecommons:domain-events-api:<version-here>'
58
- }
59
- ```
60
-
61
- To include only async commons API:
62
-
63
- ``` groovy
64
- dependencies {
65
- compile 'org.reactivecommons:async-commons-api:<version-here>'
66
- }
67
- ```
68
-
69
- ## Main abstractions
70
-
71
- ### Domain events API (Broadcast of events)
72
-
73
- ``` java
74
- package org.reactivecommons.api.domain ;
75
- import org.reactivestreams.Publisher ;
76
-
77
- public interface DomainEventBus {
78
- <T > Publisher<Void > emit (DomainEvent<T > event );
79
- }
80
- ```
81
-
82
- The above interface is the main interface for Broadcast of domain events, the DomainEvent class has the following structure:
83
-
84
- ``` java
85
- package org.reactivecommons.api.domain ;
86
-
87
- public class DomainEvent <T> {
88
- private final String name;
89
- private final String eventId;
90
- private final T data;
91
-
92
- public DomainEvent (String name , String eventId , T data ) {
93
- this . name = name;
94
- this . eventId = eventId;
95
- this . data = data;
96
- }
97
-
98
- // ... getters, equals, hascode, toString impl..
99
-
100
- }
101
- ```
102
-
103
- Usage example:
104
-
105
- ``` java
106
- public class ManageTasksUseCase {
107
-
108
- private TaskToDoRepository tasks;
109
- private DomainEventBus eventBus;
110
-
111
- public Mono<TaskToDo > createNew (String name , String description ) {
112
- return uuid()
113
- .flatMap(id - > TaskToDoFactory . createTask(id, name, description))
114
- .flatMap(tasks:: save)
115
- .flatMap(task - > emitCreatedEvent(task). thenReturn(task));
116
- }
117
-
118
- private Mono<Void > emitCreatedEvent (TaskToDo task ) {
119
- return Mono . from(eventBus. emit(new DomainEvent<> (" task.created" , task. getId(), task)));
120
- }
121
- // ...
122
- }
123
- ```
124
-
125
- Then enable this feature in a Configuration class and inject implementation:
126
-
127
- ``` java
128
- import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus ;
129
- import org.springframework.boot.SpringApplication ;
130
- import org.springframework.boot.autoconfigure.SpringBootApplication ;
131
-
132
- @SpringBootApplication
133
- @EnableDomainEventBus
134
- public class MainApplication {
135
- public static void main (String [] args ) {
136
- SpringApplication . run(MainApplication . class, args);
137
- }
138
-
139
- @Bean
140
- public ManageTasksUseCase manageTasksUseCase (TaskToDoRepository tasks , DomainEventBus eventBus ) {
141
- return new ManageTasksUseCase (tasks, eventBus);
142
- }
143
-
144
- }
145
-
146
-
147
- ```
148
-
149
- Don't forget to add the starter bundle to the main spring boot module (application):
150
-
151
- ``` groovy
152
- dependencies {
153
- compile 'org.reactivecommons:async-commons-rabbit-starter:<version-here>'
154
- }
155
- ```
156
-
157
-
158
- Or add the implementation dependency if for any reason you don't want to use the starter:
159
-
160
- ``` groovy
161
- dependencies {
162
- compile 'org.reactivecommons:async-commons:<version-here>'
163
- }
164
- ```
165
-
166
-
167
- ### Domain Event-Listener
168
- Reactive commons has four types of listeners implemented, available to be registered in the application via the ** HandlerRegistry** , each of them is designed to tackle
169
- common requirements for listeners in event based applications and abstracts the behavior of event flow in every situation (Varying for example in retrying strategy, dead letter events, sources and so on).
170
-
171
- The available event listeners are:
172
- - Domain Event Listener
173
- - Query Event Listener
174
- - Command Listener
175
- - Notification Listener
176
-
177
- Example Code:
178
- ``` java
179
- import org.springframework.context.annotation.Bean ;
180
- import org.springframework.context.annotation.Configuration ;
181
- import org.springframework.beans.factory.annotation.Autowired ;
182
-
183
- @Configuration
184
- public class SomeConfigurationClass {
185
-
186
- @Autowired
187
- private ManageTasksUseCase someBusinessDependency;
188
-
189
- @Bean
190
- public HandlerRegistry notificationEvents () {
191
- return HandlerRegistry . register()
192
- .listenNotificationEvent(" some.event.name" , event - > someBusinessDependency. someFunctionReturningMonoVoid(event), SomeClass . class)
193
- .listenEvent(" some.event.name2" , event - > someBusinessDependency. functionReturningMonoVoid(event), Some . class)
194
- .serveQuery(" query.name" , query - > someBusinessDependency. findSomething(query), SomeQuery . class)
195
- .handleCommand(" command.name" , cmd - > someBusinessDependency. handleCommand(cmd), CmdClass . class);
196
- }
197
- }
198
- ```
199
-
200
- The first line below "HandlerRegistry.register()" shows how to handle a notification event (Notification event: an event that should be handled by
201
- every running instance of a microservice, e.g: notify to every instance that a configuration setting has changed
202
- and has to do a hot reload from a persistent source of that data).
203
-
204
- The line ".listenEvent.." shows how to handle a standard event, and event that should be handled only once by some running instance of
205
- the microservice.
206
-
207
- The line ".serveQuery..." shows how to handle a standard request/reply or rpc messages flow.
208
-
209
- The line ".handleCommand..." shows how to handle a standard directed command, a message with a delivery guarantee.
210
-
211
- ### Request-Reply
212
- Example Code:
213
-
214
- ``` java
215
- public Mono<AggregateResponse > queryInformation() {
216
- AsyncQuery<String > userQuery = new AsyncQuery<> (" user.byId" , " 42" );
217
- AsyncQuery<String > projectQuery = new AsyncQuery<> (" project.byId" , " 343" );
218
- AsyncQuery<String > productQuery = new AsyncQuery<> (" product.byId" , " 22" );
219
-
220
- Mono<User > user = gateway. requestReply(userQuery, " Users" , User . class);
221
- Mono<Project > project = gateway. requestReply(projectQuery, " Projects" , Project . class);
222
- Mono<Product > product = gateway. requestReply(productQuery, " Products" , Product . class);
223
-
224
- return zip(user, project, product). map(function(this :: aggregate));
225
- }
226
- ```
227
-
228
- ### Direct Commands
229
-
230
- ``` java
231
- package org.reactivecommons.async.api ;
232
-
233
- import org.reactivecommons.api.domain.Command ;
234
- import reactor.core.publisher.Mono ;
235
-
236
- public interface DirectAsyncGateway {
237
- <T > Mono<Void > sendCommand (Command<T > command , String targetName );
238
- <T , R > Mono<R > requestReply (AsyncQuery<T > query , String targetName , Class<R > type );
239
- }
240
- ```
241
-
242
- #### Command Type
243
-
244
- ``` java
245
- package org.reactivecommons.api.domain ;
246
-
247
- public class Command <T> {
248
- private final String name;
249
- private final String commandId;
250
- private final T data;
251
- }
252
- ```
253
-
254
- #### Send Commands
255
-
256
-
257
- ``` java
258
- private static final String REGISTER_MEMBER = " Members.registerMember" ;
259
- private static final String TARGET = " Members" ;
260
- private DirectAsyncGateway asyncGateway;
261
-
262
- public Mono<Void > registerMember(Member member){
263
- String uuid = UUID . randomUUID(). toString();
264
- return asyncGateway. sendCommand(new Command<> (REGISTER_MEMBER , uuid, member), TARGET );
265
- }
266
- ```
267
-
268
- #### Handle Commands
269
- ``` java
270
- private static final String REGISTER_MEMBER = " Members.registerMember" ;
271
-
272
- @Bean
273
- public HandlerRegistry commandHandlers(MembersRegistryUseCase useCase) {
274
- return HandlerRegistry . register()
275
- .handleCommand(REGISTER_MEMBER , useCase:: registerMember, Member . class);
276
- }
277
- ```
278
-
279
- ### Broker Configuration (RabbitMQ)
280
-
281
-
282
- ```
283
- spring.rabbitmq.host= 8.8.8.1
284
- spring.rabbitmq.port=5729
285
- spring.rabbitmq.username=user
286
- spring.rabbitmq.password=pass
287
-
288
- ```
289
-
290
- ### Retry Strategy Config (RabbitMQ)
291
-
292
- ```
293
- app.async.withDLQRetry=true
294
- app.async.retryDelay=1000
295
- app.async.maxRetries=10
296
-
297
- ```
298
- ### Domain custom Configuration (RabbitMQ)
299
-
300
-
301
- ```
302
- app.async.domain.events.exchange=exchangeCustomName
303
- app.async.domain.events.maxLengthBytes=125000000
304
-
305
- ```
306
-
307
- ### Direct custom Configuration (RabbitMQ)
308
-
309
-
310
- ```
311
- app.async.direct.exchange=exchangeCustomName
312
- app.async.direct.maxLengthBytes=125000000
313
- ```
314
-
315
- ### Global custom Configuration (RabbitMQ)
316
-
317
-
318
- ```
319
- app.async.global.exchange=exchangeCustomName
320
- app.async.global.maxLengthBytes=125000000
321
- app.async.maxConcurrency=20
322
- ```
323
-
324
- * withDLQRetry: Whether to enable or not the new Retry DLQ Strategy
325
- * retryDelay: Delay retry value in ms
326
- * maxRetries: Number of retries in case of error in addition to the one automatic retry per queue.
327
-
0 commit comments