Skip to content

Commit c09ab07

Browse files
committed
feat: added span filtering for kafka-python module
Signed-off-by: Cagri Yonca <[email protected]>
1 parent ce3f3d3 commit c09ab07

16 files changed

+466
-143
lines changed

pyproject.toml

+9-5
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,15 @@ string = "instana:load"
5959

6060
[project.optional-dependencies]
6161
dev = [
62-
"pytest",
63-
"pytest-cov",
64-
"pytest-mock",
65-
"pre-commit>=3.0.0",
66-
"ruff"
62+
"pytest",
63+
"pytest-cov",
64+
"pytest-mock",
65+
"pre-commit>=3.0.0",
66+
"ruff",
67+
"pyyaml>=6.0.2",
68+
]
69+
yaml = [
70+
"pyyaml>=6.0.2",
6771
]
6872

6973
[project.urls]

src/instana/agent/host.py

+30-12
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from instana.options import StandardOptions
2323
from instana.util import to_json
2424
from instana.util.runtime import get_py_source
25-
from instana.util.span_utils import get_operation_specifier
25+
from instana.util.span_utils import get_operation_specifiers
2626
from instana.version import VERSION
2727

2828

@@ -351,13 +351,18 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
351351
Filters given span list using ignore-endpoint variable and returns the list of filtered spans.
352352
"""
353353
filtered_spans = []
354+
endpoint = ""
354355
for span in spans:
355356
if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"):
356357
service = span.n
357-
operation_specifier = get_operation_specifier(service)
358-
endpoint = span.data[service][operation_specifier]
359-
if isinstance(endpoint, str) and self.__is_service_or_endpoint_ignored(
360-
service, endpoint
358+
operation_specifier_key, service_specifier_key = (
359+
get_operation_specifiers(service)
360+
)
361+
if service == "kafka":
362+
endpoint = span.data[service][service_specifier_key]
363+
method = span.data[service][operation_specifier_key]
364+
if isinstance(method, str) and self.__is_endpoint_ignored(
365+
service, method, endpoint
361366
):
362367
continue
363368
else:
@@ -366,15 +371,28 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
366371
filtered_spans.append(span)
367372
return filtered_spans
368373

369-
def __is_service_or_endpoint_ignored(
370-
self, service: str, endpoint: str = ""
374+
def __is_endpoint_ignored(
375+
self,
376+
service: str,
377+
method: str = "",
378+
endpoint: str = "",
371379
) -> bool:
372380
"""Check if the given service and endpoint combination should be ignored."""
373-
374-
return (
375-
service.lower() in self.options.ignore_endpoints
376-
or f"{service.lower()}.{endpoint.lower()}" in self.options.ignore_endpoints
377-
)
381+
service = service.lower()
382+
method = method.lower()
383+
endpoint = endpoint.lower()
384+
filter_rules = [
385+
f"{service}.{method}", # service.method
386+
f"{service}.*", # service.*
387+
]
388+
389+
if service == "kafka" and endpoint:
390+
filter_rules += [
391+
f"{service}.{method}.{endpoint}", # service.method.endpoint
392+
f"{service}.*.{endpoint}", # service.*.endpoint
393+
f"{service}.{method}.*", # service.method.*
394+
]
395+
return any(rule in self.options.ignore_endpoints for rule in filter_rules)
378396

379397
def handle_agent_tasks(self, task: Dict[str, Any]) -> None:
380398
"""

src/instana/options.py

+17-9
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from typing import Any, Dict
2020

2121
from instana.log import logger
22-
from instana.util.config import parse_ignored_endpoints
22+
from instana.util.config import (
23+
parse_ignored_endpoints,
24+
parse_ignored_endpoints_from_yaml,
25+
)
2326
from instana.util.runtime import determine_service_name
2427
from instana.configurator import config
2528

@@ -44,18 +47,23 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
4447
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
4548
)
4649

