Skip to content

Commit

Permalink
Add self observability metrics for otel handler (#12598)
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhenxu94 authored Sep 6, 2024
1 parent 9d0cb9e commit 7848399
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 65 deletions.
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* Fix the compatibility with Grafana 11 when using label_values query variables.
* Nacos as config server and cluster coordinator supports configuration contextPath.
* Update the endpoint name format to `<Method>:<Path>` in eBPF Access Log Receiver.
* Add self-observability metrics for OpenTelemetry receiver.

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.opentelemetry.proto.logs.v1.LogRecord;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.Getter;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
Expand All @@ -39,6 +40,10 @@
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.otel.Handler;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;

import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -54,6 +59,17 @@ public class OpenTelemetryLogHandler

private ILogAnalyzerService logAnalyzerService;

@Getter(lazy = true)
private final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);

@Getter(lazy = true)
private final HistogramMetrics processHistogram = getMetricsCreator().createHistogramMetric(
"otel_logs_latency",
"The latency to process the logs request",
MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE
);

@Override
public String type() {
return "otlp-logs";
Expand Down Expand Up @@ -87,9 +103,11 @@ public void export(ExportLogsServiceRequest request, StreamObserver<ExportLogsSe
.getScopeLogsList()
.stream()
.flatMap(it -> it.getLogRecordsList().stream())
.forEach(
logRecord ->
doAnalysisQuietly(service, layer, serviceInstance, logRecord));
.forEach(logRecord -> {
try (final var timer = getProcessHistogram().createTimer()) {
doAnalysisQuietly(service, layer, serviceInstance, logRecord);
}
});
responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.vavr.Function1;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
Expand All @@ -42,6 +43,10 @@
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -73,39 +78,51 @@ public class OpenTelemetryMetricRequestProcessor implements Service {
.build();
private List<PrometheusMetricConverter> converters;

public void processMetricsRequest(final ExportMetricsServiceRequest requests) {
requests.getResourceMetricsList().forEach(request -> {
if (log.isDebugEnabled()) {
log.debug("Resource attributes: {}", request.getResource().getAttributesList());
}
@Getter(lazy = true)
private final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);

final Map<String, String> nodeLabels =
request
.getResource()
.getAttributesList()
.stream()
.collect(toMap(
it -> LABEL_MAPPINGS
.getOrDefault(it.getKey(), it.getKey())
.replaceAll("\\.", "_"),
it -> anyValueToString(it.getValue()),
(v1, v2) -> v1
));
@Getter(lazy = true)
private final HistogramMetrics processHistogram = getMetricsCreator().createHistogramMetric(
"otel_metrics_latency",
"The latency to process the metrics request",
MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE
);

converters
.forEach(convert -> convert.toMeter(
public void processMetricsRequest(final ExportMetricsServiceRequest requests) {
try (final var unused = getProcessHistogram().createTimer()) {
requests.getResourceMetricsList().forEach(request -> {
if (log.isDebugEnabled()) {
log.debug("Resource attributes: {}", request.getResource().getAttributesList());
}

final Map<String, String> nodeLabels =
request
.getScopeMetricsList().stream()
.flatMap(scopeMetrics -> scopeMetrics
.getMetricsList().stream()
.flatMap(metric -> adaptMetrics(nodeLabels, metric))
.map(Function1.liftTry(Function.identity()))
.flatMap(tryIt -> MetricConvert.log(
tryIt,
"Convert OTEL metric to prometheus metric"
)))));
});
.getResource()
.getAttributesList()
.stream()
.collect(toMap(
it -> LABEL_MAPPINGS
.getOrDefault(it.getKey(), it.getKey())
.replaceAll("\\.", "_"),
it -> anyValueToString(it.getValue()),
(v1, v2) -> v1
));

converters
.forEach(convert -> convert.toMeter(
request
.getScopeMetricsList().stream()
.flatMap(scopeMetrics -> scopeMetrics
.getMetricsList().stream()
.flatMap(metric -> adaptMetrics(nodeLabels, metric))
.map(Function1.liftTry(Function.identity()))
.flatMap(tryIt -> MetricConvert.log(
tryIt,
"Convert OTEL metric to prometheus metric"
)))));
});
}
}

public void start() throws ModuleStartException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.opentelemetry.proto.trace.v1.Status;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
Expand All @@ -40,6 +41,10 @@
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import zipkin2.Endpoint;
import zipkin2.Span;

Expand All @@ -64,6 +69,17 @@ public class OpenTelemetryTraceHandler
private final ModuleManager manager;
private SpanForwardService forwardService;

@Getter(lazy = true)
private final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);

@Getter(lazy = true)
private final HistogramMetrics processHistogram = getMetricsCreator().createHistogramMetric(
"otel_spans_latency",
"The latency to process the span request",
MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE
);

@Override
public String type() {
return "otlp-traces";
Expand All @@ -80,34 +96,36 @@ public void active() throws ModuleStartException {
@Override
public void export(ExportTraceServiceRequest request, StreamObserver<ExportTraceServiceResponse> responseObserver) {
final ArrayList<Span> result = new ArrayList<>();
request.getResourceSpansList().forEach(resourceSpans -> {
final Resource resource = resourceSpans.getResource();
final List<ScopeSpans> scopeSpansList = resourceSpans.getScopeSpansList();
if (resource.getAttributesCount() == 0 && scopeSpansList.size() == 0) {
return;
}

final Map<String, String> resourceTags = convertAttributeToMap(resource.getAttributesList());
String serviceName = extractZipkinServiceName(resourceTags);
if (StringUtil.isEmpty(serviceName)) {
log.warn("No service name found in resource attributes, discarding the trace");
return;
}
try (final var unused = getProcessHistogram().createTimer()) {
request.getResourceSpansList().forEach(resourceSpans -> {
final Resource resource = resourceSpans.getResource();
final List<ScopeSpans> scopeSpansList = resourceSpans.getScopeSpansList();
if (resource.getAttributesCount() == 0 && scopeSpansList.size() == 0) {
return;
}
final Map<String, String> resourceTags = convertAttributeToMap(resource.getAttributesList());
String serviceName = extractZipkinServiceName(resourceTags);
if (StringUtil.isEmpty(serviceName)) {
log.warn("No service name found in resource attributes, discarding the trace");
return;
}

try {
for (ScopeSpans scopeSpans : scopeSpansList) {
extractScopeTag(scopeSpans.getScope(), resourceTags);
for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) {
Span zipkinSpan = convertSpan(span, serviceName, resourceTags);
result.add(zipkinSpan);
try {
for (ScopeSpans scopeSpans : scopeSpansList) {
extractScopeTag(scopeSpans.getScope(), resourceTags);
for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) {
Span zipkinSpan = convertSpan(span, serviceName, resourceTags);
result.add(zipkinSpan);
}
}
} catch (Exception e) {
log.warn("convert span error, discarding the span: {}", e.getMessage());
}
} catch (Exception e) {
log.warn("convert span error, discarding the span: {}", e.getMessage());
}
});
});
getForwardService().send(result);
}

getForwardService().send(result);
responseObserver.onNext(ExportTraceServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
Expand Down
35 changes: 25 additions & 10 deletions oap-server/server-starter/src/main/resources/otel-rules/oap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ metricsRules:
- name: instance_jvm_memory_bytes_used
exp: jvm_memory_bytes_used.sum(['service', 'host_name'])
- name: instance_jvm_gc_count
exp: "jvm_gc_collection_seconds_count.tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation')
.sum(['service', 'host_name', 'gc']).increase('PT1M')
.tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_count'} })
.tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_count'} })"
exp: >
jvm_gc_collection_seconds_count.tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation')
.sum(['service', 'host_name', 'gc']).increase('PT1M')
.tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_count'} })
.tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_count'} })
- name: instance_jvm_gc_time
exp: "(jvm_gc_collection_seconds_sum * 1000).tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation')
.sum(['service', 'host_name', 'gc']).increase('PT1M')
.tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_time'} })
.tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_time'} })"
exp: >
(jvm_gc_collection_seconds_sum * 1000).tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation')
.sum(['service', 'host_name', 'gc']).increase('PT1M')
.tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_time'} })
.tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_time'} })
- name: instance_trace_count
exp: trace_in_latency_count.sum(['service', 'host_name']).increase('PT1M')
- name: instance_trace_latency_percentile
Expand All @@ -59,8 +61,9 @@ metricsRules:
- name: instance_mesh_analysis_error_count
exp: mesh_analysis_error_count.sum(['service', 'host_name']).increase('PT1M')
- name: instance_metrics_aggregation
exp: "metrics_aggregation.tagEqual('dimensionality', 'minute').sum(['service', 'host_name', 'level']).increase('PT1M')
.tag({tags -> if (tags['level'] == '1') {tags.level = 'L1 aggregation'} }).tag({tags -> if (tags['level'] == '2') {tags.level = 'L2 aggregation'} })"
exp: >
metrics_aggregation.tagEqual('dimensionality', 'minute').sum(['service', 'host_name', 'level']).increase('PT1M')
.tag({tags -> if (tags['level'] == '1') {tags.level = 'L1 aggregation'} }).tag({tags -> if (tags['level'] == '2') {tags.level = 'L2 aggregation'} })
- name: instance_persistence_execute_percentile
exp: persistence_timer_bulk_execute_latency.sum(['le', 'service', 'host_name']).increase('PT5M').histogram().histogram_percentile([50,70,90,99])
- name: instance_persistence_prepare_percentile
Expand Down Expand Up @@ -99,3 +102,15 @@ metricsRules:
exp: k8s_als_drop_count.sum(['service', 'host_name']).increase('PT1M')
- name: instance_k8s_als_latency_percentile
exp: k8s_als_in_latency.sum(['le', 'service', 'host_name']).increase('PT1M').histogram().histogram_percentile([50,70,90,99])
- name: otel_metrics_received
exp: otel_metrics_latency_count.sum(['service', 'host_name']).increase('PT1M')
- name: otel_logs_received
exp: otel_logs_latency_count.sum(['service', 'host_name']).increase('PT1M')
- name: otel_spans_received
exp: otel_spans_latency_count.sum(['service', 'host_name']).increase('PT1M')
- name: otel_metrics_latency_percentile
exp: otel_metrics_latency.sum(['le', 'service', 'host_name']).increase('PT1M').histogram().histogram_percentile([50,70,90,99])
- name: otel_logs_latency_percentile
exp: otel_logs_latency.sum(['le', 'service', 'host_name']).increase('PT1M').histogram().histogram_percentile([50,70,90,99])
- name: otel_spans_latency_percentile
exp: otel_spans_latency.sum(['le', 'service', 'host_name']).increase('PT1M').histogram().histogram_percentile([50,70,90,99])
Loading

0 comments on commit 7848399

Please sign in to comment.