Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3610](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3610))
- infra(ci): Fix git pull failures in core contrib test
([#3357](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3357))
- `opentelemetry-instrumentation-celery`: Bump celery semantic convention schema version from 1.11.0 to 1.37.0
([#3712](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3712))

### Added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,22 @@ def add(x, y):

from opentelemetry import context as context_api
from opentelemetry import trace
from opentelemetry.instrumentation._semconv import (
_get_schema_url,
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
_report_new,
_report_old,
_StabilityMode,
)
from opentelemetry.instrumentation.celery import utils
from opentelemetry.instrumentation.celery.package import _instruments
from opentelemetry.instrumentation.celery.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.metrics import get_meter
from opentelemetry.propagate import extract, inject
from opentelemetry.propagators.textmap import Getter
from opentelemetry.semconv._incubating.attributes import messaging_attributes
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode

Expand Down Expand Up @@ -116,27 +125,32 @@ def keys(self, carrier):
class CeleryInstrumentor(BaseInstrumentor):
metrics = None
task_id_to_start_time = {}
_sem_conv_opt_in_mode = _StabilityMode.DEFAULT

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")

self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.MESSAGING
)

# pylint: disable=attribute-defined-outside-init
self._tracer = trace.get_tracer(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_schema_url(self._sem_conv_opt_in_mode),
)

meter_provider = kwargs.get("meter_provider")
meter = get_meter(
__name__,
__version__,
meter_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_schema_url(self._sem_conv_opt_in_mode),
)

self.create_celery_metrics(meter)
Expand Down Expand Up @@ -204,8 +218,12 @@ def _trace_postrun(self, *args, **kwargs):
# request context tags
if span.is_recording():
span.set_attribute(_TASK_TAG_KEY, _TASK_RUN)
utils.set_attributes_from_context(span, kwargs)
utils.set_attributes_from_context(span, task.request)
utils.set_attributes_from_context(
span, kwargs, self._sem_conv_opt_in_mode
)
utils.set_attributes_from_context(
span, task.request, self._sem_conv_opt_in_mode
)
span.set_attribute(_TASK_NAME_KEY, task.name)

activation.__exit__(None, None, None)
Expand Down Expand Up @@ -240,9 +258,18 @@ def _trace_before_publish(self, *args, **kwargs):
# apply some attributes here because most of the data is not available
if span.is_recording():
span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC)
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id)
if _report_new(self._sem_conv_opt_in_mode):
span.set_attribute(
messaging_attributes.MESSAGING_MESSAGE_ID, task_id
) # Not necessary since it has the same name as the old attribute but just in case it changes in the future
if _report_old(self._sem_conv_opt_in_mode):
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID, task_id
)
span.set_attribute(_TASK_NAME_KEY, task_name)
utils.set_attributes_from_context(span, kwargs)
utils.set_attributes_from_context(
span, kwargs, self._sem_conv_opt_in_mode
)

activation = trace.use_span(span, end_on_exit=True)
activation.__enter__() # pylint: disable=E1101
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
from celery import registry # pylint: disable=no-name-in-module
from celery.app.task import Task

from opentelemetry.instrumentation._semconv import (
_report_new,
_report_old,
_StabilityMode,
)
from opentelemetry.semconv._incubating.attributes import messaging_attributes
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span

Expand Down Expand Up @@ -56,7 +62,11 @@


# pylint:disable=too-many-branches
def set_attributes_from_context(span, context):
def set_attributes_from_context(
span,
context,
sem_conv_opt_in_mode: _StabilityMode = _StabilityMode.DEFAULT,
):
"""Helper to extract meta values from a Celery Context"""
if not span.is_recording():
return
Expand All @@ -80,7 +90,7 @@ def set_attributes_from_context(span, context):
continue

attribute_name = None

new_attribute_name = None
# Celery 4.0 uses `origin` instead of `hostname`; this change preserves
# the same name for the tag despite Celery version
if key == "origin":
Expand All @@ -91,24 +101,42 @@ def set_attributes_from_context(span, context):
routing_key = value.get("routing_key")

if routing_key is not None:
span.set_attribute(
SpanAttributes.MESSAGING_DESTINATION, routing_key
)
if _report_new(sem_conv_opt_in_mode):
span.set_attribute(
messaging_attributes.MESSAGING_DESTINATION_NAME,
routing_key,
)
if _report_old(sem_conv_opt_in_mode):
span.set_attribute(
SpanAttributes.MESSAGING_DESTINATION, routing_key
)

value = str(value)

elif key == "id":
attribute_name = SpanAttributes.MESSAGING_MESSAGE_ID
if _report_new(sem_conv_opt_in_mode):
new_attribute_name = messaging_attributes.MESSAGING_MESSAGE_ID
if _report_old(sem_conv_opt_in_mode):
attribute_name = SpanAttributes.MESSAGING_MESSAGE_ID

elif key == "correlation_id":
attribute_name = SpanAttributes.MESSAGING_CONVERSATION_ID
if _report_new(sem_conv_opt_in_mode):
new_attribute_name = (
messaging_attributes.MESSAGING_MESSAGE_CONVERSATION_ID
)
if _report_old(sem_conv_opt_in_mode):
attribute_name = SpanAttributes.MESSAGING_CONVERSATION_ID

elif key == "routing_key":
attribute_name = SpanAttributes.MESSAGING_DESTINATION
if _report_new(sem_conv_opt_in_mode):
new_attribute_name = (
messaging_attributes.MESSAGING_DESTINATION_NAME
)
if _report_old(sem_conv_opt_in_mode):
attribute_name = SpanAttributes.MESSAGING_DESTINATION

