Skip to content

Commit

Permalink
chore(telemetry): fix flaky tests (#10721)
Browse files Browse the repository at this point in the history
## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: erikayasuda <[email protected]>
  • Loading branch information
mabdinur and erikayasuda authored Oct 1, 2024
1 parent a210c90 commit 32a2f72
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 179 deletions.
18 changes: 18 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,24 @@ def get_events(self, event_type=None, filter_heartbeats=True):
requests = self.get_requests(event_type, filter_heartbeats)
return [req["body"] for req in requests]

def get_metrics(self, name=None):
metrics = []
for event in self.get_events("generate-metrics"):
for series in event["payload"]["series"]:
if name is None or series["metric"] == name:
metrics.append(series)
metrics.sort(key=lambda x: (x["metric"], x["tags"]), reverse=False)
return metrics

def get_dependencies(self, name=None):
deps = []
for event in self.get_events("app-dependencies-loaded"):
for dep in event["payload"]["dependencies"]:
if name is None or dep["name"] == name:
deps.append(dep)
deps.sort(key=lambda x: x["name"], reverse=False)
return deps


@pytest.fixture
def test_agent_session(telemetry_writer, request):
Expand Down
7 changes: 6 additions & 1 deletion tests/telemetry/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from ddtrace.internal.telemetry.data import get_application
from ddtrace.internal.telemetry.data import get_host_info
from ddtrace.internal.telemetry.data import get_hostname
from ddtrace.internal.telemetry.data import update_imported_dependencies


def test_get_application():
Expand Down Expand Up @@ -173,7 +172,10 @@ def test_get_container_id_when_container_does_not_exists():
assert _get_container_id() == ""


@pytest.mark.subprocess
def test_update_imported_dependencies_both_empty():
from ddtrace.internal.telemetry.data import update_imported_dependencies

already_imported = {}
new_modules = []
res = update_imported_dependencies(already_imported, new_modules)
Expand All @@ -182,9 +184,12 @@ def test_update_imported_dependencies_both_empty():
assert new_modules == []


@pytest.mark.subprocess
def test_update_imported_dependencies():
import xmltodict

from ddtrace.internal.telemetry.data import update_imported_dependencies

already_imported = {}
res = update_imported_dependencies(already_imported, [xmltodict.__name__])
assert len(res) == 1
Expand Down
76 changes: 15 additions & 61 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,6 @@ def test_enable(test_agent_session, run_python_code_in_subprocess):
assert stderr == b""


@pytest.mark.snapshot
def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run_python_code_in_subprocess):
"""assert telemetry events are generated after the first trace is flushed to the agent"""

# Submit a trace to the agent in a subprocess
code = """
from ddtrace import tracer
span = tracer.trace("test-telemetry")
span.finish()
"""
env = os.environ.copy()
env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true"
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr
assert stderr == b""
# Ensure telemetry events were sent to the agent (snapshot ensures one trace was generated)
# Note event order is reversed e.g. event[0] is actually the last event
events = test_agent_session.get_events()

assert len(events) == 5
assert events[0]["request_type"] == "app-closing"
assert events[1]["request_type"] == "app-dependencies-loaded"
assert events[2]["request_type"] == "app-integrations-change"
assert events[3]["request_type"] == "generate-metrics"
assert events[4]["request_type"] == "app-started"


