diff --git a/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java b/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java index 8c00395d3..87c1b1150 100644 --- a/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java +++ b/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java @@ -163,6 +163,7 @@ import cwms.cda.api.watersupply.WaterUserDeleteController; import cwms.cda.api.watersupply.WaterUserUpdateController; import cwms.cda.data.dao.JooqDao; +import cwms.cda.data.dao.rss.QueueManager; import cwms.cda.formatters.Formats; import cwms.cda.security.Authenticator; import cwms.cda.security.CdaAccessManager; @@ -395,6 +396,7 @@ public void init() { ctx.status(200); // Respond with a 200 OK status }) .javalinServlet(); + QueueManager.ensureRssSubscribers(cwms); logger.atInfo().log("Javalin initialized."); } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/rss/QueueManager.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/rss/QueueManager.java new file mode 100644 index 000000000..c480c07ee --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/rss/QueueManager.java @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2025 Hydrologic Engineering Center + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package cwms.cda.data.dao.rss; + +import com.google.common.flogger.FluentLogger; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import javax.sql.DataSource; + +public final class QueueManager { + private static final FluentLogger LOGGER = FluentLogger.forEnclosingClass(); + private static final String SUBSCRIBER_NAME = "CDA_QUEUE_SUBSCRIBER"; + private static final String SQL = "BEGIN " + + " FOR rec IN ( " + + " SELECT OWNER, NAME " + + " FROM ALL_QUEUES " + + " WHERE OWNER = 'CWMS_20' " + + " AND QUEUE_TYPE = 'NORMAL_QUEUE' " + + " AND REGEXP_LIKE(NAME, 'STATUS|TS_UPDATES|REALTIME_OPS') " + + " ) LOOP " + + " BEGIN " + + " DBMS_AQADM.ADD_SUBSCRIBER( " + + " queue_name => rec.OWNER || '.' || rec.NAME, " + + " subscriber => sys.aq$_agent(?, NULL, NULL) " + + " ); " + + " EXCEPTION " + + " WHEN OTHERS THEN " + + " IF SQLCODE != -24034 THEN RAISE; END IF; " + // Ignore "Already a subscriber" + " END; " + + " END LOOP; " + + " COMMIT;" + + "END;"; + + private QueueManager() { + throw new AssertionError("Utility class"); + } + + public static void ensureRssSubscribers(DataSource dataSource) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement stmt = connection.prepareStatement(SQL)) { + stmt.setString(1, SUBSCRIBER_NAME); + stmt.execute(); + } catch (SQLException ex) { + LOGGER.atWarning().withCause(ex).log("Unable to ensure CDA persists AQ subscriptions. " + + "RSS endpoints may miss events."); + } + } +} diff --git a/cwms-data-api/src/test/java/cwms/cda/api/rss/RssHandlerIT.java b/cwms-data-api/src/test/java/cwms/cda/api/rss/RssHandlerIT.java index bdf3d2b56..f699a2620 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/rss/RssHandlerIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/rss/RssHandlerIT.java @@ -29,7 +29,6 @@ import static cwms.cda.api.Controllers.PAGE_SIZE; import static cwms.cda.security.ApiKeyIdentityProvider.AUTH_HEADER; import static io.restassured.RestAssured.given; -import static java.lang.String.format; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -48,7 +47,6 @@ import io.restassured.response.Response; import java.math.BigInteger; import java.net.URI; -import java.sql.Timestamp; import java.util.List; import javax.servlet.http.HttpServletResponse; import org.jooq.Configuration; @@ -69,22 +67,6 @@ final class RssHandlerIT extends DataApiTestIT { void setup() throws Exception { CwmsDataApiSetupCallback.getDatabaseLink().connection(c -> { Configuration configuration = DSL.using(c).configuration(); - //Need to have at least one subscriber for the messages to not automatically disappear from the table - configuration.dsl().execute( - "BEGIN " + - " BEGIN " + - " DBMS_AQADM.ADD_SUBSCRIBER(" + - " queue_name => ?, " + - " subscriber => sys.aq$_agent(?, NULL, NULL)" + - " ); " + - " EXCEPTION " + - " WHEN OTHERS THEN " + - " IF SQLCODE != -24034 THEN RAISE; END IF; " + // Ignore "Already a subscriber" - " END; " + - "END;", - "CWMS_20.SPK_STATUS", - "RSS_FEED_READER" - ); CWMS_ENV_PACKAGE.call_SET_SESSION_OFFICE_ID(configuration, OFFICE_ID); String text = "\n" + " %s\n" +