# according to https://docs.celeryproject.org/en/stable/userguide/routing.html#exchange-types
elif key == "declare":
attribute_name = SpanAttributes.MESSAGING_DESTINATION_KIND
for declare in value:
if declare.exchange.type == "direct":
value = "queue"
Expand All @@ -118,10 +146,13 @@ def set_attributes_from_context(span, context):
break

# set attribute name if not set specially for a key
if attribute_name is None:
if attribute_name is None and not new_attribute_name:
attribute_name = f"celery.{key}"

span.set_attribute(attribute_name, value)
if attribute_name:
span.set_attribute(attribute_name, value)
if new_attribute_name:
span.set_attribute(new_attribute_name, value)


def attach_context(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

import threading
import time
from unittest import mock

from wrapt import wrap_function_wrapper

from opentelemetry import baggage, context
from opentelemetry.instrumentation._semconv import (
OTEL_SEMCONV_STABILITY_OPT_IN,
_OpenTelemetrySemanticConventionStability,
)
from opentelemetry.instrumentation.celery import CeleryInstrumentor, utils
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv._incubating.attributes import messaging_attributes
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind, StatusCode
Expand All @@ -30,6 +36,7 @@
class TestCeleryInstrumentation(TestBase):
def setUp(self):
super().setUp()
_OpenTelemetrySemanticConventionStability._initialized = False
self._worker = app.Worker(app=app, pool="solo", concurrency=1)
self._thread = threading.Thread(target=self._worker.start)
self._thread.daemon = True
Expand Down Expand Up @@ -83,7 +90,6 @@ def test_task(self):
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_add",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
},
)
Expand Down Expand Up @@ -148,7 +154,6 @@ def test_task_raises(self):
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_raises",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
},
)
Expand Down Expand Up @@ -222,6 +227,65 @@ def _retrieve_context_wrapper_none_token(

unwrap(utils, "retrieve_context")

def test_task_new_sem_conv(self):
CeleryInstrumentor().uninstrument()
with mock.patch.dict(
"os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "messaging"}
):
CeleryInstrumentor().instrument()

result = task_add.delay(1, 2)

timeout = time.time() + 60 * 1 # 1 minutes from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)

spans = self.sorted_spans(
self.memory_exporter.get_finished_spans()
)
self.assertEqual(len(spans), 2)

consumer, producer = spans

self.assertEqual(
consumer.name, "run/tests.celery_test_tasks.task_add"
)
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
self.assertSpanHasAttributes(
consumer,
{
"celery.action": "run",
"celery.state": "SUCCESS",
messaging_attributes.MESSAGING_DESTINATION_NAME: "celery",
"celery.task_name": "tests.celery_test_tasks.task_add",
},
)

self.assertEqual(consumer.status.status_code, StatusCode.UNSET)

self.assertEqual(0, len(consumer.events))

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_add"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertSpanHasAttributes(
producer,
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_add",
messaging_attributes.MESSAGING_DESTINATION_NAME: "celery",
},
)

self.assertNotEqual(consumer.parent, producer.context)
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(
consumer.context.trace_id, producer.context.trace_id
)


class TestCelerySignatureTask(TestBase):
def setUp(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def test_set_attributes_from_context(self):
"44b7f305",
)
self.assertEqual(
span.attributes.get(SpanAttributes.MESSAGING_DESTINATION), "celery"
span.attributes.get(SpanAttributes.MESSAGING_DESTINATION),
"celery",
)

self.assertEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class _OpenTelemetryStabilitySignalType(Enum):
HTTP = "http"
DATABASE = "database"
GEN_AI = "gen_ai"
MESSAGING = "messaging"


class _StabilityMode(Enum):
Expand All @@ -175,14 +176,20 @@ class _StabilityMode(Enum):
DATABASE = "database"
DATABASE_DUP = "database/dup"
GEN_AI_LATEST_EXPERIMENTAL = "gen_ai_latest_experimental"
MESSAGING = "messaging"
MESSAGING_DUP = "messaging/dup"


def _report_new(mode: _StabilityMode):
return mode != _StabilityMode.DEFAULT


def _report_old(mode: _StabilityMode):
return mode not in (_StabilityMode.HTTP, _StabilityMode.DATABASE)
return mode not in (
_StabilityMode.HTTP,
_StabilityMode.DATABASE,
_StabilityMode.MESSAGING,
)


class _OpenTelemetrySemanticConventionStability:
Expand All @@ -206,6 +213,7 @@ def _initialize(cls):
_OpenTelemetryStabilitySignalType.HTTP: _StabilityMode.DEFAULT,
_OpenTelemetryStabilitySignalType.DATABASE: _StabilityMode.DEFAULT,
_OpenTelemetryStabilitySignalType.GEN_AI: _StabilityMode.DEFAULT,
_OpenTelemetryStabilitySignalType.MESSAGING: _StabilityMode.DEFAULT,
}
cls._initialized = True
return
Expand Down Expand Up @@ -233,6 +241,15 @@ def _initialize(cls):
_StabilityMode.DATABASE,
_StabilityMode.DATABASE_DUP,
)

cls._OTEL_SEMCONV_STABILITY_SIGNAL_MAPPING[
_OpenTelemetryStabilitySignalType.MESSAGING
] = cls._filter_mode(
opt_in_list,
_StabilityMode.MESSAGING,
_StabilityMode.MESSAGING_DUP,
)

cls._initialized = True

@staticmethod
Expand Down