def test_enable_fork(test_agent_session, run_python_code_in_subprocess):
"""assert app-started/app-closing events are only sent in parent process"""
code = """
Expand Down Expand Up @@ -183,9 +155,6 @@ def test_app_started_error_handled_exception(test_agent_session, run_python_code
from ddtrace import tracer
from ddtrace.filters import TraceFilter
from ddtrace.settings import _config
_config._telemetry_dependency_collection = False
class FailingFilture(TraceFilter):
def process_trace(self, trace):
Expand All @@ -197,10 +166,10 @@ def process_trace(self, trace):
}
)
# generate and encode span
# generate and encode span to trigger sampling failure
tracer.trace("hello").finish()
# force app_started call instead of waiting for periodic()
# force app_started event (instead of waiting for 10 seconds)
from ddtrace.internal.telemetry import telemetry_writer
telemetry_writer._app_started()
"""
Expand Down Expand Up @@ -273,9 +242,6 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
from ddtrace import patch, tracer
patch(raise_errors=False, sqlite3=True)
# Create a span to start the telemetry writer
tracer.trace("hi").finish()
"""

env = os.environ.copy()
Expand All @@ -294,19 +260,12 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
)

# Get metric containing the integration error
integration_error = {}
metric_events = test_agent_session.get_events("generate-metrics")
for event in metric_events:
for metric in event["payload"]["series"]:
if metric["metric"] == "integration_errors":
integration_error = metric
break

integration_error = test_agent_session.get_metrics("integration_errors")
# assert the integration metric has the correct type, count, and tags
assert integration_error
assert integration_error["type"] == "count"
assert integration_error["points"][0][1] == 1
assert integration_error["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"]
assert len(integration_error) == 1
assert integration_error[0]["type"] == "count"
assert integration_error[0]["points"][0][1] == 1
assert integration_error[0]["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"]


def test_unhandled_integration_error(test_agent_session, ddtrace_run_python_code_in_subprocess):
Expand Down Expand Up @@ -350,16 +309,13 @@ def test_unhandled_integration_error(test_agent_session, ddtrace_run_python_code
assert "ddtrace/contrib/internal/flask/patch.py:" in flask_integration["error"]
assert "not enough values to unpack (expected 2, got 0)" in flask_integration["error"]

metric_events = [event for event in events if event["request_type"] == "generate-metrics"]

assert len(metric_events) == 1
assert metric_events[0]["payload"]["namespace"] == "tracers"
assert len(metric_events[0]["payload"]["series"]) == 1
assert metric_events[0]["payload"]["series"][0]["metric"] == "integration_errors"
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1
assert metric_events[0]["payload"]["series"][0]["tags"] == ["integration_name:flask", "error_type:valueerror"]
error_metrics = test_agent_session.get_metrics("integration_errors")
assert len(error_metrics) == 1
error_metric = error_metrics[0]
assert error_metric["type"] == "count"
assert len(error_metric["points"]) == 1
assert error_metric["points"][0][1] == 1
assert error_metric["tags"] == ["integration_name:flask", "error_type:valueerror"]


def test_app_started_with_install_metrics(test_agent_session, run_python_code_in_subprocess):
Expand All @@ -373,7 +329,7 @@ def test_app_started_with_install_metrics(test_agent_session, run_python_code_in
}
)
# Generate a trace to trigger app-started event
_, stderr, status, _ = run_python_code_in_subprocess("import ddtrace; ddtrace.tracer.trace('s1').finish()", env=env)
_, stderr, status, _ = run_python_code_in_subprocess("import ddtrace", env=env)
assert status == 0, stderr

app_started_event = test_agent_session.get_events("app-started")
Expand All @@ -392,8 +348,6 @@ def test_instrumentation_telemetry_disabled(test_agent_session, run_python_code_

code = """
from ddtrace import tracer
# Create a span to start the telemetry writer
tracer.trace("hi").finish()
# We want to import the telemetry module even when telemetry is disabled.
import sys
Expand Down
42 changes: 13 additions & 29 deletions tests/telemetry/test_telemetry_metrics_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def parse_payload(data):

def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session):
token = "tests.telemetry.test_telemetry_metrics_e2e.test_telemetry_metrics_enabled_on_gunicorn_child_process"
initial_event_count = len(test_agent_session.get_events("generate-metrics"))
with gunicorn_server(telemetry_metrics_enabled="true", token=token) as context:
_, gunicorn_client = context

Expand All @@ -86,11 +85,12 @@ def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session)
response = gunicorn_client.get("/count_metric")
assert response.status_code == 200

