Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add messaging.system for celery #3265

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `opentelemetry-instrumentation-celery` Add messaging.system for celery
([#3265](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3265))

- `opentelemetry-instrumentation-botocore` Add support for GenAI user events and lazy initialize tracer
([#3258](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3258))
- `opentelemetry-instrumentation-botocore` Add support for GenAI system events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def add(x, y):
_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal"
_TASK_NAME_KEY = "celery.task_name"

_QUEUE_NAME = "celery"


class CeleryGetter(Getter):
def get(self, carrier, key):
Expand Down Expand Up @@ -204,6 +206,7 @@ def _trace_postrun(self, *args, **kwargs):
# request context tags
if span.is_recording():
span.set_attribute(_TASK_TAG_KEY, _TASK_RUN)
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, _QUEUE_NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure if “celery” is the right value here.
According to https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
the expected values are Kafka, ActiveMQ, RabbitMQ, etc.
What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semconv states: "If one of them applies, then the respective value MUST be used; otherwise, a custom value MAY be used."
We can add celery to the semconv but I don't see it as blocker since the intent to describe the system is clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a pr for adding celery to semantic conventions open-telemetry/semantic-conventions#1954

Copy link
Contributor

@xrmx xrmx Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this a bit in the semconv pr above, would it be possible to use the name of broker we are using?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that requires extracting broker info from the context. The context that we instrument doesn't have that info.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code references:
-> these are the context attributes that are available in celery https://github.com/celery/celery/blob/main/celery/app/task.py#L60

-> these we instrument https://github.com/open-telemetry/opentelemetry-python-contrib/blob/main/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py#L35

IMO, there's no easy way to figure out the backend info (when I was looking into it some time back), and hence having celery is backend makes most sense to me. Ideally I agree having more fine grained info about makes most sense (from sem conv perspective). But do we have a way to get backend info?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems celery has a bunch of celery-specific attributes and only a minor intersection with messaging - in particular it only sets these two mesaging attributes

span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id)
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, _QUEUE_NAME)

Messaging semconv and not designed for background task processing and have very limited usefullness for in-memory queues. I think it's misleading to try to use messaging semconv for celery and the way they are used today (or proposed in this PR) is very far from semconv.

I suggest the following mental model:

  • celery instrumentation may create spans that describe celery only and might provide a bit of context into the underlying stack (messaging/db/in-memory). These spans don't follow messaging semconv. There could be celery-specific semconv documented in this repo.
  • underlying messaging/db library is instrumented too and provide the real messaging/db observability
  • together they are two different correlated layers and users may install both instrumentation or just one of them depending on the level of details they want

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed a few more attributes here

SpanAttributes.MESSAGING_DESTINATION, routing_key
)
value = str(value)
elif key == "id":
attribute_name = SpanAttributes.MESSAGING_MESSAGE_ID
elif key == "correlation_id":
attribute_name = SpanAttributes.MESSAGING_CONVERSATION_ID
elif key == "routing_key":
attribute_name = SpanAttributes.MESSAGING_DESTINATION
# according to https://docs.celeryproject.org/en/stable/userguide/routing.html#exchange-types
elif key == "declare":
attribute_name = SpanAttributes.MESSAGING_DESTINATION_KIND
for declare in value:
if declare.exchange.type == "direct":
value = "queue"
break
if declare.exchange.type == "topic":

which also add conversation id and destination name (routing key).

It does not change a general impression: celery is not a messaging system and does not (potentially cannot) provide messaging-level instrumentation

utils.set_attributes_from_context(span, kwargs)
utils.set_attributes_from_context(span, task.request)
span.set_attribute(_TASK_NAME_KEY, task.name)
Expand Down Expand Up @@ -241,6 +244,7 @@ def _trace_before_publish(self, *args, **kwargs):
if span.is_recording():
span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC)
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id)
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, _QUEUE_NAME)
span.set_attribute(_TASK_NAME_KEY, task_name)
utils.set_attributes_from_context(span, kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def test_task(self):
"celery.state": "SUCCESS",
SpanAttributes.MESSAGING_DESTINATION: "celery",
"celery.task_name": "tests.celery_test_tasks.task_add",
SpanAttributes.MESSAGING_SYSTEM: "celery",
},
)

Expand All @@ -85,6 +86,7 @@ def test_task(self):
"celery.task_name": "tests.celery_test_tasks.task_add",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
SpanAttributes.MESSAGING_SYSTEM: "celery",
},
)

Expand Down Expand Up @@ -119,6 +121,7 @@ def test_task_raises(self):
"celery.state": "FAILURE",
SpanAttributes.MESSAGING_DESTINATION: "celery",
"celery.task_name": "tests.celery_test_tasks.task_raises",
SpanAttributes.MESSAGING_SYSTEM: "celery",
},
)

Expand Down Expand Up @@ -150,6 +153,7 @@ def test_task_raises(self):
"celery.task_name": "tests.celery_test_tasks.task_raises",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
SpanAttributes.MESSAGING_SYSTEM: "celery",
},
)

Expand Down