Skip to content

Commit 9fd1dca

Browse files
authored
chore(next): Create shared starter (#124)
* next(shared-starter): Create shared starter classes for any broker type to enable many brokers usage for 5 version * build(sonar): change sonar execution rule * build(test): Add unit tests, update some dependencies * build(sonar): Fix some sonar issues and add unit tests
1 parent 2fa73b5 commit 9fd1dca

File tree

144 files changed

+3781
-1829
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

144 files changed

+3781
-1829
lines changed

.github/workflows/main.yml

+4-5
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,13 @@ jobs:
4040
distribution: temurin
4141
java-version: 17
4242
- name: Execute build test jacocoTestReport and sonar analysis
43-
if: endsWith(github.REF, '/master') == true
43+
if: endsWith(github.REF, '/master') == true || github.event.pull_request.head.repo.fork == false
4444
env:
4545
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
4646
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
47-
run: ./gradlew build test jacocoTestReport sonar --refresh-dependencies --no-daemon --continue -Denv.ci=true
47+
run: ./gradlew clean build generateMergedReport sonar --refresh-dependencies --no-daemon --continue -Denv.ci=true
4848
- name: Execute build test jacocoTestReport pull request
49-
if: endsWith(github.REF, '/merge') == true
49+
if: github.event.pull_request.head.repo.fork == true
5050
env:
5151
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
52-
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
53-
run: ./gradlew build test jacocoTestReport --refresh-dependencies --no-daemon --continue -Denv.ci=true
52+
run: ./gradlew clean build generateMergedReport --refresh-dependencies --no-daemon --continue -Denv.ci=true

async/async-commons/async-commons.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ dependencies {
1010
compileOnly 'io.projectreactor:reactor-core'
1111
api 'com.fasterxml.jackson.core:jackson-databind'
1212
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
13-
implementation 'commons-io:commons-io:2.16.1'
13+
implementation 'commons-io:commons-io:2.17.0'
1414
implementation 'io.cloudevents:cloudevents-json-jackson:4.0.1'
1515

1616
testImplementation 'io.projectreactor:reactor-test'
17-
}
17+
}
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package org.reactivecommons.async.commons.config;
22

3+
import lombok.Getter;
4+
35
import java.time.Duration;
46
import java.util.UUID;
57

8+
@Getter
69
public class BrokerConfig {
710
private final String routingKey = UUID.randomUUID().toString().replaceAll("-", "");
811
private final boolean persistentQueries;
@@ -24,24 +27,4 @@ public BrokerConfig(boolean persistentQueries, boolean persistentCommands, boole
2427
this.replyTimeout = replyTimeout;
2528
}
2629

27-
public boolean isPersistentQueries() {
28-
return persistentQueries;
29-
}
30-
31-
public boolean isPersistentCommands() {
32-
return persistentCommands;
33-
}
34-
35-
public boolean isPersistentEvents() {
36-
return persistentEvents;
37-
}
38-
39-
public Duration getReplyTimeout() {
40-
return replyTimeout;
41-
}
42-
43-
public String getRoutingKey() {
44-
return routingKey;
45-
}
46-
4730
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package org.reactivecommons.async.kafka;
2+
3+
import io.cloudevents.CloudEvent;
4+
import org.reactivecommons.api.domain.Command;
5+
import org.reactivecommons.async.api.AsyncQuery;
6+
import org.reactivecommons.async.api.DirectAsyncGateway;
7+
import org.reactivecommons.async.api.From;
8+
import reactor.core.publisher.Mono;
9+
10+
public class KafkaDirectAsyncGateway implements DirectAsyncGateway {
11+
12+
public static final String NOT_IMPLEMENTED_YET = "Not implemented yet";
13+
14+
@Override
15+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
16+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
17+
}
18+
19+
@Override
20+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis) {
21+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
22+
}
23+
24+
@Override
25+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain) {
26+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
27+
}
28+
29+
@Override
30+
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain) {
31+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
32+
}
33+
34+
@Override
35+
public Mono<Void> sendCommand(CloudEvent command, String targetName) {
36+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
37+
}
38+
39+
@Override
40+
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis) {
41+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
42+
}
43+
44+
@Override
45+
public Mono<Void> sendCommand(CloudEvent command, String targetName, String domain) {
46+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
47+
}
48+
49+
@Override
50+
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis, String domain) {
51+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
52+
}
53+
54+
@Override
55+
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type) {
56+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
57+
}
58+
59+
@Override
60+
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type, String domain) {
61+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
62+
}
63+
64+
@Override
65+
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
66+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
67+
}
68+
69+
@Override
70+
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type, String domain) {
71+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
72+
}
73+
74+
@Override
75+
public <T> Mono<Void> reply(T response, From from) {
76+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
77+
}
78+
}

