Skip to content

Commit

Permalink
Support JMX metrics for HTTP sink (#196)
Browse files Browse the repository at this point in the history
* Support JMX metrics for HTTP sink

Adds the counts for success, request average.

* Brings HDR histogram as opposed to the Percentiles from Kafka. the Lib is easier to use

Manually sets the attributes

* Goes back on the JMX attributes name, since SetAttribute fails.

* Fix the integration tests compilation error

* Fix the integration tests to avoid using the same sink name, since it will fail on registering the JMX metrics

---------

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Jan 14, 2025
1 parent 8d34482 commit 98defcd
Show file tree
Hide file tree
Showing 12 changed files with 496 additions and 47 deletions.
40 changes: 22 additions & 18 deletions java-connectors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ allprojects {
"Git-Commit-Hash": gitCommitHash,
"Git-Tag": gitTag,
"StreamReactor-Docs": "https://docs.lenses.io/connectors/"
)
)
}
}

Expand All @@ -97,17 +97,17 @@ allprojects {

manifest {
attributes("StreamReactor-Version": artifactVersion,
"Kafka-Version": kafkaVersion,
"Created-By": "Lenses",
"Created-At": new Date().format("YYYYMMDDHHmm"),
"Git-Repo": gitRepo,
"Git-Commit-Hash": gitCommitHash,
"Git-Tag": gitTag,
"StreamReactor-Docs": "https://docs.lenses.io/connectors/"
"Kafka-Version": kafkaVersion,
"Created-By": "Lenses",
"Created-At": new Date().format("YYYYMMDDHHmm"),
"Git-Repo": gitRepo,
"Git-Commit-Hash": gitCommitHash,
"Git-Tag": gitTag,
"StreamReactor-Docs": "https://docs.lenses.io/connectors/"
)
}
configurations = [
project.configurations.compileClasspath
project.configurations.compileClasspath
]
archiveFileName = "${project.name}-${artifactVersion}-all.jar"
zip64 true
Expand Down Expand Up @@ -196,7 +196,11 @@ allprojects {
}
}

task collectArtifacts(dependsOn: [test, collectDependencies, copyModuleJar])
task collectArtifacts(dependsOn: [
test,
collectDependencies,
copyModuleJar
])
}


