Skip to content

Commit 8ae44ad

Browse files
committed
basic e2e test for force telemetry verification
Signed-off-by: Nikhil Suri <[email protected]>
1 parent cca421b commit 8ae44ad

File tree

1 file changed

+164
-0
lines changed

1 file changed

+164
-0
lines changed

tests/e2e/test_telemetry_e2e.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
"""
2+
E2E test for telemetry - verifies telemetry successfully sends to backend
3+
"""
4+
import time
5+
import json
6+
import threading
7+
from unittest.mock import patch
8+
import pytest
9+
from concurrent.futures import wait
10+
11+
from databricks.sql.telemetry.telemetry_client import (
12+
TelemetryClient,
13+
TelemetryClientFactory,
14+
)
15+
from tests.e2e.test_driver import PySQLPytestTestCase
16+
17+
18+
class TestTelemetryE2E(PySQLPytestTestCase):
19+
"""E2E tests for telemetry"""
20+
21+
@pytest.fixture(autouse=True)
22+
def telemetry_setup_teardown(self):
23+
"""
24+
This fixture ensures the TelemetryClientFactory is in a clean state
25+
before each test and shuts it down afterward.
26+
"""
27+
try:
28+
yield
29+
finally:
30+
if TelemetryClientFactory._executor:
31+
TelemetryClientFactory._executor.shutdown(wait=True)
32+
TelemetryClientFactory._executor = None
33+
TelemetryClientFactory._stop_flush_thread()
34+
TelemetryClientFactory._initialized = False
35+
36+
def test_telemetry_sends_successfully_with_200_response(self):
37+
"""
38+
E2E test to verify telemetry successfully sends to backend and receives 200 response.
39+
40+
This test:
41+
1. Enables telemetry with force_enable_telemetry
42+
2. Sets telemetry_batch_size=1 for immediate flushing
43+
3. Executes a simple query
44+
4. Captures the telemetry response
45+
5. Verifies response status is 200 (success)
46+
47+
With batch_size=1, telemetry is sent immediately after each event.
48+
"""
49+
capture_lock = threading.Lock()
50+
captured_futures = []
51+
52+
# Store original callback
53+
original_callback = TelemetryClient._telemetry_request_callback
54+
55+
def callback_wrapper(self_client, future, sent_count):
56+
"""
57+
Wraps the original callback to capture the server's response.
58+
"""
59+
with capture_lock:
60+
captured_futures.append(future)
61+
original_callback(self_client, future, sent_count)
62+
63+
with patch.object(
64+
TelemetryClient, "_telemetry_request_callback", callback_wrapper
65+
):
66+
# Execute a query with telemetry enabled and batch_size=1
67+
with self.connection(
68+
extra_params={
69+
"force_enable_telemetry": True,
70+
"telemetry_batch_size": 1, # Immediate flushing for test
71+
}
72+
) as conn:
73+
with conn.cursor() as cursor:
74+
cursor.execute("SELECT 1")
75+
result = cursor.fetchone()
76+
assert result[0] == 1
77+
78+
# Wait for telemetry to complete (max 30 seconds)
79+
# With batch_size=1, we expect 2 events: initial_log and latency_log
80+
timeout_seconds = 30
81+
start_time = time.time()
82+
expected_event_count = 2 # initial_log + latency_log
83+
84+
while (
85+
len(captured_futures) < expected_event_count
86+
and time.time() - start_time < timeout_seconds
87+
):
88+
time.sleep(0.1)
89+
90+
# Wait for all futures to complete
91+
done, not_done = wait(captured_futures, timeout=timeout_seconds)
92+
assert (
93+
not not_done
94+
), f"Telemetry requests timed out: {len(not_done)} still pending"
95+
96+
# Verify all responses are successful (status 200)
97+
captured_exceptions = []
98+
captured_responses = []
99+
100+
for future in done:
101+
try:
102+
response = future.result()
103+
104+
# Verify status is 200
105+
assert (
106+
200 <= response.status < 300
107+
), f"Expected 2xx status, got {response.status}"
108+
109+
# Parse JSON response
110+
response_data = (
111+
json.loads(response.data.decode()) if response.data else {}
112+
)
113+
captured_responses.append(response_data)
114+
115+
except Exception as e:
116+
captured_exceptions.append(e)
117+
118+
# Assert no exceptions occurred
119+
assert (
120+
not captured_exceptions
121+
), f"Telemetry requests failed with exceptions: {captured_exceptions}"
122+
123+
# Assert we got responses
124+
assert len(captured_responses) > 0, "No telemetry responses received"
125+
126+
# Verify response structure
127+
for response in captured_responses:
128+
# Should not have errors
129+
assert (
130+
"errors" not in response or not response["errors"]
131+
), f"Telemetry response contained errors: {response.get('errors')}"
132+
133+
def test_telemetry_does_not_break_driver_on_query_execution(self):
134+
"""
135+
E2E test to verify telemetry doesn't break driver functionality.
136+
137+
This is a simpler test that just ensures:
138+
1. Driver works fine with telemetry enabled
139+
2. Query executes successfully
140+
3. Results are returned correctly
141+
142+
If telemetry has issues, they're logged but don't break the driver.
143+
"""
144+
with self.connection(
145+
extra_params={
146+
"force_enable_telemetry": True,
147+
"telemetry_batch_size": 1, # Immediate flushing for test
148+
}
149+
) as conn:
150+
with conn.cursor() as cursor:
151+
# Execute a simple query
152+
cursor.execute("SELECT 1 as col1, 'test' as col2")
153+
result = cursor.fetchone()
154+
155+
# Verify query worked correctly
156+
assert result[0] == 1
157+
assert result[1] == "test"
158+
159+
# Execute another query to generate more telemetry
160+
cursor.execute("SELECT 42")
161+
result = cursor.fetchone()
162+
assert result[0] == 42
163+
164+
# Test passes = telemetry didn't break driver ✅

0 commit comments

Comments
 (0)