metrics = test_agent_session.get_events("generate-metrics")
assert len(metrics) > initial_event_count
assert len(metrics) == 1
assert metrics[0]["payload"]["series"][0]["metric"] == "test_metric"
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 5
# Ensure /count_metric was called 5 times (these counts could be sent in different payloads)
metrics = test_agent_session.get_metrics("test_metric")
count = 0
for metric in metrics:
count += metric["points"][0][1]
assert count == 5


def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_run_python_code_in_subprocess):
Expand All @@ -104,14 +104,13 @@ def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_
env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true"
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr
metrics_events = test_agent_session.get_events("generate-metrics")
metrics_sc = get_metrics_from_events("spans_created", metrics_events)
metrics_sc = test_agent_session.get_metrics("spans_created")
assert len(metrics_sc) == 1
assert metrics_sc[0]["metric"] == "spans_created"
assert metrics_sc[0]["tags"] == ["integration_name:datadog"]
assert metrics_sc[0]["points"][0][1] == 10

metrics_sf = get_metrics_from_events("spans_finished", metrics_events)
metrics_sf = test_agent_session.get_metrics("spans_finished")
assert len(metrics_sf) == 1
assert metrics_sf[0]["metric"] == "spans_finished"
assert metrics_sf[0]["tags"] == ["integration_name:datadog"]
Expand All @@ -133,15 +132,13 @@ def test_span_creation_and_finished_metrics_otel(test_agent_session, ddtrace_run
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr

metrics_events = test_agent_session.get_events("generate-metrics")

metrics_sc = get_metrics_from_events("spans_created", metrics_events)
metrics_sc = test_agent_session.get_metrics("spans_created")
assert len(metrics_sc) == 1
assert metrics_sc[0]["metric"] == "spans_created"
assert metrics_sc[0]["tags"] == ["integration_name:otel"]
assert metrics_sc[0]["points"][0][1] == 9

metrics_sf = get_metrics_from_events("spans_finished", metrics_events)
metrics_sf = test_agent_session.get_metrics("spans_finished")
assert len(metrics_sf) == 1
assert metrics_sf[0]["metric"] == "spans_finished"
assert metrics_sf[0]["tags"] == ["integration_name:otel"]
Expand All @@ -163,15 +160,13 @@ def test_span_creation_and_finished_metrics_opentracing(test_agent_session, ddtr
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr

metrics_events = test_agent_session.get_events("generate-metrics")

metrics_sc = get_metrics_from_events("spans_created", metrics_events)
metrics_sc = test_agent_session.get_metrics("spans_created")
assert len(metrics_sc) == 1
assert metrics_sc[0]["metric"] == "spans_created"
assert metrics_sc[0]["tags"] == ["integration_name:opentracing"]
assert metrics_sc[0]["points"][0][1] == 2

metrics_sf = get_metrics_from_events("spans_finished", metrics_events)
metrics_sf = test_agent_session.get_metrics("spans_finished")
assert len(metrics_sf) == 1
assert metrics_sf[0]["metric"] == "spans_finished"
assert metrics_sf[0]["tags"] == ["integration_name:opentracing"]
Expand Down Expand Up @@ -202,8 +197,7 @@ def test_span_creation_no_finish(test_agent_session, ddtrace_run_python_code_in_
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr

metrics_events = test_agent_session.get_events("generate-metrics")
metrics = get_metrics_from_events("spans_created", metrics_events)
metrics = test_agent_session.get_metrics("spans_created")
assert len(metrics) == 3

assert metrics[0]["metric"] == "spans_created"
Expand All @@ -215,13 +209,3 @@ def test_span_creation_no_finish(test_agent_session, ddtrace_run_python_code_in_
assert metrics[2]["metric"] == "spans_created"
assert metrics[2]["tags"] == ["integration_name:otel"]
assert metrics[2]["points"][0][1] == 4


def get_metrics_from_events(name, events):
metrics = []
for event in events:
for series in event["payload"]["series"]:
if series["metric"] == name:
metrics.append(series)
metrics.sort(key=lambda x: (x["metric"], x["tags"]), reverse=False)
return metrics
Loading

0 comments on commit 32a2f72

Please sign in to comment.