async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java

+11
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,26 @@
99

1010
@AllArgsConstructor
1111
public class KafkaDomainEventBus implements DomainEventBus {
12+
public static final String NOT_IMPLEMENTED_YET = "Not implemented yet";
1213
private final ReactiveMessageSender sender;
1314

1415
@Override
1516
public <T> Publisher<Void> emit(DomainEvent<T> event) {
1617
return sender.send(event);
1718
}
1819

20+
@Override
21+
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
22+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
23+
}
24+
1925
@Override
2026
public Publisher<Void> emit(CloudEvent event) {
2127
return sender.send(event);
2228
}
29+
30+
@Override
31+
public Publisher<Void> emit(String domain, CloudEvent event) {
32+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
33+
}
2334
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.reactivecommons.async.kafka;
2+
3+
import io.cloudevents.CloudEvent;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.extension.ExtendWith;
6+
import org.mockito.Mock;
7+
import org.mockito.junit.jupiter.MockitoExtension;
8+
import org.reactivecommons.api.domain.Command;
9+
import org.reactivecommons.async.api.AsyncQuery;
10+
import org.reactivecommons.async.api.DirectAsyncGateway;
11+
import org.reactivecommons.async.api.From;
12+
13+
import static org.junit.jupiter.api.Assertions.assertThrows;
14+
15+
@ExtendWith(MockitoExtension.class)
16+
class KafkaDirectAsyncGatewayTest {
17+
private final DirectAsyncGateway directAsyncGateway = new KafkaDirectAsyncGateway();
18+
private final String targetName = "targetName";
19+
private final String domain = "domain";
20+
private final long delay = 1000L;
21+
@Mock
22+
private CloudEvent cloudEvent;
23+
@Mock
24+
private Command<String> command;
25+
@Mock
26+
private AsyncQuery<String> query;
27+
@Mock
28+
private From from;
29+
30+
@Test
31+
void allMethodsAreNotImplemented() {
32+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(cloudEvent, targetName));
33+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(cloudEvent, targetName, domain));
34+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(cloudEvent, targetName, delay));
35+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(cloudEvent, targetName, delay, domain));
36+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(command, targetName));
37+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(command, targetName, domain));
38+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(command, targetName, delay));
39+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(command, targetName, delay, domain));
40+
41+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.requestReply(cloudEvent, targetName, CloudEvent.class));
42+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.requestReply(cloudEvent, targetName, CloudEvent.class, domain));
43+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.requestReply(query, targetName, CloudEvent.class));
44+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.requestReply(query, targetName, CloudEvent.class, domain));
45+
46+
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.reply(targetName, from));
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package org.reactivecommons.async.kafka;
2+
3+
import io.cloudevents.CloudEvent;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.extension.ExtendWith;
6+
import org.mockito.InjectMocks;
7+
import org.mockito.Mock;
8+
import org.mockito.junit.jupiter.MockitoExtension;
9+
import org.reactivecommons.api.domain.DomainEvent;
10+
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
11+
import reactor.core.publisher.Mono;
12+
import reactor.test.StepVerifier;
13+
14+
import static org.junit.jupiter.api.Assertions.assertThrows;
15+
import static org.mockito.Mockito.when;
16+
17+
@ExtendWith(MockitoExtension.class)
18+
class KafkaDomainEventBusTest {
19+
@Mock
20+
private DomainEvent<String> domainEvent;
21+
@Mock
22+
private CloudEvent cloudEvent;
23+
@Mock
24+
private ReactiveMessageSender sender;
25+
@InjectMocks
26+
private KafkaDomainEventBus kafkaDomainEventBus;
27+
private final String domain = "domain";
28+
29+
@Test
30+
void shouldEmitDomainEvent() {
31+
// Arrange
32+
when(sender.send(domainEvent)).thenReturn(Mono.empty());
33+
// Act
34+
Mono<Void> flow = Mono.from(kafkaDomainEventBus.emit(domainEvent));
35+
// Assert
36+
StepVerifier.create(flow)
37+
.verifyComplete();
38+
}
39+
40+
@Test
41+
void shouldEmitCloudEvent() {
42+
// Arrange
43+
when(sender.send(cloudEvent)).thenReturn(Mono.empty());
44+
// Act
45+
Mono<Void> flow = Mono.from(kafkaDomainEventBus.emit(cloudEvent));
46+
// Assert
47+
StepVerifier.create(flow)
48+
.verifyComplete();
49+
}
50+
51+
@Test
52+
void operationsShouldNotBeAbleForDomains() {
53+
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, domainEvent));
54+
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, cloudEvent));
55+
}
56+
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package org.reactivecommons.async.rabbit;
22