Expand All @@ -221,13 +225,13 @@ task testModuleList() {

task releaseModuleList() {
def nonReleaseModules = [
"java-reactor",
"test-utils",
"kafka-connect-cloud-common",
"kafka-connect-common",
"kafka-connect-gcp-common",
"kafka-connect-query-language",
"kafka-connect-sink-reporting"
"java-reactor",
"test-utils",
"kafka-connect-cloud-common",
"kafka-connect-common",
"kafka-connect-gcp-common",
"kafka-connect-query-language",
"kafka-connect-sink-reporting"
]
def modulesFile = new File("gradle-modules.txt")
modulesFile.delete()
Expand All @@ -243,4 +247,4 @@ task releaseModuleList() {
task prepareModuleList() {
dependsOn testModuleList
dependsOn releaseModuleList
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.funsuite.AsyncFunSuite
import org.scalatest.time.Minute
import org.scalatest.time.Span

import java.util.UUID
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.SeqHasAsJava

Expand All @@ -44,11 +45,13 @@ class HttpSinkTaskIT extends AsyncFunSuite with AsyncIOSpec with Eventually {
_ <- Resource.make(IO.delay(task.start(Map("connect.http.config" -> config).asJava)))(_ => IO.delay(task.stop()))
} yield task

def sinkTaskUsingProps(config: Map[String, String]): Resource[IO, HttpSinkTask] =
def sinkTaskUsingProps(config: Map[String, String]): Resource[IO, HttpSinkTask] = {
val configWithUniqueName: Map[String, String] = config + ("name" -> ("mySinkName" + UUID.randomUUID().toString))
for {
task <- Resource.eval(IO.delay(new HttpSinkTask()))
_ <- Resource.make(IO.delay(task.start(config.asJava)))(_ => IO.delay(task.stop()))
_ <- Resource.make(IO.delay(task.start(configWithUniqueName.asJava)))(_ => IO.delay(task.stop()))
} yield task
}

private val Host = "localhost"
private val users = SampleData.Employees
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import cats.effect.Resource
import cats.effect.testing.scalatest.AsyncIOSpec
import cats.implicits.catsSyntaxOptionId
import cats.implicits.none
import io.lenses.streamreactor.connect.http.sink.metrics.HttpSinkMetrics
import org.mockito.ArgumentMatchers.any
import org.scalatest.EitherValues
//import cats.implicits.catsSyntaxOptionId
Expand Down Expand Up @@ -71,12 +72,14 @@ class HttpRequestSenderIT
.willReturn(aResponse.withHeader("Content-Type", "text/plain")
.withBody(HttpResponseBody)))

val metrics = new HttpSinkMetrics
JdkHttpClient.simple[IO].use {
client =>
val requestSender = new NoAuthenticationHttpRequestSender(
sinkName,
Method.PUT,
client,
metrics,
)
val processedTemplate = ProcessedTemplate(
s"${wireMockServer.baseUrl()}$expectedUrl",
Expand Down Expand Up @@ -109,6 +112,7 @@ class HttpRequestSenderIT
.withBody(HttpResponseBody)),
)

val metrics = new HttpSinkMetrics
JdkHttpClient.simple[IO].use {
client =>
val requestSender = new BasicAuthenticationHttpRequestSender(
Expand All @@ -117,6 +121,7 @@ class HttpRequestSenderIT
client,
"myUser",
"myPassword",
metrics,
)
val processedTemplate = ProcessedTemplate(
s"${wireMockServer.baseUrl()}$expectedUrl",
Expand Down Expand Up @@ -144,10 +149,12 @@ class HttpRequestSenderIT
Resource.eval(IO.raiseError(expectedException)),
)

val metrics = new HttpSinkMetrics
val requestSender = new NoAuthenticationHttpRequestSender(
sinkName,
Method.PUT,
mockClient,
metrics,
)
val processedTemplate = ProcessedTemplate(
s"${wireMockServer.baseUrl()}$expectedUrl",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy
import io.lenses.streamreactor.connect.http.sink.commit.HttpBatchPolicy
import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
import io.lenses.streamreactor.connect.http.sink.metrics.HttpSinkMetrics
import io.lenses.streamreactor.connect.http.sink.reporter.model.HttpFailureConnectorSpecificRecordData
import io.lenses.streamreactor.connect.http.sink.reporter.model.HttpSuccessConnectorSpecificRecordData
import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
Expand Down Expand Up @@ -109,6 +110,7 @@ object HttpWriterManager extends StrictLogging {
config.method.toHttp4sMethod,
retriableClient,
config.authentication,
new HttpSinkMetrics,
)
batchPolicy = config.batch.toBatchPolicy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import io.lenses.streamreactor.connect.http.sink.client.oauth2.AccessToken
import io.lenses.streamreactor.connect.http.sink.client.oauth2.AccessTokenProvider
import io.lenses.streamreactor.connect.http.sink.client.oauth2.OAuth2AccessTokenProvider
import io.lenses.streamreactor.connect.http.sink.client.oauth2.cache.CachedAccessTokenProvider
import io.lenses.streamreactor.connect.http.sink.metrics.HttpSinkMetricsMBean
import io.lenses.streamreactor.connect.http.sink.metrics.MetricsRegistrar
import io.lenses.streamreactor.connect.http.sink.tpl.ProcessedTemplate
import org.http4s.EntityDecoder
import org.http4s._
Expand All @@ -36,7 +38,8 @@ class NoAuthenticationHttpRequestSender(
sinkName: String,
method: Method,
client: Client[IO],
) extends HttpRequestSender(sinkName, method, client) {
metrics: HttpSinkMetricsMBean,
) extends HttpRequestSender(sinkName, method, client, metrics) {

override protected def updateRequest(request: Request[IO]): IO[Request[IO]] = IO.pure(request)
}
Expand All @@ -47,7 +50,8 @@ class BasicAuthenticationHttpRequestSender(
client: Client[IO],
username: String,
password: String,
) extends HttpRequestSender(sinkName, method, client) {
metrics: HttpSinkMetricsMBean,
) extends HttpRequestSender(sinkName, method, client, metrics) {

override protected def updateRequest(request: Request[IO]): IO[Request[IO]] =
IO.pure(request.putHeaders(Authorization(BasicCredentials(username, password))))
Expand All @@ -58,7 +62,8 @@ class OAuth2AuthenticationHttpRequestSender(
method: Method,
client: Client[IO],
tokenProvider: AccessTokenProvider[IO],
) extends HttpRequestSender(sinkName, method, client) {
metrics: HttpSinkMetricsMBean,
) extends HttpRequestSender(sinkName, method, client, metrics) {

override protected def updateRequest(request: Request[IO]): IO[Request[IO]] =
for {
Expand All @@ -73,25 +78,34 @@ object HttpRequestSender {
method: Method,
client: Client[IO],
authentication: Authentication,
): IO[HttpRequestSender] = authentication match {
case NoAuthentication => IO(new NoAuthenticationHttpRequestSender(sinkName, method, client))
case BasicAuthentication(username, password) =>
IO(new BasicAuthenticationHttpRequestSender(sinkName, method, client, username, password))
case OAuth2Authentication(uri, clientId, clientSecret, tokenProperty, clientScope, clientHeaders) =>
val rawHeaders = clientHeaders.map { case (k, v) => Header.Raw(CIString(k), v) }
val tokenProvider =
new OAuth2AccessTokenProvider(client, uri, clientId, clientSecret, clientScope, rawHeaders, tokenProperty)
for {
ref <- Ref.of[IO, Option[AccessToken]](none)
cachedTokenProvider = new CachedAccessTokenProvider(tokenProvider, ref)
} yield new OAuth2AuthenticationHttpRequestSender(sinkName, method, client, cachedTokenProvider)
metrics: HttpSinkMetricsMBean,
): IO[HttpRequestSender] = {

val senderIo = authentication match {
case NoAuthentication => IO(new NoAuthenticationHttpRequestSender(sinkName, method, client, metrics))
case BasicAuthentication(username, password) =>
IO(new BasicAuthenticationHttpRequestSender(sinkName, method, client, username, password, metrics))
case OAuth2Authentication(uri, clientId, clientSecret, tokenProperty, clientScope, clientHeaders) =>
val rawHeaders = clientHeaders.map { case (k, v) => Header.Raw(CIString(k), v) }
val tokenProvider =
new OAuth2AccessTokenProvider(client, uri, clientId, clientSecret, clientScope, rawHeaders, tokenProperty)
for {
ref <- Ref.of[IO, Option[AccessToken]](none)
cachedTokenProvider = new CachedAccessTokenProvider(tokenProvider, ref)
} yield new OAuth2AuthenticationHttpRequestSender(sinkName, method, client, cachedTokenProvider, metrics)
}
for {
_ <- IO(MetricsRegistrar.registerMetricsMBean(metrics, sinkName))
sender <- senderIo
} yield sender
}
}

abstract class HttpRequestSender(
sinkName: String,
method: Method,
client: Client[IO],
metrics: HttpSinkMetricsMBean,
) extends LazyLogging {

private case class HeaderInfo(contentType: Option[`Content-Type`], headers: Headers)
Expand Down Expand Up @@ -160,7 +174,10 @@ abstract class HttpRequestSender(
// Add authentication if present
authenticatedRequest <- updateRequest(requestWithContentType)
_ <- IO.delay(logger.debug(s"[$sinkName] Auth: $authenticatedRequest"))
startTime <- IO(System.nanoTime())
response <- executeRequestAndHandleErrors(authenticatedRequest)
durationMillis <- IO((System.nanoTime() - startTime) / 1000000)
_ <- IO(metrics.recordRequestTime(durationMillis))
_ <- IO.delay(logger.trace(s"[$sinkName] Response: $response"))
} yield response

Expand Down Expand Up @@ -188,8 +205,16 @@ abstract class HttpRequestSender(
response.as[Option[String]].map {
body =>
if (response.status.isSuccess) {
metrics.increment2xxCount()
HttpResponseSuccess(response.status.code, body).asRight[HttpResponseFailure]
} else {
if (response.status.code >= 400 && response.status.code < 500) {
metrics.increment4xxCount()
} else if (response.status.code >= 500 && response.status.code < 600) {
metrics.increment5xxCount()
} else {
metrics.incrementOtherErrorsCount()
}
HttpResponseFailure(
message = "Request failed with error response",
cause = Option.empty,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2017-2025 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.metrics

import org.HdrHistogram.Histogram

import java.util.concurrent.atomic.LongAdder

trait HttpSinkMetricsMBean {
def get2xxCount: Long
def get4xxCount: Long
def get5xxCount: Long
def getOtherErrorsCount: Long
def recordRequestTime(time: Long): Unit
def increment2xxCount(): Unit
def increment4xxCount(): Unit
def increment5xxCount(): Unit
def incrementOtherErrorsCount(): Unit
def getP50RequestTimeMs: Long
def getP95RequestTimeMs: Long
def getP99RequestTimeMs: Long
}

class HttpSinkMetrics extends HttpSinkMetricsMBean {
private val successCount = new LongAdder()
private val error4xxCount = new LongAdder()
private val error5xxCount = new LongAdder()
private val otherErrorsCount = new LongAdder()

// Sets the maximum value to record in the histogram
private val MaxValueMillis = 3 * 60 * 60 * 1000L // 3 hours
private val histogram = new Histogram(MaxValueMillis, 3)

def increment2xxCount(): Unit = successCount.increment()
def increment4xxCount(): Unit = error4xxCount.increment()
def increment5xxCount(): Unit = error5xxCount.increment()

def recordRequestTime(millis: Long): Unit =
histogram.recordValue(math.min(millis, MaxValueMillis))

override def get2xxCount: Long = successCount.sum()
override def get4xxCount: Long = error4xxCount.sum()
override def get5xxCount: Long = error5xxCount.sum()
override def getOtherErrorsCount: Long = otherErrorsCount.sum()
override def incrementOtherErrorsCount(): Unit = otherErrorsCount.increment()

override def getP50RequestTimeMs: Long =
histogram.getValueAtPercentile(50.0)
override def getP95RequestTimeMs: Long =
histogram.getValueAtPercentile(95.0)
override def getP99RequestTimeMs: Long =
histogram.getValueAtPercentile(99.0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2025 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.metrics

import java.lang.management.ManagementFactory
import javax.management.ObjectName

object MetricsRegistrar {

val NameTemplate = "io.lenses.streamreactor.connect.http.sink:type=metrics,name=%s"

/**
* Register the metrics MBean exposing the count on 200, 400, 500 and other response codes as well as the http request time percentiles
* @param metrics
* @param sinkName
*/
def registerMetricsMBean(metrics: HttpSinkMetricsMBean, sinkName: String): Unit = {
val mbs = ManagementFactory.getPlatformMBeanServer
val objectName = new ObjectName(s"io.lenses.streamreactor.connect.http.sink:type=metrics,name=$sinkName")
mbs.registerMBean(metrics, objectName)
()
}
}
Loading

0 comments on commit 98defcd

Please sign in to comment.