Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .rhcicd/clowdapp-engine.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ objects:
- export-service
database:
sharedDbAppName: notifications-backend
inMemoryDb: ${{IN_MEMORY_DB_ENABLED}}
featureFlags: true
kafkaTopics:
- topicName: platform.notifications.connector.email.high.volume
Expand Down Expand Up @@ -287,6 +288,9 @@ parameters:
value: quay.io/cloudservices/notifications-engine
- name: IMAGE_TAG
value: latest
- name: IN_MEMORY_DB_ENABLED
description: If inMemoryDb is set to true, Clowder will pass configuration of an In Memory Database to the pods in the ClowdApp. This single instance will be shared between all apps.
value: "false"
- name: KAFKA_CONSUMED_TOTAL_CHECKER_ENABLED
description: Is the Kafka records consumed total check included in the global health check?
value: "false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ public class TestConstants {
public static final String DEFAULT_USER = "default-user";

public static final String POSTGRES_MAJOR_VERSION = "16";
public static final String VALKEY_MAJOR_VERSION = "8";
}
17 changes: 17 additions & 0 deletions engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

<properties>
<apache.commons.csv.version>1.14.1</apache.commons.csv.version>
<testcontainers-valkey.version>1.0.0</testcontainers-valkey.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -46,6 +47,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-cache</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-redis-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-flyway</artifactId>
Expand Down Expand Up @@ -115,6 +120,12 @@
<artifactId>flyway-database-postgresql</artifactId>
</dependency>

<!-- Valkey connection management -->
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>

