Skip to content

Commit bf7cadb

Browse files
enplotzbernd-wiswedel
authored andcommitted
AP-23883: Expose current application health metrics via JMX
AP-23883 (Expose health metrics via standard Java monitoring API)
1 parent 876441a commit bf7cadb

File tree

11 files changed

+885
-13
lines changed

11 files changed

+885
-13
lines changed

org.knime.core.tests/src/org/knime/core/monitor/ApplicationHealthTest.java

+233-6
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,43 @@
4848
*/
4949
package org.knime.core.monitor;
5050

51+
import static org.junit.jupiter.api.Assertions.assertEquals;
5152
import static org.junit.jupiter.api.Assertions.assertFalse;
5253
import static org.junit.jupiter.api.Assertions.assertTrue;
5354

55+
import java.lang.management.ManagementFactory;
56+
import java.util.ArrayList;
57+
import java.util.List;
58+
import java.util.concurrent.atomic.AtomicBoolean;
59+
import java.util.concurrent.atomic.AtomicInteger;
60+
import java.util.function.Consumer;
61+
import java.util.stream.Collectors;
62+
63+
import javax.management.AttributeNotFoundException;
64+
import javax.management.InstanceNotFoundException;
65+
import javax.management.IntrospectionException;
66+
import javax.management.MBeanException;
67+
import javax.management.MBeanInfo;
68+
import javax.management.MalformedObjectNameException;
69+
import javax.management.ObjectName;
70+
import javax.management.ReflectionException;
71+
import javax.management.openmbean.CompositeData;
72+
import javax.management.openmbean.TabularData;
73+
74+
import org.apache.commons.io.output.NullWriter;
75+
import org.apache.commons.lang3.StringUtils;
76+
import org.apache.log4j.Layout;
77+
import org.apache.log4j.Level;
78+
import org.apache.log4j.spi.LoggingEvent;
5479
import org.junit.jupiter.api.Test;
80+
import org.knime.core.monitor.ApplicationHealth.LoadAverages;
81+
import org.knime.core.monitor.beans.CounterMXBean;
82+
import org.knime.core.monitor.beans.CountersMXBean;
83+
import org.knime.core.monitor.beans.GlobalPoolMXBean;
84+
import org.knime.core.monitor.beans.InstanceCountersMXBean;
85+
import org.knime.core.monitor.beans.NodeStatesMXBean;
86+
import org.knime.core.node.NodeLogger;
87+
import org.knime.core.node.NodeLogger.LEVEL;
5588

