diff --git a/.github/workflows/publish-docker.yaml b/.github/workflows/publish-docker.yaml index 01c7e0b20da6..ca6278dc33f3 100644 --- a/.github/workflows/publish-docker.yaml +++ b/.github/workflows/publish-docker.yaml @@ -20,6 +20,7 @@ on: push: branches: - master + - otelso117 release: types: - released diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryLogHandler.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryLogHandler.java index 3d6c6bc9db62..fd7f12f11764 100644 --- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryLogHandler.java +++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryLogHandler.java @@ -27,6 +27,7 @@ import io.opentelemetry.proto.logs.v1.LogRecord; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import lombok.Getter; import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair; import org.apache.skywalking.apm.network.logging.v3.LogData; import org.apache.skywalking.apm.network.logging.v3.LogDataBody; @@ -39,6 +40,11 @@ import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.receiver.otel.Handler; import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; import java.util.Map; import java.util.stream.Collectors; @@ -54,6 +60,24 @@ public class OpenTelemetryLogHandler private ILogAnalyzerService logAnalyzerService; + @Getter(lazy = true) + private final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); + + @Getter(lazy = true) + private final CounterMetrics receivedCounter = getMetricsCreator().createCounter( + "otel_logs_received", + "The number of log request received by the OTLP receiver", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("otlp") + ); + @Getter(lazy = true) + private final HistogramMetrics processHistogram = getMetricsCreator().createHistogramMetric( + "otel_logs_latency", + "The latency to process the logs request", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("otlp") + ); + @Override public String type() { return "otlp-logs"; @@ -69,30 +93,32 @@ public void active() throws ModuleStartException { @Override public void export(ExportLogsServiceRequest request, StreamObserver responseObserver) { - request.getResourceLogsList().forEach(resourceLogs -> { - final var resource = resourceLogs.getResource(); - final var attributes = resource - .getAttributesList().stream() - .map(it -> Map.entry(it.getKey(), buildTagValue(it))) - .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); - final var service = attributes.get("service.name"); - if (Strings.isNullOrEmpty(service)) { - log.warn("No service name found in resource attributes, discarding the log"); - return; - } - final var layer = attributes.getOrDefault("service.layer", ""); - final var serviceInstance = attributes.getOrDefault("service.instance", ""); - - resourceLogs - .getScopeLogsList() - .stream() - .flatMap(it -> it.getLogRecordsList().stream()) - .forEach( - logRecord -> - doAnalysisQuietly(service, layer, serviceInstance, logRecord)); - responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - }); + getReceivedCounter().inc(); + + try (final var timer = getProcessHistogram().createTimer()) { + request.getResourceLogsList().forEach(resourceLogs -> { + final var resource = resourceLogs.getResource(); + final var attributes = resource + .getAttributesList().stream() + .map(it -> Map.entry(it.getKey(), buildTagValue(it))) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + final var service = attributes.get("service.name"); + if (Strings.isNullOrEmpty(service)) { + log.warn("No service name found in resource attributes, discarding the log"); + return; + } + final var layer = attributes.getOrDefault("service.layer", ""); + final var serviceInstance = attributes.getOrDefault("service.instance", ""); + + resourceLogs + .getScopeLogsList() + .stream() + .flatMap(it -> it.getLogRecordsList().stream()) + .forEach(logRecord -> doAnalysisQuietly(service, layer, serviceInstance, logRecord)); + responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance()); + responseObserver.onCompleted(); + }); + } } private void doAnalysisQuietly(String service, String layer, String serviceInstance, LogRecord logRecord) { diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java index 0be3aaaf2c3b..02d6b3dc229d 100644 --- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java +++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java @@ -25,6 +25,7 @@ import io.opentelemetry.proto.metrics.v1.Sum; import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; import io.vavr.Function1; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.meter.analyzer.MetricConvert; @@ -42,6 +43,11 @@ import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric; import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary; import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; import java.io.IOException; import java.util.HashMap; @@ -73,39 +79,60 @@ public class OpenTelemetryMetricRequestProcessor implements Service { .build(); private List converters; + @Getter(lazy = true) + private final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); + + @Getter(lazy = true) + private final CounterMetrics receivedCounter = getMetricsCreator().createCounter( + "otel_metrics_received", + "The number of metrics received by the OTLP receiver", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("otlp") + ); + @Getter(lazy = true) + private final HistogramMetrics processHistogram = getMetricsCreator().createHistogramMetric( + "otel_metrics_latency", + "The latency to process the metrics request", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("otlp") + ); + public void processMetricsRequest(final ExportMetricsServiceRequest requests) { - requests.getResourceMetricsList().forEach(request -> { - if (log.isDebugEnabled()) { - log.debug("Resource attributes: {}", request.getResource().getAttributesList()); - } + getReceivedCounter().inc(); + + try (final var unused = getProcessHistogram().createTimer()) { + requests.getResourceMetricsList().forEach(request -> { + if (log.isDebugEnabled()) { + log.debug("Resource attributes: {}", request.getResource().getAttributesList()); + } - final Map nodeLabels = - request - .getResource() - .getAttributesList() - .stream() - .collect(toMap( - it -> LABEL_MAPPINGS + final Map nodeLabels = + request + .getResource() + .getAttributesList() + .stream() + .collect(toMap( + it -> LABEL_MAPPINGS .getOrDefault(it.getKey(), it.getKey()) .replaceAll("\\.", "_"), - it -> anyValueToString(it.getValue()), + it -> anyValueToString(it.getValue()), (v1, v2) -> v1 - )); - - converters - .forEach(convert -> convert.toMeter( - request - .getScopeMetricsList().stream() - .flatMap(scopeMetrics -> scopeMetrics - .getMetricsList().stream() - .flatMap(metric -> adaptMetrics(nodeLabels, metric)) - .map(Function1.liftTry(Function.identity())) - .flatMap(tryIt -> MetricConvert.log( - tryIt, - "Convert OTEL metric to prometheus metric" - ))))); - }); + )); + converters + .forEach(convert -> convert.toMeter( + request + .getScopeMetricsList().stream() + .flatMap(scopeMetrics -> scopeMetrics + .getMetricsList().stream() + .flatMap(metric -> adaptMetrics(nodeLabels, metric)) + .map(Function1.liftTry(Function.identity())) + .flatMap(tryIt -> MetricConvert.log( + tryIt, + "Convert OTEL metric to prometheus metric" + ))))); + }); + } } public void start() throws ModuleStartException { diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryTraceHandler.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryTraceHandler.java index 18e76bee346f..08c5ed376fe5 100644 --- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryTraceHandler.java +++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryTraceHandler.java @@ -30,6 +30,7 @@ import io.opentelemetry.proto.resource.v1.Resource; import io.opentelemetry.proto.trace.v1.ScopeSpans; import io.opentelemetry.proto.trace.v1.Status; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; @@ -40,6 +41,11 @@ import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule; import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; import zipkin2.Endpoint; import zipkin2.Span; @@ -64,6 +70,24 @@ public class OpenTelemetryTraceHandler private final ModuleManager manager; private SpanForwardService forwardService; + @Getter(lazy = true) + private final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); + + @Getter(lazy = true) + private final CounterMetrics receivedCounter = getMetricsCreator().createCounter( + "otel_spans_received", + "The number of span request received by the OTLP receiver", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("otlp") + ); + @Getter(lazy = true) + private final HistogramMetrics processHistogram = getMetricsCreator().createHistogramMetric( + "otel_spans_latency", + "The latency to process the span request", + new MetricsTag.Keys("protocol"), + new MetricsTag.Values("otlp") + ); + @Override public String type() { return "otlp-traces"; @@ -79,33 +103,37 @@ public void active() throws ModuleStartException { @Override public void export(ExportTraceServiceRequest request, StreamObserver responseObserver) { + getReceivedCounter().inc(); + final ArrayList result = new ArrayList<>(); - request.getResourceSpansList().forEach(resourceSpans -> { - final Resource resource = resourceSpans.getResource(); - final List scopeSpansList = resourceSpans.getScopeSpansList(); - if (resource.getAttributesCount() == 0 && scopeSpansList.size() == 0) { - return; - } - final Map resourceTags = convertAttributeToMap(resource.getAttributesList()); - String serviceName = extractZipkinServiceName(resourceTags); - if (StringUtil.isEmpty(serviceName)) { - log.warn("No service name found in resource attributes, discarding the trace"); - return; - } + try (final var unused = getProcessHistogram().createTimer()) { + request.getResourceSpansList().forEach(resourceSpans -> { + final Resource resource = resourceSpans.getResource(); + final List scopeSpansList = resourceSpans.getScopeSpansList(); + if (resource.getAttributesCount() == 0 && scopeSpansList.size() == 0) { + return; + } + final Map resourceTags = convertAttributeToMap(resource.getAttributesList()); + String serviceName = extractZipkinServiceName(resourceTags); + if (StringUtil.isEmpty(serviceName)) { + log.warn("No service name found in resource attributes, discarding the trace"); + return; + } - try { - for (ScopeSpans scopeSpans : scopeSpansList) { - extractScopeTag(scopeSpans.getScope(), resourceTags); - for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) { - Span zipkinSpan = convertSpan(span, serviceName, resourceTags); - result.add(zipkinSpan); + try { + for (ScopeSpans scopeSpans : scopeSpansList) { + extractScopeTag(scopeSpans.getScope(), resourceTags); + for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) { + Span zipkinSpan = convertSpan(span, serviceName, resourceTags); + result.add(zipkinSpan); + } } + } catch (Exception e) { + log.warn("convert span error, discarding the span: {}", e.getMessage()); } - } catch (Exception e) { - log.warn("convert span error, discarding the span: {}", e.getMessage()); - } - }); + }); + } getForwardService().send(result); responseObserver.onNext(ExportTraceServiceResponse.getDefaultInstance()); diff --git a/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml b/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml index 6de0e00b53d7..e83e2ca97e40 100644 --- a/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml +++ b/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml @@ -37,15 +37,17 @@ metricsRules: - name: instance_jvm_memory_bytes_used exp: jvm_memory_bytes_used.sum(['service', 'host_name']) - name: instance_jvm_gc_count - exp: "jvm_gc_collection_seconds_count.tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation') - .sum(['service', 'host_name', 'gc']).increase('PT1M') - .tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_count'} }) - .tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_count'} })" + exp: > + jvm_gc_collection_seconds_count.tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation') + .sum(['service', 'host_name', 'gc']).increase('PT1M') + .tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_count'} }) + .tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_count'} }) - name: instance_jvm_gc_time - exp: "(jvm_gc_collection_seconds_sum * 1000).tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation') - .sum(['service', 'host_name', 'gc']).increase('PT1M') - .tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_time'} }) - .tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_time'} })" + exp: > + (jvm_gc_collection_seconds_sum * 1000).tagMatch('gc', 'PS Scavenge|Copy|ParNew|G1 Young Generation|PS MarkSweep|MarkSweepCompact|ConcurrentMarkSweep|G1 Old Generation') + .sum(['service', 'host_name', 'gc']).increase('PT1M') + .tag({tags -> if (tags['gc'] == 'PS Scavenge' || tags['gc'] == 'Copy' || tags['gc'] == 'ParNew' || tags['gc'] == 'G1 Young Generation') {tags.gc = 'young_gc_time'} }) + .tag({tags -> if (tags['gc'] == 'PS MarkSweep' || tags['gc'] == 'MarkSweepCompact' || tags['gc'] == 'ConcurrentMarkSweep' || tags['gc'] == 'G1 Old Generation') {tags.gc = 'old_gc_time'} }) - name: instance_trace_count exp: trace_in_latency_count.sum(['service', 'host_name']).increase('PT1M') - name: instance_trace_latency_percentile @@ -59,8 +61,9 @@ metricsRules: - name: instance_mesh_analysis_error_count exp: mesh_analysis_error_count.sum(['service', 'host_name']).increase('PT1M') - name: instance_metrics_aggregation - exp: "metrics_aggregation.tagEqual('dimensionality', 'minute').sum(['service', 'host_name', 'level']).increase('PT1M') - .tag({tags -> if (tags['level'] == '1') {tags.level = 'L1 aggregation'} }).tag({tags -> if (tags['level'] == '2') {tags.level = 'L2 aggregation'} })" + exp: > + metrics_aggregation.tagEqual('dimensionality', 'minute').sum(['service', 'host_name', 'level']).increase('PT1M') + .tag({tags -> if (tags['level'] == '1') {tags.level = 'L1 aggregation'} }).tag({tags -> if (tags['level'] == '2') {tags.level = 'L2 aggregation'} }) - name: instance_persistence_execute_percentile exp: persistence_timer_bulk_execute_latency.sum(['le', 'service', 'host_name']).increase('PT5M').histogram().histogram_percentile([50,70,90,99]) - name: instance_persistence_prepare_percentile @@ -99,3 +102,7 @@ metricsRules: exp: k8s_als_drop_count.sum(['service', 'host_name']).increase('PT1M') - name: instance_k8s_als_latency_percentile exp: k8s_als_in_latency.sum(['le', 'service', 'host_name']).increase('PT1M').histogram().histogram_percentile([50,70,90,99]) + - name: otel_metrics_received + exp: otel_metrics_received.sum(['service', 'host_name']).increase('PT1M') + - name: otel_metrics_processed + exp: otel_metrics_processed.sum(['service', 'host_name']).increase('PT1M')