Skip to content

Commit dda69a3

Browse files
committed
feat: added span filtering for confluent-kafka module
Signed-off-by: Cagri Yonca <[email protected]>
1 parent 8bc97bb commit dda69a3

File tree

1 file changed

+127
-0
lines changed

1 file changed

+127
-0
lines changed

tests/clients/kafka/test_confluent_kafka.py

+127
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# (c) Copyright IBM Corp. 2025
22

3+
import os
34
from typing import Generator
45

56
import pytest
@@ -11,7 +12,9 @@
1112
from confluent_kafka.admin import AdminClient, NewTopic
1213
from opentelemetry.trace import SpanKind
1314

15+
from instana.options import StandardOptions
1416
from instana.singletons import agent, tracer
17+
from instana.util.config import parse_ignored_endpoints_from_yaml
1518
from tests.helpers import testenv
1619

1720

@@ -186,3 +189,127 @@ def test_trace_confluent_kafka_error(self) -> None:
186189
kafka_span.data["kafka"]["error"]
187190
== "num_messages must be between 0 and 1000000 (1M)"
188191
)
192+
193+
def test_ignore_confluent_kafka(self) -> None:
194+
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka"
195+
agent.options = StandardOptions()
196+
197+
with tracer.start_as_current_span("test"):
198+
self.producer.produce(testenv["kafka_topic"], b"raw_bytes")
199+
self.producer.flush(timeout=10)
200+
201+
spans = self.recorder.queued_spans()
202+
assert len(spans) == 2
203+
204+
filtered_spans = agent.filter_spans(spans)
205+
assert len(filtered_spans) == 1
206+
207+
def test_ignore_confluent_kafka_producer(self) -> None:
208+
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:produce"
209+
agent.options = StandardOptions()
210+
211+
with tracer.start_as_current_span("test-span"):
212+
# Produce some events
213+
self.producer.produce(testenv["kafka_topic"], b"raw_bytes1")
214+
self.producer.produce(testenv["kafka_topic"], b"raw_bytes2")
215+
self.producer.flush()
216+
217+
# Consume the events
218+
consumer_config = self.kafka_config.copy()
219+
consumer_config["group.id"] = "my-group"
220+
consumer_config["auto.offset.reset"] = "earliest"
221+
222+
consumer = Consumer(consumer_config)
223+
consumer.subscribe([testenv["kafka_topic"]])
224+
consumer.consume(num_messages=2, timeout=60)
225+
226+
consumer.close()
227+
228+
spans = self.recorder.queued_spans()
229+
assert len(spans) == 4
230+
231+
filtered_spans = agent.filter_spans(spans)
232+
assert len(filtered_spans) == 2
233+
234+
def test_ignore_confluent_kafka_consumer(self) -> None:
235+
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume"
236+
agent.options = StandardOptions()
237+
238+
with tracer.start_as_current_span("test-span"):
239+
# Produce some events
240+
self.producer.produce(testenv["kafka_topic"], b"raw_bytes1")
241+
self.producer.produce(testenv["kafka_topic"], b"raw_bytes2")
242+
self.producer.flush()
243+
244+
# Consume the events
245+
consumer_config = self.kafka_config.copy()
246+
consumer_config["group.id"] = "my-group"
247+
consumer_config["auto.offset.reset"] = "earliest"
248+
249+
consumer = Consumer(consumer_config)
250+
consumer.subscribe([testenv["kafka_topic"]])
251+
consumer.consume(num_messages=2, timeout=60)
252+
253+
consumer.close()
254+
255+
spans = self.recorder.queued_spans()
256+
assert len(spans)
257+
258+
filtered_spans = agent.filter_spans(spans)
259+
assert len(filtered_spans) == 3
260+
261+
def test_ignore_confluent_specific_topic(self) -> None:
262+
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "kafka:consume"
263+
os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"] = (
264+
"tests/util/test_configuration-1.yaml"
265+
)
266+
267+
agent.options = StandardOptions()
268+
269+
with tracer.start_as_current_span("test-span"):
270+
# Produce some events
271+
self.producer.produce(testenv["kafka_topic"], b"raw_bytes1")
272+
self.producer.flush()
273+
274+
# Consume the events
275+
consumer_config = self.kafka_config.copy()
276+
consumer_config["group.id"] = "my-group"
277+
consumer_config["auto.offset.reset"] = "earliest"
278+
279+
consumer = Consumer(consumer_config)
280+
consumer.subscribe([testenv["kafka_topic"]])
281+
consumer.consume(num_messages=1, timeout=60)
282+
283+
consumer.close()
284+
285+
spans = self.recorder.queued_spans()
286+
assert len(spans) == 3
287+
288+
filtered_spans = agent.filter_spans(spans)
289+
assert len(filtered_spans) == 1
290+
291+
def test_ignore_confluent_specific_topic_with_config_file(self) -> None:
292+
agent.options.ignore_endpoints = parse_ignored_endpoints_from_yaml(
293+
"tests/util/test_configuration-1.yaml"
294+
)
295+
296+
with tracer.start_as_current_span("test-span"):
297+
# Produce some events
298+
self.producer.produce(testenv["kafka_topic"], b"raw_bytes1")
299+
self.producer.flush()
300+
301+
# Consume the events
302+
consumer_config = self.kafka_config.copy()
303+
consumer_config["group.id"] = "my-group"
304+
consumer_config["auto.offset.reset"] = "earliest"
305+
306+
consumer = Consumer(consumer_config)
307+
consumer.subscribe([testenv["kafka_topic"]])
308+
consumer.consume(num_messages=1, timeout=60)
309+
consumer.close()
310+
311+
spans = self.recorder.queued_spans()
312+
assert len(spans) == 3
313+
314+
filtered_spans = agent.filter_spans(spans)
315+
assert len(filtered_spans) == 1

0 commit comments

Comments
 (0)