Skip to content

Commit

Permalink
Add support for span events
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Costa <[email protected]>
  • Loading branch information
marcotc committed Oct 31, 2024
1 parent a4891c7 commit e2e8837
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 10 deletions.
1 change: 1 addition & 0 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ async def handle_info(self, request: Request) -> web.Response:
"client_drop_p0s": True,
# Just a random selection of some peer_tags to aggregate on for testing, not exhaustive
"peer_tags": ["db.name", "mongodb.db", "messaging.system"],
"span_events": True, # Advertise support for the top-level Span field for Span Events
},
headers={"Datadog-Agent-State": "03e868b3ecdd62a91423cc4c3917d0d151fb9fa486736911ab7f5a0750c63824"},
)
Expand Down
65 changes: 64 additions & 1 deletion ddapm_test_agent/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class SpanLink(TypedDict):
flags: NotRequired[Optional[int]]


class SpanEvent(TypedDict):
time_unix_nano: int
name: str
attributes: NotRequired[Dict[str, Any]]


class Span(TypedDict):
name: str
span_id: SpanId
Expand All @@ -71,6 +77,7 @@ class Span(TypedDict):
meta: NotRequired[Dict[str, str]]
metrics: NotRequired[Dict[str, MetricType]]
span_links: NotRequired[List[SpanLink]]
span_events: NotRequired[List[SpanEvent]]
meta_struct: NotRequired[Dict[str, Dict[str, Any]]]


Expand All @@ -88,9 +95,12 @@ class Span(TypedDict):
"meta",
"metrics",
"span_links",
"span_events",
"meta_struct",
]
TopLevelSpanValue = Union[None, SpanId, TraceId, int, str, Dict[str, str], Dict[str, MetricType], List[SpanLink]]
TopLevelSpanValue = Union[
None, SpanId, TraceId, int, str, Dict[str, str], Dict[str, MetricType], List[SpanLink], List[SpanEvent]
]
Trace = List[Span]
v04TracePayload = List[List[Span]]
TraceMap = OrderedDict[int, Trace]
Expand Down Expand Up @@ -176,6 +186,32 @@ def verify_span(d: Any) -> Span:
assert isinstance(link["flags"], int), "Expected flags to be of type: 'int', got: " + str(
type(link["flags"])
)
if "span_events" in d:
assert isinstance(d["span_events"], list)
for event in d["span_events"]:
assert isinstance(event, dict), f"Expected all span_events to be of type: 'dict', got: {type(event)}"
required_attrs = ["time_unix_nano", "name"]
for attr in required_attrs:
assert attr in event, f"'{attr}' required in span event"
assert isinstance(
event["time_unix_nano"], int
), "Expected 'time_unix_nano' to be of type: 'int', got: " + str(type(event["time_unix_nano"]))
assert isinstance(event["name"], str), "Expected 'name' to be of type: 'str', got: " + str(
type(event["name"])
)
if "attributes" in event:
assert isinstance(event["attributes"], dict)
for k, v in event["attributes"].items():
assert isinstance(k, str), f"Expected key 'attributes.{k}' to be of type: 'str', got: {type(k)}"
assert isinstance(
v, (str, int, float, bool, list)
), f"Expected value of key 'attributes.{k}' to be of type: 'str/int/float/bool/list', got: {type(v)}"
if isinstance(v, list) and v: # Check if list is homogeneous
first_type = type(v[0])
i = None
assert all(
isinstance(i, first_type) for i in v
), f"Expected all elements in list to be of the same type: '{first_type}', got: {type(i)}"
return cast(Span, d)
except AssertionError as e:
raise TypeError(*e.args) from e
Expand Down Expand Up @@ -304,17 +340,32 @@ def copy_span_links(s: SpanLink) -> SpanLink:
return copy


def copy_span_events(s: SpanEvent) -> SpanEvent:
attributes = s["attributes"].copy() if "attributes" in s else None
copy = s.copy()
if attributes is not None:
# Copy arrays inside attributes
for k, v in attributes.items():
if isinstance(v, list):
attributes[k] = v.copy()
copy["attributes"] = attributes
return copy


