diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java index f97d2e8c..71d2e8e5 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java @@ -4,12 +4,14 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; +import org.reactivecommons.api.domain.RawMessage; import org.reactivecommons.async.api.handlers.CloudCommandHandler; import org.reactivecommons.async.api.handlers.CloudEventHandler; import org.reactivecommons.async.api.handlers.DomainCommandHandler; import org.reactivecommons.async.api.handlers.DomainEventHandler; import org.reactivecommons.async.api.handlers.QueryHandler; import org.reactivecommons.async.api.handlers.QueryHandlerDelegate; +import org.reactivecommons.async.api.handlers.RawEventHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; @@ -50,6 +52,12 @@ public HandlerRegistry listenDomainCloudEvent(String domain, String eventName, C return this; } + public HandlerRegistry listenDomainRawEvent(String domain, String eventName, RawEventHandler handler) { + domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>()) + .add(new RegisteredEventListener<>(eventName, handler, RawMessage.class)); + return this; + } + public HandlerRegistry listenEvent(String eventName, DomainEventHandler handler, Class eventClass) { domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>()) .add(new RegisteredEventListener<>(eventName, handler, eventClass)); diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/RawEventHandler.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/RawEventHandler.java new file mode 100644 index 00000000..9ed19005 --- /dev/null +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/RawEventHandler.java @@ -0,0 +1,6 @@ +package org.reactivecommons.async.api.handlers; + +import org.reactivecommons.api.domain.RawMessage; + +public interface RawEventHandler extends EventHandler { +} diff --git a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java index af3db020..b0e4ef8d 100644 --- a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java +++ b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java @@ -5,12 +5,14 @@ import org.junit.jupiter.api.Test; import org.reactivecommons.api.domain.Command; import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.api.domain.RawMessage; import org.reactivecommons.async.api.handlers.CloudCommandHandler; import org.reactivecommons.async.api.handlers.CloudEventHandler; import org.reactivecommons.async.api.handlers.DomainCommandHandler; import org.reactivecommons.async.api.handlers.DomainEventHandler; import org.reactivecommons.async.api.handlers.QueryHandler; import org.reactivecommons.async.api.handlers.QueryHandlerDelegate; +import org.reactivecommons.async.api.handlers.RawEventHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; @@ -54,6 +56,20 @@ void shouldListenDomainCloudEvent() { .containsExactly(name, CloudEvent.class, eventHandler)).hasSize(1); } + @Test + void shouldListenDomainRawEvent() { + SomeRawEventHandler eventHandler = new SomeRawEventHandler(); + + registry.listenDomainRawEvent(domain, name, eventHandler); + + assertThat(registry.getDomainEventListeners().get(domain)) + .anySatisfy(registered -> assertThat(registered) + .extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, + RegisteredEventListener::getHandler + ) + .containsExactly(name, RawMessage.class, eventHandler)).hasSize(1); + } + @Test void shouldListenEvent() { SomeDomainEventHandler eventHandler = new SomeDomainEventHandler<>(); @@ -269,6 +285,13 @@ public Mono handle(CloudEvent message) { } } + private static class SomeRawEventHandler implements RawEventHandler { + @Override + public Mono handle(RawMessage message) { + return null; + } + } + private static class SomeDomainCommandHandler implements DomainCommandHandler { @Override public Mono handle(Command message) { diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/communications/Message.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/communications/Message.java index 42b12b5c..ad60f221 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/communications/Message.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/communications/Message.java @@ -1,5 +1,7 @@ package org.reactivecommons.async.commons.communications; +import org.reactivecommons.api.domain.RawMessage; + import java.util.Map; /** @@ -7,7 +9,7 @@ * * @author Daniel Bustamante Ospina */ -public interface Message { +public interface Message extends RawMessage { byte[] getBody(); diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java index b31f4a78..54de85e0 100644 --- a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java @@ -4,6 +4,7 @@ import lombok.RequiredArgsConstructor; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.api.domain.RawMessage; import org.reactivecommons.async.kafka.communications.ReactiveMessageSender; import org.reactivestreams.Publisher; @@ -31,4 +32,14 @@ public Publisher emit(CloudEvent event) { public Publisher emit(String domain, CloudEvent event) { throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); } + + @Override + public Publisher emit(RawMessage event) { + return sender.send(event); + } + + @Override + public Publisher emit(String domain, RawMessage event) { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } } diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaMessage.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaMessage.java index bafda174..a0bf4eeb 100644 --- a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaMessage.java +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaMessage.java @@ -15,6 +15,7 @@ public class KafkaMessage implements Message { private final byte[] body; private final Properties properties; + private final String type; @Data public static class KafkaMessageProperties implements Properties { @@ -30,7 +31,11 @@ public String getContentType() { } public static KafkaMessage fromDelivery(ReceiverRecord receiverRecord) { - return new KafkaMessage(receiverRecord.value(), createMessageProps(receiverRecord)); + return fromDelivery(receiverRecord, null); + } + + public static KafkaMessage fromDelivery(ReceiverRecord receiverRecord, String type) { + return new KafkaMessage(receiverRecord.value(), createMessageProps(receiverRecord), type); } private static Properties createMessageProps(ReceiverRecord receiverRecord) { diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverter.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverter.java index d1f79703..035829e5 100644 --- a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverter.java +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverter.java @@ -22,6 +22,9 @@ public KafkaJacksonMessageConverter(ObjectMapper objectMapper) { @Override public Message toMessage(Object object) { + if (object instanceof KafkaMessage) { + return (KafkaMessage) object; + } byte[] bytes; try { String jsonString = this.objectMapper.writeValueAsString(object); @@ -30,7 +33,7 @@ public Message toMessage(Object object) { throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); } KafkaMessageProperties props = buildProperties(object); - return new KafkaMessage(bytes, props); + return new KafkaMessage(bytes, props, null); } private KafkaMessageProperties buildProperties(Object message) { diff --git a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/GenericMessageListener.java b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/GenericMessageListener.java index fc20596a..7c14e631 100644 --- a/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/GenericMessageListener.java +++ b/async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/GenericMessageListener.java @@ -112,7 +112,7 @@ protected Mono> handle(ReceiverRecord> handler = getExecutor(executorPath); - final Message message = KafkaMessage.fromDelivery(msj); + final Message message = KafkaMessage.fromDelivery(msj, executorPath); Mono flow = Mono.defer(() -> handler.apply(message)) .transform(enrichPostProcess(message)); diff --git a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/KafkaDomainEventBusTest.java b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/KafkaDomainEventBusTest.java index 0dc04cdf..f33f582a 100644 --- a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/KafkaDomainEventBusTest.java +++ b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/KafkaDomainEventBusTest.java @@ -7,6 +7,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.api.domain.RawMessage; import org.reactivecommons.async.kafka.communications.ReactiveMessageSender; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -21,6 +22,8 @@ class KafkaDomainEventBusTest { @Mock private CloudEvent cloudEvent; @Mock + private RawMessage rawMessage; + @Mock private ReactiveMessageSender sender; @InjectMocks private KafkaDomainEventBus kafkaDomainEventBus; @@ -48,9 +51,21 @@ void shouldEmitCloudEvent() { .verifyComplete(); } + @Test + void shouldEmitRawMessage() { + // Arrange + when(sender.send(rawMessage)).thenReturn(Mono.empty()); + // Act + Mono flow = Mono.from(kafkaDomainEventBus.emit(rawMessage)); + // Assert + StepVerifier.create(flow) + .verifyComplete(); + } + @Test void operationsShouldNotBeAbleForDomains() { assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, domainEvent)); assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, cloudEvent)); + assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, rawMessage)); } } diff --git a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverterTest.java b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverterTest.java index a07539fa..3219ebeb 100644 --- a/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverterTest.java +++ b/async/async-kafka/src/test/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverterTest.java @@ -42,6 +42,7 @@ void shouldSerializeDomainEvent() { String expectedJson = "{\"name\":\"test\",\"eventId\":\"" + id + "\",\"data\":{\"name\":\"name\",\"age\":1}}"; // Act Message message = converter.toMessage(testEvent); + assertEquals(message, converter.toMessage(message)); // Assert assertEquals("test", message.getProperties().getTopic()); assertEquals(id, message.getProperties().getKey()); diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java index d930f22c..b2bb5eda 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java @@ -3,6 +3,7 @@ import io.cloudevents.CloudEvent; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.api.domain.RawMessage; import org.reactivecommons.async.commons.config.BrokerConfig; import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; import org.reactivestreams.Publisher; @@ -12,6 +13,8 @@ public class RabbitDomainEventBus implements DomainEventBus { + private static final String EVENT_SEND_FAILURE = "Event send failure: "; + private static final String NOT_IMPLEMENTED_YET = "Not implemented yet"; private final ReactiveMessageSender sender; private final String exchange; private final boolean persistentEvents; @@ -29,24 +32,35 @@ public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange, Broke @Override public Mono emit(DomainEvent event) { return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap(), persistentEvents) - .onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err)); + .onErrorMap(err -> new RuntimeException(EVENT_SEND_FAILURE + event.getName(), err)); } @Override public Publisher emit(String domain, DomainEvent event) { - throw new UnsupportedOperationException("Not implemented yet"); + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); } @Override public Publisher emit(CloudEvent cloudEvent) { return sender.sendWithConfirm(cloudEvent, exchange, cloudEvent.getType(), Collections.emptyMap(), persistentEvents) - .onErrorMap(err -> new RuntimeException("Event send failure: " + cloudEvent.getType(), err)); + .onErrorMap(err -> new RuntimeException(EVENT_SEND_FAILURE + cloudEvent.getType(), err)); } @Override public Publisher emit(String domain, CloudEvent event) { - throw new UnsupportedOperationException("Not implemented yet"); + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); } + @Override + public Publisher emit(RawMessage rawEvent) { + return sender.sendWithConfirm(rawEvent, exchange, rawEvent.getType(), + Collections.emptyMap(), persistentEvents) + .onErrorMap(err -> new RuntimeException(EVENT_SEND_FAILURE + rawEvent.getType(), err)); + } + + @Override + public Publisher emit(String domain, RawMessage event) { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitMessage.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitMessage.java index f14446d7..cc35549f 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitMessage.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitMessage.java @@ -11,6 +11,7 @@ public class RabbitMessage implements Message { private final byte[] body; private final Properties properties; + private final String type; @Data public static class RabbitMessageProperties implements Properties { @@ -22,7 +23,11 @@ public static class RabbitMessageProperties implements Properties { } public static RabbitMessage fromDelivery(Delivery delivery) { - return new RabbitMessage(delivery.getBody(), createMessageProps(delivery)); + return fromDelivery(delivery, null); + } + + public static RabbitMessage fromDelivery(Delivery delivery, String executorPath) { + return new RabbitMessage(delivery.getBody(), createMessageProps(delivery), executorPath); } private static Message.Properties createMessageProps(Delivery msj) { diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/RabbitJacksonMessageConverter.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/RabbitJacksonMessageConverter.java index db19e9fb..1cff647e 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/RabbitJacksonMessageConverter.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/RabbitJacksonMessageConverter.java @@ -18,6 +18,9 @@ public RabbitJacksonMessageConverter(ObjectMapper objectMapper) { @Override public Message toMessage(Object object) { + if (object instanceof RabbitMessage) { + return (RabbitMessage) object; + } byte[] bytes; try { String jsonString = this.objectMapper.writeValueAsString(object); @@ -29,10 +32,10 @@ public Message toMessage(Object object) { if (object instanceof CloudEvent) { props.setContentType(APPLICATION_CLOUD_EVENT_JSON); } else { - props.setContentType(CONTENT_TYPE); + props.setContentType(APPLICATION_JSON); } props.setContentEncoding(StandardCharsets.UTF_8.name()); props.setContentLength(bytes.length); - return new RabbitMessage(bytes, props); + return new RabbitMessage(bytes, props, null); } } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java index 11cfd5ba..bd10d077 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java @@ -4,6 +4,7 @@ import lombok.extern.java.Log; import org.reactivecommons.async.api.handlers.CloudEventHandler; import org.reactivecommons.async.api.handlers.DomainEventHandler; +import org.reactivecommons.async.api.handlers.RawEventHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; import org.reactivecommons.async.commons.DiscardNotifier; import org.reactivecommons.async.commons.EventExecutor; @@ -133,6 +134,9 @@ private Function resolveConverter(RegisteredEventListene if (registeredEventListener.getHandler() instanceof CloudEventHandler) { return messageConverter::readCloudEvent; } + if (registeredEventListener.getHandler() instanceof RawEventHandler) { + return message -> message; + } throw new RuntimeException("Unknown handler type"); } } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java index 82e32ac1..b5158a95 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java @@ -121,7 +121,7 @@ protected Mono handle(AcknowledgableDelivery msj, Instan try { final String executorPath = getExecutorPath(msj); final Function> handler = getExecutor(executorPath); - final Message message = RabbitMessage.fromDelivery(msj); + final Message message = RabbitMessage.fromDelivery(msj, executorPath); Mono flow = defer(() -> handler.apply(message)) .transform(enrichPostProcess(message)); diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/helpers/TestStubs.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/helpers/TestStubs.java index 280a84e9..35dc28d2 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/helpers/TestStubs.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/helpers/TestStubs.java @@ -15,6 +15,6 @@ public static Message mockMessage() { properties.getHeaders().put(CORRELATION_ID, "correlation"); properties.getHeaders().put(SERVED_QUERY_ID, "my-query"); return new RabbitMessage("{\"id\":\"id\",\"name\":\"name\",\"date\":\"2020-10-22T17:03:26.062Z\"}".getBytes(), - properties); + properties, null); } } diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDomainEventBusTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDomainEventBusTest.java index 38ed6e4c..ed03150d 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDomainEventBusTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDomainEventBusTest.java @@ -7,6 +7,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.api.domain.RawMessage; import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -24,6 +25,8 @@ class RabbitDomainEventBusTest { @Mock private CloudEvent cloudEvent; @Mock + private RawMessage rawMessage; + @Mock private ReactiveMessageSender sender; private RabbitDomainEventBus rabbitDomainEventBus; private final String domain = "domain"; @@ -59,9 +62,23 @@ void shouldEmitCloudEvent() { .verifyComplete(); } + @Test + void shouldEmitRawMessage() { + // Arrange + when(rawMessage.getType()).thenReturn("event"); + when(sender.sendWithConfirm(any(RawMessage.class), anyString(), anyString(), any(), anyBoolean())) + .thenReturn(Mono.empty()); + // Act + Mono flow = Mono.from(rabbitDomainEventBus.emit(rawMessage)); + // Assert + StepVerifier.create(flow) + .verifyComplete(); + } + @Test void operationsShouldNotBeAbleForDomains() { assertThrows(UnsupportedOperationException.class, () -> rabbitDomainEventBus.emit(domain, domainEvent)); assertThrows(UnsupportedOperationException.class, () -> rabbitDomainEventBus.emit(domain, cloudEvent)); + assertThrows(UnsupportedOperationException.class, () -> rabbitDomainEventBus.emit(domain, rawMessage)); } } diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverterTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverterTest.java index 15b697b4..1e39a7ad 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverterTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverterTest.java @@ -19,6 +19,7 @@ import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; class JacksonMessageConverterTest { @@ -34,6 +35,7 @@ static void setUp() { @Test void toMessage() { final Message message = converter.toMessage(new SampleClass("42", "Daniel", new Date())); + assertEquals(message, converter.toMessage(message)); assertThat(new String(message.getBody())).contains("42").contains("Daniel"); } diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListenerTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListenerTest.java index 763785f7..ff2b79b2 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListenerTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListenerTest.java @@ -1,5 +1,7 @@ package org.reactivecommons.async.rabbit.listeners; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; @@ -8,6 +10,7 @@ import org.reactivecommons.async.commons.HandlerResolver; import reactor.core.publisher.Mono; +import java.net.URI; import java.util.Optional; import java.util.UUID; @@ -25,6 +28,13 @@ public class ApplicationEventListenerTest extends ListenerReporterTestSuperClass "app.event.test2", UUID.randomUUID().toString(), new DummyMessage() ); + private final CloudEvent cloudEvent = CloudEventBuilder.v1() + .withType("app.event.test") + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("/test")) + .withData("application/json", "{}".getBytes()) + .build(); + @Test void shouldSendErrorToCustomErrorReporter() throws InterruptedException { final HandlerRegistry registry = HandlerRegistry.register() @@ -34,6 +44,20 @@ void shouldSendErrorToCustomErrorReporter() throws InterruptedException { assertSendErrorToCustomReporter(registry, createSource(DomainEvent::getName, event1)); } + @Test + void shouldResolveCorrectCloudEventHandler() throws InterruptedException { + final HandlerRegistry registry = HandlerRegistry.register() + .listenCloudEvent("app.event.test", m -> error(new RuntimeException("testEx"))); + assertSendErrorToCustomReporter(registry, createSource(CloudEvent::getType, cloudEvent)); + } + + @Test + void shouldResolveCorrectRawHandler() throws InterruptedException { + final HandlerRegistry registry = HandlerRegistry.register() + .listenDomainRawEvent("domain","app.event.test", m -> error(new RuntimeException("testEx"))); + assertSendErrorToCustomReporter(registry, createSource(CloudEvent::getType, cloudEvent)); + } + @Test void shouldContinueAfterReportError() throws InterruptedException { final HandlerRegistry handlerRegistry = HandlerRegistry.register() @@ -44,7 +68,8 @@ void shouldContinueAfterReportError() throws InterruptedException { m -> Mono.fromRunnable(successSemaphore::release), DummyMessage.class ); - assertContinueAfterSendErrorToCustomReporter(handlerRegistry, createSource(DomainEvent::getName, event1, event2)); + assertContinueAfterSendErrorToCustomReporter(handlerRegistry, createSource(DomainEvent::getName, event1, + event2)); } @Override diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java index 2cfeb669..2e101c38 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java @@ -65,7 +65,7 @@ public abstract class ListenerReporterTestSuperClass { protected final CustomReporter errorReporter = mock(CustomReporter.class); protected final Semaphore semaphore = new Semaphore(0); protected final Semaphore successSemaphore = new Semaphore(0); - private final ObjectMapper mapper = new ObjectMapper(); + private final ObjectMapper mapper = new DefaultObjectMapperSupplier().get(); private final Receiver receiver = mock(Receiver.class); protected final ReactiveMessageListener reactiveMessageListener = new ReactiveMessageListener( receiver, topologyCreator @@ -147,9 +147,15 @@ protected Flux createSource(Function rout protected abstract GenericMessageListener createMessageListener(final HandlerResolver handlerResolver); private HandlerResolver createHandlerResolver(final HandlerRegistry registry) { - final Map> eventHandlers = Stream.concat( - registry.getDynamicEventHandlers().stream(), - registry.getDomainEventListeners().get(DEFAULT_DOMAIN).stream()) + Stream> listenerStream = Stream.concat( + registry.getDynamicEventHandlers().stream(), + registry.getDomainEventListeners().get(DEFAULT_DOMAIN).stream()); + if (registry.getDomainEventListeners().containsKey("domain")) { + listenerStream = Stream.concat( + listenerStream, + registry.getDomainEventListeners().get("domain").stream()); + } + final Map> eventHandlers = listenerStream .collect(toMap(RegisteredEventListener::getPath, identity())); final Map> eventsToBind = registry.getDomainEventListeners() .get(DEFAULT_DOMAIN).stream().collect(toMap(RegisteredEventListener::getPath, identity())); diff --git a/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java index 0c62f8a8..da5b7315 100644 --- a/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java +++ b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java @@ -11,4 +11,7 @@ public interface DomainEventBus { Publisher emit(CloudEvent event); Publisher emit(String domain, CloudEvent event); + + Publisher emit(RawMessage event); + Publisher emit(String domain, RawMessage event); } diff --git a/domain/domain-events/src/main/java/org/reactivecommons/api/domain/RawMessage.java b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/RawMessage.java new file mode 100644 index 00000000..1dd9fa01 --- /dev/null +++ b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/RawMessage.java @@ -0,0 +1,5 @@ +package org.reactivecommons.api.domain; + +public interface RawMessage { + String getType(); +} diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/GenericDomainEventBus.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/GenericDomainEventBus.java index 0feda04a..fd7fbe25 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/GenericDomainEventBus.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/GenericDomainEventBus.java @@ -4,6 +4,7 @@ import lombok.RequiredArgsConstructor; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.api.domain.RawMessage; import org.reactivecommons.async.starter.exceptions.InvalidConfigurationException; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; @@ -14,6 +15,7 @@ @RequiredArgsConstructor public class GenericDomainEventBus implements DomainEventBus { + private static final String DOMAIN_NOT_FOUND = "Domain not found: "; private final ConcurrentMap domainEventBuses; @@ -26,7 +28,7 @@ public Publisher emit(DomainEvent event) { public Publisher emit(String domain, DomainEvent event) { DomainEventBus domainEventBus = domainEventBuses.get(domain); if (domainEventBus == null) { - return Mono.error(() -> new InvalidConfigurationException("Domain not found: " + domain)); + return Mono.error(() -> new InvalidConfigurationException(DOMAIN_NOT_FOUND + domain)); } return domainEventBus.emit(event); } @@ -40,7 +42,21 @@ public Publisher emit(CloudEvent event) { public Publisher emit(String domain, CloudEvent event) { DomainEventBus domainEventBus = domainEventBuses.get(domain); if (domainEventBus == null) { - return Mono.error(() -> new InvalidConfigurationException("Domain not found: " + domain)); + return Mono.error(() -> new InvalidConfigurationException(DOMAIN_NOT_FOUND + domain)); + } + return domainEventBus.emit(event); + } + + @Override + public Publisher emit(RawMessage event) { + return emit(DEFAULT_DOMAIN, event); + } + + @Override + public Publisher emit(String domain, RawMessage event) { + DomainEventBus domainEventBus = domainEventBuses.get(domain); + if (domainEventBus == null) { + return Mono.error(() -> new InvalidConfigurationException(DOMAIN_NOT_FOUND + domain)); } return domainEventBus.emit(event); } diff --git a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/senders/GenericDomainEventBusTest.java b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/senders/GenericDomainEventBusTest.java index c2f23fe6..43d62071 100644 --- a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/senders/GenericDomainEventBusTest.java +++ b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/senders/GenericDomainEventBusTest.java @@ -8,6 +8,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.async.commons.communications.Message; import org.reactivecommons.async.starter.exceptions.InvalidConfigurationException; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -28,6 +29,8 @@ class GenericDomainEventBusTest { @Mock private CloudEvent cloudEvent; @Mock + private Message rawMessage; + @Mock private DomainEvent domainEvent; private GenericDomainEventBus genericDomainEventBus; @@ -108,4 +111,27 @@ void shouldFailWhenNoDomainFoundWithCloudEvent() { .expectError(InvalidConfigurationException.class) .verify(); } + + @Test + void shouldEmitRawEventWithSpecificDomain() { + // Arrange + when(domainEventBus2.emit(rawMessage)).thenReturn(Mono.empty()); + // Act + Mono flow = Mono.from(genericDomainEventBus.emit(DOMAIN_2, rawMessage)); + // Assert + StepVerifier.create(flow) + .verifyComplete(); + verify(domainEventBus2).emit(rawMessage); + } + + @Test + void shouldFailWhenNoDomainFoundEmittingRawEvent() { + // Arrange + // Act + Mono flow = Mono.from(genericDomainEventBus.emit("another", rawMessage)); + // Assert + StepVerifier.create(flow) + .expectError(InvalidConfigurationException.class) + .verify(); + } }