Skip to content

Commit e7f79b5

Browse files
author
Daniel Bustamante Ospina
committed
Initial api version, auto-recovery support
1 parent 8557acc commit e7f79b5

File tree

69 files changed

+2479
-654
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+2479
-654
lines changed

async/async-commons-api/async-commons-api.gradle

-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,5 @@ dependencies {
22
compile project(":domain-events-api")
33
implementation 'io.projectreactor:reactor-core'
44
implementation 'io.projectreactor.addons:reactor-extra'
5-
implementation 'org.springframework.cloud:spring-cloud-stream-reactive'
6-
implementation 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'
75
testImplementation 'io.projectreactor:reactor-test'
86
}

async/async-commons-api/src/main/java/us/sofka/commons/reactive/async/api/AsyncQuery.java renamed to async/async-commons-api/src/main/java/org/reactivecommons/async/api/AsyncQuery.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package us.sofka.commons.reactive.async.api;
1+
package org.reactivecommons.async.api;
22

33
import lombok.Data;
44

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package us.sofka.commons.reactive.async.api;
1+
package org.reactivecommons.async.api;
22

33
import reactor.core.publisher.Mono;
44

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.reactivecommons.async.api;
2+
3+
import org.reactivecommons.async.api.handlers.CommandHandler;
4+
5+
public interface DefaultCommandHandler<T> extends CommandHandler<T> {
6+
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.reactivecommons.async.api;
2+
3+
import org.reactivecommons.async.api.handlers.QueryHandler;
4+
5+
public interface DefaultQueryHandler<T, C> extends QueryHandler<T, C> {
6+
7+
}
+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
package us.sofka.commons.reactive.async.api;
1+
package org.reactivecommons.async.api;
22

3+
import org.reactivecommons.api.domain.Command;
34
import reactor.core.publisher.Mono;
45

56
public interface DirectAsyncGateway {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package org.reactivecommons.async.api;
2+
3+
import lombok.AccessLevel;
4+
import lombok.Getter;
5+
import lombok.NoArgsConstructor;
6+
import lombok.RequiredArgsConstructor;
7+
import org.reactivecommons.async.api.handlers.CommandHandler;
8+
import org.reactivecommons.async.api.handlers.EventHandler;
9+
import org.reactivecommons.async.api.handlers.QueryHandler;
10+
11+
import java.lang.reflect.ParameterizedType;
12+
import java.util.List;
13+
import java.util.concurrent.CopyOnWriteArrayList;
14+
15+
@Getter
16+
@NoArgsConstructor(access = AccessLevel.PACKAGE)
17+
public class HandlerRegistry {
18+
19+
public static HandlerRegistry register(){
20+
return new HandlerRegistry();
21+
}
22+
23+
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
24+
private final List<RegisteredEventListener> eventListeners = new CopyOnWriteArrayList<>();
25+
private final List<RegisteredCommandHandler> commandHandlers = new CopyOnWriteArrayList<>();
26+
27+
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass){
28+
eventListeners.add(new RegisteredEventListener<>(eventName, fn, eventClass));
29+
return this;
30+
}
31+
32+
public <T> HandlerRegistry handleCommand(String commandName, CommandHandler<T> fn, Class<T> commandClass){
33+
commandHandlers.add(new RegisteredCommandHandler<>(commandName, fn, commandClass));
34+
return this;
35+
}
36+
37+
@SuppressWarnings("unchecked")
38+
public <T, R> HandlerRegistry serveQuery(String commandId, QueryHandler<T, R> handler){
39+
return serveQuery(commandId, handler, inferGenericParameterType(handler));
40+
}
41+
42+
@SuppressWarnings("unchecked")
43+
private <T, R> Class<R> inferGenericParameterType(QueryHandler<T, R> handler){
44+
try{
45+
ParameterizedType genericSuperclass = (ParameterizedType) handler.getClass().getGenericInterfaces()[0];
46+
return (Class<R>) genericSuperclass.getActualTypeArguments()[1];
47+
}catch (Exception e){
48+
throw new RuntimeException("Fail to infer generic Query class, please use serveQuery(path, handler, class) instead");
49+
}
50+
}
51+
52+
public <T, R> HandlerRegistry serveQuery(String commandId, QueryHandler<T, R> handler, Class<R> queryClass){
53+
handlers.add(new RegisteredQueryHandler<>(commandId, handler, queryClass));
54+
return this;
55+
}
56+
57+
58+
@RequiredArgsConstructor
59+
@Getter
60+
public static class RegisteredQueryHandler<T, R> {
61+
private final String path;
62+
private final QueryHandler<T, R> handler;
63+
private final Class<R> queryClass;
64+
}
65+
66+
@RequiredArgsConstructor
67+
@Getter
68+
public static class RegisteredEventListener<T> {
69+
private final String path;
70+
private final EventHandler<T> handler;
71+
private final Class<T> inputClass;
72+
}
73+
74+
@RequiredArgsConstructor
75+
@Getter
76+
public static class RegisteredCommandHandler<T> {
77+
private final String path;
78+
private final CommandHandler<T> handler;
79+
private final Class<T> inputClass;
80+
}
81+
82+
}
83+
84+
85+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.reactivecommons.async.api;
2+
3+
import java.util.Map;
4+
5+
/**
6+
* Simple Internal Message representation
7+
* @author Daniel Bustamante Ospina
8+
*/
9+
public interface Message {
10+
11+
byte[] getBody();
12+
Properties getProperties();
13+
14+
interface Properties {
15+
String getContentType();
16+
String getContentEncoding();
17+
long getContentLength();
18+
Map<String, Object> getHeaders();
19+
}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.reactivecommons.async.api;
2+
3+
public class MessageConversionException extends RuntimeException {
4+
5+
public MessageConversionException(String message, Throwable cause) {
6+
super(message, cause);
7+
}
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.reactivecommons.async.api;
2+
3+
import org.reactivecommons.api.domain.Command;
4+
import org.reactivecommons.api.domain.DomainEvent;
5+
6+
public interface MessageConverter {
7+
8+
<T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass);
9+
10+
<T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass);
11+
12+
<T> Command<T> readCommand(Message message, Class<T> bodyClass);
13+
14+
Message toMessage(Object object);
15+
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package org.reactivecommons.async.api.handlers;
2+
3+
import org.reactivecommons.api.domain.Command;
4+
5+
public interface CommandHandler<T> extends GenericHandler<Void, Command<T>> {
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package org.reactivecommons.async.api.handlers;
2+
3+
import org.reactivecommons.api.domain.DomainEvent;
4+
5+
public interface EventHandler<T> extends GenericHandler<Void, DomainEvent<T>> {
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.reactivecommons.async.api.handlers;
2+
3+
import reactor.core.publisher.Mono;
4+
5+
public interface GenericHandler<T, M> {
6+
Mono<T> handle(M message);
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.reactivecommons.async.api.handlers;
2+
3+
public interface QueryHandler<T, C> extends GenericHandler<T, C> {
4+
5+
}

async/async-commons-api/src/main/java/us/sofka/commons/reactive/async/api/CommandHandler.java

-9
This file was deleted.

async/async-commons-api/src/main/java/us/sofka/commons/reactive/async/api/DomainEvent.java

-20
This file was deleted.

async/async-commons-api/src/main/java/us/sofka/commons/reactive/async/api/DomainEventBus.java

-9
This file was deleted.

async/async-commons/async-commons.gradle

+5-4
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ dependencies {
33
compile project(":domain-events-api")
44

55
compile 'io.projectreactor:reactor-core'
6-
// implementation 'io.projectreactor.addons:reactor-extra'
7-
// compile 'org.springframework.cloud:spring-cloud-stream-reactive'
8-
// compile 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'
9-
compile 'org.springframework.boot:spring-boot-starter-amqp'
6+
implementation 'io.projectreactor.addons:reactor-extra'
7+
compile('org.springframework.boot:spring-boot-starter')
8+
// compile 'org.springframework.boot:spring-boot-starter-amqp'
9+
// compile group: 'org.springframework.amqp', name: 'spring-amqp'
1010
compile "io.projectreactor.rabbitmq:reactor-rabbitmq:1.0.0.RC1"
11+
// compile project(":async:libs:reactor-rabbitmq")
1112
compile 'com.fasterxml.jackson.core:jackson-databind:2.9.7'
1213
testImplementation 'io.projectreactor:reactor-test'
1314
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.reactivecommons.async.impl;
2+
3+
4+
import org.reactivecommons.api.domain.Command;
5+
import org.reactivecommons.async.api.Message;
6+
import org.reactivecommons.async.api.handlers.CommandHandler;
7+
import reactor.core.publisher.Mono;
8+
9+
import java.util.function.Function;
10+
11+
public class CommandExecutor<T> {
12+
private final CommandHandler<T> eventHandler;
13+
private final Function<Message, Command<T>> converter;
14+
15+
public CommandExecutor(CommandHandler<T> eventHandler, Function<Message, Command<T>> converter) {
16+
this.eventHandler = eventHandler;
17+
this.converter = converter;
18+
}
19+
20+
public Mono<Void> execute(Message rawMessage){
21+
return eventHandler.handle(converter.apply(rawMessage));
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.reactivecommons.async.impl;
2+
3+
4+
import org.reactivecommons.api.domain.DomainEvent;
5+
import org.reactivecommons.async.api.handlers.EventHandler;
6+
import org.reactivecommons.async.api.Message;
7+
import reactor.core.publisher.Mono;
8+
9+
import java.util.function.Function;
10+
11+
public class EventExecutor<T> {
12+
private final EventHandler<T> eventHandler;
13+
private final Function<Message, DomainEvent<T>> converter;
14+
15+
public EventExecutor(EventHandler<T> eventHandler, Function<Message, DomainEvent<T>> converter) {
16+
this.eventHandler = eventHandler;
17+
this.converter = converter;
18+
}
19+
20+
public Mono<Void> execute(Message rawMessage){
21+
return eventHandler.handle(converter.apply(rawMessage));
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.reactivecommons.async.impl;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.api.handlers.QueryHandler;
5+
import org.reactivecommons.async.api.HandlerRegistry;
6+
7+
import java.util.Collection;
8+
import java.util.Map;
9+
10+
@RequiredArgsConstructor
11+
public class HandlerResolver {
12+
13+
private final Map<String, QueryHandler<?, ?>> queryHandlers;
14+
private final Map<String, HandlerRegistry.RegisteredEventListener> eventListeners;
15+
private final Map<String, HandlerRegistry.RegisteredCommandHandler> commandHandlers;
16+
17+
18+
@SuppressWarnings("unchecked")
19+
public <T, R> QueryHandler<T, R> getQueryHandler(String path) {
20+
return (QueryHandler<T, R>) queryHandlers.get(path);
21+
}
22+
23+
@SuppressWarnings("unchecked")
24+
public <T> HandlerRegistry.RegisteredCommandHandler<T> getCommandHandler(String path) {
25+
return commandHandlers.get(path);
26+
}
27+
28+
@SuppressWarnings("unchecked")
29+
public <T> HandlerRegistry.RegisteredEventListener<T> getEventListener(String path) {
30+
return eventListeners.get(path);
31+
}
32+
33+
public Collection<HandlerRegistry.RegisteredEventListener> getEventListeners() {
34+
return eventListeners.values();
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.reactivecommons.async.impl;
2+
3+
4+
import org.reactivecommons.async.api.handlers.QueryHandler;
5+
import org.reactivecommons.async.api.Message;
6+
import reactor.core.publisher.Mono;
7+
8+
import java.util.function.Function;
9+
10+
public class QueryExecutor<C, R> {
11+
private final QueryHandler<R, C> queryHandler;
12+
private final Function<Message, C> converter;
13+
14+
public QueryExecutor(QueryHandler<R, C> queryHandler, Function<Message, C> converter) {
15+
this.queryHandler = queryHandler;
16+
this.converter = converter;
17+
}
18+
19+
public Mono<R> execute(Message rawMessage){
20+
return queryHandler.handle(converter.apply(rawMessage));
21+
}
22+
}

0 commit comments

Comments
 (0)