33
import io.cloudevents.CloudEvent;
4-
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
4+
import org.reactivecommons.api.domain.DomainEvent;
5+
import org.reactivecommons.api.domain.DomainEventBus;
56
import org.reactivecommons.async.commons.config.BrokerConfig;
7+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
68
import org.reactivestreams.Publisher;
79
import reactor.core.publisher.Mono;
8-
import org.reactivecommons.api.domain.DomainEvent;
9-
import org.reactivecommons.api.domain.DomainEventBus;
1010

1111
import java.util.Collections;
1212

@@ -29,7 +29,12 @@ public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange, Broke
2929
@Override
3030
public <T> Mono<Void> emit(DomainEvent<T> event) {
3131
return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap(), persistentEvents)
32-
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
32+
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
33+
}
34+
35+
@Override
36+
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
37+
throw new UnsupportedOperationException("Not implemented yet");
3338
}
3439

3540
@Override
@@ -39,4 +44,9 @@ public Publisher<Void> emit(CloudEvent cloudEvent) {
3944
.onErrorMap(err -> new RuntimeException("Event send failure: " + cloudEvent.getType(), err));
4045
}
4146

47+
@Override
48+
public Publisher<Void> emit(String domain, CloudEvent event) {
49+
throw new UnsupportedOperationException("Not implemented yet");
50+
}
51+
4252
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationNotificationListener.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
88
import org.reactivecommons.async.commons.DiscardNotifier;
99
import org.reactivecommons.async.commons.EventExecutor;
10+
import org.reactivecommons.async.commons.HandlerResolver;
1011
import org.reactivecommons.async.commons.communications.Message;
1112
import org.reactivecommons.async.commons.converters.MessageConverter;
1213
import org.reactivecommons.async.commons.ext.CustomReporter;
13-
import org.reactivecommons.async.commons.HandlerResolver;
1414
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1515
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1616
import reactor.core.publisher.Flux;
@@ -51,9 +51,6 @@ public ApplicationNotificationListener(ReactiveMessageListener receiver,
5151
}
5252

5353
protected Mono<Void> setUpBindings(TopologyCreator creator) {
54-
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
55-
.type("topic")
56-
.durable(true));
5754

5855
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(
5956
queue(queueName)
@@ -65,6 +62,10 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
6562
.flatMap(listener -> creator.bind(binding(exchangeName, listener.getPath(), queueName)));
6663

6764
if (createTopology) {
65+
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
66+
.type("topic")
67+
.durable(true));
68+
6869
return declareExchange
6970
.then(declareQueue)
7071
.thenMany(bindings)

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
139139
}
140140

141141
private void onTerminate() {
142-
messageFlux.doOnTerminate(this::onTerminate)
142+
messageFlux
143+
.doOnTerminate(this::onTerminate)
143144
.subscribe(new LoggerSubscriber<>(getClass().getName()));
144145
}
145146

0 commit comments

Comments
 (0)