Flexibility and adaptability: New subscribers or publishers can be added to the system
+ * without significant changes to the existing components, making the system highly adaptable to
+ * evolving requirements.
+ *
+ * In this example we will create three topics WEATHER, TEMPERATURE and CUSTOMER_SUPPORT.
+ * Then we will register those topics in the {@link Publisher}. After that we will create two
+ * {@link WeatherSubscriber}s, one {@link DelayedWeatherSubscriber} and two {@link
+ * CustomerSupportSubscriber}.The subscribers will subscribe to the relevant topics. One {@link
+ * WeatherSubscriber} will subscribe to two topics (WEATHER, TEMPERATURE). {@link
+ * DelayedWeatherSubscriber} has a delay in message processing. Now we can publish the three
+ * {@link Topic}s with different content in the {@link Message}s. And we can observe the output
+ * in the log where, one {@link WeatherSubscriber} will output the message with weather and the
+ * other {@link WeatherSubscriber} will output weather and temperature. {@link
+ * CustomerSupportSubscriber}s will output the message with customer support email. {@link
+ * DelayedWeatherSubscriber} has a delay in processing and will output the message at last. Each
+ * subscriber is only listening to the subscribed topics.
+ */
+public class App {
+
+ /**
+ * Program entry point.
+ *
+ * @param args command line args
+ */
+ public static void main(String[] args) throws InterruptedException {
+
+ final String topicWeather = "WEATHER";
+ final String topicTemperature = "TEMPERATURE";
+ final String topicCustomerSupport = "CUSTOMER_SUPPORT";
+
+ // 1. create the publisher.
+ Publisher publisher = new PublisherImpl();
+
+ // 2. define the topics and register on publisher
+ Topic weatherTopic = new Topic(topicWeather);
+ publisher.registerTopic(weatherTopic);
+
+ Topic temperatureTopic = new Topic(topicTemperature);
+ publisher.registerTopic(temperatureTopic);
+
+ Topic supportTopic = new Topic(topicCustomerSupport);
+ publisher.registerTopic(supportTopic);
+
+ // 3. Create the subscribers and subscribe to the relevant topics
+ // weatherSub1 will subscribe to two topics WEATHER and TEMPERATURE.
+ Subscriber weatherSub1 = new WeatherSubscriber();
+ weatherTopic.addSubscriber(weatherSub1);
+ temperatureTopic.addSubscriber(weatherSub1);
+
+ // weatherSub2 will subscribe to WEATHER topic
+ Subscriber weatherSub2 = new WeatherSubscriber();
+ weatherTopic.addSubscriber(weatherSub2);
+
+ // delayedWeatherSub will subscribe to WEATHER topic
+ // NOTE :: DelayedWeatherSubscriber has a 0.2 sec delay of processing message.
+ Subscriber delayedWeatherSub = new DelayedWeatherSubscriber();
+ weatherTopic.addSubscriber(delayedWeatherSub);
+
+ // subscribe the customer support subscribers to the CUSTOMER_SUPPORT topic.
+ Subscriber supportSub1 = new CustomerSupportSubscriber();
+ supportTopic.addSubscriber(supportSub1);
+ Subscriber supportSub2 = new CustomerSupportSubscriber();
+ supportTopic.addSubscriber(supportSub2);
+
+ // 4. publish message from each topic
+ publisher.publish(weatherTopic, new Message("earthquake"));
+ publisher.publish(temperatureTopic, new Message("23C"));
+ publisher.publish(supportTopic, new Message("support@test.de"));
+
+ // 5. unregister subscriber from TEMPERATURE topic
+ temperatureTopic.removeSubscriber(weatherSub1);
+
+ // 6. publish message under TEMPERATURE topic
+ publisher.publish(temperatureTopic, new Message("0C"));
+
+ /*
+ * Finally, we wait for the subscribers to consume messages to check the output.
+ * The output can change on each run, depending on how long the execution on each
+ * subscriber would take
+ * Expected behavior:
+ * - weatherSub1 will consume earthquake and 23C
+ * - weatherSub2 will consume earthquake
+ * - delayedWeatherSub will take longer and consume earthquake
+ * - supportSub1, supportSub2 will consume support@test.de
+ * - the message 0C will not be consumed because weatherSub1 unsubscribed from TEMPERATURE topic
+ */
+ TimeUnit.SECONDS.sleep(2);
+ }
+}
diff --git a/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/Message.java b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/Message.java
new file mode 100644
index 000000000000..f6ed426b3a7e
--- /dev/null
+++ b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/Message.java
@@ -0,0 +1,4 @@
+package com.iluwatar.publish.subscribe.model;
+
+/** This class represents a Message that holds the published content. */
+public record Message(Object content) {}
diff --git a/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/Topic.java b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/Topic.java
new file mode 100644
index 000000000000..c0220fd67e2e
--- /dev/null
+++ b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/Topic.java
@@ -0,0 +1,48 @@
+package com.iluwatar.publish.subscribe.model;
+
+import com.iluwatar.publish.subscribe.subscriber.Subscriber;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArraySet;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+
+/** This class represents a Topic that topic name and subscribers. */
+@Getter
+@Setter
+@RequiredArgsConstructor
+public class Topic {
+
+ private final String topicName;
+ private final Set subscribers = new CopyOnWriteArraySet<>();
+
+ /**
+ * Add a subscriber to the list of subscribers.
+ *
+ * @param subscriber subscriber to add
+ */
+ public void addSubscriber(Subscriber subscriber) {
+ subscribers.add(subscriber);
+ }
+
+ /**
+ * Remove a subscriber to the list of subscribers.
+ *
+ * @param subscriber subscriber to remove
+ */
+ public void removeSubscriber(Subscriber subscriber) {
+ subscribers.remove(subscriber);
+ }
+
+ /**
+ * Publish a message to subscribers.
+ *
+ * @param message message with content to publish
+ */
+ public void publish(Message message) {
+ for (Subscriber subscriber : subscribers) {
+ CompletableFuture.runAsync(() -> subscriber.onMessage(message));
+ }
+ }
+}
diff --git a/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/publisher/Publisher.java b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/publisher/Publisher.java
new file mode 100644
index 000000000000..f63c53dbd59a
--- /dev/null
+++ b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/publisher/Publisher.java
@@ -0,0 +1,23 @@
+package com.iluwatar.publish.subscribe.publisher;
+
+import com.iluwatar.publish.subscribe.model.Message;
+import com.iluwatar.publish.subscribe.model.Topic;
+
+/** This class represents a Publisher. */
+public interface Publisher {
+
+ /**
+ * Register a topic in the publisher.
+ *
+ * @param topic the topic to be registered
+ */
+ void registerTopic(Topic topic);
+
+ /**
+ * Register a topic in the publisher.
+ *
+ * @param topic the topic to publish the message under
+ * @param message message with content to be published
+ */
+ void publish(Topic topic, Message message);
+}
diff --git a/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/publisher/PublisherImpl.java b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/publisher/PublisherImpl.java
new file mode 100644
index 000000000000..7b87c3a8895f
--- /dev/null
+++ b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/publisher/PublisherImpl.java
@@ -0,0 +1,29 @@
+package com.iluwatar.publish.subscribe.publisher;
+
+import com.iluwatar.publish.subscribe.model.Message;
+import com.iluwatar.publish.subscribe.model.Topic;
+import java.util.HashSet;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class is an implementation of the Publisher. */
+public class PublisherImpl implements Publisher {
+
+ private static final Logger logger = LoggerFactory.getLogger(PublisherImpl.class);
+ private final Set topics = new HashSet<>();
+
+ @Override
+ public void registerTopic(Topic topic) {
+ topics.add(topic);
+ }
+
+ @Override
+ public void publish(Topic topic, Message message) {
+ if (!topics.contains(topic)) {
+ logger.error("This topic is not registered: {}", topic.getTopicName());
+ return;
+ }
+ topic.publish(message);
+ }
+}
diff --git a/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/CustomerSupportSubscriber.java b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/CustomerSupportSubscriber.java
new file mode 100644
index 000000000000..37fcb3f42109
--- /dev/null
+++ b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/CustomerSupportSubscriber.java
@@ -0,0 +1,26 @@
+package com.iluwatar.publish.subscribe.subscriber;
+
+import com.iluwatar.publish.subscribe.model.Message;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class subscribes to CUSTOMER_SUPPORT topic. */
+@Slf4j
+public class CustomerSupportSubscriber implements Subscriber {
+
+ private static final Logger logger = LoggerFactory.getLogger(CustomerSupportSubscriber.class);
+
+ @Override
+ public void onMessage(Message message) {
+ if (message.content() instanceof String content) {
+ logger.info(
+ "Customer Support Subscriber: {} sent the email to: {}", this.hashCode(), content);
+ } else {
+ logger.error(
+ "Unknown content type: {} expected: {}",
+ message.content().getClass().getSimpleName(),
+ String.class.getSimpleName());
+ }
+ }
+}
diff --git a/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/DelayedWeatherSubscriber.java b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/DelayedWeatherSubscriber.java
new file mode 100644
index 000000000000..f397d71542f4
--- /dev/null
+++ b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/DelayedWeatherSubscriber.java
@@ -0,0 +1,37 @@
+package com.iluwatar.publish.subscribe.subscriber;
+
+import com.iluwatar.publish.subscribe.model.Message;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class subscribes to WEATHER topic. */
+@Slf4j
+public class DelayedWeatherSubscriber implements Subscriber {
+
+ private static final Logger logger = LoggerFactory.getLogger(DelayedWeatherSubscriber.class);
+
+ @Override
+ public void onMessage(Message message) {
+ if (message.content() instanceof String content) {
+ processData();
+ logger.info("Delayed Weather Subscriber: {} issued message: {}", this.hashCode(), content);
+ } else {
+ logger.error(
+ "Unknown content type: {} expected: {}",
+ message.content().getClass().getSimpleName(),
+ String.class.getSimpleName());
+ }
+ }
+
+ /** create an artificial delay to mimic the persistence and timeouts in real world. */
+ private void processData() {
+ try {
+ TimeUnit.MILLISECONDS.sleep(2000);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted!", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/Subscriber.java b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/Subscriber.java
new file mode 100644
index 000000000000..0d3cb3455e54
--- /dev/null
+++ b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/Subscriber.java
@@ -0,0 +1,14 @@
+package com.iluwatar.publish.subscribe.subscriber;
+
+import com.iluwatar.publish.subscribe.model.Message;
+
+/** This class represents a Subscriber. */
+public interface Subscriber {
+
+ /**
+ * On message method will trigger when the subscribed event is published.
+ *
+ * @param message the message contains the content of the published event
+ */
+ void onMessage(Message message);
+}
diff --git a/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/WeatherSubscriber.java b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/WeatherSubscriber.java
new file mode 100644
index 000000000000..75ac32badeaf
--- /dev/null
+++ b/publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/subscriber/WeatherSubscriber.java
@@ -0,0 +1,25 @@
+package com.iluwatar.publish.subscribe.subscriber;
+
+import com.iluwatar.publish.subscribe.model.Message;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class subscribes to WEATHER or TEMPERATURE topic. */
+@Slf4j
+public class WeatherSubscriber implements Subscriber {
+
+ private static final Logger logger = LoggerFactory.getLogger(WeatherSubscriber.class);
+
+ @Override
+ public void onMessage(Message message) {
+ if (message.content() instanceof String content) {
+ logger.info("Weather Subscriber: {} issued message: {}", this.hashCode(), content);
+ } else {
+ logger.error(
+ "Unknown content type: {} expected: {}",
+ message.content().getClass().getSimpleName(),
+ String.class.getSimpleName());
+ }
+ }
+}
diff --git a/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/AppTest.java b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/AppTest.java
new file mode 100644
index 000000000000..25f19be1e41b
--- /dev/null
+++ b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/AppTest.java
@@ -0,0 +1,13 @@
+package com.iluwatar.publish.subscribe;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+import org.junit.jupiter.api.Test;
+
+public class AppTest {
+
+ @Test
+ void shouldExecuteApplicationWithoutException() {
+ assertDoesNotThrow(() -> App.main(new String[] {}));
+ }
+}
diff --git a/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/LoggerExtension.java b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/LoggerExtension.java
new file mode 100644
index 000000000000..c99aaab5e2eb
--- /dev/null
+++ b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/LoggerExtension.java
@@ -0,0 +1,40 @@
+package com.iluwatar.publish.subscribe;
+
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.LoggerFactory;
+
+public class LoggerExtension implements BeforeEachCallback, AfterEachCallback {
+
+ private final ListAppender listAppender = new ListAppender<>();
+ private final Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+
+ @Override
+ public void afterEach(ExtensionContext extensionContext) throws Exception {
+ listAppender.stop();
+ listAppender.list.clear();
+ logger.detachAppender(listAppender);
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext extensionContext) throws Exception {
+ logger.addAppender(listAppender);
+ listAppender.start();
+ }
+
+ public List getMessages() {
+ return listAppender.list.stream().map(e -> e.getMessage()).collect(Collectors.toList());
+ }
+
+ public List getFormattedMessages() {
+ return listAppender.list.stream()
+ .map(e -> e.getFormattedMessage())
+ .collect(Collectors.toList());
+ }
+}
diff --git a/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/model/MessageTest.java b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/model/MessageTest.java
new file mode 100644
index 000000000000..28becc469f19
--- /dev/null
+++ b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/model/MessageTest.java
@@ -0,0 +1,17 @@
+package com.iluwatar.publish.subscribe.model;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import org.junit.jupiter.api.Test;
+
+public class MessageTest {
+
+ @Test
+ public void testMessage() {
+ final String content = "some content";
+ Message message = new Message(content);
+ assertInstanceOf(String.class, message.content());
+ assertEquals(content, String.valueOf(message.content()));
+ }
+}
diff --git a/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/model/TopicTest.java b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/model/TopicTest.java
new file mode 100644
index 000000000000..d7c22c2b7aec
--- /dev/null
+++ b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/model/TopicTest.java
@@ -0,0 +1,40 @@
+package com.iluwatar.publish.subscribe.model;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import com.iluwatar.publish.subscribe.subscriber.Subscriber;
+import com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber;
+import java.lang.reflect.Field;
+import java.util.Set;
+import org.junit.jupiter.api.Test;
+
+public class TopicTest {
+
+ private static final String TOPIC_WEATHER = "WEATHER";
+
+ @Test
+ void testTopic() {
+ Topic topic = new Topic(TOPIC_WEATHER);
+ assertEquals(TOPIC_WEATHER, topic.getTopicName());
+ }
+
+ @Test
+ void testSubscribing() throws NoSuchFieldException, IllegalAccessException {
+
+ Topic topic = new Topic(TOPIC_WEATHER);
+ Subscriber sub = new WeatherSubscriber();
+ topic.addSubscriber(sub);
+
+ Field field = topic.getClass().getDeclaredField("subscribers");
+ field.setAccessible(true);
+ Object value = field.get(topic);
+ assertInstanceOf(Set.class, value);
+
+ Set> subscribers = (Set>) field.get(topic);
+ assertEquals(1, subscribers.size());
+
+ topic.removeSubscriber(sub);
+ assertEquals(0, subscribers.size());
+ }
+}
diff --git a/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/publisher/PublisherTest.java b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/publisher/PublisherTest.java
new file mode 100644
index 000000000000..b4b9cab82888
--- /dev/null
+++ b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/publisher/PublisherTest.java
@@ -0,0 +1,60 @@
+package com.iluwatar.publish.subscribe.publisher;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import com.iluwatar.publish.subscribe.LoggerExtension;
+import com.iluwatar.publish.subscribe.model.Message;
+import com.iluwatar.publish.subscribe.model.Topic;
+import java.lang.reflect.Field;
+import java.util.Set;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class PublisherTest {
+
+ @RegisterExtension public LoggerExtension loggerExtension = new LoggerExtension();
+
+ private static final String TOPIC_WEATHER = "WEATHER";
+ private static final String TOPIC_CUSTOMER_SUPPORT = "CUSTOMER_SUPPORT";
+
+ @Test
+ void testRegisterTopic() throws NoSuchFieldException, IllegalAccessException {
+ Topic topic = new Topic(TOPIC_CUSTOMER_SUPPORT);
+ Publisher publisher = new PublisherImpl();
+ publisher.registerTopic(topic);
+
+ Field field = publisher.getClass().getDeclaredField("topics");
+ field.setAccessible(true);
+ Object value = field.get(publisher);
+ assertInstanceOf(Set.class, value);
+
+ Set> topics = (Set>) field.get(publisher);
+ assertEquals(1, topics.size());
+ }
+
+ @Test
+ void testPublish() {
+ Topic topic = new Topic(TOPIC_WEATHER);
+ Publisher publisher = new PublisherImpl();
+ publisher.registerTopic(topic);
+
+ Message message = new Message("weather");
+ assertDoesNotThrow(() -> publisher.publish(topic, message));
+ }
+
+ @Test
+ void testPublishUnregisteredTopic() {
+ Topic topic = new Topic(TOPIC_WEATHER);
+ Publisher publisher = new PublisherImpl();
+ publisher.registerTopic(topic);
+
+ Topic topicUnregistered = new Topic(TOPIC_CUSTOMER_SUPPORT);
+ Message message = new Message("support");
+ publisher.publish(topicUnregistered, message);
+ assertEquals(
+ "This topic is not registered: CUSTOMER_SUPPORT",
+ loggerExtension.getFormattedMessages().getFirst());
+ }
+}
diff --git a/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/subscriber/SubscriberTest.java b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/subscriber/SubscriberTest.java
new file mode 100644
index 000000000000..feb98510189a
--- /dev/null
+++ b/publish-subscribe/src/test/java/com/iluwatar/publish/subscribe/subscriber/SubscriberTest.java
@@ -0,0 +1,183 @@
+package com.iluwatar.publish.subscribe.subscriber;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.iluwatar.publish.subscribe.LoggerExtension;
+import com.iluwatar.publish.subscribe.model.Message;
+import com.iluwatar.publish.subscribe.model.Topic;
+import com.iluwatar.publish.subscribe.publisher.Publisher;
+import com.iluwatar.publish.subscribe.publisher.PublisherImpl;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriberTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(SubscriberTest.class);
+ @RegisterExtension public LoggerExtension loggerExtension = new LoggerExtension();
+
+ private static final String TOPIC_WEATHER = "WEATHER";
+ private static final String TOPIC_TEMPERATURE = "TEMPERATURE";
+ private static final String TOPIC_CUSTOMER_SUPPORT = "CUSTOMER_SUPPORT";
+
+ @Test
+ void testSubscribeToMultipleTopics() {
+
+ Topic topicWeather = new Topic(TOPIC_WEATHER);
+ Topic topicTemperature = new Topic(TOPIC_TEMPERATURE);
+ Subscriber weatherSubscriber = new WeatherSubscriber();
+
+ topicWeather.addSubscriber(weatherSubscriber);
+ topicTemperature.addSubscriber(weatherSubscriber);
+
+ Publisher publisher = new PublisherImpl();
+ publisher.registerTopic(topicWeather);
+ publisher.registerTopic(topicTemperature);
+
+ publisher.publish(topicWeather, new Message("earthquake"));
+ publisher.publish(topicTemperature, new Message("-2C"));
+
+ waitForOutput();
+ assertEquals(2, loggerExtension.getFormattedMessages().size());
+ }
+
+ @Test
+ void testOnlyReceiveSubscribedTopic() {
+
+ Topic weatherTopic = new Topic(TOPIC_WEATHER);
+ Subscriber weatherSubscriber = new WeatherSubscriber();
+ weatherTopic.addSubscriber(weatherSubscriber);
+
+ Topic customerSupportTopic = new Topic(TOPIC_CUSTOMER_SUPPORT);
+ Publisher publisher = new PublisherImpl();
+ publisher.registerTopic(weatherTopic);
+ publisher.registerTopic(customerSupportTopic);
+
+ publisher.publish(customerSupportTopic, new Message("support@test.de"));
+
+ waitForOutput();
+ assertEquals(0, loggerExtension.getFormattedMessages().size());
+ }
+
+ @Test
+ void testMultipleSubscribersOnSameTopic() {
+
+ Topic weatherTopic = new Topic(TOPIC_WEATHER);
+ Subscriber weatherSubscriber1 = new WeatherSubscriber();
+ weatherTopic.addSubscriber(weatherSubscriber1);
+
+ Subscriber weatherSubscriber2 = new WeatherSubscriber();
+ weatherTopic.addSubscriber(weatherSubscriber2);
+
+ Publisher publisher = new PublisherImpl();
+ publisher.registerTopic(weatherTopic);
+
+ publisher.publish(weatherTopic, new Message("tornado"));
+
+ waitForOutput();
+ assertEquals(2, loggerExtension.getFormattedMessages().size());
+ assertEquals(
+ "Weather Subscriber: " + weatherSubscriber1.hashCode() + " issued message: tornado",
+ getMessage(weatherSubscriber1.hashCode()));
+ assertEquals(
+ "Weather Subscriber: " + weatherSubscriber2.hashCode() + " issued message: tornado",
+ getMessage(weatherSubscriber2.hashCode()));
+ }
+
+ @Test
+ void testMultipleSubscribersOnDifferentTopics() {
+
+ Topic weatherTopic = new Topic(TOPIC_WEATHER);
+ Subscriber weatherSubscriber = new WeatherSubscriber();
+ weatherTopic.addSubscriber(weatherSubscriber);
+
+ Topic customerSupportTopic = new Topic(TOPIC_CUSTOMER_SUPPORT);
+ Subscriber customerSupportSubscriber = new CustomerSupportSubscriber();
+ customerSupportTopic.addSubscriber(customerSupportSubscriber);
+
+ Publisher publisher = new PublisherImpl();
+ publisher.registerTopic(weatherTopic);
+ publisher.registerTopic(customerSupportTopic);
+
+ publisher.publish(weatherTopic, new Message("flood"));
+ publisher.publish(customerSupportTopic, new Message("support@test.at"));
+
+ waitForOutput();
+ assertEquals(2, loggerExtension.getFormattedMessages().size());
+ assertEquals(
+ "Weather Subscriber: " + weatherSubscriber.hashCode() + " issued message: flood",
+ getMessage(weatherSubscriber.hashCode()));
+ assertEquals(
+ "Customer Support Subscriber: "
+ + customerSupportSubscriber.hashCode()
+ + " sent the email to: support@test.at",
+ getMessage(customerSupportSubscriber.hashCode()));
+ }
+
+ @Test
+ void testInvalidContentOnTopics() {
+
+ Topic weatherTopic = new Topic(TOPIC_WEATHER);
+ Subscriber weatherSubscriber = new WeatherSubscriber();
+ weatherTopic.addSubscriber(weatherSubscriber);
+
+ Topic customerSupportTopic = new Topic(TOPIC_CUSTOMER_SUPPORT);
+ Subscriber customerSupportSubscriber = new CustomerSupportSubscriber();
+ customerSupportTopic.addSubscriber(customerSupportSubscriber);
+
+ Publisher publisher = new PublisherImpl();
+ publisher.registerTopic(weatherTopic);
+ publisher.registerTopic(customerSupportTopic);
+
+ publisher.publish(weatherTopic, new Message(123));
+ publisher.publish(customerSupportTopic, new Message(34.56));
+
+ waitForOutput();
+ assertTrue(loggerExtension.getFormattedMessages().getFirst().contains("Unknown content type"));
+ assertTrue(loggerExtension.getFormattedMessages().get(1).contains("Unknown content type"));
+ }
+
+ @Test
+ void testUnsubscribe() {
+
+ Topic weatherTopic = new Topic(TOPIC_WEATHER);
+ Subscriber weatherSubscriber = new WeatherSubscriber();
+ weatherTopic.addSubscriber(weatherSubscriber);
+
+ Publisher publisher = new PublisherImpl();
+ publisher.registerTopic(weatherTopic);
+
+ publisher.publish(weatherTopic, new Message("earthquake"));
+
+ weatherTopic.removeSubscriber(weatherSubscriber);
+ publisher.publish(weatherTopic, new Message("tornado"));
+
+ waitForOutput();
+ assertEquals(1, loggerExtension.getFormattedMessages().size());
+ assertTrue(loggerExtension.getFormattedMessages().getFirst().contains("earthquake"));
+ assertFalse(loggerExtension.getFormattedMessages().getFirst().contains("tornado"));
+ }
+
+ private String getMessage(int subscriberHash) {
+ Optional message =
+ loggerExtension.getFormattedMessages().stream()
+ .filter(str -> str.contains(String.valueOf(subscriberHash)))
+ .findFirst();
+ assertTrue(message.isPresent());
+ return message.get();
+ }
+
+ private void waitForOutput() {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted!", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+}