5689
/**
5790
* Tests for {@link ApplicationHealth}.
@@ -62,20 +95,214 @@ final class ApplicationHealthTest {
6295

6396
@SuppressWarnings("static-method")
6497
@Test
65-
final void testInstanceCounters() {
66-
assertFalse(ApplicationHealth.getInstanceCounters().isEmpty(), "instance counter list should not be empty");
98+
final void testInstanceCounters() throws IntrospectionException, InstanceNotFoundException,
99+
MalformedObjectNameException, ReflectionException, AttributeNotFoundException, MBeanException {
100+
try (final var app = new ApplicationHealth()) {
101+
assertFalse(ApplicationHealth.getInstanceCounters().isEmpty(), "instance counter list should not be empty");
102+
103+
// and now via JMX
104+
final var server = ManagementFactory.getPlatformMBeanServer();
105+
final var name = new ObjectName("org.knime.core:type=Memory,name=ObjectInstances");
106+
final var info = server.getMBeanInfo(name);
107+
for (final var attr : info.getAttributes()) {
108+
final var attrName = attr.getName();
109+
final var attrValue = server.getAttribute(name, attrName);
110+
NodeLogger.getLogger(ApplicationHealthTest.class).info(attrName + ": " + attrValue);
111+
}
112+
final var data = (TabularData)server.getAttribute(name, "InstanceCounters");
113+
assertFalse(data.isEmpty(), "Instance counters should not be empty");
114+
115+
// test that we have exactly the known instance counters
116+
final var knownInstanceCounters = ApplicationHealth.getInstanceCounters() //
117+
.stream().map(i -> i.getName()).collect(Collectors.toSet());
118+
data.values().forEach(row -> {
119+
// Our Map<String, Long> is mapped to TabularData by JMX
120+
final var cd = (CompositeData)row;
121+
assertTrue(cd.values().size() == 2, "TabularData row contains two columns");
122+
final var counterName = (String)cd.get("key");
123+
final var counterValue = (Long)cd.get("value");
124+
assertTrue(knownInstanceCounters.remove(counterName), "Unknown counter: " + counterName);
125+
assertTrue(counterValue >= 0, "Counter value should be non-negative");
126+
});
127+
assertTrue(knownInstanceCounters.isEmpty(),
128+
"Some known instance counters are missing: " + String.join(", ", knownInstanceCounters));
129+
}
130+
}
131+
132+
@SuppressWarnings("static-method")
133+
@Test
134+
final void testNodeStates() throws MalformedObjectNameException, InstanceNotFoundException,
135+
AttributeNotFoundException, ReflectionException, MBeanException {
136+
try (final var app = new ApplicationHealth()) {
137+
assertEquals(0, ApplicationHealth.getNodeStateExecutedCount(), "No executed nodes");
138+
assertEquals(0, ApplicationHealth.getNodeStateExecutingCount(), "No executing nodes");
139+
assertEquals(0, ApplicationHealth.getNodeStateOtherCount(), "No nodes in \"other\" state");
140+
141+
// and now via JMX
142+
final var server = ManagementFactory.getPlatformMBeanServer();
143+
final var name = new ObjectName("org.knime.core:type=Execution,name=NodeStates");
144+
final CompositeData attr = (CompositeData)server.getAttribute(name, "NodeStates");
145+
assertEquals(0, attr.get("executed"), "No executed nodes");
146+
assertEquals(0, attr.get("executing"), "No executing nodes");
147+
assertEquals(0, attr.get("other"), "No nodes in \"other\" state");
148+
}
67149
}
68150

69151
@SuppressWarnings("static-method")
70152
@Test
71-
final void testThreadPoolLoadAverages() {
72-
assertTrue(ApplicationHealth.getGlobalThreadPoolLoadAverages().avg1Min() >= 0.0, "Reports load average >= 0.0");
153+
final void testThreadPoolLoadAverages() throws MalformedObjectNameException, InstanceNotFoundException,
154+
AttributeNotFoundException, ReflectionException, MBeanException {
155+
try (final var app = new ApplicationHealth()) {
156+
assertTrue(ApplicationHealth.getGlobalThreadPoolLoadAverages().avg1Min() >= 0.0,
157+
"Reports load average >= 0.0");
158+
159+
// and now via JMX
160+
final var server = ManagementFactory.getPlatformMBeanServer();
161+
final var name = new ObjectName("org.knime.core:type=Execution,name=GlobalPool");
162+
final var load = fromCompositeData((CompositeData)server.getAttribute(name, "AverageLoad"));
163+
assertTrue(load.avg1Min() >= 0.0, "Reports load average >= 0.0");
164+
}
73165
}
74166

75167
@SuppressWarnings("static-method")
76168
@Test
77-
final void testQueuedAverages() {
78-
assertTrue(ApplicationHealth.getGlobalThreadPoolQueuedAverages().avg1Min() >= 0.0,
169+
final void testQueuedAverages() throws MalformedObjectNameException, InstanceNotFoundException,
170+
AttributeNotFoundException, ReflectionException, MBeanException {
171+
try (final var app = new ApplicationHealth()) {
172+
assertTrue(ApplicationHealth.getGlobalThreadPoolQueuedAverages().avg1Min() >= 0.0,
79173
"Reports queue length average >= 0.0");
174+
175+
// and now via JMX
176+
final var server = ManagementFactory.getPlatformMBeanServer();
177+
final var name = new ObjectName("org.knime.core:type=Execution,name=GlobalPool");
178+
final var queue = fromCompositeData((CompositeData)server.getAttribute(name, "AverageQueueLength"));
179+
assertTrue(queue.avg1Min() >= 0.0, "Reports queue length average >= 0.0");
180+
}
181+
}
182+
183+
private static LoadAverages fromCompositeData(final CompositeData cd) {
184+
return new LoadAverages((double)cd.get("avg1Min"), (double)cd.get("avg5Min"), (double)cd.get("avg15Min"));
185+
}
186+
187+
/**
188+
* Tests that registration of ApplicationHealth MXBeans worked.
189+
*
190+
* @throws IntrospectionException some MXBean exception
191+
* @throws InstanceNotFoundException some MXBean exception
192+
* @throws MalformedObjectNameException some MXBean exception
193+
* @throws ReflectionException some MXBean exception
194+
*/
195+
@Test
196+
@SuppressWarnings("static-method")
197+
final void testMXBeanRegistrations()
198+
throws IntrospectionException, InstanceNotFoundException, MalformedObjectNameException, ReflectionException {
199+
try (final var app = new ApplicationHealth()) {
200+
201+
final var beans = new ArrayList<>();
202+
beans.add(assertMXBeanRegistered("org.knime.core:type=Execution,name=NodeStates", NodeStatesMXBean.class));
203+
beans.add(assertMXBeanRegistered("org.knime.core:type=Execution,name=GlobalPool", GlobalPoolMXBean.class));
204+
beans.add(assertMXBeanRegistered("org.knime.core:type=Memory,name=ObjectInstances",
205+
InstanceCountersMXBean.class));
206+
if (ProcessStateUtil.supportsPSS()) {
207+
beans.add(assertMXBeanRegistered("org.knime.core:type=Memory,name=ExternalProcessesPss",
208+
CountersMXBean.class));
209+
}
210+
if (ProcessStateUtil.supportsRSS()) {
211+
beans.add(assertMXBeanRegistered("org.knime.core:type=Memory,name=KNIMErss", CounterMXBean.class));
212+
}
213+
214+
// check that "CODING" message is logged when opening app health twice
215+
final AtomicBoolean enabled = new AtomicBoolean(false);
216+
final AtomicInteger numBeans = new AtomicInteger(0);
217+
final List<String> msgs = new ArrayList<>();
218+
final var logStack = new LogInterceptor(ApplicationHealth.class.getName(), log -> {
219+
if (skip(enabled, log.msg)) {
220+
return;
221+
}
222+
if (log.level == Level.ERROR && log.msg.startsWith("CODING PROBLEM")
223+
&& log.msg.contains("Failed to register")) {
224+
numBeans.incrementAndGet();
225+
msgs.add(StringUtils.truncate(log.msg, 100) + "[...]");
226+
}
227+
});
228+
try {
229+
NodeLogger.addWriter(NullWriter.INSTANCE, logStack, LEVEL.ERROR, LEVEL.OFF);
230+
NodeLogger.getLogger(ApplicationHealth.class.getName()).error(START_MSG_MARKER);
231+
try (final var app2 = new ApplicationHealth()) {
232+
// should work, but print CODING error
233+
assertTrue(true);
234+
}
235+
NodeLogger.getLogger(ApplicationHealth.class.getName()).error(END_MSG_MARKER);
236+
} finally {
237+
NodeLogger.removeWriter(NullWriter.INSTANCE);
238+
}
239+
240+
// GlobalPool, ObjectInstances, NodeStates (and PSS & RSS on linux)
241+
assertEquals(beans.size(), numBeans.get(), "Expected %d CODING error messages, got %d:%n%s"
242+
.formatted(beans.size(), numBeans.get(), String.join(",\n", msgs)));
243+
}
244+
}
245+
246+
private static final String START_MSG_MARKER = "EXPECTED LOG START";
247+
248+
private static final String END_MSG_MARKER = "EXPECTED LOG END";
249+
250+
private static boolean skip(final AtomicBoolean enabled, final String msg) {
251+
if (msg.contains(START_MSG_MARKER)) {
252+
enabled.set(true);
253+
} else if (msg.contains(END_MSG_MARKER)) {
254+
enabled.set(false);
255+
}
256+
return !enabled.get();
257+
}
258+
259+
private static MBeanInfo assertMXBeanRegistered(final String beanName, final Class<?> beanType)
260+
throws ReflectionException, IntrospectionException, InstanceNotFoundException, MalformedObjectNameException {
261+
final var name = new ObjectName(beanName);
262+
final var nodeStatesMXBean = ManagementFactory.getPlatformMBeanServer().getMBeanInfo(name);
263+
final var nodeStatesDesc = nodeStatesMXBean.getDescriptor();
264+
assertEquals(beanType.getCanonicalName(), nodeStatesDesc.getFieldValue("interfaceClassName"),
265+
"NodeStates should be %s".formatted(beanType.getSimpleName()));
266+
assertEquals("true", nodeStatesDesc.getFieldValue("mxbean"), "Should be an MXBean");
267+
return nodeStatesMXBean;
268+
}
269+
270+
private record LogMsg(Level level, String msg) {
271+
}
272+
273+
private static final class LogInterceptor extends Layout {
274+
275+
private final String m_loggerName;
276+
277+
private final Consumer<LogMsg> m_logConsumer;
278+
279+
LogInterceptor(final String loggerName, final Consumer<LogMsg> logConsumer) {
280+
m_loggerName = loggerName;
281+
m_logConsumer = logConsumer;
282+
}
283+
284+
@Override
285+
public String format(final LoggingEvent event) {
286+
final var level = event.getLevel();
287+
final var message = event.getMessage();
288+
289+
if (m_loggerName.equals(event.getLoggerName()) && message != null) {
290+
final var msg = event.getMessage().toString();
291+
m_logConsumer.accept(new LogMsg(level, msg));
292+
}
293+
294+
return String.format("%s: %s", level, message);
295+
}
296+
297+
@Override
298+
public void activateOptions() {
299+
// no-op
300+
}
301+
302+
@Override
303+
public boolean ignoresThrowable() {
304+
return false;
305+
}
306+
80307
}
81308
}

