diff --git a/ddtrace/internal/ci_visibility/telemetry/test_management.py b/ddtrace/internal/ci_visibility/telemetry/test_management.py index a9bd1d84d54..2d7717f6279 100644 --- a/ddtrace/internal/ci_visibility/telemetry/test_management.py +++ b/ddtrace/internal/ci_visibility/telemetry/test_management.py @@ -9,11 +9,11 @@ class TEST_MANAGEMENT_TELEMETRY(str, Enum): - REQUEST = "test_management.request" - REQUEST_MS = "test_management.request_ms" - REQUEST_ERRORS = "test_management.request_errors" - RESPONSE_BYTES = "test_management.response_bytes" - RESPONSE_TESTS = "test_management.response_tests" + REQUEST = "test_management_tests.request" + REQUEST_MS = "test_management_tests.request_ms" + REQUEST_ERRORS = "test_management_tests.request_errors" + RESPONSE_BYTES = "test_management_tests.response_bytes" + RESPONSE_TESTS = "test_management_tests.response_tests" def record_test_management_tests_count(test_management_count: int): diff --git a/ddtrace/testing/internal/api_client.py b/ddtrace/testing/internal/api_client.py index 586623c0905..eb6a4826d87 100644 --- a/ddtrace/testing/internal/api_client.py +++ b/ddtrace/testing/internal/api_client.py @@ -12,6 +12,7 @@ from ddtrace.testing.internal.git import GitTag from ddtrace.testing.internal.http import BackendConnectorSetup from ddtrace.testing.internal.http import FileAttachment +from ddtrace.testing.internal.telemetry import TelemetryAPI from ddtrace.testing.internal.test_data import ITRSkippingLevel from ddtrace.testing.internal.test_data import ModuleRef from ddtrace.testing.internal.test_data import SuiteRef @@ -30,6 +31,7 @@ def __init__( itr_skipping_level: ITRSkippingLevel, configurations: t.Dict[str, str], connector_setup: BackendConnectorSetup, + telemetry_api: TelemetryAPI, ) -> None: self.service = service self.env = env @@ -37,11 +39,19 @@ def __init__( self.itr_skipping_level = itr_skipping_level self.configurations = configurations self.connector = connector_setup.get_connector_for_subdomain("api") + self.telemetry_api = telemetry_api def close(self) -> None: self.connector.close() def get_settings(self) -> Settings: + telemetry = self.telemetry_api.with_request_metric_names( + count="git_requests.settings", + duration="git_requests.settings_ms", + response_bytes=None, + error="git_requests.settings_errors", + ) + request_data = { "data": { "id": str(uuid.uuid4()), @@ -59,8 +69,11 @@ def get_settings(self) -> Settings: } try: - response, response_data = self.connector.post_json("/api/v2/libraries/tests/services/setting", request_data) - attributes = response_data["data"]["attributes"] + result = self.connector.post_json( + "/api/v2/libraries/tests/services/setting", request_data, telemetry=telemetry + ) + result.on_error_raise_exception() + attributes = result.parsed_response["data"]["attributes"] return Settings.from_attributes(attributes) except Exception: @@ -68,6 +81,13 @@ def get_settings(self) -> Settings: return Settings() def get_known_tests(self) -> t.Set[TestRef]: + telemetry = self.telemetry_api.with_request_metric_names( + count="known_tests.request", + duration="known_tests.request_ms", + response_bytes="known_tests.response_bytes", + error="known_tests.request_errors", + ) + request_data = { "data": { "id": str(uuid.uuid4()), @@ -82,8 +102,9 @@ def get_known_tests(self) -> t.Set[TestRef]: } try: - response, response_data = self.connector.post_json("/api/v2/ci/libraries/tests", request_data) - tests_data = response_data["data"]["attributes"]["tests"] + result = self.connector.post_json("/api/v2/ci/libraries/tests", request_data, telemetry=telemetry) + result.on_error_raise_exception() + tests_data = result.parsed_response["data"]["attributes"]["tests"] known_test_ids = set() for module, suites in tests_data.items(): @@ -100,6 +121,13 @@ def get_known_tests(self) -> t.Set[TestRef]: return set() def get_test_management_properties(self) -> t.Dict[TestRef, TestProperties]: + telemetry = self.telemetry_api.with_request_metric_names( + count="test_management_tests.request", + duration="test_management_tests.request_ms", + response_bytes="test_management_tests.response_bytes", + error="test_management_tests.request_errors", + ) + request_data = { "data": { "id": str(uuid.uuid4()), @@ -113,11 +141,12 @@ def get_test_management_properties(self) -> t.Dict[TestRef, TestProperties]: } try: - response, response_data = self.connector.post_json( - "/api/v2/test/libraries/test-management/tests", request_data + result = self.connector.post_json( + "/api/v2/test/libraries/test-management/tests", request_data, telemetry=telemetry ) + result.on_error_raise_exception() test_properties: t.Dict[TestRef, TestProperties] = {} - modules = response_data["data"]["attributes"]["modules"] + modules = result.parsed_response["data"]["attributes"]["modules"] for module_name, module_data in modules.items(): module_ref = ModuleRef(module_name) @@ -149,8 +178,9 @@ def get_known_commits(self, latest_commits: t.List[str]) -> t.List[str]: } try: - response, response_data = self.connector.post_json("/api/v2/git/repository/search_commits", request_data) - return [item["id"] for item in response_data["data"] if item["type"] == "commit"] + result = self.connector.post_json("/api/v2/git/repository/search_commits", request_data) + result.on_error_raise_exception() + return [item["id"] for item in result.parsed_response["data"] if item["type"] == "commit"] except Exception: log.exception("Failed to parse search_commits data") @@ -173,14 +203,21 @@ def send_git_pack_file(self, packfile: Path) -> None: name="packfile", filename=packfile.name, content_type="application/octet-stream", data=content ), ] - response, response_data = self.connector.post_files( - "/api/v2/git/repository/packfile", files=files, send_gzip=False - ) + try: + result = self.connector.post_files("/api/v2/git/repository/packfile", files=files, send_gzip=False) + result.on_error_raise_exception() - if response.status != 204: - log.warning("Failed to upload git pack data: %s %s", response.status, response_data) + except Exception: + log.warning("Failed to upload git pack data") def get_skippable_tests(self) -> t.Tuple[t.Set[t.Union[SuiteRef, TestRef]], t.Optional[str]]: + telemetry = self.telemetry_api.with_request_metric_names( + count="itr_skippable_tests.request", + duration="itr_skippable_tests.request_ms", + response_bytes="itr_skippable_tests.response_bytes", + error="itr_skippable_tests.request_errors", + ) + request_data = { "data": { "id": str(uuid.uuid4()), @@ -196,10 +233,12 @@ def get_skippable_tests(self) -> t.Tuple[t.Set[t.Union[SuiteRef, TestRef]], t.Op } } try: - response, response_data = self.connector.post_json("/api/v2/ci/tests/skippable", request_data) + result = self.connector.post_json("/api/v2/ci/tests/skippable", request_data, telemetry=telemetry) + result.on_error_raise_exception() + skippable_items: t.Set[t.Union[SuiteRef, TestRef]] = set() - for item in response_data["data"]: + for item in result.parsed_response["data"]: if item["type"] in ("test", "suite"): module_ref = ModuleRef(item["attributes"].get("configurations", {}).get("test.bundle", EMPTY_NAME)) suite_ref = SuiteRef(module_ref, item["attributes"].get("suite", EMPTY_NAME)) @@ -209,7 +248,7 @@ def get_skippable_tests(self) -> t.Tuple[t.Set[t.Union[SuiteRef, TestRef]], t.Op test_ref = TestRef(suite_ref, item["attributes"].get("name", EMPTY_NAME)) skippable_items.add(test_ref) - correlation_id = response_data["meta"]["correlation_id"] + correlation_id = result.parsed_response["meta"]["correlation_id"] return skippable_items, correlation_id diff --git a/ddtrace/testing/internal/http.py b/ddtrace/testing/internal/http.py index a9fc2190b8c..144a00effd1 100644 --- a/ddtrace/testing/internal/http.py +++ b/ddtrace/testing/internal/http.py @@ -8,6 +8,7 @@ import json import logging import os +import random import socket import threading import time @@ -21,13 +22,41 @@ from ddtrace.testing.internal.constants import DEFAULT_AGENT_SOCKET_FILE from ddtrace.testing.internal.constants import DEFAULT_SITE from ddtrace.testing.internal.errors import SetupError +from ddtrace.testing.internal.telemetry import ErrorType +from ddtrace.testing.internal.telemetry import TelemetryAPIRequestMetrics from ddtrace.testing.internal.utils import asbool DEFAULT_TIMEOUT_SECONDS = 15.0 +MAX_ATTEMPTS = 5 log = logging.getLogger(__name__) +T = t.TypeVar("T") + + +class BackendError(Exception): + pass + + +@dataclass +class BackendResult: + error_type: t.Optional[ErrorType] = None + error_description: t.Optional[str] = None + response: t.Optional[http.client.HTTPResponse] = None + response_length: t.Optional[int] = None + response_body: t.Optional[bytes] = None + parsed_response: t.Any = None + is_gzip_response: bool = False + elapsed_seconds: float = 0.0 + + def on_error_raise_exception(self) -> None: + if self.error_type: + raise BackendError(self.error_description) + + +RETRIABLE_ERRORS = {ErrorType.TIMEOUT, ErrorType.NETWORK, ErrorType.CODE_5XX, ErrorType.BAD_JSON} + class BackendConnectorSetup: """ @@ -93,17 +122,15 @@ def _detect_evp_proxy_setup(cls) -> BackendConnectorSetup: # Get info from agent to check if the agent is there, and which EVP proxy version it supports. try: connector = BackendConnector(agent_url) - response, response_data = connector.get_json("/info") - endpoints = response_data.get("endpoints", []) + result = connector.get_json("/info", max_attempts=2) connector.close() except Exception as e: raise SetupError(f"Error connecting to Datadog agent at {agent_url}: {e}") - if response.status != 200: - raise SetupError( - f"Error connecting to Datadog agent at {agent_url}: status {response.status}, " - f"response {response_data!r}" - ) + if result.error_type: + raise SetupError(f"Error connecting to Datadog agent at {agent_url}: {result.error_description}") + + endpoints = result.parsed_response.get("endpoints", []) if "/evp_proxy/v4/" in endpoints: return BackendConnectorEVPProxySetup(url=agent_url, base_path="/evp_proxy/v4", use_gzip=True) @@ -193,53 +220,154 @@ def _make_connection(self, parsed_url: ParseResult, timeout_seconds: float) -> h raise SetupError(f"Unknown scheme {parsed_url.scheme} in {parsed_url.geturl()}") - # TODO: handle retries - def request( + def _do_single_request( self, method: str, path: str, data: t.Optional[bytes] = None, headers: t.Optional[t.Dict[str, str]] = None, send_gzip: bool = False, - ) -> t.Tuple[http.client.HTTPResponse, bytes]: + is_json_response: bool = False, + ) -> BackendResult: full_headers = self.default_headers | (headers or {}) if send_gzip and self.use_gzip and data is not None: data = gzip.compress(data, compresslevel=6) full_headers["Content-Encoding"] = "gzip" - start_time = time.time() + result = BackendResult() + start_time = time.perf_counter() - self.conn.request(method, self.base_path + path, body=data, headers=full_headers) - - response = self.conn.getresponse() - if response.headers.get("Content-Encoding") == "gzip": - response_data = gzip.open(response).read() - else: - response_data = response.read() + try: + self.conn.request(method, self.base_path + path, body=data, headers=full_headers) + result.response = self.conn.getresponse() + result.response_length = int(result.response.headers.get("Content-Length") or "0") + result.is_gzip_response = result.response.headers.get("Content-Encoding") == "gzip" + if result.is_gzip_response: + result.response_body = response_body = gzip.open(result.response).read() + else: + result.response_body = response_body = result.response.read() + + if not (200 <= result.response.status <= 299): + result.error_description = f"{result.response.status} {result.response.reason}" + if result.response.status >= 500: + result.error_type = ErrorType.CODE_5XX + elif result.response.status >= 400: + result.error_type = ErrorType.CODE_4XX + else: + result.error_type = ErrorType.NETWORK + except (TimeoutError, socket.timeout) as e: + result.error_type = ErrorType.TIMEOUT + result.error_description = str(e) + except (ConnectionRefusedError, http.client.HTTPException) as e: + result.error_type = ErrorType.NETWORK + result.error_description = str(e) + except Exception as e: + result.error_type = ErrorType.UNKNOWN + result.error_description = str(e) + log.exception("Error requesting %s %s", method, path) + finally: + result.elapsed_seconds = time.perf_counter() - start_time + + if not result.error_type and is_json_response: + try: + result.parsed_response = json.loads(response_body) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + result.error_type = ErrorType.BAD_JSON + result.error_description = str(e) + except Exception as e: + log.exception("Error parsing respose for %s %s", method, path) + result.error_type = ErrorType.UNKNOWN + result.error_description = str(e) + + if result.error_type: + self.conn.close() # Clean up bad state, ensure subsequent requests start with a fresh connection. + + return result - elapsed_time = time.time() - start_time + def request( + self, + method: str, + path: str, + data: t.Optional[bytes] = None, + headers: t.Optional[t.Dict[str, str]] = None, + send_gzip: bool = False, + is_json_response: bool = False, + telemetry: t.Optional[TelemetryAPIRequestMetrics] = None, + max_attempts: int = MAX_ATTEMPTS, + ) -> BackendResult: + attempts_so_far = 0 + + while True: + attempts_so_far += 1 + result = self._do_single_request( + method=method, + path=path, + data=data, + headers=headers, + send_gzip=send_gzip, + is_json_response=is_json_response, + ) - log.debug("Request to %s %s took %.3f seconds", method, path, elapsed_time) - # log.debug("Request headers %s, data %s", full_headers, data) - # log.debug("Response status %s, data %s", response.status, response_data) + if telemetry: + telemetry.record_request( + seconds=result.elapsed_seconds, + response_bytes=result.response_length, + compressed_response=result.is_gzip_response, + error=result.error_type, + ) + + if result.error_type and result.error_type in RETRIABLE_ERRORS and attempts_so_far < max_attempts: + delay_seconds = random.uniform(0, (1.618 ** (attempts_so_far - 1))) # nosec: B311 + log.debug( + "Retrying %s %s in %.3f seconds (%d attempts so far)", method, path, delay_seconds, attempts_so_far + ) + time.sleep(delay_seconds) + else: + break - return response, response_data + return result - def get_json(self, path: str, headers: t.Optional[t.Dict[str, str]] = None, send_gzip: bool = False) -> t.Any: + def get_json( + self, + path: str, + headers: t.Optional[t.Dict[str, str]] = None, + send_gzip: bool = False, + telemetry: t.Optional[TelemetryAPIRequestMetrics] = None, + max_attempts: int = MAX_ATTEMPTS, + ) -> BackendResult: headers = {"Content-Type": "application/json"} | (headers or {}) - response, response_data = self.request("GET", path=path, headers=headers, send_gzip=send_gzip) - return response, json.loads(response_data) + return self.request( + "GET", + path=path, + headers=headers, + send_gzip=send_gzip, + is_json_response=True, + telemetry=telemetry, + max_attempts=max_attempts, + ) def post_json( - self, path: str, data: t.Any, headers: t.Optional[t.Dict[str, str]] = None, send_gzip: bool = False - ) -> t.Any: + self, + path: str, + data: t.Any, + headers: t.Optional[t.Dict[str, str]] = None, + send_gzip: bool = False, + telemetry: t.Optional[TelemetryAPIRequestMetrics] = None, + max_attempts: int = MAX_ATTEMPTS, + ) -> BackendResult: headers = {"Content-Type": "application/json"} | (headers or {}) encoded_data = json.dumps(data).encode("utf-8") - response, response_data = self.request( - "POST", path=path, data=encoded_data, headers=headers, send_gzip=send_gzip + return self.request( + "POST", + path=path, + data=encoded_data, + headers=headers, + send_gzip=send_gzip, + is_json_response=True, + telemetry=telemetry, + max_attempts=max_attempts, ) - return response, json.loads(response_data) def post_files( self, @@ -247,7 +375,9 @@ def post_files( files: t.List[FileAttachment], headers: t.Optional[t.Dict[str, str]] = None, send_gzip: bool = False, - ) -> t.Tuple[http.client.HTTPResponse, bytes]: + telemetry: t.Optional[TelemetryAPIRequestMetrics] = None, + max_attempts: int = MAX_ATTEMPTS, + ) -> BackendResult: boundary = uuid.uuid4().hex boundary_bytes = boundary.encode("utf-8") headers = {"Content-Type": f"multipart/form-data; boundary={boundary}"} | (headers or {}) @@ -266,7 +396,15 @@ def post_files( body.write(b"--%s--\r\n" % boundary_bytes) - return self.request("POST", path=path, data=body.getvalue(), headers=headers, send_gzip=send_gzip) + return self.request( + "POST", + path=path, + data=body.getvalue(), + headers=headers, + send_gzip=send_gzip, + telemetry=telemetry, + max_attempts=max_attempts, + ) @dataclass diff --git a/ddtrace/testing/internal/session_manager.py b/ddtrace/testing/internal/session_manager.py index eccde0cd7a1..6e7419a3b95 100644 --- a/ddtrace/testing/internal/session_manager.py +++ b/ddtrace/testing/internal/session_manager.py @@ -19,6 +19,7 @@ from ddtrace.testing.internal.retry_handlers import AutoTestRetriesHandler from ddtrace.testing.internal.retry_handlers import EarlyFlakeDetectionHandler from ddtrace.testing.internal.retry_handlers import RetryHandler +from ddtrace.testing.internal.telemetry import TelemetryAPI from ddtrace.testing.internal.test_data import ITRSkippingLevel from ddtrace.testing.internal.test_data import SuiteRef from ddtrace.testing.internal.test_data import Test @@ -64,6 +65,8 @@ def __init__(self, session: TestSession) -> None: self.connector_setup = BackendConnectorSetup.detect_setup() + self.telemetry_api = TelemetryAPI(connector_setup=self.connector_setup) + self.api_client = APIClient( service=self.service, env=self.env, @@ -71,6 +74,7 @@ def __init__(self, session: TestSession) -> None: itr_skipping_level=self.itr_skipping_level, configurations=self.platform_tags, connector_setup=self.connector_setup, + telemetry_api=self.telemetry_api, ) self.settings = self.api_client.get_settings() self.known_tests = self.api_client.get_known_tests() if self.settings.known_tests_enabled else set() @@ -155,9 +159,19 @@ def start(self) -> None: atexit.register(self.finish) def finish(self) -> None: + # Avoid being called again by atexit if we've already been called by the pytest plugin. atexit.unregister(self.finish) - self.writer.finish() - self.coverage_writer.finish() + + # Start writer shutdown in background, so both can do it at the same time. + self.writer.signal_finish() + self.coverage_writer.signal_finish() + + # Telemetry API is based on ddtrace, we don't have fine-grained control over the background process. + self.telemetry_api.finish() + + # Wait for the writer threads to finish. + self.writer.wait_finish() + self.coverage_writer.wait_finish() def discover_test( self, diff --git a/ddtrace/testing/internal/telemetry.py b/ddtrace/testing/internal/telemetry.py new file mode 100644 index 00000000000..bdb1a03b14c --- /dev/null +++ b/ddtrace/testing/internal/telemetry.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import dataclasses +from enum import Enum +import logging +import typing as t + +from ddtrace.internal.telemetry import telemetry_writer +from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE + + +if t.TYPE_CHECKING: + from ddtrace.testing.internal.http import BackendConnectorSetup + + +log = logging.getLogger(__name__) + + +class ErrorType(str, Enum): + TIMEOUT = "timeout" + NETWORK = "network" + CODE_4XX = "status_code_4xx_response" + CODE_5XX = "status_code_5xx_response" + BAD_JSON = "bad_json" + UNKNOWN = "unknown" + + +class TelemetryAPI: + def __init__(self, connector_setup: BackendConnectorSetup) -> None: + # DEV: In a beautiful world, this would set up a backend connector to the telemetry endpoint. + # Currently we rely on ddtrace's telemetry infrastructure, so we don't have to do anything here. + self.writer = telemetry_writer + + def with_request_metric_names( + self, count: str, duration: str, response_bytes: t.Optional[str], error: str + ) -> TelemetryAPIRequestMetrics: + return TelemetryAPIRequestMetrics( + telemetry_api=self, count=count, duration=duration, response_bytes=response_bytes, error=error + ) + + def finish(self) -> None: + self.writer.periodic(force_flush=True) + + +@dataclasses.dataclass +class TelemetryAPIRequestMetrics: + telemetry_api: TelemetryAPI + count: str + duration: str + response_bytes: t.Optional[str] + error: str + + def record_request( + self, seconds: float, response_bytes: t.Optional[int], compressed_response: bool, error: t.Optional[ErrorType] + ) -> None: + log.debug( + "Recording Test Optimization telemetry for %s: %s %s %s %s", + self.count, + seconds, + response_bytes, + compressed_response, + error, + ) + self.telemetry_api.writer.add_count_metric(TELEMETRY_NAMESPACE.CIVISIBILITY, self.count, 1) + self.telemetry_api.writer.add_distribution_metric(TELEMETRY_NAMESPACE.CIVISIBILITY, self.duration, seconds) + if response_bytes is not None and self.response_bytes is not None: + # We don't always want to record response bytes (for settings requests), so assume that no metric name + # means we don't want to record it. + self.telemetry_api.writer.add_distribution_metric( + TELEMETRY_NAMESPACE.CIVISIBILITY, self.response_bytes, response_bytes + ) + + if error is not None: + self.record_error(error) + + def record_error(self, error: ErrorType) -> None: + log.debug("Recording Test Optimization request error telemetry: %s", error) + self.telemetry_api.writer.add_count_metric( + TELEMETRY_NAMESPACE.CIVISIBILITY, self.error, 1, (("error_type", error),) + ) diff --git a/ddtrace/testing/internal/writer.py b/ddtrace/testing/internal/writer.py index 577b5b3a6c8..612b06d8037 100644 --- a/ddtrace/testing/internal/writer.py +++ b/ddtrace/testing/internal/writer.py @@ -50,26 +50,28 @@ def start(self) -> None: self.task = threading.Thread(target=self._periodic_task, daemon=True) self.task.start() - def finish(self) -> None: - log.debug("Waiting for writer thread to finish") + def signal_finish(self) -> None: + log.debug("Signalling for %s writer thread to finish", self.__class__.__name__) self.should_finish.set() + + def wait_finish(self) -> None: self.task.join() - log.debug("Writer thread finished") + log.debug("%s writer thread finished", self.__class__.__name__) def _periodic_task(self) -> None: while True: self.should_finish.wait(timeout=self.flush_interval_seconds) - log.debug("Flushing events in background task") + log.debug("Flushing %s events in background task", self.__class__.__name__) self.flush() if self.should_finish.is_set(): break - log.debug("Exiting background task") + log.debug("Exiting %s background task", self.__class__.__name__) def flush(self) -> None: if events := self.pop_events(): - log.debug("Sending %d events", len(events)) + log.debug("Sending %d events for %s", len(events), self.__class__.__name__) self._send_events(events) @abstractmethod @@ -125,7 +127,7 @@ def _send_events(self, events: t.List[Event]) -> None: "events": events, } pack = msgpack_packb(payload) - response, response_data = self.connector.request( + self.connector.request( "POST", "/api/v2/citestcycle", data=pack, headers={"Content-Type": "application/msgpack"}, send_gzip=True ) @@ -167,7 +169,7 @@ def _send_events(self, events: t.List[Event]) -> None: ), ] - response, response_data = self.connector.post_files("/api/v2/citestcov", files=files, send_gzip=True) + self.connector.post_files("/api/v2/citestcov", files=files, send_gzip=True) def serialize_test_run(test_run: TestRun) -> Event: diff --git a/tests/testing/internal/test_http.py b/tests/testing/internal/test_http.py index 1677a94f801..a0d75c49750 100644 --- a/tests/testing/internal/test_http.py +++ b/tests/testing/internal/test_http.py @@ -3,6 +3,7 @@ import http.client import os from unittest.mock import Mock +from unittest.mock import call from unittest.mock import patch import pytest @@ -15,6 +16,7 @@ from ddtrace.testing.internal.http import BackendConnectorSetup from ddtrace.testing.internal.http import FileAttachment from ddtrace.testing.internal.http import UnixDomainSocketHTTPConnection +from ddtrace.testing.internal.telemetry import ErrorType from tests.testing.mocks import mock_backend_connector @@ -54,6 +56,308 @@ def test_init_unix_domain_socket(self, mock_unix_connection: Mock) -> None: assert connector.default_headers == {} assert connector.base_path == "/evp_proxy/over9000" + @patch("http.client.HTTPSConnection") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_ok(self, mock_time: Mock, mock_https_connection: Mock) -> None: + mock_response = Mock() + mock_response.headers = {"Content-Length": 14} + mock_response.read.return_value = b'{"answer": 42}' + mock_response.status = 200 + + mock_conn = Mock() + mock_conn.getresponse.return_value = mock_response + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}) + ] + assert result.error_type is None + assert result.error_description is None + assert result.parsed_response == {"answer": 42} + assert result.is_gzip_response is False + assert result.response_length == 14 + assert isinstance(result.elapsed_seconds, float) + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=14, compressed_response=False, error=None) + ] + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_retry_then_ok(self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock) -> None: + mock_response_error = Mock() + mock_response_error.headers = {} + mock_response_error.read.return_value = b"Internal Server Error :(" + mock_response_error.status = 500 + + mock_response_ok = Mock() + mock_response_ok.headers = {"Content-Length": 14} + mock_response_ok.read.return_value = b'{"answer": 42}' + mock_response_ok.status = 200 + + mock_conn = Mock() + mock_conn.getresponse.side_effect = [mock_response_error, mock_response_ok] + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + ] + assert len(mock_sleep.call_args_list) == 1 + + assert result.error_type is None + assert result.error_description is None + assert result.parsed_response == {"answer": 42} + assert result.is_gzip_response is False + assert result.response_length == 14 + assert isinstance(result.elapsed_seconds, float) + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.CODE_5XX), + call(seconds=0.0, response_bytes=14, compressed_response=False, error=None), + ] + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_retry_limit(self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock) -> None: + mock_response_error = Mock() + mock_response_error.headers = {} + mock_response_error.read.return_value = b"Internal Server Error :(" + mock_response_error.status = 500 + mock_response_error.reason = "Internal Server Error" + + mock_conn = Mock() + mock_conn.getresponse.return_value = mock_response_error + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + ] + + assert len(mock_sleep.call_args_list) == 4 + + assert result.error_type is ErrorType.CODE_5XX + assert result.error_description == "500 Internal Server Error" + assert result.parsed_response is None + assert result.is_gzip_response is False + assert result.response_length == 0 + assert isinstance(result.elapsed_seconds, float) + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.CODE_5XX), + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.CODE_5XX), + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.CODE_5XX), + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.CODE_5XX), + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.CODE_5XX), + ] + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_bad_json(self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock) -> None: + mock_response = Mock() + mock_response.headers = {"Content-Length": 14} + mock_response.read.return_value = b'{"answer": ???' + mock_response.status = 200 + + mock_conn = Mock() + mock_conn.getresponse.return_value = mock_response + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + ] + assert result.error_type is ErrorType.BAD_JSON + assert result.error_description == "Expecting value: line 1 column 12 (char 11)" + assert result.parsed_response is None + assert result.is_gzip_response is False + assert result.response_length == 14 + assert isinstance(result.elapsed_seconds, float) + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=14, compressed_response=False, error=ErrorType.BAD_JSON), + call(seconds=0.0, response_bytes=14, compressed_response=False, error=ErrorType.BAD_JSON), + call(seconds=0.0, response_bytes=14, compressed_response=False, error=ErrorType.BAD_JSON), + call(seconds=0.0, response_bytes=14, compressed_response=False, error=ErrorType.BAD_JSON), + call(seconds=0.0, response_bytes=14, compressed_response=False, error=ErrorType.BAD_JSON), + ] + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_unretriable_error(self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock) -> None: + mock_response_error = Mock() + mock_response_error.headers = {} + mock_response_error.read.return_value = b"No bueno" + mock_response_error.status = 400 + mock_response_error.reason = "Bad Request" + + mock_conn = Mock() + mock_conn.getresponse.return_value = mock_response_error + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + ] + assert len(mock_sleep.call_args_list) == 0 + + assert result.error_type is ErrorType.CODE_4XX + assert result.error_description == "400 Bad Request" + assert result.parsed_response is None + assert result.is_gzip_response is False + assert result.response_length == 0 + assert isinstance(result.elapsed_seconds, float) + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=0, compressed_response=False, error=ErrorType.CODE_4XX), + ] + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_connection_refused(self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock) -> None: + mock_conn = Mock() + mock_conn.getresponse.side_effect = ConnectionRefusedError("No connection for you") + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + ] + assert len(mock_sleep.call_args_list) == 4 + + assert result.error_type == ErrorType.NETWORK + assert result.error_description == "No connection for you" + assert result.parsed_response is None + assert result.is_gzip_response is False + assert result.response_length is None + assert isinstance(result.elapsed_seconds, float) + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.NETWORK), + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.NETWORK), + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.NETWORK), + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.NETWORK), + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.NETWORK), + ] + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_timeout(self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock) -> None: + mock_conn = Mock() + mock_conn.getresponse.side_effect = TimeoutError("ars longa, vita brevis") + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + ] + assert len(mock_sleep.call_args_list) == 4 + + assert result.error_type == ErrorType.TIMEOUT + assert result.error_description == "ars longa, vita brevis" + assert result.parsed_response is None + assert result.is_gzip_response is False + assert result.response_length is None + assert isinstance(result.elapsed_seconds, float) + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.TIMEOUT), + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.TIMEOUT), + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.TIMEOUT), + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.TIMEOUT), + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.TIMEOUT), + ] + + @patch("http.client.HTTPSConnection") + @patch("time.sleep") + @patch("time.perf_counter", return_value=0.0) + def test_post_json_unknown_error(self, mock_time: Mock, mock_sleep: Mock, mock_https_connection: Mock) -> None: + mock_conn = Mock() + mock_conn.getresponse.side_effect = ValueError("some internal error") + mock_https_connection.return_value = mock_conn + + mock_telemetry = Mock() + + connector = BackendConnector(url="https://api.example.com") + + result = connector.post_json("/v1/some-endpoint", data={"question": 1}, telemetry=mock_telemetry) + + assert mock_conn.request.call_args_list == [ + call("POST", "/v1/some-endpoint", body=b'{"question": 1}', headers={"Content-Type": "application/json"}), + ] + assert len(mock_sleep.call_args_list) == 0 + + assert result.error_type == ErrorType.UNKNOWN + assert result.error_description == "some internal error" + assert result.parsed_response is None + assert result.is_gzip_response is False + assert result.response_length is None + assert isinstance(result.elapsed_seconds, float) + + assert mock_telemetry.record_request.call_args_list == [ + call(seconds=0.0, response_bytes=None, compressed_response=False, error=ErrorType.UNKNOWN), + ] + @patch("http.client.HTTPSConnection") @patch("uuid.uuid4") def test_post_files_multiple_files(self, mock_uuid: Mock, mock_https_connection: Mock) -> None: diff --git a/tests/testing/internal/test_telemetry.py b/tests/testing/internal/test_telemetry.py new file mode 100644 index 00000000000..846cdbc216b --- /dev/null +++ b/tests/testing/internal/test_telemetry.py @@ -0,0 +1,100 @@ +from unittest.mock import Mock +from unittest.mock import call + +from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE +from ddtrace.testing.internal.telemetry import ErrorType +from ddtrace.testing.internal.telemetry import TelemetryAPI + + +class TestTelemetry: + def test_record_request(self) -> None: + telemetry_api = TelemetryAPI(connector_setup=Mock()) + + mock_writer = Mock() + telemetry_api.writer = mock_writer + + request_telemetry = telemetry_api.with_request_metric_names( + count="known_tests.request", + duration="known_tests.request_ms", + response_bytes="known_tests.response_bytes", + error="known_tests.request_errors", + ) + + request_telemetry.record_request( + seconds=1.41, + response_bytes=42, + compressed_response=False, + error=ErrorType.CODE_4XX, + ) + + assert mock_writer.add_count_metric.call_args_list == [ + call(TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.request", 1), + call( + TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.request_errors", 1, (("error_type", ErrorType.CODE_4XX),) + ), + ] + + assert mock_writer.add_distribution_metric.call_args_list == [ + call(TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.request_ms", 1.41), + call(TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.response_bytes", 42), + ] + + def test_record_request_without_response_bytes(self) -> None: + telemetry_api = TelemetryAPI(connector_setup=Mock()) + + mock_writer = Mock() + telemetry_api.writer = mock_writer + + request_telemetry = telemetry_api.with_request_metric_names( + count="known_tests.request", + duration="known_tests.request_ms", + response_bytes=None, + error="known_tests.request_errors", + ) + + request_telemetry.record_request( + seconds=1.41, + response_bytes=42, + compressed_response=False, + error=ErrorType.CODE_4XX, + ) + + assert mock_writer.add_count_metric.call_args_list == [ + call(TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.request", 1), + call( + TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.request_errors", 1, (("error_type", ErrorType.CODE_4XX),) + ), + ] + + assert mock_writer.add_distribution_metric.call_args_list == [ + call(TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.request_ms", 1.41), + ] + + def test_record_request_without_error(self) -> None: + telemetry_api = TelemetryAPI(connector_setup=Mock()) + + mock_writer = Mock() + telemetry_api.writer = mock_writer + + request_telemetry = telemetry_api.with_request_metric_names( + count="known_tests.request", + duration="known_tests.request_ms", + response_bytes="known_tests.response_bytes", + error="known_tests.request_errors", + ) + + request_telemetry.record_request( + seconds=1.41, + response_bytes=42, + compressed_response=False, + error=None, + ) + + assert mock_writer.add_count_metric.call_args_list == [ + call(TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.request", 1), + ] + + assert mock_writer.add_distribution_metric.call_args_list == [ + call(TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.request_ms", 1.41), + call(TELEMETRY_NAMESPACE.CIVISIBILITY, "known_tests.response_bytes", 42), + ] diff --git a/tests/testing/mocks.py b/tests/testing/mocks.py index 7ce3f5e17c2..4c06709f4da 100644 --- a/tests/testing/mocks.py +++ b/tests/testing/mocks.py @@ -23,6 +23,8 @@ from ddtrace.testing.internal.api_client import TestManagementSettings from ddtrace.testing.internal.api_client import TestProperties from ddtrace.testing.internal.http import BackendConnectorSetup +from ddtrace.testing.internal.http import BackendResult +from ddtrace.testing.internal.http import ErrorType from ddtrace.testing.internal.session_manager import SessionManager from ddtrace.testing.internal.test_data import ModuleRef from ddtrace.testing.internal.test_data import SuiteRef @@ -442,6 +444,11 @@ def with_request_response(self, method: str, path: str, response_data: t.Any) -> self._request_responses[f"{method}:{path}"] = response_data return self + def _make_404_response(self) -> BackendResult: + return BackendResult( + response=Mock(status=404), error_type=ErrorType.CODE_4XX, error_description="Not found", parsed_response={} + ) + def build(self) -> Mock: """Build the BackendConnector mock.""" mock_connector = Mock() @@ -449,22 +456,22 @@ def build(self) -> Mock: # Mock methods to prevent real HTTP calls def mock_post_json(endpoint: str, data: t.Any) -> t.Tuple[Mock, t.Any]: if endpoint in self._post_json_responses: - return Mock(status=200), self._post_json_responses[endpoint] - return Mock(status=404), {} + return BackendResult(response=Mock(status=200), parsed_response=self._post_json_responses[endpoint]) + return self._make_404_response() - def mock_get_json(endpoint: str) -> t.Tuple[Mock, t.Any]: + def mock_get_json(endpoint: str, max_attempts: int = 0) -> t.Tuple[Mock, t.Any]: if endpoint in self._get_json_responses: - return Mock(status=200), self._get_json_responses[endpoint] - return Mock(status=404), {} + return BackendResult(response=Mock(status=200), parsed_response=self._get_json_responses[endpoint]) + return self._make_404_response() def mock_request(method: str, path: str, **kwargs: t.Any) -> t.Tuple[Mock, t.Any]: key = f"{method}:{path}" if key in self._request_responses: - return Mock(status=200), self._request_responses[key] - return Mock(status=404), {} + BackendResult(response=Mock(status=200), parsed_response=self._request_responses[key]) + return self._make_404_response() def mock_post_files(path: str, files: t.Any, **kwargs: t.Any) -> t.Tuple[Mock, t.Dict[str, t.Any]]: - return Mock(status=200), {} + return BackendResult(response=Mock(status=200)) mock_connector.post_json.side_effect = mock_post_json mock_connector.get_json.side_effect = mock_get_json