47-
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
48-
self.ignore_endpoints = parse_ignored_endpoints(
49-
os.environ["INSTANA_IGNORE_ENDPOINTS"]
50+
if "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ:
51+
self.ignore_endpoints = parse_ignored_endpoints_from_yaml(
52+
os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"]
5053
)
5154
else:
52-
if (
53-
isinstance(config.get("tracing"), dict)
54-
and "ignore_endpoints" in config["tracing"]
55-
):
55+
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
5656
self.ignore_endpoints = parse_ignored_endpoints(
57-
config["tracing"]["ignore_endpoints"],
57+
os.environ["INSTANA_IGNORE_ENDPOINTS"]
5858
)
59+
else:
60+
if (
61+
isinstance(config.get("tracing"), dict)
62+
and "ignore_endpoints" in config["tracing"]
63+
):
64+
self.ignore_endpoints = parse_ignored_endpoints(
65+
config["tracing"]["ignore_endpoints"],
66+
)
5967

6068
if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1":
6169
self.allow_exit_as_root = True

src/instana/util/config.py

+76-19
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
import itertools
12
from typing import Any, Dict, List, Union
3+
4+
import yaml
5+
26
from instana.log import logger
37

48

@@ -7,19 +11,19 @@ def parse_service_pair(pair: str) -> List[str]:
711
Parses a pair string to prepare a list of ignored endpoints.
812
913
@param pair: String format:
10-
- "service1:endpoint1,endpoint2" or "service1:endpoint1" or "service1"
11-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
14+
- "service1:method1,method2" or "service1:method1" or "service1"
15+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
1216
"""
1317
pair_list = []
1418
if ":" in pair:
15-
service, endpoints = pair.split(":", 1)
19+
service, methods = pair.split(":", 1)
1620
service = service.strip()
17-
endpoint_list = [ep.strip() for ep in endpoints.split(",") if ep.strip()]
21+
method_list = [ep.strip() for ep in methods.split(",") if ep.strip()]
1822

19-
for endpoint in endpoint_list:
20-
pair_list.append(f"{service}.{endpoint}")
23+
for method in method_list:
24+
pair_list.append(f"{service}.{method}")
2125
else:
22-
pair_list.append(pair)
26+
pair_list.append(f"{pair}.*")
2327
return pair_list
2428

2529

@@ -28,8 +32,8 @@ def parse_ignored_endpoints_string(params: str) -> List[str]:
2832
Parses a string to prepare a list of ignored endpoints.
2933
3034
@param params: String format:
31-
- "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
32-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
35+
- "service1:method1,method2;service2:method3" or "service1;service2"
36+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
3337
"""
3438
ignore_endpoints = []
3539
if params:
@@ -46,18 +50,45 @@ def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]:
4650
Parses a dictionary to prepare a list of ignored endpoints.
4751
4852
@param params: Dict format:
49-
- {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
50-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
53+
- {"service1": ["method1", "method2"], "service2": ["method3"]}
54+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
5155
"""
5256
ignore_endpoints = []
5357

54-
for service, endpoints in params.items():
55-
if not endpoints: # filtering all service
56-
ignore_endpoints.append(service.lower())
58+
for service, methods in params.items():
59+
if not methods: # filtering all service
60+
ignore_endpoints.append(f"{service.lower()}.*")
5761
else: # filtering specific endpoints
58-
for endpoint in endpoints:
59-
ignore_endpoints.append(f"{service.lower()}.{endpoint.lower()}")
62+
ignore_endpoints = parse_endpoints_of_service(
63+
ignore_endpoints, service, methods
64+
)
65+
66+
return ignore_endpoints
67+
68+
69+
def parse_endpoints_of_service(
70+
ignore_endpoints: List[str],
71+
service: str,
72+
methods: Union[str, List[str]],
73+
) -> List[str]:
74+
"""
75+
Parses endpoints of each service.
6076
77+
@param ignore_endpoints: A list of rules for endpoints to be filtered.
78+
@param service: The name of the service to be filtered.
79+
@param methods: A list of specific endpoints of the service to be filtered.
80+
"""
81+
if service == "kafka" and isinstance(methods, list):
82+
for rule in methods:
83+
for method, endpoint in itertools.product(
84+
rule["methods"], rule["endpoints"]
85+
):
86+
ignore_endpoints.append(
87+
f"{service.lower()}.{method.lower()}.{endpoint.lower()}"
88+
)
89+
else:
90+
for method in methods:
91+
ignore_endpoints.append(f"{service.lower()}.{method.lower()}")
6192
return ignore_endpoints
6293

6394

@@ -66,9 +97,9 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
6697
Parses input to prepare a list for ignored endpoints.
6798
6899
@param params: Can be either:
69-
- String: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
70-
- Dict: {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
71-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
100+
- String: "service1:method1,method2;service2:method3" or "service1;service2"
101+
- Dict: {"service1": ["method1", "method2"], "service2": ["method3"]}
102+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
72103
"""
73104
try:
74105
if isinstance(params, str):
@@ -80,3 +111,29 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
80111
except Exception as e:
81112
logger.debug("Error parsing ignored endpoints: %s", str(e))
82113
return []
114+
115+
116+
def parse_ignored_endpoints_from_yaml(configuration: str) -> List[str]:
117+
"""
118+
Parses configuration yaml file and prepares a list of ignored endpoints.
119+
120+
@param configuration: Path of the file as a string
121+
@param is_yaml: True if the given configuration is yaml string. False if it's the path of the file.
122+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*", "kafka.method.topic", "kafka.*.topic", "kafka.method.*"]
123+
"""
124+
ignored_endpoints = []
125+
with open(configuration, "r") as configuration_file:
126+
yaml_configuration = yaml.safe_load(configuration_file)
127+
configuration_key = (
128+
"tracing" if "tracing" in yaml_configuration else "com.instana.tracing"
129+
)
130+
if (
131+
configuration_key in yaml_configuration
132+
and "ignore-endpoints" in yaml_configuration[configuration_key]
133+
):
134+
ignored_endpoints = parse_ignored_endpoints(
135+
yaml_configuration[configuration_key]["ignore-endpoints"]
136+
)
137+
if configuration_key == "com.instana.tracing":
138+
logger.debug('Please use "tracing" instead of "com.instana.tracing"')
139+
return ignored_endpoints

src/instana/util/span_utils.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
# (c) Copyright IBM Corp. 2025
22

3-
from typing import Optional
3+
from typing import Tuple
44

55

6-
def get_operation_specifier(span_name: str) -> Optional[str]:
6+
def get_operation_specifiers(span_name: str) -> Tuple[str, str]:
77
"""Get the specific operation specifier for the given span."""
8-
operation_specifier = ""
8+
operation_specifier_key = ""
9+
service_specifier_key = ""
910
if span_name == "redis":
10-
operation_specifier = "command"
11+
operation_specifier_key = "command"
1112
elif span_name == "dynamodb":
12-
operation_specifier = "op"
13-
return operation_specifier
13+
operation_specifier_key = "op"
14+
elif span_name == "kafka":
15+
operation_specifier_key = "access"
16+
service_specifier_key = "service"
17+
return operation_specifier_key, service_specifier_key

tests/agent/test_host.py

+10-20
Original file line numberDiff line numberDiff line change
@@ -692,31 +692,21 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None:
692692
assert "should_send_snapshot_data: True" in caplog.messages
693693

694694
def test_is_service_or_endpoint_ignored(self) -> None:
695-
self.agent.options.ignore_endpoints.append("service1")
696-
self.agent.options.ignore_endpoints.append("service2.endpoint1")
695+
self.agent.options.ignore_endpoints.append("service1.*")
696+
self.agent.options.ignore_endpoints.append("service2.method1")
697697

698698
# ignore all endpoints of service1
699-
assert self.agent._HostAgent__is_service_or_endpoint_ignored("service1")
700-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
701-
"service1", "endpoint1"
702-
)
703-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
704-
"service1", "endpoint2"
705-
)
699+
assert self.agent._HostAgent__is_endpoint_ignored("service1")
700+
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method1")
701+
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method2")
706702

