Skip to content

Commit 4b9bde3

Browse files
committed
add dynamic handling of camel routes
groups routes for office as well as individual routes add basic security mechanism that requires client to specify username and apikey for the password create a specific cda apikey for authenticating within the vm
1 parent 496d69b commit 4b9bde3

File tree

9 files changed

+873
-115
lines changed

9 files changed

+873
-115
lines changed

cwms-data-api/src/main/java/cwms/cda/ApiServlet.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import cwms.cda.api.errors.JsonFieldsException;
9999
import cwms.cda.api.errors.NotFoundException;
100100
import cwms.cda.api.errors.RequiredQueryParameterException;
101+
import cwms.cda.api.messaging.CdaTopicHandler;
101102
import cwms.cda.data.dao.JooqDao;
102103
import cwms.cda.formatters.Formats;
103104
import cwms.cda.formatters.FormattingException;
@@ -187,7 +188,8 @@
187188
"/projects/*",
188189
"/properties/*",
189190
"/lookup-types/*",
190-
"/embankments/*"
191+
"/embankments/*",
192+
"/cda-topics"
191193
})
192194
public class ApiServlet extends HttpServlet {
193195

@@ -219,12 +221,13 @@ public class ApiServlet extends HttpServlet {
219221

220222
@Resource(name = "jdbc/CWMS3")
221223
DataSource cwms;
222-
224+
private CdaTopicHandler cdaTopicHandler;
223225

224226

225227
@Override
226228
public void destroy() {
227229
javalin.destroy();
230+
cdaTopicHandler.shutdown();
228231
}
229232

230233
@Override
@@ -514,6 +517,12 @@ protected void configureRoutes() {
514517
new PropertyController(metrics), requiredRoles,1, TimeUnit.DAYS);
515518
cdaCrudCache(format("/lookup-types/{%s}", Controllers.NAME),
516519
new LookupTypeController(metrics), requiredRoles,1, TimeUnit.DAYS);
520+
if(true || Boolean.getBoolean("cwms.data.api.messaging.enabled")) {
521+
//TODO: setup separate data source for persistent connections to Oracle AQ
522+
cdaTopicHandler = new CdaTopicHandler(cwms, metrics);
523+
get("/cda-topics", cdaTopicHandler);
524+
addCacheControl("/cda-topics", 1, TimeUnit.DAYS);
525+
}
517526
}
518527

519528
/**
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2024 Hydrologic Engineering Center
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package cwms.cda.api.messaging;
26+
27+
import static cwms.cda.ApiServlet.CWMS_USERS_ROLE;
28+
29+
import com.google.common.flogger.FluentLogger;
30+
import cwms.cda.data.dao.AuthDao;
31+
import cwms.cda.security.CwmsAuthException;
32+
import cwms.cda.security.DataApiPrincipal;
33+
import java.util.Set;
34+
import javax.sql.DataSource;
35+
import org.apache.activemq.artemis.core.security.CheckType;
36+
import org.apache.activemq.artemis.core.security.Role;
37+
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
38+
import org.jooq.SQLDialect;
39+
import org.jooq.impl.DSL;
40+
41+
final class ArtemisSecurityManager implements ActiveMQSecurityManager {
42+
private static final FluentLogger LOGGER = FluentLogger.forEnclosingClass();
43+
private final DataSource dataSource;
44+
private final String cdaUser;
45+
46+
ArtemisSecurityManager(DataSource dataSource) {
47+
this.dataSource = dataSource;
48+
cdaUser = DSL.using(dataSource, SQLDialect.ORACLE18C)
49+
.connectionResult(c -> c.getMetaData().getUserName());
50+
}
51+
52+
@Override
53+
public boolean validateUser(String user, String password) {
54+
return validate(user, password);
55+
}
56+
57+
@Override
58+
public boolean validateUserAndRole(String user, String password, Set<Role> roles, CheckType checkType) {
59+
//CDA User is allowed to send and manage messages for the invm acceptor.
60+
//Other users are not allowed to send messages.
61+
if (!cdaUser.equalsIgnoreCase(user) && (checkType == CheckType.SEND || checkType == CheckType.MANAGE)) {
62+
LOGGER.atWarning().log("User: " + user
63+
+ " attempting to access Artemis Server with check type: " + checkType
64+
+ " Only message consumption is supported.");
65+
return false;
66+
}
67+
return validate(user, password);
68+
}
69+
70+
private boolean validate(String user, String password) {
71+
AuthDao instance = AuthDao.getInstance(DSL.using(dataSource, SQLDialect.ORACLE18C));
72+
boolean retval = false;
73+
try {
74+
DataApiPrincipal principal = instance.getByApiKey(password);
75+
retval = principal.getName().equalsIgnoreCase(user)
76+
&& principal.getRoles().contains(new cwms.cda.security.Role(CWMS_USERS_ROLE));
77+
} catch (CwmsAuthException ex) {
78+
LOGGER.atWarning().withCause(ex).log("Unauthenticated user: " + user
79+
+ " attempting to access Artemis Server");
80+
}
81+
return retval;
82+
}
83+
}
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2024 Hydrologic Engineering Center
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package cwms.cda.api.messaging;
26+
27+
import static java.lang.String.format;
28+
import static java.util.stream.Collectors.toList;
29+
import static java.util.stream.Collectors.toMap;
30+
import static java.util.stream.Collectors.toSet;
31+
import static org.jooq.impl.DSL.field;
32+
import static org.jooq.impl.DSL.name;
33+
import static org.jooq.impl.DSL.table;
34+
35+
import com.google.common.flogger.FluentLogger;
36+
import cwms.cda.ApiServlet;
37+
import cwms.cda.data.dao.AuthDao;
38+
import cwms.cda.data.dto.auth.ApiKey;
39+
import cwms.cda.security.DataApiPrincipal;
40+
import java.net.InetAddress;
41+
import java.net.UnknownHostException;
42+
import java.time.Instant;
43+
import java.time.ZonedDateTime;
44+
import java.util.Collection;
45+
import java.util.HashSet;
46+
import java.util.List;
47+
import java.util.Map;
48+
import java.util.Set;
49+
import java.util.UUID;
50+
import java.util.regex.Matcher;
51+
import java.util.regex.Pattern;
52+
import javax.jms.ConnectionFactory;
53+
import javax.jms.TopicConnectionFactory;
54+
import javax.sql.DataSource;
55+
import oracle.jms.AQjmsFactory;
56+
import org.apache.activemq.artemis.api.core.ActiveMQException;
57+
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
58+
import org.apache.activemq.artemis.core.server.ServerConsumer;
59+
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
60+
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
61+
import org.apache.camel.CamelContext;
62+
import org.apache.camel.component.jms.JmsComponent;
63+
import org.apache.camel.impl.DefaultCamelContext;
64+
import org.apache.camel.model.RouteDefinition;
65+
import org.jooq.DSLContext;
66+
import org.jooq.Field;
67+
import org.jooq.Record1;
68+
import org.jooq.SQLDialect;
69+
import org.jooq.impl.DSL;
70+
71+
final class CamelRouter implements ActiveMQServerPlugin {
72+
private static final FluentLogger LOGGER = FluentLogger.forEnclosingClass();
73+
private static final String ORACLE_QUEUE_SOURCE = "oracleAQ";
74+
private static final String ARTEMIS_QUEUE_SOURCE = "artemis";
75+
private final CamelContext camelContext;
76+
private final Map<OracleQueue, RouteDefinition> routeDefinitions;
77+
private final String oracleAqClientId;
78+
79+
CamelRouter(DataSource cwms) throws Exception {
80+
oracleAqClientId = getClientId();
81+
camelContext = initCamel(cwms);
82+
routeDefinitions = buildRouteDefinitions(cwms);
83+
camelContext.addRouteDefinitions(routeDefinitions.values());
84+
}
85+
86+
private CamelContext initCamel(DataSource cwms) {
87+
try {
88+
//wrapped DelegatingDataSource is used because internally AQJMS casts the returned connection
89+
//as an OracleConnection, but the JNDI pool is returning us a proxy, so unwrap it
90+
DefaultCamelContext camel = new DefaultCamelContext();
91+
DataSourceWrapper dataSource = new DataSourceWrapper(cwms);
92+
TopicConnectionFactory connectionFactory = AQjmsFactory.getTopicConnectionFactory(dataSource, true);
93+
camel.addComponent(ORACLE_QUEUE_SOURCE, JmsComponent.jmsComponent(connectionFactory));
94+
95+
DSLContext context = DSL.using(cwms, SQLDialect.ORACLE18C);
96+
String cdaUser = context
97+
.connectionResult(c -> c.getMetaData().getUserName());
98+
String apiKey = createApiKey(context, cdaUser);
99+
ConnectionFactory artemisConnectionFactory = new ActiveMQJMSConnectionFactory("vm://0", cdaUser, apiKey);
100+
camel.addComponent(ARTEMIS_QUEUE_SOURCE, JmsComponent.jmsComponent(artemisConnectionFactory));
101+
camel.start();
102+
return camel;
103+
} catch (Exception e) {
104+
throw new IllegalStateException("Unable to setup Queues", e);
105+
}
106+
}
107+
108+
private Map<OracleQueue, RouteDefinition> buildRouteDefinitions(DataSource cwms) {
109+
DSLContext create = DSL.using(cwms, SQLDialect.ORACLE18C);
110+
Field<String> field = field(name("OWNER")).concat(".").concat(field(name("NAME"))).as("queue");
111+
return create.select(field)
112+
.from(table(name("DBA_QUEUES")))
113+
.where(field(name("OWNER")).eq("CWMS_20"))
114+
.and(field(name("QUEUE_TYPE")).eq("NORMAL_QUEUE"))
115+
.fetch()
116+
.stream()
117+
.map(Record1::component1)
118+
.distinct()
119+
.map(OracleQueue::new)
120+
.collect(toMap(q -> q, this::queueToRoute));
121+
}
122+
123+
private RouteDefinition queueToRoute(OracleQueue queue) {
124+
RouteDefinition routeDefinition = new RouteDefinition();
125+
String durableSub = (ApiServlet.APPLICATION_TITLE + "_" + queue.getOracleQueueName())
126+
.replace(" ", "_")
127+
.replace(".", "_");
128+
String fromOracleRoute = format("%s:topic:%s?durableSubscriptionName=%s&clientId=%s", ORACLE_QUEUE_SOURCE,
129+
queue.getOracleQueueName(), durableSub, oracleAqClientId);
130+
String[] topics = queue.getTopicIds()
131+
.stream()
132+
.map(CamelRouter::createArtemisLabel)
133+
.toArray(String[]::new);
134+
routeDefinition.id(queue.getOracleQueueName());
135+
routeDefinition.from(fromOracleRoute)
136+
.log("Received message from ActiveMQ.Queue : ${body}")
137+
.process(new MapMessageToJsonProcessor(camelContext))
138+
.to(topics)
139+
.autoStartup(false);
140+
return routeDefinition;
141+
}
142+
143+
private static String getClientId() {
144+
try {
145+
String host = InetAddress.getLocalHost().getCanonicalHostName().replace("/", "_");
146+
return "CDA_" + host.replace(".", "_").replace(":", "_");
147+
} catch (UnknownHostException e) {
148+
throw new IllegalStateException("Cannot obtain local host name for durable subscription queue setup", e);
149+
}
150+
}
151+
152+
@Override
153+
public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
154+
String routeId = consumer.getQueueAddress().toString();
155+
String label = createArtemisLabel(routeId);
156+
List<RouteDefinition> routeDefinition = routeDefinitions.values()
157+
.stream()
158+
.filter(r -> r.getOutputs().stream().anyMatch(o -> o.getLabel().equals(label)))
159+
.collect(toList());
160+
if (routeDefinition.isEmpty()) {
161+
throw new ActiveMQException(ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST,
162+
"Route for id: " + routeId + " does not exit");
163+
}
164+
try {
165+
for (RouteDefinition route : routeDefinition) {
166+
//Camel handles synchronization internally
167+
//Calling startRoute on an already started route is innocuous
168+
camelContext.startRoute(route.getId());
169+
}
170+
} catch (Exception e) {
171+
throw new ActiveMQException("Could not start route: " + routeId, e,
172+
ActiveMQExceptionType.GENERIC_EXCEPTION);
173+
}
174+
}
175+
176+
Collection<String> getTopics(String office) {
177+
return routeDefinitions.keySet().stream()
178+
.filter(q -> office == null || q.office.equalsIgnoreCase(office))
179+
.map(OracleQueue::getTopicIds)
180+
.flatMap(Collection::stream)
181+
.collect(toSet());
182+
}
183+
184+
private static String createArtemisLabel(String routeId) {
185+
return format("%s:topic:%s", ARTEMIS_QUEUE_SOURCE, routeId);
186+
}
187+
188+
void stop() throws Exception {
189+
camelContext.stop();
190+
}
191+
192+
private String createApiKey(DSLContext context, String user) {
193+
AuthDao instance = AuthDao.getInstance(context);
194+
UUID uuid = UUID.randomUUID();
195+
DataApiPrincipal principal = new DataApiPrincipal(user, new HashSet<>());
196+
ZonedDateTime now = ZonedDateTime.now();
197+
//TODO: Expiration should be handled more gracefully.
198+
// This assumes no new queues are accessed after three months of uptime
199+
//TODO: cda_camel_invm needs to be unique per instance of CDA. Not sure how to handle that at the moment.
200+
// for now using current epoch millis. This unfortunately leaves old keys between restarts.
201+
String keyName = "cda_camel_invm_" + Instant.now().toEpochMilli();
202+
ApiKey apiKey = new ApiKey(user, keyName, uuid.toString(), now, now.plusMonths(3));
203+
return instance.createApiKey(principal, apiKey).getApiKey();
204+
}
205+
206+
private static final class OracleQueue {
207+
private static final Pattern ORACLE_QUEUE_PATTERN =
208+
Pattern.compile("CWMS_20\\.(?<office>[A-Z]+)_(?<queueGroup>.*)");
209+
private final String oracleQueueName;
210+
private final String office;
211+
private final String queueGroup;
212+
213+
private OracleQueue(String oracleQueueName) {
214+
this.oracleQueueName = oracleQueueName;
215+
Matcher matcher = ORACLE_QUEUE_PATTERN.matcher(oracleQueueName);
216+
if (matcher.matches()) {
217+
this.office = matcher.group("office");
218+
this.queueGroup = matcher.group("queueGroup");
219+
} else {
220+
LOGGER.atInfo().log("Oracle queue:" + oracleQueueName + " did not match standard pattern: " +
221+
ORACLE_QUEUE_PATTERN.pattern() + " Artemis topic will use the Oracle queue name as-is.");
222+
this.office = null;
223+
this.queueGroup = null;
224+
}
225+
}
226+
227+
private String getOracleQueueName() {
228+
return this.oracleQueueName;
229+
}
230+
231+
private Set<String> getTopicIds() {
232+
Set<String> retval = new HashSet<>();
233+
if (this.office != null && queueGroup != null) {
234+
retval.add("CDA." + this.office + ".ALL");
235+
retval.add("CDA." + this.office + "." + this.queueGroup);
236+
} else {
237+
retval.add(this.oracleQueueName);
238+
}
239+
return retval;
240+
}
241+
}
242+
}

0 commit comments

Comments
 (0)