Skip to content

Commit

Permalink
Keep track of HTTPServer instances (#40)
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison committed Aug 21, 2024
1 parent dfcde19 commit dab0fda
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 120 deletions.
91 changes: 91 additions & 0 deletions src/main/java/io/strimzi/kafka/metrics/HttpServers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;

import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Class to keep track of all the HTTP servers started by all the Kafka components in a JVM.
*/
public class HttpServers {

private final static Logger LOG = LoggerFactory.getLogger(HttpServers.class);
private static final Map<Listener, ServerCounter> SERVERS = new HashMap<>();

/**
* Get or create a new HTTP server if there isn't an existing instance for the specified listener.
* @param listener The host and port
* @param registry The Prometheus registry to expose
* @return A ServerCounter instance
* @throws IOException if the HTTP server does not exist and cannot be started
*/
public synchronized static ServerCounter getOrCreate(Listener listener, PrometheusRegistry registry) throws IOException {
ServerCounter serverCounter = SERVERS.get(listener);
if (serverCounter == null) {
serverCounter = new ServerCounter(listener, registry);
SERVERS.put(listener, serverCounter);
}
serverCounter.count.incrementAndGet();
return serverCounter;
}

/**
* Release an HTTP server instance. If no other components hold this instance, it is closed.
* @param serverCounter The HTTP server instance to release
*/
public synchronized static void release(ServerCounter serverCounter) {
if (serverCounter.close()) {
SERVERS.remove(serverCounter.listener);
}
}

/**
* Class used to keep track of the HTTP server started on a listener.
*/
public static class ServerCounter {

private final AtomicInteger count;
private final HTTPServer server;
private final Listener listener;

private ServerCounter(Listener listener, PrometheusRegistry registry) throws IOException {
this.count = new AtomicInteger();
this.server = HTTPServer.builder()
.hostname(listener.host)
.port(listener.port)
.registry(registry)
.buildAndStart();
LOG.debug("Started HTTP server on http://{}:{}", listener.host, server.getPort());
this.listener = listener;
}

/**
* The port this HTTP server instance is listening on. If the listener port is 0, this returns the actual port
* that is used.
* @return The port number
*/
public int port() {
return server.getPort();
}

private synchronized boolean close() {
int remaining = count.decrementAndGet();
if (remaining == 0) {
server.close();
LOG.debug("Stopped HTTP server on http://{}:{}", listener.host, server.getPort());
return true;
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.strimzi.kafka.metrics;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.instrumentation.jvm.JvmMetrics;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
Expand Down Expand Up @@ -36,7 +35,7 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter {
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private Optional<HTTPServer> httpServer;
private Optional<HttpServers.ServerCounter> httpServer;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the contextChange method
private String prefix;

Expand All @@ -57,7 +56,7 @@ public void configure(Map<String, ?> map) {
config = new PrometheusMetricsReporterConfig(map, registry);
collector = new KafkaMetricsCollector();
// Add JVM metrics
JvmMetrics.builder().register(registry);
JvmMetrics.builder().register();
httpServer = config.startHttpServer();
LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config);
}
Expand Down Expand Up @@ -88,6 +87,7 @@ public void metricRemoval(KafkaMetric metric) {
@Override
public void close() {
registry.unregister(collector);
httpServer.ifPresent(HttpServers::release);
}

@Override
Expand All @@ -111,6 +111,6 @@ public void contextChange(MetricsContext metricsContext) {

// for testing
Optional<Integer> getPort() {
return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().getPort() : null);
return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().port() : null);
}
}
74 changes: 74 additions & 0 deletions src/main/java/io/strimzi/kafka/metrics/Listener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig.LISTENER_CONFIG;

/**
* Class parsing and handling the listener specified via {@link PrometheusMetricsReporterConfig#LISTENER_CONFIG} for
* the HTTP server used to expose the metrics.
*/
public class Listener {

private static final Pattern PATTERN = Pattern.compile("http://\\[?([0-9a-zA-Z\\-%._:]*)]?:([0-9]+)");

final String host;
final int port;

/* test */ Listener(String host, int port) {
this.host = host;
this.port = port;
}

static Listener parseListener(String listener) {
Matcher matcher = PATTERN.matcher(listener);
if (matcher.matches()) {
String host = matcher.group(1);
int port = Integer.parseInt(matcher.group(2));
return new Listener(host, port);
} else {
throw new ConfigException(LISTENER_CONFIG, listener, "Listener must be of format http://[host]:[port]");
}
}

@Override
public String toString() {
return "http://" + host + ":" + port;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Listener listener = (Listener) o;
return port == listener.port && Objects.equals(host, listener.host);
}

@Override
public int hashCode() {
return Objects.hash(host, port);
}

/**
* Validator to check the user provided listener configuration
*/
static class ListenerValidator implements ConfigDef.Validator {

@Override
public void ensureValid(String name, Object value) {
Matcher matcher = PATTERN.matcher(String.valueOf(value));
if (!matcher.matches()) {
throw new ConfigException(name, value, "Listener must be of format http://[host]:[port]");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package io.strimzi.kafka.metrics;

import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -13,12 +12,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.BindException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -65,7 +61,7 @@ public class PrometheusMetricsReporterConfig extends AbstractConfig {
private static final String ALLOWLIST_CONFIG_DOC = "A comma separated list of regex patterns to specify the metrics to collect.";

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(LISTENER_CONFIG, ConfigDef.Type.STRING, LISTENER_CONFIG_DEFAULT, new ListenerValidator(), ConfigDef.Importance.HIGH, LISTENER_CONFIG_DOC)
.define(LISTENER_CONFIG, ConfigDef.Type.STRING, LISTENER_CONFIG_DEFAULT, new Listener.ListenerValidator(), ConfigDef.Importance.HIGH, LISTENER_CONFIG_DOC)
.define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, ALLOWLIST_CONFIG_DOC)
.define(LISTENER_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, LISTENER_ENABLE_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, LISTENER_ENABLE_CONFIG_DOC);

Expand Down Expand Up @@ -140,80 +136,20 @@ public String toString() {
/**
* Start the HTTP server for exposing metrics.
*
* @return An optional HTTPServer instance if started successfully, otherwise empty.
* @return An optional ServerCounter instance if {@link #LISTENER_ENABLE_CONFIG} is enabled, otherwise empty.
*/
public synchronized Optional<HTTPServer> startHttpServer() {
public synchronized Optional<HttpServers.ServerCounter> startHttpServer() {
if (!listenerEnabled) {
LOG.info("HTTP server listener not enabled");
return Optional.empty();
}
try {
HTTPServer httpServer = HTTPServer.builder()
.hostname(listener.host)
.port(listener.port)
.registry(registry)
.buildAndStart();
LOG.info("HTTP server started on listener http://{}:{}", listener.host, httpServer.getPort());
return Optional.of(httpServer);
} catch (BindException be) {
LOG.info("HTTP server already started");
return Optional.empty();
HttpServers.ServerCounter server = HttpServers.getOrCreate(listener, registry);
LOG.info("HTTP server listening on http://{}:{}", listener.host, server.port());
return Optional.of(server);
} catch (IOException ioe) {
LOG.error("Failed starting HTTP server", ioe);
throw new RuntimeException(ioe);
}
}

static class Listener {

private static final Pattern PATTERN = Pattern.compile("http://\\[?([0-9a-zA-Z\\-%._:]*)]?:([0-9]+)");

final String host;
final int port;

Listener(String host, int port) {
this.host = host;
this.port = port;
}

static Listener parseListener(String listener) {
Matcher matcher = PATTERN.matcher(listener);
if (matcher.matches()) {
String host = matcher.group(1);
int port = Integer.parseInt(matcher.group(2));
return new Listener(host, port);
} else {
throw new ConfigException(LISTENER_CONFIG, listener, "Listener must be of format http://[host]:[port]");
}
}

@Override
public String toString() {
return "http://" + host + ":" + port;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Listener listener = (Listener) o;
return port == listener.port && Objects.equals(host, listener.host);
}

@Override
public int hashCode() {
return Objects.hash(host, port);
}
}

static class ListenerValidator implements ConfigDef.Validator {

@Override
public void ensureValid(String name, Object value) {
Matcher matcher = Listener.PATTERN.matcher(String.valueOf(value));
if (!matcher.matches()) {
throw new ConfigException(name, value, "Listener must be of format http://[host]:[port]");
}
}
}
}
62 changes: 62 additions & 0 deletions src/test/java/io/strimzi/kafka/metrics/HttpServersTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;

import io.prometheus.metrics.model.registry.PrometheusRegistry;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class HttpServersTest {

private final PrometheusRegistry registry = new PrometheusRegistry();

@Test
public void testLifecycle() throws IOException {
Listener listener1 = Listener.parseListener("http://localhost:0");
HttpServers.ServerCounter server1 = HttpServers.getOrCreate(listener1, registry);
assertTrue(listenerStarted(listener1.host, server1.port()));

Listener listener2 = Listener.parseListener("http://localhost:0");
HttpServers.ServerCounter server2 = HttpServers.getOrCreate(listener2, registry);
assertTrue(listenerStarted(listener2.host, server2.port()));
assertSame(server1, server2);

Listener listener3 = Listener.parseListener("http://127.0.0.1:0");
HttpServers.ServerCounter server3 = HttpServers.getOrCreate(listener3, registry);
assertTrue(listenerStarted(listener3.host, server3.port()));

HttpServers.release(server1);
assertTrue(listenerStarted(listener1.host, server1.port()));
assertTrue(listenerStarted(listener2.host, server2.port()));
assertTrue(listenerStarted(listener3.host, server3.port()));

HttpServers.release(server2);
assertFalse(listenerStarted(listener1.host, server1.port()));
assertFalse(listenerStarted(listener2.host, server2.port()));
assertTrue(listenerStarted(listener3.host, server3.port()));

HttpServers.release(server3);
assertFalse(listenerStarted(listener3.host, server3.port()));
}

private boolean listenerStarted(String host, int port) {
try {
URL url = new URL("http://" + host + ":" + port + "/metrics");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("HEAD");
con.connect();
return con.getResponseCode() == HttpURLConnection.HTTP_OK;
} catch (IOException ioe) {
return false;
}
}
}
Loading

0 comments on commit dab0fda

Please sign in to comment.