707703
# case-insensitive
708-
assert self.agent._HostAgent__is_service_or_endpoint_ignored("SERVICE1")
709-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
710-
"service1", "ENDPOINT1"
711-
)
704+
assert self.agent._HostAgent__is_endpoint_ignored("SERVICE1")
705+
assert self.agent._HostAgent__is_endpoint_ignored("service1", "METHOD1")
712706

713707
# ignore only endpoint1 of service2
714-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
715-
"service2", "endpoint1"
716-
)
717-
assert not self.agent._HostAgent__is_service_or_endpoint_ignored(
718-
"service2", "endpoint2"
719-
)
708+
assert self.agent._HostAgent__is_endpoint_ignored("service2", "method1")
709+
assert not self.agent._HostAgent__is_endpoint_ignored("service2", "method2")
720710

721711
# don't ignore other services
722-
assert not self.agent._HostAgent__is_service_or_endpoint_ignored("service3")
712+
assert not self.agent._HostAgent__is_endpoint_ignored("service3")

tests/clients/boto3/test_boto3_dynamodb.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_ignore_dynamodb(self) -> None:
9494
assert dynamodb_span not in filtered_spans
9595

9696
def test_ignore_create_table(self) -> None:
97-
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb.createtable"
97+
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb:createtable"
9898
agent.options = StandardOptions()
9999

100100
with tracer.start_as_current_span("test"):

0 commit comments

Comments
 (0)