Skip to content

Commit

Permalink
simplify PR
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed Oct 10, 2023
1 parent ca9dbd1 commit def9d9d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 71 deletions.
103 changes: 34 additions & 69 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import base64
from collections import OrderedDict
from collections import defaultdict
import csv
import json
import logging
import os
Expand Down Expand Up @@ -404,7 +403,6 @@ async def _integration_requests_by_session(
integration_name=integration_name,
integration_version=integration_version,
dependency_name=data.get("dependency_name", integration_name),
version_sent=f"{integration_name}@{integration_version}" in ",".join(os.listdir("artifacts")),
)
req["tracer_version"] = data.get("tracer_version", None)
req["tracer_language"] = data.get("tracer_language", None)
Expand All @@ -414,18 +412,17 @@ async def _integration_requests_by_session(
# check if integration data was provided in the trace request instead
elif (
"_dd_trace_env_variables" in req
and "DD_PLUGIN" in req["_dd_trace_env_variables"]
and "DD_PLUGIN_VERSION" in req["_dd_trace_env_variables"]
and "DD_INTEGRATION" in req["_dd_trace_env_variables"]
and "DD_INTEGRATION_VERSION" in req["_dd_trace_env_variables"]
):
integration_name = req["_dd_trace_env_variables"]["DD_PLUGIN"]
integration_version = req["_dd_trace_env_variables"]["DD_PLUGIN_VERSION"]
integration_name = req["_dd_trace_env_variables"]["DD_INTEGRATION"]
integration_version = req["_dd_trace_env_variables"]["DD_INTEGRATION_VERSION"]

if "integration" not in req:
req["integration"] = Integration(
integration_name=integration_name,
integration_version=integration_version,
dependency_name=req["_dd_trace_env_variables"].get("DD_DEPENDENCY_NAME", integration_name),
version_sent=f"{integration_name}@{integration_version}" in ",".join(os.listdir("artifacts")),
)

if req.headers.get("dd-client-library-version", None):
Expand Down Expand Up @@ -517,78 +514,49 @@ async def handle_v2_apmtelemetry(self, request: Request) -> web.Response:
async def handle_put_tested_integrations(self, request: Request) -> web.Response:
# we need to store the request manually since this is not a real DD agent endpoint
await self._store_request(request)
await self.update_seen_integration_versions()
return web.HTTPOk()

async def update_seen_integration_versions(self):
reqs = await self._integration_requests_by_session(token=None)
async def handle_get_tested_integrations(self, request: Request) -> web.Response:
"""Return all tested integrations according to integration data received by agent."""
text_headers = ["language_name", "tracer_version", "integration_name", "integration_version", "dependency_name"]
aggregated_text = ""
seen_integrations = set()
req_headers = {}

# get all requests associated with an integration
reqs = await self._integration_requests_by_session(token=None, include_sent_integrations=True)
for req in reqs:
integration = req["integration"]

# integrations should contain all following data to be emitted
# only include the integration in response if all data is included and integration hasn't already been added
if (
integration.integration_name
and integration.integration_version
and integration.dependency_name
and not integration.version_sent
and req["tracer_language"]
and req["tracer_version"]
and f"{integration.integration_name}@{integration.integration_version}" not in seen_integrations
):
await self.save_tested_integrations(
integration, tracer_language=req["tracer_language"], tracer_version=req["tracer_version"]
aggregated_text += (
",".join(
[
req["tracer_language"],
".".join(req["tracer_version"].split("-")[0].split(".")[0:3]), # ensure semver
integration.integration_name,
integration.integration_version,
integration.dependency_name,
]
)
+ "\n"
)
# update the actual req to store that the integration / version have been emitted so we can skip the req later
integration.version_sent = True

async def save_tested_integrations(self, integration: Integration, tracer_language: str, tracer_version: str):
headers = ["language_name", "tracer_version", "integration_name", "integration_version", "dependency_name"]
log.debug(f"Saving Integration: {integration.integration_name} to current directory")
filename = (
f"./artifacts/{integration.integration_name}@{integration.integration_version}_supported_versions.csv"
)

# create test artifact directory if it doesnt exist
directory = "./artifacts"
if not os.path.exists(directory):
os.makedirs(directory)

# Check if the file exists
if os.path.exists(filename):
mode = "a+" # Append mode
else:
mode = "w+" # Write mode

with open(filename, mode, newline="") as file:
writer = csv.writer(file)
if mode == "w+":
writer.writerow(headers)
writer.writerow(
[
tracer_language,
".".join(tracer_version.split(".")[0:3]), # ensure semver
integration.integration_name,
integration.integration_version,
integration.dependency_name,
]
)

async def handle_get_tested_integrations(self, request: Request) -> web.Response:
file_headers = ["language_name", "tracer_version", "integration_name", "integration_version", "dependency_name"]
aggregated_text = ""
headers = {}
directory = "./artifacts"
files = os.listdir(directory)
if len(files) > 0:
aggregated_text += ",".join(file_headers) + "\n"
for file in files:
filepath = os.path.join(directory, file)
with open(filepath, "r") as f:
lines = f.readlines()
if lines[0] == ",".join(file_headers) + "\n":
lines = lines[1:] # Skip the headers if they already exist in the file
aggregated_text += "".join(lines)
headers["file-name"]= file.split("@")[0] # use integration name before @ as filename
return web.Response(body=aggregated_text, content_type="text/plain", headers=headers)
# update seen integrations to skip this specific integration and version next loop from another request
seen_integrations.add(f"{integration.integration_name}@{integration.integration_version}")
# given that we will mainly see one integration per call, set a header for the calling lib to know the
# integration name
req_headers["file-name"] = integration.integration_name
if len(aggregated_text) > 0:
aggregated_text = ",".join(text_headers) + "\n" + aggregated_text
return web.Response(body=aggregated_text, content_type="text/plain", headers=req_headers)

async def handle_info(self, request: Request) -> web.Response:
return web.json_response(
Expand Down Expand Up @@ -901,9 +869,6 @@ async def request_forwarder_middleware(self, request: Request, handler: _Handler
log.debug("Found the following Datadog Trace Env Variables: " + str(env_vars))
request["_dd_trace_env_variables"] = env_vars

if "DD_PLUGIN" in env_vars and "DD_PLUGIN_VERSION" in env_vars:
await self.update_seen_integration_versions()

if "X-Datadog-Agent-Proxy-Disabled" in headers:
request["_proxy_to_agent"] = (
headers.pop("X-Datadog-Agent-Proxy-Disabled").lower() != "true" and request.app["agent_url"] != ""
Expand Down
3 changes: 1 addition & 2 deletions ddapm_test_agent/integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
class Integration:
def __init__(self, integration_name: str, integration_version: str, dependency_name: str, version_sent: bool):
def __init__(self, integration_name: str, integration_version: str, dependency_name: str):
self.integration_name = integration_name
self.integration_version = integration_version
self.dependency_name = dependency_name
self.version_sent = version_sent

0 comments on commit def9d9d

Please sign in to comment.