<!-- events-schemas -->
<dependency>
<groupId>com.redhat.cloud.event</groupId>
Expand Down Expand Up @@ -184,6 +195,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.ss-bhatt</groupId>
<artifactId>testcontainers-valkey</artifactId>
<version>${testcontainers-valkey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mailer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class EngineConfig {
private static final String UNLEASH = "notifications.unleash.enabled";
private static final String PROCESSOR_CONNECTORS_MAX_SERVER_ERRORS = "processor.connectors.max-server-errors";
private static final String PROCESSOR_CONNECTORS_MIN_DELAY_SINCE_FIRST_SERVER_ERROR = "processor.connectors.min-delay-since-first-server-error";
private static final String IN_MEMORY_DB_ENABLED = "in-memory-db.enabled";

/**
* Standard "Red Hat Hybrid Cloud Console" sender that the vast majority of the
Expand All @@ -57,6 +58,7 @@ public class EngineConfig {
private String drawerToggle;
private String exportServiceHccClusterToggle;
private String kafkaConsumedTotalCheckerToggle;
private String valkeyEventDeduplicatorToggle;
private String toggleBlacklistedEndpoints;
private String toggleBlacklistedEventTypes;
private String toggleKafkaOutgoingHighVolumeTopic;
Expand Down Expand Up @@ -153,6 +155,9 @@ public class EngineConfig {
@ConfigProperty(name = NOTIFICATIONS_INGRESSREPLAY_END_TIME, defaultValue = "2025-12-15T09:30:00Z")
String replayEndTime;

@ConfigProperty(name = IN_MEMORY_DB_ENABLED, defaultValue = "false")
boolean inMemoryDbEnabled;

@Inject
ToggleRegistry toggleRegistry;

Expand All @@ -166,6 +171,7 @@ void postConstruct() {
drawerToggle = toggleRegistry.register("drawer", true);
exportServiceHccClusterToggle = toggleRegistry.register("export-service-hcc-cluster", true);
kafkaConsumedTotalCheckerToggle = toggleRegistry.register("kafka-consumed-total-checker", true);
valkeyEventDeduplicatorToggle = toggleRegistry.register("valkey-event-deduplicator", true);
toggleKafkaOutgoingHighVolumeTopic = toggleRegistry.register("kafka-outgoing-high-volume-topic", true);
toggleBlacklistedEndpoints = toggleRegistry.register("blacklisted-endpoints", true);
toggleBlacklistedEventTypes = toggleRegistry.register("blacklisted-event-types", true);
Expand Down Expand Up @@ -201,6 +207,8 @@ void logConfigAtStartup(@Observes Startup event) {
config.put(asyncEventProcessingToggle, isAsyncEventProcessing());
config.put(toggleIncludeSeverityToFilterRecipients, isIncludeSeverityToFilterRecipientsEnabled(""));
config.put(toggleSkipProcessingMessagesOnReplayService, isSkipMessageProcessing());
config.put(valkeyEventDeduplicatorToggle, isValkeyEventDeduplicatorEnabled());
config.put(IN_MEMORY_DB_ENABLED, isInMemoryDbEnabled());

Log.info("=== Startup configuration ===");
config.forEach((key, value) -> {
Expand Down Expand Up @@ -371,4 +379,16 @@ public boolean isSubscriptionsDeduplicationWillBeNotifiedEnabled(String orgId) {
return false;
}
}

public boolean isValkeyEventDeduplicatorEnabled() {
if (unleashEnabled) {
return this.unleash.isEnabled(this.valkeyEventDeduplicatorToggle, false);
} else {
return false;
}
}

public boolean isInMemoryDbEnabled() {
return inMemoryDbEnabled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.redhat.cloud.notifications.events;

import com.redhat.cloud.notifications.config.EngineConfig;
import io.quarkus.logging.Log;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.redis.client.RedisOptions;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

/** Stores and retrieves data from remote cache (i.e. Valkey). */
@ApplicationScoped
public class ValkeyService {

private static final String EVENT_DEDUPLICATION_KEY = "engine:event-deduplication";
private static final String NOT_USED = "";

@ConfigProperty(name = "valkey-service.ttl", defaultValue = "PT24H")
Duration ttl;

@ConfigProperty(name = "quarkus.redis.hosts", defaultValue = "")
Optional<String> valkeyHost;

@ConfigProperty(name = "quarkus.redis.password", defaultValue = "")
Optional<String> valkeyPassword;

@Inject
EngineConfig config;

@Inject
Vertx vertx;

/** The underlying client connecting to Valkey. */
private Redis valkeyClient;

/** Implementation of the Redis/Valkey API, using {@link #valkeyClient} */
private RedisAPI valkey;

@PostConstruct
void initialize() {
if (config.isInMemoryDbEnabled()) {
if (valkeyHost.isEmpty() || valkeyHost.get().isEmpty()) {
Log.warn("In-memory DB enabled, but Valkey connection string was not provided");
} else {
RedisOptions valkeyOptions = new RedisOptions().setConnectionString(valkeyHost.get());
valkeyPassword.ifPresent(valkeyOptions::setPassword);

this.valkeyClient = Redis.createClient(vertx, valkeyOptions);
this.valkey = RedisAPI.api(this.valkeyClient);
}
}
}

/**
* Verifies that the event has not been previously processed. The format of saved keys is
* {@code engine:event-deduplication:<event_type>:<deduplication_key>}.
*
* @param eventId only used for debugging
* @see com.redhat.cloud.notifications.events.deduplication.EventDeduplicator EventDeduplicator
*/
public boolean isNewEvent(UUID eventTypeId, String deduplicationKey, LocalDateTime deleteAfter, UUID eventId) {
String key = String.format("%s:%s:%s", EVENT_DEDUPLICATION_KEY, eventTypeId, deduplicationKey);
String deleteAfterIso = deleteAfter.format(DateTimeFormatter.ISO_DATE_TIME);

boolean isNew = valkey.setnxAndAwait(key, deleteAfterIso).toBoolean();
if (isNew) {
boolean expireSet = valkey.expireatAndAwait(List.of(
key,
String.valueOf(deleteAfter.toEpochSecond(ZoneOffset.UTC))
)).toBoolean();

if (!expireSet) {
// dedup key may include private information, so other fields are used
Log.warnf("unable to set expiry for Valkey event deduplication [event_type_id=%s, event_id=%s, delete_after=%s]",
eventTypeId, eventId, deleteAfterIso);
}
}

return isNew;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.redhat.cloud.notifications.events.deduplication;

import com.redhat.cloud.notifications.config.EngineConfig;
import com.redhat.cloud.notifications.events.ValkeyService;
import com.redhat.cloud.notifications.models.Event;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.transaction.Transactional;

import java.time.LocalDateTime;
import java.util.Optional;
import java.util.UUID;

@ApplicationScoped
public class EventDeduplicator {
Expand All @@ -21,6 +25,12 @@ public class EventDeduplicator {
@Inject
SubscriptionsDeduplicationConfig subscriptionsDeduplicationConfig;

@Inject
EngineConfig engineConfig;

@Inject
ValkeyService valkeyService;

private EventDeduplicationConfig getEventDeduplicationConfig(Event event) {
return switch (event.getEventType().getApplication().getBundle().getName()) {
case SUBSCRIPTION_SERVICES_BUNDLE ->
Expand All @@ -43,15 +53,22 @@ public boolean isNew(Event event) {
return true;
}

String sql = "INSERT INTO event_deduplication(event_type_id, deduplication_key, delete_after) " +
"VALUES (:eventTypeId, :deduplicationKey, :deleteAfter) " +
"ON CONFLICT (event_type_id, deduplication_key) DO NOTHING";
UUID eventTypeId = event.getEventType().getId();
LocalDateTime deleteAfter = eventDeduplicationConfig.getDeleteAfter(event);

if (engineConfig.isInMemoryDbEnabled() && engineConfig.isValkeyEventDeduplicatorEnabled()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to "feed" Valkey for 15 days before using it for dedup check.
could you also compare both result valkey and RDS, then log a warn if results are different ?
(RDS should have the last word for few weeks, then we could drop it if result are always identical for 2 weeks)

return valkeyService.isNewEvent(eventTypeId, deduplicationKey.get(), deleteAfter, event.getId());
Comment on lines +59 to +60
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Make the Valkey dedup path rollback-safe.

isNew() still runs inside a transaction, but this branch writes to Valkey outside that transaction. If the enclosing transaction rolls back later, the Valkey key remains while the database work is undone, and a retry will be treated as a duplicate. That is a behavior change from the SQL path below, where the insert is rolled back with the transaction. Please make this reservation rollback-safe (for example by cleaning it up on rollback or only finalizing it after commit).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@engine/src/main/java/com/redhat/cloud/notifications/events/deduplication/EventDeduplicator.java`
around lines 59 - 60, The Valkey branch in EventDeduplicator calls
valkeyService.isNewEvent(...) while still inside a DB transaction, causing
Valkey writes to become permanent even if the DB transaction later rolls back;
make this reservation rollback-safe by deferring finalization or cleaning up on
rollback: modify EventDeduplicator (the branch guarded by
engineConfig.isInMemoryDbEnabled() &&
engineConfig.isValkeyEventDeduplicatorEnabled()) to call a non-final "reserve"
method on valkeyService (e.g., valkeyService.reserveEventKey(...)) and then
register a transaction synchronization (via Spring's
TransactionSynchronizationManager.registerSynchronization or
TransactionSynchronization) that on afterCommit finalizes the reservation (e.g.,
valkeyService.confirmReservation(...)) and on afterCompletion/rollback removes
the reservation (e.g., valkeyService.cancelReservation(...)); alternatively,
perform the final write only in an onCommit callback so Valkey state mirrors DB
commit.

Copy link
Copy Markdown
Member Author

@jessicarod7 jessicarod7 Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not applicable, Redis does not support transaction rollbacks. Unless there's a way to implement this manually and we actually do need it.

Edit: To the reviewer, please add your thoughts on the bot's suggestion, I'm leaning no.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jessicarod7, you're correct that Redis/Valkey has no native transaction rollback support. The concern I raised isn't about Redis rolling back — it's about the DB transaction rolling back after the Valkey key is written, which would leave a stale key and cause a retry of the same event to be silently dropped as a "duplicate."

Manual compensation is technically achievable in Quarkus without Redis transactions — you can register a jakarta.transaction.Synchronization against the current DB transaction, whose beforeCompletion is invoked prior to the two-phase commit and whose afterCompletion is called after the transaction completes. In afterCompletion, you'd check whether the status indicates a rollback and, if so, issue a Redis DEL on the deduplication key. The afterCompletion callback runs in an undefined context, but resources can still be closed and non-transactional work (like a Redis command) can be performed.

That said, this compensation approach has its own failure mode: if the DEL itself fails (Valkey unreachable after the DB rollback), you're still in the inconsistent state. This is a fundamental dual-write problem without XA support.

The practical question is: how often do DB transaction rollbacks occur after isNew() runs but before the outer transaction commits, and are retries expected in that path? If DB rollbacks in that window are rare and/or retries are unlikely to happen within the Valkey key's TTL, the current implementation may be a reasonable trade-off. If event loss on retry after a DB rollback is unacceptable for this use case, the JTA synchronization + compensating DEL pattern would address it — though imperfectly.

Happy to defer to your judgment on whether this risk level is acceptable given your system's retry behavior.


📝 Learnings were identified and not saved (knowledge base disabled). Enable

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All that code is wrapped into a large try/catch. I think we could delete the Valkey key in the catch block.
It will let us the opportunity to replay the message later, just in case it fails because of an internal issue.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

} else {
String sql = "INSERT INTO event_deduplication(event_type_id, deduplication_key, delete_after) " +
"VALUES (:eventTypeId, :deduplicationKey, :deleteAfter) " +
"ON CONFLICT (event_type_id, deduplication_key) DO NOTHING";

int rowCount = entityManager.createNativeQuery(sql)
.setParameter("eventTypeId", event.getEventType().getId())
.setParameter("deduplicationKey", deduplicationKey.get())
.setParameter("deleteAfter", eventDeduplicationConfig.getDeleteAfter(event))
.executeUpdate();
return rowCount > 0;
int rowCount = entityManager.createNativeQuery(sql)
.setParameter("eventTypeId", eventTypeId)
.setParameter("deduplicationKey", deduplicationKey.get())
.setParameter("deleteAfter", deleteAfter)
.executeUpdate();
return rowCount > 0;
}
}
}
3 changes: 3 additions & 0 deletions engine/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ quarkus.http.port=8087
%test.quarkus.http.test-port=9087
%test.quarkus.datasource.devservices.enabled=true

# Configure Valkey devservices for test cases
%test.quarkus.redis.devservices.enabled=true

# Input queue
mp.messaging.incoming.ingress.connector=smallrye-kafka
mp.messaging.incoming.ingress.topic=platform.notifications.ingress
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.redhat.cloud.notifications;

import io.github.ss_bhatt.testcontainers.valkey.ValkeyContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import org.eclipse.microprofile.config.ConfigProvider;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -15,30 +17,42 @@

import static com.redhat.cloud.notifications.MockServerLifecycleManager.getMockServerUrl;
import static com.redhat.cloud.notifications.TestConstants.POSTGRES_MAJOR_VERSION;
import static com.redhat.cloud.notifications.TestConstants.VALKEY_MAJOR_VERSION;

public class TestLifecycleManager implements QuarkusTestResourceLifecycleManager {

Boolean quarkusDevServiceEnabled = true;
Boolean quarkusDatasourceDevServiceEnabled = true;
Boolean quarkusValkeyDevServiceEnabled = true;

PostgreSQLContainer<?> postgreSQLContainer;
ValkeyContainer valkeyContainer;

@Override
public Map<String, String> start() {
System.out.println("++++ TestLifecycleManager start +++");
Optional<Boolean> quarkusDevServiceEnabledFlag = ConfigProvider.getConfig().getOptionalValue("quarkus.datasource.devservices.enabled", Boolean.class);
if (quarkusDevServiceEnabledFlag.isPresent()) {
quarkusDevServiceEnabled = quarkusDevServiceEnabledFlag.get();
}
System.out.println(" -- quarkusDatasourceDevServiceEnabled is " + quarkusDevServiceEnabled);
Optional<Boolean> quarkusDatasourceDevServiceEnabledFlag = ConfigProvider.getConfig().getOptionalValue("quarkus.datasource.devservices.enabled", Boolean.class);
quarkusDatasourceDevServiceEnabledFlag.ifPresent(flag -> quarkusDatasourceDevServiceEnabled = flag);
System.out.println(" -- quarkusDatasourceDevServiceEnabled is " + quarkusDatasourceDevServiceEnabled);

Optional<Boolean> quarkusValkeyDevServiceEnabledFlag = ConfigProvider.getConfig().getOptionalValue("quarkus.redis.devservices.enabled", Boolean.class);
quarkusValkeyDevServiceEnabledFlag.ifPresent(flag -> quarkusValkeyDevServiceEnabled = flag);
System.out.println(" -- quarkusValkeyDevServiceEnabled is " + quarkusValkeyDevServiceEnabled);

Map<String, String> properties = new HashMap<>();
if (quarkusDevServiceEnabled) {
if (quarkusDatasourceDevServiceEnabled) {
try {
setupPostgres(properties);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (quarkusValkeyDevServiceEnabled) {
try {
setupValkey(properties);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
setupMockEngine(properties);

System.out.println(" -- Running with properties: " + properties);
Expand All @@ -47,7 +61,7 @@ public Map<String, String> start() {

@Override
public void stop() {
if (quarkusDevServiceEnabled) {
if (quarkusDatasourceDevServiceEnabled) {
postgreSQLContainer.stop();
}
MockServerLifecycleManager.stop();
Expand Down Expand Up @@ -76,6 +90,16 @@ void setupPostgres(Map<String, String> props) throws SQLException {
connection.close();
}

void setupValkey(Map<String, String> props) {
valkeyContainer = new ValkeyContainer(DockerImageName.parse("valkey/valkey:" + VALKEY_MAJOR_VERSION)
.asCompatibleSubstituteFor("docker.io/valkey/valkey"));
valkeyContainer.start();
// Provide the connection credentials as a redis URI for compatibility
String valkeyHost = valkeyContainer.getConnectionString().replace("valkey://", "redis://");
props.put("quarkus.redis.hosts", valkeyHost);
props.put("in-memory-db.enabled", "true");
}

void setupMockEngine(Map<String, String> props) {
MockServerLifecycleManager.start();
props.put("quarkus.rest-client.export-service.url", getMockServerUrl());
Expand Down
Loading
Loading