org.knime.core/src/eclipse/org/knime/core/internal/CorePlugin.java

+7
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.knime.core.customization.APCustomizationProviderService;
6363
import org.knime.core.customization.APCustomizationProviderServiceImpl;
6464
import org.knime.core.eclipseUtil.EclipseProxyServiceInitializer;
65+
import org.knime.core.monitor.ApplicationHealth;
6566
import org.knime.core.node.port.report.IReportService;
6667
import org.knime.core.util.IEarlyStartup;
6768
import org.knime.core.util.pathresolve.ResolverUtil;
@@ -126,6 +127,8 @@ public final boolean isWrapColumnHeaderInTableViews() {
126127
private ServiceTracker<APCustomizationProviderService, APCustomizationProviderService>
127128
m_customizationServiceTracker;
128129

130+
private ApplicationHealth m_applicationHealth;
131+
129132
@Override
130133
public void start(final BundleContext context)
131134
throws Exception {
@@ -173,6 +176,8 @@ public void start(final BundleContext context)
173176
/* Listening on the proxy service initialization, we can install multiple proxy-supporting services.
174177
* Needs to happen early to avoid interference with org.apache.cxf.transport.http.ReferencingAuthenticator. */
175178
EclipseProxyServiceInitializer.startListening(context);
179+
180+
m_applicationHealth = new ApplicationHealth();
176181
}
177182

178183
private static void readMimeTypes() throws IOException {
@@ -195,6 +200,8 @@ public void stop(final BundleContext context) throws Exception {
195200
m_customizationServiceTracker = null;
196201
m_customizationServiceRegistration.unregister();
197202
m_customizationServiceRegistration = null;
203+
m_applicationHealth.close();
204+
m_applicationHealth = null;
198205
instance = null;
199206
}
200207

0 commit comments

Comments
 (0)