def copy_span(s: Span) -> Span:
meta = s["meta"].copy() if "meta" in s else None
metrics = s["metrics"].copy() if "metrics" in s else None
links = s["span_links"].copy() if "span_links" in s else None
events = s["span_events"].copy() if "span_events" in s else None
copy = s.copy()
if meta is not None:
copy["meta"] = meta
if metrics is not None:
copy["metrics"] = metrics
if links is not None:
copy["span_links"] = [copy_span_links(link) for link in links]
if events is not None:
copy["span_events"] = [copy_span_events(event) for event in events]
return copy


Expand Down Expand Up @@ -379,6 +430,18 @@ def add_span_link(
return s


def add_span_event(
s: Span, time_unix_nano: int = 1730405656000000000, name: str = "event", attributes: Optional[Dict[str, Any]] = None
) -> Span:
if "span_events" not in s:
s["span_events"] = []
new_event = SpanEvent(time_unix_nano=time_unix_nano, name=name)
if attributes is not None:
new_event["attributes"] = attributes
s["span_events"].append(new_event)
return s


def _trace_decoder_flexible(json_string: bytes) -> Dict[str, Any]:
"""Parse Trace JSON and accounts for meta that may contain numbers such as ports. Converts these meta correctly to strings.
Also ensures that any valid integers/floats are correctly parsed, to prevent ids from being decoded as strings incorrectly.
Expand Down
92 changes: 83 additions & 9 deletions ddapm_test_agent/trace_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
log = logging.getLogger(__name__)


DEFAULT_SNAPSHOT_IGNORES = "span_id,trace_id,parent_id,duration,start,metrics.system.pid,metrics.system.process_id,metrics.process_id,metrics._dd.tracer_kr,meta.runtime-id,span_links.trace_id_high,meta.pathway.hash,meta._dd.p.tid"
DEFAULT_SNAPSHOT_IGNORES = "span_id,trace_id,parent_id,duration,start,metrics.system.pid,metrics.system.process_id,metrics.process_id,metrics._dd.tracer_kr,meta.runtime-id,span_links.trace_id_high,span_events.time_unix_nano,meta.pathway.hash,meta._dd.p.tid"


def _key_match(d1: Dict[str, Any], d2: Dict[str, Any], key: str) -> bool:
Expand Down Expand Up @@ -198,13 +198,13 @@ def _diff_spans(
results = []
s1_no_tags = cast(
Dict[str, TopLevelSpanValue],
{k: v for k, v in s1.items() if k not in ("meta", "metrics", "span_links")},
{k: v for k, v in s1.items() if k not in ("meta", "metrics", "span_links", "span_events")},
)
s2_no_tags = cast(
Dict[str, TopLevelSpanValue],
{k: v for k, v in s2.items() if k not in ("meta", "metrics", "span_links")},
{k: v for k, v in s2.items() if k not in ("meta", "metrics", "span_links", "span_events")},
)
for d1, d2, ignored in [
for d1, d2, ign in [
(s1_no_tags, s2_no_tags, ignored),
(s1["meta"], s2["meta"], set(i[5:] for i in ignored if i.startswith("meta."))),
(
Expand All @@ -216,7 +216,7 @@ def _diff_spans(
d1 = cast(Dict[str, Any], d1)
d2 = cast(Dict[str, Any], d2)
diffs = []
for k in (set(d1.keys()) | set(d2.keys())) - ignored:
for k in (set(d1.keys()) | set(d2.keys())) - ign:
if not _key_match(d1, d2, k):
diffs.append(k)
results.append(diffs)
Expand All @@ -235,7 +235,7 @@ def _diff_spans(
{k: v for k, v in l2.items() if k != "attributes"},
)
link_diff = []
for d1, d2, ignored in [
for d1, d2, ign in [
(l1_no_tags, l2_no_tags, set(i[11:] for i in ignored if i.startswith("span_links."))),
(
l1.get("attributes") or {},
Expand All @@ -246,14 +246,47 @@ def _diff_spans(
d1 = cast(Dict[str, Any], d1)
d2 = cast(Dict[str, Any], d2)
diffs = []
for k in (set(d1.keys()) | set(d2.keys())) - ignored:
for k in (set(d1.keys()) | set(d2.keys())) - ign:
if not _key_match(d1, d2, k):
diffs.append(k)
link_diff.append(diffs)

link_diffs.append(link_diff)
results.append(link_diffs) # type: ignore

event_diffs = []
if len(s1.get("span_events") or []) != len(s2.get("span_events") or []):
results[0].append("span_events")
else:
for e1, e2 in zip(s1.get("span_events") or [], s2.get("span_events") or []):
l1_no_tags = cast(
Dict[str, TopLevelSpanValue],
{k: v for k, v in e1.items() if k != "attributes"},
)
l2_no_tags = cast(
Dict[str, TopLevelSpanValue],
{k: v for k, v in e2.items() if k != "attributes"},
)
event_diff = []
for d1, d2, ign in [
(l1_no_tags, l2_no_tags, set(i[12:] for i in ignored if i.startswith("span_events."))),
(
e1.get("attributes") or {},
e2.get("attributes") or {},
set(i[23:] for i in ignored if i.startswith("span_events.attributes.")),
),
]:
d1 = cast(Dict[str, Any], d1)
d2 = cast(Dict[str, Any], d2)
diffs = []
for k in (set(d1.keys()) | set(d2.keys())) - ign:
if not _key_match(d1, d2, k):
diffs.append(k)
event_diff.append(diffs)

event_diffs.append(event_diff)
results.append(event_diffs) # type: ignore

return cast(Tuple[List[str], List[str], List[str], List[Tuple[List[str], List[str]]]], tuple(results))


Expand All @@ -279,7 +312,9 @@ def _compare_traces(expected: Trace, received: Trace, ignored: Set[str]) -> None
) as frame:
frame.add_item(f"Expected span:\n{pprint.pformat(s_exp)}")
frame.add_item(f"Received span:\n{pprint.pformat(s_rec)}")
top_level_diffs, meta_diffs, metrics_diffs, span_link_diffs = _diff_spans(s_exp, s_rec, ignored)
top_level_diffs, meta_diffs, metrics_diffs, span_link_diffs, span_event_diffs = _diff_spans(
s_exp, s_rec, ignored
)

for diffs, diff_type, d_exp, d_rec in [
(top_level_diffs, "span", s_exp, s_rec),
Expand All @@ -295,7 +330,7 @@ def _compare_traces(expected: Trace, received: Trace, ignored: Set[str]) -> None
raise AssertionError(
f"Span{' ' + diff_type if diff_type != 'span' else ''} value '{diff_key}' in expected span but is not in the received span."
)
elif diff_key == "span_links":
elif diff_key in ["span_links", "span_events"]:
raise AssertionError(
f"{diff_type} mismatch on '{diff_key}': got {len(d_rec[diff_key])} values for {diff_key} which does not match expected {len(d_exp[diff_key])}." # type: ignore
)
Expand Down Expand Up @@ -328,6 +363,30 @@ def _compare_traces(expected: Trace, received: Trace, ignored: Set[str]) -> None
f"Span link {diff_type} mismatch on '{diff_key}': got '{d_rec[diff_key]}' which does not match expected '{d_exp[diff_key]}'."
)

for i, (event_level_diffs, attribute_diffs) in enumerate(span_event_diffs):
for diffs, diff_type, d_exp, d_rec in [
(event_level_diffs, f"{i}", s_exp["span_events"][i], s_rec["span_events"][i]),
(
attribute_diffs,
f"{i} attributes",
s_exp["span_events"][i].get("attributes") or {},
s_rec["span_events"][i].get("attributes") or {},
),
]:
for diff_key in diffs:
if diff_key not in d_exp:
raise AssertionError(
f"Span event {diff_type} value '{diff_key}' in received span event but is not in the expected span event."
)
elif diff_key not in d_rec:
raise AssertionError(
f"Span event {diff_type} value '{diff_key}' in expected span event but is not in the received span event."
)
else:
raise AssertionError(
f"Span event {diff_type} mismatch on '{diff_key}': got '{d_rec[diff_key]}' which does not match expected '{d_exp[diff_key]}'."
)


class SnapshotFailure(Exception):
pass
Expand Down Expand Up @@ -367,6 +426,7 @@ def _ordered_span(s: Span) -> OrderedDictType[str, TopLevelSpanValue]:
"meta",
"metrics",
"span_links",
"span_events",
]
for k in order:
if k in s:
Expand All @@ -385,6 +445,11 @@ def _ordered_span(s: Span) -> OrderedDictType[str, TopLevelSpanValue]:
if "attributes" in link:
link["attributes"] = OrderedDict(sorted(link["attributes"].items(), key=operator.itemgetter(0)))

if "span_events" in d:
for event in d["span_events"]:
if "attributes" in event:
event["attributes"] = OrderedDict(sorted(event["attributes"].items(), key=operator.itemgetter(0)))

for k in ["meta", "metrics"]:
if k in d and len(d[k]) == 0:
del d[k]
Expand Down Expand Up @@ -414,6 +479,15 @@ def _snapshot_trace_str(trace: Trace, removed: Optional[List[str]] = None) -> st
if "span_links" in span:
for link in span["span_links"]:
link.pop(key[11:], None) # type: ignore
elif key.startswith("span_events.attributes."):
if "span_events" in span:
for event in span["span_events"]:
if "attributes" in event:
event["attributes"].pop(key[23:], None)
elif key.startswith("span_events."):
if "span_events" in span:
for event in span["span_events"]:
event.pop(key[12:], None)
else:
span.pop(key, None) # type: ignore

Expand Down
3 changes: 3 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ async def test_info(agent):
"endpoints": [
"/v0.4/traces",
"/v0.5/traces",
"/v0.7/traces",
"/v0.6/stats",
"/telemetry/proxy/",
"/v0.7/config",
Expand All @@ -94,6 +95,8 @@ async def test_info(agent):
"feature_flags": [],
"config": {},
"client_drop_p0s": True,
"peer_tags": ["db.name", "mongodb.db", "messaging.system"],
"span_events": True,
}


Expand Down
41 changes: 41 additions & 0 deletions tests/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ddapm_test_agent import trace_snapshot
from ddapm_test_agent import tracestats_snapshot
from ddapm_test_agent.trace import add_span_event
from ddapm_test_agent.trace import add_span_link
from ddapm_test_agent.trace import copy_span
from ddapm_test_agent.trace import set_attr
Expand Down Expand Up @@ -667,6 +668,34 @@ async def test_removed_attributes_metrics(agent, tmp_path, snapshot_removed_attr
"",
{"start"},
),
# Mismatching span events count
(
[TWO_SPAN_TRACE_NO_START],
[[TWO_SPAN_TRACE_NO_START[0], add_span_event(copy_span(TWO_SPAN_TRACE_NO_START[1]))]],
"Span value 'span_events' in received span but is not in the expected span.",
{"start"},
),
# Mismatching span event name
(
[[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), name="expected_name")]],
[[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), name="got_name")]],
"Span event 0 mismatch on 'name': got 'got_name' which does not match expected 'expected_name'.",
{"start"},
),
# Mismatching span event attributes
(
[[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), attributes={"a": "1", "b": "2"})]],
[[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), attributes={"a": "1", "b": "0"})]],
"Span event 0 attributes mismatch on 'b': got '0' which does not match expected '2'.",
{"start"},
),
# Matching span event
(
[[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), attributes={"a": "1", "b": 2, "c": [3]})]],
[[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), attributes={"a": "1", "b": 2, "c": [3]})]],
"",
{"start"},
),
# Default ignored fields
(
[
Expand All @@ -682,6 +711,12 @@ async def test_removed_attributes_metrics(agent, tmp_path, snapshot_removed_attr
"error": 0,
"meta": {},
"metrics": {},
"span_events": [
{
"time_unix_nano": 123,
"name": "event_name",
},
],
}
]
],
Expand All @@ -698,6 +733,12 @@ async def test_removed_attributes_metrics(agent, tmp_path, snapshot_removed_attr
"error": 0,
"meta": {},
"metrics": {},
"span_events": [
{
"time_unix_nano": 456,
"name": "event_name",
},
],
}
]
],
Expand Down
Loading

0 comments on commit e2e8837

Please sign in to comment.