diff --git a/.rhcicd/clowdapp-engine.yaml b/.rhcicd/clowdapp-engine.yaml
index 4dbe34b822..974cec6ac4 100644
--- a/.rhcicd/clowdapp-engine.yaml
+++ b/.rhcicd/clowdapp-engine.yaml
@@ -20,6 +20,7 @@ objects:
- export-service
database:
sharedDbAppName: notifications-backend
+ inMemoryDb: ${{IN_MEMORY_DB_ENABLED}}
featureFlags: true
kafkaTopics:
- topicName: platform.notifications.connector.email.high.volume
@@ -287,6 +288,9 @@ parameters:
value: quay.io/cloudservices/notifications-engine
- name: IMAGE_TAG
value: latest
+- name: IN_MEMORY_DB_ENABLED
+ description: If inMemoryDb is set to true, Clowder will pass configuration of an In Memory Database to the pods in the ClowdApp. This single instance will be shared between all apps.
+ value: "false"
- name: KAFKA_CONSUMED_TOTAL_CHECKER_ENABLED
description: Is the Kafka records consumed total check included in the global health check?
value: "false"
diff --git a/common/src/test/java/com/redhat/cloud/notifications/TestConstants.java b/common/src/test/java/com/redhat/cloud/notifications/TestConstants.java
index 99f7aff519..163606d6c4 100644
--- a/common/src/test/java/com/redhat/cloud/notifications/TestConstants.java
+++ b/common/src/test/java/com/redhat/cloud/notifications/TestConstants.java
@@ -16,4 +16,5 @@ public class TestConstants {
public static final String DEFAULT_USER = "default-user";
public static final String POSTGRES_MAJOR_VERSION = "16";
+ public static final String VALKEY_MAJOR_VERSION = "8";
}
diff --git a/engine/pom.xml b/engine/pom.xml
index 4915800a28..98861f839a 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -15,6 +15,7 @@
1.14.1
+ 1.0.0
@@ -46,6 +47,10 @@
io.quarkus
quarkus-cache
+
+ io.quarkus
+ quarkus-redis-client
+
io.quarkus
quarkus-flyway
@@ -184,6 +189,12 @@
${testcontainers.version}
test
+
+ io.github.ss-bhatt
+ testcontainers-valkey
+ ${testcontainers-valkey.version}
+ test
+
io.quarkus
quarkus-mailer
diff --git a/engine/src/main/java/com/redhat/cloud/notifications/config/EngineConfig.java b/engine/src/main/java/com/redhat/cloud/notifications/config/EngineConfig.java
index aaa770d4f4..5657c31ee8 100644
--- a/engine/src/main/java/com/redhat/cloud/notifications/config/EngineConfig.java
+++ b/engine/src/main/java/com/redhat/cloud/notifications/config/EngineConfig.java
@@ -35,6 +35,7 @@ public class EngineConfig {
private static final String UNLEASH = "notifications.unleash.enabled";
private static final String PROCESSOR_CONNECTORS_MAX_SERVER_ERRORS = "processor.connectors.max-server-errors";
private static final String PROCESSOR_CONNECTORS_MIN_DELAY_SINCE_FIRST_SERVER_ERROR = "processor.connectors.min-delay-since-first-server-error";
+ private static final String IN_MEMORY_DB_ENABLED = "in-memory-db.enabled";
/**
* Standard "Red Hat Hybrid Cloud Console" sender that the vast majority of the
@@ -57,6 +58,7 @@ public class EngineConfig {
private String drawerToggle;
private String exportServiceHccClusterToggle;
private String kafkaConsumedTotalCheckerToggle;
+ private String valkeyEventDeduplicatorToggle;
private String toggleBlacklistedEndpoints;
private String toggleBlacklistedEventTypes;
private String toggleKafkaOutgoingHighVolumeTopic;
@@ -154,6 +156,9 @@ public class EngineConfig {
@ConfigProperty(name = NOTIFICATIONS_INGRESSREPLAY_END_TIME, defaultValue = "2025-12-15T09:30:00Z")
String replayEndTime;
+ @ConfigProperty(name = IN_MEMORY_DB_ENABLED, defaultValue = "false")
+ boolean inMemoryDbEnabled;
+
@Inject
ToggleRegistry toggleRegistry;
@@ -167,6 +172,7 @@ void postConstruct() {
drawerToggle = toggleRegistry.register("drawer", true);
exportServiceHccClusterToggle = toggleRegistry.register("export-service-hcc-cluster", true);
kafkaConsumedTotalCheckerToggle = toggleRegistry.register("kafka-consumed-total-checker", true);
+ valkeyEventDeduplicatorToggle = toggleRegistry.register("valkey-event-deduplicator", true);
toggleKafkaOutgoingHighVolumeTopic = toggleRegistry.register("kafka-outgoing-high-volume-topic", true);
toggleBlacklistedEndpoints = toggleRegistry.register("blacklisted-endpoints", true);
toggleBlacklistedEventTypes = toggleRegistry.register("blacklisted-event-types", true);
@@ -203,6 +209,8 @@ void logConfigAtStartup(@Observes Startup event) {
config.put(asyncEventProcessingToggle, isAsyncEventProcessing());
config.put(toggleIncludeSeverityToFilterRecipients, isIncludeSeverityToFilterRecipientsEnabled(""));
config.put(toggleSkipProcessingMessagesOnReplayService, isSkipMessageProcessing());
+ config.put(valkeyEventDeduplicatorToggle, isValkeyEventDeduplicatorEnabled());
+ config.put(IN_MEMORY_DB_ENABLED, isInMemoryDbEnabled());
Log.info("=== Startup configuration ===");
config.forEach((key, value) -> {
@@ -382,4 +390,16 @@ public boolean isNormalizedQueriesEnabled(String orgId) {
return false;
}
}
+
+ public boolean isValkeyEventDeduplicatorEnabled() {
+ if (unleashEnabled) {
+ return this.unleash.isEnabled(this.valkeyEventDeduplicatorToggle, false);
+ } else {
+ return false;
+ }
+ }
+
+ public boolean isInMemoryDbEnabled() {
+ return inMemoryDbEnabled;
+ }
}
diff --git a/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java b/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java
index 9777fa3b4b..3bf87ec1a0 100644
--- a/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java
+++ b/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java
@@ -101,6 +101,9 @@ public class EventConsumer {
@Inject
SeverityTransformer severityTransformer;
+ @Inject
+ ValkeyService valkeyService;
+
ConsoleCloudEventParser cloudEventParser = new ConsoleCloudEventParser();
private Counter rejectedCounter;
@@ -262,7 +265,21 @@ public void process(Message message) {
* Before we persist the event into the DB and process it, we need to check whether the event is
* a duplicate using the custom event deduplication logic tenants might have implemented.
*/
- if (!eventDeduplicator.isNew(event)) {
+ boolean isNewEvent;
+
+ try {
+ isNewEvent = eventDeduplicator.isNew(event);
+ } catch (Exception e) {
+ // If Valkey deduplication is enabled, remove any keys that may have been created
+ if (engineConfig.isInMemoryDbEnabled() && engineConfig.isValkeyEventDeduplicatorEnabled()) {
+ Optional dedupKey = eventDeduplicator.getEventDeduplicationConfig(event).getDeduplicationKey(event);
+ dedupKey.ifPresent(key -> valkeyService.removeEventFromDeduplication(key));
+ }
+
+ throw e;
+ }
+
+ if (!isNewEvent) {
// The event is already known and should therefore be ignored.
Log.debug("Duplicated event ignored");
registry.counter(DUPLICATE_EVENT_COUNTER_NAME,
diff --git a/engine/src/main/java/com/redhat/cloud/notifications/events/ValkeyService.java b/engine/src/main/java/com/redhat/cloud/notifications/events/ValkeyService.java
new file mode 100644
index 0000000000..e8881e206d
--- /dev/null
+++ b/engine/src/main/java/com/redhat/cloud/notifications/events/ValkeyService.java
@@ -0,0 +1,110 @@
+package com.redhat.cloud.notifications.events;
+
+import com.redhat.cloud.notifications.config.EngineConfig;
+import io.quarkus.logging.Log;
+import io.vertx.mutiny.core.Vertx;
+import io.vertx.mutiny.redis.client.Redis;
+import io.vertx.mutiny.redis.client.RedisAPI;
+import io.vertx.mutiny.redis.client.Response;
+import io.vertx.redis.client.RedisOptions;
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+/** Stores and retrieves data from remote cache (i.e. Valkey). */
+@ApplicationScoped
+public class ValkeyService {
+
+ private static final String EVENT_DEDUPLICATION_KEY = "engine:event-deduplication";
+
+ @ConfigProperty(name = "valkey-service.ttl", defaultValue = "PT24H")
+ Duration ttl;
+
+ @ConfigProperty(name = "quarkus.redis.hosts", defaultValue = "")
+ Optional valkeyHost;
+
+ @ConfigProperty(name = "quarkus.redis.password", defaultValue = "")
+ Optional valkeyPassword;
+
+ @Inject
+ EngineConfig config;
+
+ @Inject
+ Vertx vertx;
+
+ /** The underlying client connecting to Valkey. */
+ private Redis valkeyClient;
+
+ /** Implementation of the Redis/Valkey API, using {@link #valkeyClient} */
+ private RedisAPI valkey;
+
+ @PostConstruct
+ void initialize() {
+ if (config.isInMemoryDbEnabled()) {
+ if (valkeyHost.isEmpty() || valkeyHost.get().isEmpty()) {
+ throw new IllegalStateException("In-memory DB enabled, but Valkey connection string was not provided");
+ } else {
+ RedisOptions valkeyOptions = new RedisOptions().setConnectionString(valkeyHost.get().replace("valkey://", "redis://"));
+ valkeyPassword.ifPresent(valkeyOptions::setPassword);
+
+ this.valkeyClient = Redis.createClient(vertx, valkeyOptions);
+ this.valkey = RedisAPI.api(this.valkeyClient);
+ }
+ }
+ }
+
+ public static String formatDeduplicationKey(UUID eventTypeId, String deduplicationKey) {
+ return String.format("%s:%s:%s", EVENT_DEDUPLICATION_KEY, eventTypeId, deduplicationKey);
+ }
+
+ /**
+ * Verifies that the event has not been previously processed. The format of saved keys is
+ * {@code engine:event-deduplication::}.
+ *
+ * @param eventId only used for debugging
+ * @see com.redhat.cloud.notifications.events.deduplication.EventDeduplicator EventDeduplicator
+ */
+ public boolean isNewEvent(UUID eventTypeId, String deduplicationKey, LocalDateTime deleteAfter, UUID eventId) {
+ String key = formatDeduplicationKey(eventTypeId, deduplicationKey);
+ String deleteAfterIso = deleteAfter.format(DateTimeFormatter.ISO_DATE_TIME);
+ boolean isNew;
+
+ Response valkeyResp = valkey.setAndAwait(List.of(
+ key,
+ deleteAfterIso,
+ "NX",
+ "EXAT",
+ String.valueOf(deleteAfter.toEpochSecond(ZoneOffset.UTC))
+ ));
+
+ try {
+ if (valkeyResp == null) {
+ isNew = false;
+ } else {
+ isNew = valkeyResp.toString().equals("OK");
+ }
+ } catch (Exception ignored) {
+ // Invalid response could not be mapped to string. Assume event is new
+ // dedup key may include private information, so other fields are used
+ Log.warnf("unable to check for duplicate event in Valkey [event_type_id=%s, event_id=%s, delete_after=%s]",
+ eventTypeId, eventId, deleteAfterIso);
+ isNew = true;
+ }
+
+ return isNew;
+ }
+
+ /** This method should only be called to remove a key that may have been inserted by {@link #isNewEvent(UUID, String, LocalDateTime, UUID)} */
+ public boolean removeEventFromDeduplication(String key) {
+ return valkey.delAndAwait(List.of(key)).toBoolean();
+ }
+}
diff --git a/engine/src/main/java/com/redhat/cloud/notifications/events/deduplication/EventDeduplicator.java b/engine/src/main/java/com/redhat/cloud/notifications/events/deduplication/EventDeduplicator.java
index 6dd6c514eb..a0de8405c0 100644
--- a/engine/src/main/java/com/redhat/cloud/notifications/events/deduplication/EventDeduplicator.java
+++ b/engine/src/main/java/com/redhat/cloud/notifications/events/deduplication/EventDeduplicator.java
@@ -1,12 +1,17 @@
package com.redhat.cloud.notifications.events.deduplication;
+import com.redhat.cloud.notifications.config.EngineConfig;
+import com.redhat.cloud.notifications.events.ValkeyService;
import com.redhat.cloud.notifications.models.Event;
+import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.transaction.Transactional;
+import java.time.LocalDateTime;
import java.util.Optional;
+import java.util.UUID;
@ApplicationScoped
public class EventDeduplicator {
@@ -21,7 +26,13 @@ public class EventDeduplicator {
@Inject
SubscriptionsDeduplicationConfig subscriptionsDeduplicationConfig;
- private EventDeduplicationConfig getEventDeduplicationConfig(Event event) {
+ @Inject
+ EngineConfig engineConfig;
+
+ @Inject
+ ValkeyService valkeyService;
+
+ public EventDeduplicationConfig getEventDeduplicationConfig(Event event) {
return switch (event.getEventType().getApplication().getBundle().getName()) {
case SUBSCRIPTION_SERVICES_BUNDLE ->
switch (event.getEventType().getApplication().getName()) {
@@ -43,14 +54,35 @@ public boolean isNew(Event event) {
return true;
}
+ UUID eventTypeId = event.getEventType().getId();
+ LocalDateTime deleteAfter = eventDeduplicationConfig.getDeleteAfter(event);
+
+ if (engineConfig.isInMemoryDbEnabled() && engineConfig.isValkeyEventDeduplicatorEnabled()) {
+ boolean isNewEvent = postgresEventDeduplication(eventTypeId, deduplicationKey, deleteAfter);
+ boolean valkeyIsNewEvent = valkeyService.isNewEvent(eventTypeId, deduplicationKey.get(),
+ deleteAfter, event.getId());
+ if (valkeyIsNewEvent != isNewEvent) {
+ Log.warnf(
+ "Valkey event deduplication (isNewEvent=%s) does not align with Postgres result (isNewEvent=%s) [event_type_id=%s, event_id=%s]",
+ valkeyIsNewEvent, isNewEvent, eventTypeId, event.getId());
+ }
+
+ return isNewEvent;
+
+ } else {
+ return postgresEventDeduplication(eventTypeId, deduplicationKey, deleteAfter);
+ }
+ }
+
+ private boolean postgresEventDeduplication(UUID eventTypeId, Optional deduplicationKey, LocalDateTime deleteAfter) {
String sql = "INSERT INTO event_deduplication(event_type_id, deduplication_key, delete_after) " +
"VALUES (:eventTypeId, :deduplicationKey, :deleteAfter) " +
"ON CONFLICT (event_type_id, deduplication_key) DO NOTHING";
int rowCount = entityManager.createNativeQuery(sql)
- .setParameter("eventTypeId", event.getEventType().getId())
+ .setParameter("eventTypeId", eventTypeId)
.setParameter("deduplicationKey", deduplicationKey.get())
- .setParameter("deleteAfter", eventDeduplicationConfig.getDeleteAfter(event))
+ .setParameter("deleteAfter", deleteAfter)
.executeUpdate();
return rowCount > 0;
}
diff --git a/engine/src/main/resources/application.properties b/engine/src/main/resources/application.properties
index 607115c430..dbaaf7bcb2 100644
--- a/engine/src/main/resources/application.properties
+++ b/engine/src/main/resources/application.properties
@@ -10,6 +10,9 @@ quarkus.http.port=8087
%test.quarkus.http.test-port=9087
%test.quarkus.datasource.devservices.enabled=true
+# Configure Valkey devservices for test cases
+%test.quarkus.redis.devservices.enabled=true
+
# Input queue
mp.messaging.incoming.ingress.connector=smallrye-kafka
mp.messaging.incoming.ingress.topic=platform.notifications.ingress
diff --git a/engine/src/test/java/com/redhat/cloud/notifications/TestLifecycleManager.java b/engine/src/test/java/com/redhat/cloud/notifications/TestLifecycleManager.java
index e9af826bd6..f873def2db 100644
--- a/engine/src/test/java/com/redhat/cloud/notifications/TestLifecycleManager.java
+++ b/engine/src/test/java/com/redhat/cloud/notifications/TestLifecycleManager.java
@@ -1,10 +1,12 @@
package com.redhat.cloud.notifications;
+import io.github.ss_bhatt.testcontainers.valkey.ValkeyContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import org.eclipse.microprofile.config.ConfigProvider;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.utility.DockerImageName;
import java.sql.Connection;
import java.sql.SQLException;
@@ -15,30 +17,42 @@
import static com.redhat.cloud.notifications.MockServerLifecycleManager.getMockServerUrl;
import static com.redhat.cloud.notifications.TestConstants.POSTGRES_MAJOR_VERSION;
+import static com.redhat.cloud.notifications.TestConstants.VALKEY_MAJOR_VERSION;
public class TestLifecycleManager implements QuarkusTestResourceLifecycleManager {
- Boolean quarkusDevServiceEnabled = true;
+ Boolean quarkusDatasourceDevServiceEnabled = true;
+ Boolean quarkusValkeyDevServiceEnabled = true;
PostgreSQLContainer> postgreSQLContainer;
+ ValkeyContainer valkeyContainer;
@Override
public Map start() {
System.out.println("++++ TestLifecycleManager start +++");
- Optional quarkusDevServiceEnabledFlag = ConfigProvider.getConfig().getOptionalValue("quarkus.datasource.devservices.enabled", Boolean.class);
- if (quarkusDevServiceEnabledFlag.isPresent()) {
- quarkusDevServiceEnabled = quarkusDevServiceEnabledFlag.get();
- }
- System.out.println(" -- quarkusDatasourceDevServiceEnabled is " + quarkusDevServiceEnabled);
+ Optional quarkusDatasourceDevServiceEnabledFlag = ConfigProvider.getConfig().getOptionalValue("quarkus.datasource.devservices.enabled", Boolean.class);
+ quarkusDatasourceDevServiceEnabledFlag.ifPresent(flag -> quarkusDatasourceDevServiceEnabled = flag);
+ System.out.println(" -- quarkusDatasourceDevServiceEnabled is " + quarkusDatasourceDevServiceEnabled);
+
+ Optional quarkusValkeyDevServiceEnabledFlag = ConfigProvider.getConfig().getOptionalValue("quarkus.redis.devservices.enabled", Boolean.class);
+ quarkusValkeyDevServiceEnabledFlag.ifPresent(flag -> quarkusValkeyDevServiceEnabled = flag);
+ System.out.println(" -- quarkusValkeyDevServiceEnabled is " + quarkusValkeyDevServiceEnabled);
Map properties = new HashMap<>();
- if (quarkusDevServiceEnabled) {
+ if (quarkusDatasourceDevServiceEnabled) {
try {
setupPostgres(properties);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ if (quarkusValkeyDevServiceEnabled) {
+ try {
+ setupValkey(properties);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
setupMockEngine(properties);
System.out.println(" -- Running with properties: " + properties);
@@ -47,9 +61,12 @@ public Map start() {
@Override
public void stop() {
- if (quarkusDevServiceEnabled) {
+ if (quarkusDatasourceDevServiceEnabled) {
postgreSQLContainer.stop();
}
+ if (quarkusValkeyDevServiceEnabled) {
+ valkeyContainer.stop();
+ }
MockServerLifecycleManager.stop();
InMemoryConnector.clear();
}
@@ -76,6 +93,16 @@ void setupPostgres(Map props) throws SQLException {
connection.close();
}
+ void setupValkey(Map props) {
+ valkeyContainer = new ValkeyContainer(DockerImageName.parse("valkey/valkey:" + VALKEY_MAJOR_VERSION)
+ .asCompatibleSubstituteFor("docker.io/valkey/valkey"));
+ valkeyContainer.start();
+ // Provide the connection credentials as a redis URI for compatibility
+ String valkeyHost = valkeyContainer.getConnectionString().replace("valkey://", "redis://");
+ props.put("quarkus.redis.hosts", valkeyHost);
+ props.put("in-memory-db.enabled", "true");
+ }
+
void setupMockEngine(Map props) {
MockServerLifecycleManager.start();
props.put("quarkus.rest-client.export-service.url", getMockServerUrl());
diff --git a/engine/src/test/java/com/redhat/cloud/notifications/events/EventConsumerTest.java b/engine/src/test/java/com/redhat/cloud/notifications/events/EventConsumerTest.java
index be3e3711cd..471c26229c 100644
--- a/engine/src/test/java/com/redhat/cloud/notifications/events/EventConsumerTest.java
+++ b/engine/src/test/java/com/redhat/cloud/notifications/events/EventConsumerTest.java
@@ -27,6 +27,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import java.util.Set;
@@ -309,8 +311,11 @@ void testProcessingErrorWithoutMessageId() {
verify(eventDeduplicator, times(1)).isNew(any(Event.class));
}
- @Test
- void testDuplicatePayload() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testDuplicatePayload(final boolean valkeyDedupEnabled) {
+ when(config.isValkeyEventDeduplicatorEnabled()).thenReturn(valkeyDedupEnabled);
+
EventType eventType = mockGetEventTypeAndCreateEvent();
Action action = buildValidAction(false);
String payload = serializeAction(action);
diff --git a/engine/src/test/java/com/redhat/cloud/notifications/events/ValkeyServiceTest.java b/engine/src/test/java/com/redhat/cloud/notifications/events/ValkeyServiceTest.java
new file mode 100644
index 0000000000..7bb5de9c52
--- /dev/null
+++ b/engine/src/test/java/com/redhat/cloud/notifications/events/ValkeyServiceTest.java
@@ -0,0 +1,81 @@
+package com.redhat.cloud.notifications.events;
+
+import io.quarkus.test.junit.QuarkusTest;
+import jakarta.inject.Inject;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@QuarkusTest
+class ValkeyServiceTest {
+ @Inject
+ ValkeyService valkeyService;
+
+ @Test
+ void testAddNewEntries() {
+ UUID eventTypeId1 = UUID.randomUUID();
+ String dedupKey1 = "dedup-key-new-" + UUID.randomUUID();
+ UUID eventId = UUID.randomUUID();
+ LocalDateTime deleteAfter = LocalDateTime.now().plusDays(7);
+
+ // Insert first event
+ assertTrue(valkeyService.isNewEvent(eventTypeId1, dedupKey1, deleteAfter, eventId));
+
+ // Insert entry with different event type ID
+ UUID eventTypeId2 = UUID.randomUUID();
+ assertTrue(valkeyService.isNewEvent(eventTypeId2, dedupKey1, deleteAfter, eventId));
+
+ // Insert entry with different deduplication key
+ String dedupKey2 = "dedup-key-new-" + UUID.randomUUID();
+ assertTrue(valkeyService.isNewEvent(eventTypeId1, dedupKey2, deleteAfter, eventId));
+
+ // Deduplication required both event type and dedup key to match
+ assertTrue(valkeyService.isNewEvent(eventTypeId2, dedupKey2, deleteAfter, eventId));
+
+ // Events which should have already expired will also return successfully
+ LocalDateTime expiredDeleteAfter = LocalDateTime.of(2023, 5, 11, 15, 40, 21);
+ String dedupKey3 = "dedup-key-new-" + UUID.randomUUID();
+ assertTrue(valkeyService.isNewEvent(eventTypeId1, dedupKey3, expiredDeleteAfter, eventId));
+ }
+
+ @Test
+ void testAddDuplicateEntries() {
+ // Insert initial event
+ UUID eventTypeId = UUID.randomUUID();
+ String dedupKey = "dedup-key-duplicates-" + UUID.randomUUID();
+ UUID eventId = UUID.randomUUID();
+ LocalDateTime deleteAfter = LocalDateTime.now().plusDays(7);
+ assertTrue(valkeyService.isNewEvent(eventTypeId, dedupKey, deleteAfter, eventId));
+
+ // Attempt to reinsert the same event
+ assertFalse(valkeyService.isNewEvent(eventTypeId, dedupKey, deleteAfter, eventId));
+
+ // Changing the expiry date does not affect duplicate detection
+ LocalDateTime deleteAfter2 = deleteAfter.plusDays(10);
+ assertFalse(valkeyService.isNewEvent(eventTypeId, dedupKey, deleteAfter2, eventId));
+
+ // Event ID is not used to identify duplicates, and has no impact outside of debugging
+ UUID eventId2 = UUID.randomUUID();
+ assertFalse(valkeyService.isNewEvent(eventTypeId, dedupKey, deleteAfter, eventId2));
+ }
+
+ @Test
+ void testEntryExpiry() throws InterruptedException {
+ UUID eventTypeId = UUID.randomUUID();
+ String deduplicationKey = "dedup-expiry-key-" + UUID.randomUUID();
+ UUID eventId = UUID.randomUUID();
+
+ // Key will expire in 7 seconds
+ LocalDateTime deleteAfter = LocalDateTime.now().plusSeconds(7);
+ valkeyService.isNewEvent(eventTypeId, deduplicationKey, deleteAfter, eventId);
+
+ // Wait 10 seconds and attempt to insert key that should have expired
+ Thread.sleep(Duration.ofSeconds(10));
+ assertTrue(valkeyService.isNewEvent(eventTypeId, deduplicationKey, deleteAfter, eventId));
+ }
+}
diff --git a/engine/src/test/java/com/redhat/cloud/notifications/events/deduplication/EventDeduplicatorTest.java b/engine/src/test/java/com/redhat/cloud/notifications/events/deduplication/EventDeduplicatorTest.java
index 4593eadce9..c0ac8de10c 100644
--- a/engine/src/test/java/com/redhat/cloud/notifications/events/deduplication/EventDeduplicatorTest.java
+++ b/engine/src/test/java/com/redhat/cloud/notifications/events/deduplication/EventDeduplicatorTest.java
@@ -1,33 +1,47 @@
package com.redhat.cloud.notifications.events.deduplication;
+import com.redhat.cloud.notifications.config.EngineConfig;
import com.redhat.cloud.notifications.events.EventWrapperAction;
import com.redhat.cloud.notifications.models.Application;
import com.redhat.cloud.notifications.models.Bundle;
import com.redhat.cloud.notifications.models.Event;
import com.redhat.cloud.notifications.models.EventType;
import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.mockito.InjectSpy;
import io.vertx.core.json.JsonObject;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.transaction.Transactional;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
@QuarkusTest
class EventDeduplicatorTest {
+ public static final String TEST_BUNDLE_NAME = "test-bundle";
+ public static final String SUBSCRIPTION_SERVICES_BUNDLE_NAME = "subscription-services";
+
+ static final ZoneId UTC_ZONE = ZoneId.of("UTC");
+
@Inject
EventDeduplicator eventDeduplicator;
@Inject
EntityManager entityManager;
+ @InjectSpy
+ EngineConfig config;
+
@BeforeEach
@Transactional
void beforeEach() {
@@ -36,16 +50,30 @@ void beforeEach() {
.executeUpdate();
}
- @Test
- void testIsNewWithDefaultDeduplication() {
+ @AfterEach
+ @Transactional
+ void afterEach() {
+ entityManager
+ .createNativeQuery("DELETE FROM bundles WHERE name = :testName OR name = :subName")
+ .setParameter("testName", TEST_BUNDLE_NAME)
+ .setParameter("subName", SUBSCRIPTION_SERVICES_BUNDLE_NAME)
+ .executeUpdate();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testIsNewWithDefaultDeduplication(final boolean valkeyDedupEnabled) {
+ when(config.isValkeyEventDeduplicatorEnabled()).thenReturn(valkeyDedupEnabled);
+ when(config.isInMemoryDbEnabled()).thenReturn(valkeyDedupEnabled);
- EventType eventType = createEventType("test-bundle", "test-app");
+ EventType eventType = createEventType(TEST_BUNDLE_NAME, "test-app");
+ LocalDateTime dateTime = LocalDateTime.now(UTC_ZONE);
UUID eventId1 = UUID.randomUUID();
Event event1 = new Event();
event1.setId(eventId1);
event1.setEventType(eventType);
- event1.setEventWrapper(new EventWrapperAction(ActionBuilder.build(LocalDateTime.of(2025, 11, 14, 10, 52))));
+ event1.setEventWrapper(new EventWrapperAction(ActionBuilder.build(dateTime)));
assertTrue(eventDeduplicator.isNew(event1), "New event should return true");
@@ -53,28 +81,33 @@ void testIsNewWithDefaultDeduplication() {
Event event2 = new Event();
event2.setId(eventId2);
event2.setEventType(eventType);
- event2.setEventWrapper(new EventWrapperAction(ActionBuilder.build(LocalDateTime.of(2025, 11, 14, 10, 52))));
+ event2.setEventWrapper(new EventWrapperAction(ActionBuilder.build(dateTime)));
assertTrue(eventDeduplicator.isNew(event2), "New event should return true");
Event event3 = new Event();
event3.setId(eventId2);
event3.setEventType(eventType);
- event3.setEventWrapper(new EventWrapperAction(ActionBuilder.build(LocalDateTime.of(2025, 11, 14, 10, 52))));
+ event3.setEventWrapper(new EventWrapperAction(ActionBuilder.build(dateTime)));
assertFalse(eventDeduplicator.isNew(event3), "Duplicate event should return false");
}
- @Test
- void testIsNewWithSubscriptionsDeduplication() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testIsNewWithSubscriptionsDeduplication(final boolean valkeyDedupEnabled) {
+ when(config.isValkeyEventDeduplicatorEnabled()).thenReturn(valkeyDedupEnabled);
+ when(config.isInMemoryDbEnabled()).thenReturn(valkeyDedupEnabled);
- EventType eventType = createEventType("subscription-services", "subscriptions");
+ EventType eventType = createEventType(SUBSCRIPTION_SERVICES_BUNDLE_NAME, "subscriptions");
+ LocalDateTime baseDateTime =
+ LocalDateTime.of(LocalDateTime.now(UTC_ZONE).plusYears(1).getYear(), 11, 14, 10, 52);
Event event1 = createSubscriptionsEvent(
UUID.randomUUID(),
"org123",
eventType,
- LocalDateTime.of(2025, 11, 14, 10, 52),
+ baseDateTime,
"prod456",
"metric789",
"billing001");
@@ -85,7 +118,7 @@ void testIsNewWithSubscriptionsDeduplication() {
UUID.randomUUID(),
"org123",
eventType,
- LocalDateTime.of(2025, 11, 15, 14, 30), // Different day, same month.
+ baseDateTime.withDayOfMonth(15).withHour(14).withMinute(30), // Different day, same month.
"prod456",
"metric789",
"billing001");
@@ -96,7 +129,7 @@ void testIsNewWithSubscriptionsDeduplication() {
UUID.randomUUID(),
"org999",
eventType,
- LocalDateTime.of(2025, 11, 16, 9, 15), // Different day, still same month.
+ baseDateTime.withDayOfMonth(16).withHour(9).withMinute(15), // Different day, still same month.
"prod456",
"metric789",
"billing001");
@@ -107,7 +140,7 @@ void testIsNewWithSubscriptionsDeduplication() {
UUID.randomUUID(),
"org123",
eventType,
- LocalDateTime.of(2025, 12, 1, 10, 0), // Different month.
+ baseDateTime.withMonth(12).withDayOfMonth(1).withHour(10).withMinute(0), // Different month.
"prod456",
"metric789",
"billing001");
@@ -118,7 +151,7 @@ void testIsNewWithSubscriptionsDeduplication() {
UUID.randomUUID(),
"org123",
eventType,
- LocalDateTime.of(2025, 11, 17, 11, 0),
+ baseDateTime.withDayOfMonth(17).withHour(11).withMinute(0),
"prod999",
"metric789",
"billing001");
@@ -129,7 +162,7 @@ void testIsNewWithSubscriptionsDeduplication() {
UUID.randomUUID(),
"org123",
eventType,
- LocalDateTime.of(2025, 11, 18, 11, 0),
+ baseDateTime.withDayOfMonth(18).withHour(11).withMinute(0),
"prod999",
"metric999",
"billing001");
@@ -140,7 +173,7 @@ void testIsNewWithSubscriptionsDeduplication() {
UUID.randomUUID(),
"org123",
eventType,
- LocalDateTime.of(2025, 11, 19, 11, 0),
+ baseDateTime.withDayOfMonth(19).withHour(11).withMinute(0),
"prod999",
"metric999",
"billing999");