Skip to content

Conversation

CagriYonca
Copy link
Contributor

Fixed trace correlation issues, aligned with other tracers through the tracer-test-suite.

@CagriYonca CagriYonca self-assigned this Aug 23, 2025
@CagriYonca CagriYonca added the FUP Fair Usage Policy label Aug 23, 2025
@CagriYonca CagriYonca requested a review from a team as a code owner August 23, 2025 14:24
@CagriYonca CagriYonca added the fix label Aug 23, 2025
@CagriYonca CagriYonca closed this Aug 23, 2025
@CagriYonca CagriYonca force-pushed the fix/kafka-fup-final branch from a72d810 to ea9c383 Compare August 23, 2025 14:38
@CagriYonca CagriYonca reopened this Aug 23, 2025
@CagriYonca CagriYonca force-pushed the fix/kafka-fup-final branch from 3a5c83f to 4dd204f Compare August 25, 2025 08:29
@CagriYonca
Copy link
Contributor Author

CagriYonca commented Aug 25, 2025

A couple of words behind the need for the changes.

Let's say we have two distributed environments: env A as a producer and env B as a consumer, and say we have a flask endpoint named send_message(), which receives the call and sends a message to a kafka topic, after sending the message we also make an http call to let's say ibm-producer-api. the consumer checks the topic every 10 seconds, reads the message if any, then makes another http call to ibm-consumer-api.

So the pseudo code is here, trace IDs are given as examples:

env A - producer

  • send_message() endpoint receives a call, WSGI Span is created, trace ID: 100
    • message is sent to kafka topic, kafka-produce span is created, trace ID: 100
    • http call made to ibm.com, urllib3 span is created, trace ID: 100

env B - consumer

  • consumer checks the topic every 10 seconds for any new messages, if any:
    • message is consumed from the kafka topic, kafka-consume span is created, trace ID: 100
    • since there is no active span after the message has been consumed, kafka-consume span is ended, and since the span context was not saved, we lose the trace ID, so the new urllib3 span will have the trace ID: 101

What we want is whether there is a parent span or not, to keep the incoming trace ID alive.

So after this PR, what will happen in the example above is as follows:

env B - consumer

  • consumer checks the topic every 10 seconds for any new messages, if any:
    • message is consumed from the kafka topic, kafka-consume span is created, trace ID: 100
      • kafka-consume span is still alive until we call kafka_consumer.close() or manually close the span using close_consumer_span(). now since the span context information is still there, urllib3 span will also have the trace ID: 100

Comment on lines +206 to +207
def trace_kafka_close(
wrapped: Callable[..., InstanaConfluentKafkaConsumer.close],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to add this function to the class InstanaConfluentKafkaConsumer to wrap it correctly.

from instana.util.traceutils import (
get_tracer_tuple,
tracing_is_off,
)
from instana.span.span import InstanaSpan

consumer_token: Dict[str, Any] = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this variable necessary to be a dictionary? I saw that only one value to the unique key token is assigned to it, so I think a simple variable is sufficient, don't you think?

BTW, we are not using type annotations for variables in our code.

Suggested change
consumer_token: Dict[str, Any] = {}
consumer_token = None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the same way we used for the celery. Let me change them both

Comment on lines +172 to +173
token = context.attach(ctx)
consumer_token["token"] = token
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If changing the consumer_token variable as commented here, you can do the following:

Suggested change
token = context.attach(ctx)
consumer_token["token"] = token
consumer_token = context.attach(ctx)

Comment on lines +180 to +181
if "token" in consumer_token:
context.detach(consumer_token.pop("token", None))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If changing the consumer_token variable as commented here, you can do the following:

Suggested change
if "token" in consumer_token:
context.detach(consumer_token.pop("token", None))
if consumer_token:
context.detach(consumer_token)


def clear_context() -> None:
context.attach(trace.set_span_in_context(None))
consumer_token.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If changing the consumer_token variable as commented here, you can do the following:

Suggested change
consumer_token.clear()
consumer_token = None


if TYPE_CHECKING:
from kafka.producer.future import FutureRecordMetadata

consumer_token: Dict[str, Any] = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment as here.

BTW, there was too much repetitive code in those two files. It would be interesting to see how to reduce that.

Comment on lines 245 to +251
except Exception as exc:
exception = exc
finally:
if res:
create_span("poll", res.topic(), res.headers())
else:
create_span(
"poll",
next(iter(instance.list_topics().topics)),
exception=exception,
)

return res
create_span(
"poll",
next(iter(instance.list_topics().topics)),
exception=exception,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as commented here. Please verify the rest of the code and the kafka_python.py file to apply the same changes, including necessary tests.

Suggested change
except Exception as exc:
exception = exc
finally:
if res:
create_span("poll", res.topic(), res.headers())
else:
create_span(
"poll",
next(iter(instance.list_topics().topics)),
exception=exception,
)
return res
create_span(
"poll",
next(iter(instance.list_topics().topics)),
exception=exception,
)
except Exception as exc_error:
create_span(
"poll",
next(iter(instance.list_topics().topics)),
exception=exc_error,
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fix FUP Fair Usage Policy
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants