Skip to content

Commit

Permalink
Add self observability metrics for otel handler
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhenxu94 committed Sep 6, 2024
1 parent 9d0cb9e commit 174167b
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 83 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ on:
push:
branches:
- master
- otelso117
release:
types:
- released
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.opentelemetry.proto.logs.v1.LogRecord;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.Getter;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
Expand All @@ -39,6 +40,11 @@
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.otel.Handler;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;

import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -54,6 +60,24 @@ public class OpenTelemetryLogHandler

private ILogAnalyzerService logAnalyzerService;

@Getter(lazy = true)
private final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);

@Getter(lazy = true)
private final CounterMetrics receivedCounter = getMetricsCreator().createCounter(
"otel_logs_received",
"The number of log request received by the OTLP receiver",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("otlp")
);
@Getter(lazy = true)
private final HistogramMetrics processHistogram = getMetricsCreator().createHistogramMetric(
"otel_logs_latency",
"The latency to process the logs request",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("otlp")
);

@Override
public String type() {
return "otlp-logs";
Expand All @@ -69,30 +93,32 @@ public void active() throws ModuleStartException {

@Override
public void export(ExportLogsServiceRequest request, StreamObserver<ExportLogsServiceResponse> responseObserver) {
request.getResourceLogsList().forEach(resourceLogs -> {
final var resource = resourceLogs.getResource();
final var attributes = resource
.getAttributesList().stream()
.map(it -> Map.entry(it.getKey(), buildTagValue(it)))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
final var service = attributes.get("service.name");
if (Strings.isNullOrEmpty(service)) {
log.warn("No service name found in resource attributes, discarding the log");
return;
}
final var layer = attributes.getOrDefault("service.layer", "");
final var serviceInstance = attributes.getOrDefault("service.instance", "");

resourceLogs
.getScopeLogsList()
.stream()
.flatMap(it -> it.getLogRecordsList().stream())
.forEach(
logRecord ->
doAnalysisQuietly(service, layer, serviceInstance, logRecord));
responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
});
getReceivedCounter().inc();

try (final var timer = getProcessHistogram().createTimer()) {
request.getResourceLogsList().forEach(resourceLogs -> {
final var resource = resourceLogs.getResource();
final var attributes = resource
.getAttributesList().stream()
.map(it -> Map.entry(it.getKey(), buildTagValue(it)))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
final var service = attributes.get("service.name");
if (Strings.isNullOrEmpty(service)) {
log.warn("No service name found in resource attributes, discarding the log");
return;
}
final var layer = attributes.getOrDefault("service.layer", "");
final var serviceInstance = attributes.getOrDefault("service.instance", "");

resourceLogs
.getScopeLogsList()
.stream()
.flatMap(it -> it.getLogRecordsList().stream())
.forEach(logRecord -> doAnalysisQuietly(service, layer, serviceInstance, logRecord));
responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
});
}
}

private void doAnalysisQuietly(String service, String layer, String serviceInstance, LogRecord logRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.vavr.Function1;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
Expand All @@ -42,6 +43,11 @@
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -73,39 +79,60 @@ public class OpenTelemetryMetricRequestProcessor implements Service {
.build();
private List<PrometheusMetricConverter> converters;

@Getter(lazy = true)
private final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);

@Getter(lazy = true)
private final CounterMetrics receivedCounter = getMetricsCreator().createCounter(
"otel_metrics_received",
"The number of metrics received by the OTLP receiver",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("otlp")
);
@Getter(lazy = true)
private final HistogramMetrics processHistogram = getMetricsCreator().createHistogramMetric(
"otel_metrics_latency",
"The latency to process the metrics request",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("otlp")
);

public void processMetricsRequest(final ExportMetricsServiceRequest requests) {
requests.getResourceMetricsList().forEach(request -> {
if (log.isDebugEnabled()) {
log.debug("Resource attributes: {}", request.getResource().getAttributesList());
}
getReceivedCounter().inc();

try (final var unused = getProcessHistogram().createTimer()) {
requests.getResourceMetricsList().forEach(request -> {
if (log.isDebugEnabled()) {
log.debug("Resource attributes: {}", request.getResource().getAttributesList());
}

final Map<String, String> nodeLabels =
request
.getResource()
.getAttributesList()
.stream()
.collect(toMap(
it -> LABEL_MAPPINGS
final Map<String, String> nodeLabels =
request
.getResource()
.getAttributesList()
.stream()
.collect(toMap(
it -> LABEL_MAPPINGS
.getOrDefault(it.getKey(), it.getKey())
.replaceAll("\\.", "_"),
it -> anyValueToString(it.getValue()),
it -> anyValueToString(it.getValue()),
(v1, v2) -> v1
));

converters
.forEach(convert -> convert.toMeter(
request
.getScopeMetricsList().stream()
.flatMap(scopeMetrics -> scopeMetrics
.getMetricsList().stream()
.flatMap(metric -> adaptMetrics(nodeLabels, metric))
.map(Function1.liftTry(Function.identity()))
.flatMap(tryIt -> MetricConvert.log(
tryIt,
"Convert OTEL metric to prometheus metric"
)))));
});
));

converters
.forEach(convert -> convert.toMeter(
request
.getScopeMetricsList().stream()
.flatMap(scopeMetrics -> scopeMetrics
.getMetricsList().stream()
.flatMap(metric -> adaptMetrics(nodeLabels, metric))
.map(Function1.liftTry(Function.identity()))
.flatMap(tryIt -> MetricConvert.log(
tryIt,
"Convert OTEL metric to prometheus metric"
)))));
});
}
}

public void start() throws ModuleStartException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.opentelemetry.proto.trace.v1.Status;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
Expand All @@ -40,6 +41,11 @@
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import zipkin2.Endpoint;
import zipkin2.Span;

Expand All @@ -64,6 +70,24 @@ public class OpenTelemetryTraceHandler
private final ModuleManager manager;
private SpanForwardService forwardService;

@Getter(lazy = true)
private final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);

@Getter(lazy = true)
private final CounterMetrics receivedCounter = getMetricsCreator().createCounter(
"otel_spans_received",
"The number of span request received by the OTLP receiver",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("otlp")
);
@Getter(lazy = true)
private final HistogramMetrics processHistogram = getMetricsCreator().createHistogramMetric(
"otel_spans_latency",
"The latency to process the span request",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("otlp")
);

@Override
public String type() {
return "otlp-traces";
Expand All @@ -79,33 +103,37 @@ public void active() throws ModuleStartException {

@Override
public void export(ExportTraceServiceRequest request, StreamObserver<ExportTraceServiceResponse> responseObserver) {
getReceivedCounter().inc();

final ArrayList<Span> result = new ArrayList<>();
request.getResourceSpansList().forEach(resourceSpans -> {
final Resource resource = resourceSpans.getResource();
final List<ScopeSpans> scopeSpansList = resourceSpans.getScopeSpansList();
if (resource.getAttributesCount() == 0 && scopeSpansList.size() == 0) {
return;
}

final Map<String, String> resourceTags = convertAttributeToMap(resource.getAttributesList());
String serviceName = extractZipkinServiceName(resourceTags);
if (StringUtil.isEmpty(serviceName)) {
log.warn("No service name found in resource attributes, discarding the trace");
return;
}
try (final var unused = getProcessHistogram().createTimer()) {
request.getResourceSpansList().forEach(resourceSpans -> {
final Resource resource = resourceSpans.getResource();
final List<ScopeSpans> scopeSpansList = resourceSpans.getScopeSpansList();
if (resource.getAttributesCount() == 0 && scopeSpansList.size() == 0) {
return;
}
final Map<String, String> resourceTags = convertAttributeToMap(resource.getAttributesList());
String serviceName = extractZipkinServiceName(resourceTags);
if (StringUtil.isEmpty(serviceName)) {
log.warn("No service name found in resource attributes, discarding the trace");
return;
}

try {
for (ScopeSpans scopeSpans : scopeSpansList) {
extractScopeTag(scopeSpans.getScope(), resourceTags);
for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) {
Span zipkinSpan = convertSpan(span, serviceName, resourceTags);
result.add(zipkinSpan);
try {
for (ScopeSpans scopeSpans : scopeSpansList) {
extractScopeTag(scopeSpans.getScope(), resourceTags);
for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) {
Span zipkinSpan = convertSpan(span, serviceName, resourceTags);
result.add(zipkinSpan);
}
}
} catch (Exception e) {
log.warn("convert span error, discarding the span: {}", e.getMessage());
}
} catch (Exception e) {
log.warn("convert span error, discarding the span: {}", e.getMessage());
}
});
});
}

getForwardService().send(result);
responseObserver.onNext(ExportTraceServiceResponse.getDefaultInstance());
Expand Down
27 changes: 17 additions & 10 deletions oap-server/server-starter/src/main/resources/otel-rules/oap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ metricsRules:
- name: instance_jvm_memory_bytes_used
exp: jvm_memory_bytes_used.sum(['service', 'host_name'])
- name: instance_jvm_gc_count
exp: "jvm_gc_collection_seconds_count.tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation')
.sum(['service', 'host_name', 'gc']).increase('PT1M')
.tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_count'} })
.tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_count'} })"
exp: >
jvm_gc_collection_seconds_count.tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation')
.sum(['service', 'host_name', 'gc']).increase('PT1M')
.tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_count'} })
.tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_count'} })
- name: instance_jvm_gc_time
exp: "(jvm_gc_collection_seconds_sum * 1000).tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation')
.sum(['service', 'host_name', 'gc']).increase('PT1M')
.tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_time'} })
.tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_time'} })"
exp: >
(jvm_gc_collection_seconds_sum * 1000).tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation')
.sum(['service', 'host_name', 'gc']).increase('PT1M')
.tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_time'} })
.tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_time'} })
- name: instance_trace_count
exp: trace_in_latency_count.sum(['service', 'host_name']).increase('PT1M')
- name: instance_trace_latency_percentile
Expand All @@ -59,8 +61,9 @@ metricsRules:
- name: instance_mesh_analysis_error_count
exp: mesh_analysis_error_count.sum(['service', 'host_name']).increase('PT1M')
- name: instance_metrics_aggregation
exp: "metrics_aggregation.tagEqual('dimensionality', 'minute').sum(['service', 'host_name', 'level']).increase('PT1M')
.tag({tags -> if (tags['level'] == '1') {tags.level = 'L1 aggregation'} }).tag({tags -> if (tags['level'] == '2') {tags.level = 'L2 aggregation'} })"
exp: >
metrics_aggregation.tagEqual('dimensionality', 'minute').sum(['service', 'host_name', 'level']).increase('PT1M')
.tag({tags -> if (tags['level'] == '1') {tags.level = 'L1 aggregation'} }).tag({tags -> if (tags['level'] == '2') {tags.level = 'L2 aggregation'} })
- name: instance_persistence_execute_percentile
exp: persistence_timer_bulk_execute_latency.sum(['le', 'service', 'host_name']).increase('PT5M').histogram().histogram_percentile([50,70,90,99])
- name: instance_persistence_prepare_percentile
Expand Down Expand Up @@ -99,3 +102,7 @@ metricsRules:
exp: k8s_als_drop_count.sum(['service', 'host_name']).increase('PT1M')
- name: instance_k8s_als_latency_percentile
exp: k8s_als_in_latency.sum(['le', 'service', 'host_name']).increase('PT1M').histogram().histogram_percentile([50,70,90,99])
- name: otel_metrics_received
exp: otel_metrics_received.sum(['service', 'host_name']).increase('PT1M')
- name: otel_metrics_processed
exp: otel_metrics_processed.sum(['service', 'host_name']).increase('PT1M')

0 comments on commit 174167b

Please sign in to comment.