Skip to content

Commit 61f8029

Browse files
authored
[PECOBLR-1735] Fix #729 and #731: Telemetry lifecycle management (#734)
* Fix #729 and #731: Telemetry lifecycle management Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> * Address review comments: revert timeout and telemetry_enabled changes Per reviewer feedback on PR #734: 1. Revert timeout from 30s back to 900s (line 299) - Reviewer noted that with wait=False, timeout is not critical - The async nature and wait=False handle the exit speed 2. Revert telemetry_enabled parameter back to True (line 734) - Reviewer noted this is redundant given the early return - If enable_telemetry=False, we return early (line 729) - Line 734 only executes when enable_telemetry=True - Therefore using the parameter here is unnecessary These changes address the reviewer's valid technical concerns while keeping the core fixes intact: - wait=False for non-blocking shutdown (critical for Issue #729) - Early return when enable_telemetry=False (critical for Issue #729) - All Issue #731 fixes (null-safety, __del__, documentation) Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> * Fix Black formatting violations Apply Black formatting to files modified in previous commits: - src/databricks/sql/common/unified_http_client.py - src/databricks/sql/telemetry/telemetry_client.py Changes are purely cosmetic (quote style consistency). Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> * Fix CI test failure: Prevent parallel execution of telemetry tests Add @pytest.mark.xdist_group to telemetry test classes to ensure they run sequentially on the same worker when using pytest-xdist (-n auto). Root cause: Tests marked @pytest.mark.serial were still being parallelized in CI because pytest-xdist doesn't respect custom markers by default. With host-level telemetry batching (PR #718), tests running in parallel would share the same TelemetryClient and interfere with each other's event counting, causing test_concurrent_queries_sends_telemetry to see 88 events instead of the expected 60. The xdist_group marker ensures all tests in the "serial_telemetry" group run on the same worker sequentially, preventing state interference. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Fix telemetry test fixtures: Clean up state before AND after tests Modified telemetry_setup_teardown fixtures to clean up TelemetryClientFactory state both BEFORE and AFTER each test, not just after. This prevents leftover state from previous tests (pending events, active executors) from interfering with the current test. Root cause: In CI with sequential execution on the same worker, if a previous test left pending telemetry events in the executor, those events could be captured by the next test's mock, causing inflated event counts (88 instead of 60). Now ensures complete isolation between tests by resetting all shared state before each test starts. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Fix CI test failure: Clear _flush_event between tests The _flush_event threading.Event was never cleared after stopping the flush thread, remaining in "set" state. This caused timing issues in subsequent tests where the Event was already signaled, triggering unexpected flush behavior and causing extra telemetry events to be captured (88 instead of 60). Now explicitly clear the _flush_event flag in both setup (before test) and teardown (after test) to ensure clean state isolation between tests. This explains why CI consistently got 88 events - the flush_event from previous tests triggered additional flushes during test execution. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Add debug workflow and output to diagnose CI test failure 1. Created new workflow 'test-telemetry-only.yml' that runs only the failing telemetry test with -n auto, mimicking real CI but much faster 2. Added debug output to test showing: - Client-side captured events - Number of futures/batches - Number of server responses - Server-reported successful events This will help identify why CI gets 88 events vs local 60 events. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Fix workflow: Add krb5 system dependency The workflow was failing during poetry install due to missing krb5 system libraries needed for kerberos dependencies. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Fix xdist_group: Add --dist=loadgroup to pytest commands The @pytest.mark.xdist_group markers were being ignored because pytest-xdist uses --dist=load by default, which doesn't respect groups. With --dist=loadgroup, tests in the same xdist_group run sequentially on the same worker, preventing telemetry state interference between tests. This is the ROOT CAUSE of the 88 vs 60 events issue - tests were running in parallel across workers instead of sequentially on one worker as intended. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Add aggressive flush before test to prevent event interference CI shows 72 events instead of 60. Debug output reveals: - Client captured: 60 events (correct) - Server received: 72 events across 2 batches The 12 extra events accumulate in the timing window between fixture cleanup and mock setup. Other tests (like circuit breaker tests not in our xdist_group) may be sending telemetry concurrently. Solution: Add an explicit flush+shutdown RIGHT BEFORE setting up the mock to ensure a completely clean slate with zero buffered events. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Split workflow: Isolate telemetry tests in separate job To prevent interference from other e2e tests, split into two jobs: Job 1 (run-non-telemetry-tests): - Runs all e2e tests EXCEPT telemetry tests - Uses -n auto for parallel execution Job 2 (run-telemetry-tests): - Runs ONLY telemetry tests - Depends on Job 1 completing (needs: run-non-telemetry-tests) - Fresh Python process = complete isolation - No ambient telemetry from other tests This eliminates the 68 vs 60 event discrepancy by ensuring telemetry tests run in a clean environment with zero interference. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Fix workflows: Add krb5 deps and cleanup debug code Changes across multiple workflows: 1. integration.yml: - Add krb5 system dependency to telemetry job - Fixes: krb5-config command not found error during poetry install 2. code-coverage.yml: - Add krb5 system dependency - Split telemetry tests into separate step for isolation - Maintains coverage accumulation with --cov-append 3. publish-test.yml: - Add krb5 system dependency for consistent builds 4. test_concurrent_telemetry.py: - Remove debug print statements 5. Delete test-telemetry-only.yml: - Remove temporary debug workflow All workflows now have proper telemetry test isolation and required system dependencies for kerberos packages. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Fix publish-test.yml: Update Python 3.9 -> 3.10 Poetry 2.3.2 installation fails with Python 3.9: Installing Poetry (2.3.2): An error occurred. Other workflows use Python 3.10 and work fine. Updating to match ensures consistency and avoids Poetry installation issues. Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com> * Fix integration workflow: Remove --dist=loadgroup from non-telemetry tests - Remove --dist=loadgroup from non-telemetry job (only needed for telemetry) - Remove test_telemetry_e2e.py from telemetry job (was skipped before) - This should fix test_uc_volume_life_cycle failure caused by changed test distribution * Fix code-coverage workflow: Remove test_telemetry_e2e.py from coverage tests - Only run test_concurrent_telemetry.py in isolated telemetry step - test_telemetry_e2e.py was excluded in original workflow, keep it excluded * Fix publish-test workflow: Remove cache conditional - Always run poetry install (not just on cache miss) - Ensures fresh install with system dependencies (krb5) - Matches pattern used in integration.yml * Fix publish-test.yml: Remove duplicate krb5 install, restore cache conditional - Remove duplicate system dependencies step - Restore cache conditional to match main branch - Keep Python 3.10 (our change from 3.9) * Fix code-coverage: Remove serial tests step - All serial tests are telemetry tests (test_concurrent_telemetry.py and test_telemetry_e2e.py) - They're already run in the isolated telemetry step - Running -m serial with --ignore on both files results in 0 tests (exit code 5) --------- Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> Signed-off-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent cafed60 commit 61f8029

File tree

8 files changed

+161
-17
lines changed

8 files changed

+161
-17
lines changed

.github/workflows/code-coverage.yml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ jobs:
3131
with:
3232
python-version: "3.10"
3333
#----------------------------------------------
34+
# ----- install system dependencies -----
35+
#----------------------------------------------
36+
- name: Install system dependencies
37+
run: |
38+
sudo apt-get update
39+
sudo apt-get install -y libkrb5-dev
40+
#----------------------------------------------
3441
# ----- install & configure poetry -----
3542
#----------------------------------------------
3643
- name: Install Poetry
@@ -80,13 +87,13 @@ jobs:
8087
-v
8188
8289
#----------------------------------------------
83-
# run serial tests with coverage
90+
# run telemetry tests with coverage (isolated)
8491
#----------------------------------------------
85-
- name: Run serial tests with coverage
92+
- name: Run telemetry tests with coverage (isolated)
8693
continue-on-error: false
8794
run: |
88-
poetry run pytest tests/e2e \
89-
-m "serial" \
95+
# Run test_concurrent_telemetry.py separately for isolation
96+
poetry run pytest tests/e2e/test_concurrent_telemetry.py \
9097
--cov=src \
9198
--cov-append \
9299
--cov-report=xml \

.github/workflows/integration.yml

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ on:
77
pull_request:
88

99
jobs:
10-
run-e2e-tests:
10+
run-non-telemetry-tests:
1111
runs-on: ubuntu-latest
1212
environment: azure-prod
1313
env:
@@ -59,9 +59,53 @@ jobs:
5959
#----------------------------------------------
6060
# run test suite
6161
#----------------------------------------------
62-
- name: Run e2e tests (excluding daily-only tests)
62+
- name: Run non-telemetry e2e tests
6363
run: |
64-
# Exclude telemetry E2E tests from PR runs (run daily instead)
64+
# Exclude all telemetry tests - they run in separate job for isolation
6565
poetry run python -m pytest tests/e2e \
6666
--ignore=tests/e2e/test_telemetry_e2e.py \
67-
-n auto
67+
--ignore=tests/e2e/test_concurrent_telemetry.py \
68+
-n auto
69+
70+
run-telemetry-tests:
71+
runs-on: ubuntu-latest
72+
needs: run-non-telemetry-tests # Run after non-telemetry tests complete
73+
environment: azure-prod
74+
env:
75+
DATABRICKS_SERVER_HOSTNAME: ${{ secrets.DATABRICKS_HOST }}
76+
DATABRICKS_HTTP_PATH: ${{ secrets.TEST_PECO_WAREHOUSE_HTTP_PATH }}
77+
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
78+
DATABRICKS_CATALOG: peco
79+
DATABRICKS_USER: ${{ secrets.TEST_PECO_SP_ID }}
80+
steps:
81+
- name: Check out repository
82+
uses: actions/checkout@v4
83+
- name: Set up python
84+
id: setup-python
85+
uses: actions/setup-python@v5
86+
with:
87+
python-version: "3.10"
88+
- name: Install system dependencies
89+
run: |
90+
sudo apt-get update
91+
sudo apt-get install -y libkrb5-dev
92+
- name: Install Poetry
93+
uses: snok/install-poetry@v1
94+
with:
95+
virtualenvs-create: true
96+
virtualenvs-in-project: true
97+
installer-parallel: true
98+
- name: Load cached venv
99+
id: cached-poetry-dependencies
100+
uses: actions/cache@v4
101+
with:
102+
path: .venv
103+
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }}
104+
- name: Install dependencies
105+
run: poetry install --no-interaction --all-extras
106+
- name: Run telemetry tests in isolation
107+
run: |
108+
# Run test_concurrent_telemetry.py in isolation with complete process separation
109+
# Use --dist=loadgroup to respect @pytest.mark.xdist_group markers
110+
poetry run python -m pytest tests/e2e/test_concurrent_telemetry.py \
111+
-n auto --dist=loadgroup -v

.github/workflows/publish-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
id: setup-python
1515
uses: actions/setup-python@v5
1616
with:
17-
python-version: 3.9
17+
python-version: "3.10"
1818
#----------------------------------------------
1919
# ----- install & configure poetry -----
2020
#----------------------------------------------

src/databricks/sql/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,8 @@ def read(self) -> Optional[OAuthToken]:
306306
)
307307
self.session.open()
308308
except Exception as e:
309+
# Respect user's telemetry preference even during connection failure
310+
enable_telemetry = kwargs.get("enable_telemetry", True)
309311
TelemetryClientFactory.connection_failure_log(
310312
error_name="Exception",
311313
error_message=str(e),
@@ -316,6 +318,7 @@ def read(self) -> Optional[OAuthToken]:
316318
user_agent=self.session.useragent_header
317319
if hasattr(self, "session")
318320
else None,
321+
enable_telemetry=enable_telemetry,
319322
)
320323
raise e
321324

src/databricks/sql/common/unified_http_client.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,15 +217,15 @@ def _should_use_proxy(self, target_host: str) -> bool:
217217
logger.debug("Error checking proxy bypass for host %s: %s", target_host, e)
218218
return True
219219

220-
def _get_pool_manager_for_url(self, url: str) -> urllib3.PoolManager:
220+
def _get_pool_manager_for_url(self, url: str) -> Optional[urllib3.PoolManager]:
221221
"""
222222
Get the appropriate pool manager for the given URL.
223223
224224
Args:
225225
url: The target URL
226226
227227
Returns:
228-
PoolManager instance (either direct or proxy)
228+
PoolManager instance (either direct or proxy), or None if client is closed
229229
"""
230230
parsed_url = urllib.parse.urlparse(url)
231231
target_host = parsed_url.hostname
@@ -291,6 +291,14 @@ def request_context(
291291
# Select appropriate pool manager based on target URL
292292
pool_manager = self._get_pool_manager_for_url(url)
293293

294+
# DEFENSIVE: Check if pool_manager is None (client closing/closed)
295+
# This prevents AttributeError race condition when telemetry cleanup happens
296+
if pool_manager is None:
297+
logger.debug(
298+
"HTTP client closing or closed, cannot make request to %s", url
299+
)
300+
raise RequestError("HTTP client is closing or has been closed")
301+
294302
response = None
295303

296304
try:

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
4343
from databricks.sql.common.unified_http_client import UnifiedHttpClient
4444
from databricks.sql.common.http import HttpMethod
45+
from databricks.sql.exc import RequestError
4546
from databricks.sql.telemetry.telemetry_push_client import (
4647
ITelemetryPushClient,
4748
TelemetryPushClient,
@@ -417,10 +418,38 @@ def export_latency_log(
417418
)
418419

419420
def close(self):
420-
"""Flush remaining events before closing"""
421+
"""Flush remaining events before closing
422+
423+
IMPORTANT: This method does NOT close self._http_client.
424+
425+
Rationale:
426+
- _flush() submits async work to the executor that uses _http_client
427+
- If we closed _http_client here, async callbacks would fail with AttributeError
428+
- Instead, we let _http_client live as long as needed:
429+
* Pending futures hold references to self (via bound methods)
430+
* This keeps self alive, which keeps self._http_client alive
431+
* When all futures complete, Python GC will clean up naturally
432+
- The __del__ method ensures eventual cleanup during garbage collection
433+
434+
This design prevents race conditions while keeping telemetry truly async.
435+
"""
421436
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
422437
self._flush()
423438

439+
def __del__(self):
440+
"""Cleanup when TelemetryClient is garbage collected
441+
442+
This ensures _http_client is eventually closed when the TelemetryClient
443+
object is destroyed. By this point, all async work should be complete
444+
(since the futures held references keeping us alive), so it's safe to
445+
close the http client.
446+
"""
447+
try:
448+
if hasattr(self, "_http_client") and self._http_client:
449+
self._http_client.close()
450+
except Exception:
451+
pass
452+
424453

425454
class _TelemetryClientHolder:
426455
"""
@@ -674,7 +703,8 @@ def close(host_url):
674703
)
675704
try:
676705
TelemetryClientFactory._stop_flush_thread()
677-
TelemetryClientFactory._executor.shutdown(wait=True)
706+
# Use wait=False to allow process to exit immediately
707+
TelemetryClientFactory._executor.shutdown(wait=False)
678708
except Exception as e:
679709
logger.debug("Failed to shutdown thread pool executor: %s", e)
680710
TelemetryClientFactory._executor = None
@@ -689,9 +719,15 @@ def connection_failure_log(
689719
port: int,
690720
client_context,
691721
user_agent: Optional[str] = None,
722+
enable_telemetry: bool = True,
692723
):
693724
"""Send error telemetry when connection creation fails, using provided client context"""
694725

726+
# Respect user's telemetry preference - don't force-enable
727+
if not enable_telemetry:
728+
logger.debug("Telemetry disabled, skipping connection failure log")
729+
return
730+
695731
UNAUTH_DUMMY_SESSION_ID = "unauth_session_id"
696732

697733
TelemetryClientFactory.initialize_telemetry_client(

tests/e2e/test_concurrent_telemetry.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def run_in_threads(target, num_threads, pass_index=False):
2727

2828

2929
@pytest.mark.serial
30+
@pytest.mark.xdist_group(name="serial_telemetry")
3031
class TestE2ETelemetry(PySQLPytestTestCase):
3132
@pytest.fixture(autouse=True)
3233
def telemetry_setup_teardown(self):
@@ -35,13 +36,27 @@ def telemetry_setup_teardown(self):
3536
before each test and shuts it down afterward. Using a fixture makes
3637
this robust and automatic.
3738
"""
39+
# Clean up BEFORE test starts to ensure no leftover state from previous tests
40+
# Use wait=True to ensure all pending telemetry from previous tests completes
41+
# This prevents those events from being captured by this test's mock
42+
if TelemetryClientFactory._executor:
43+
TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for pending telemetry
44+
TelemetryClientFactory._executor = None
45+
TelemetryClientFactory._stop_flush_thread()
46+
TelemetryClientFactory._flush_event.clear() # Clear the event flag
47+
TelemetryClientFactory._clients.clear()
48+
TelemetryClientFactory._initialized = False
49+
3850
try:
3951
yield
4052
finally:
53+
# Clean up AFTER test ends
54+
# Use wait=True to ensure this test's telemetry completes before next test starts
4155
if TelemetryClientFactory._executor:
42-
TelemetryClientFactory._executor.shutdown(wait=True)
56+
TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for this test's telemetry
4357
TelemetryClientFactory._executor = None
4458
TelemetryClientFactory._stop_flush_thread()
59+
TelemetryClientFactory._flush_event.clear() # Clear the event flag
4560
TelemetryClientFactory._clients.clear()
4661
TelemetryClientFactory._initialized = False
4762

@@ -50,6 +65,14 @@ def test_concurrent_queries_sends_telemetry(self):
5065
An E2E test where concurrent threads execute real queries against
5166
the staging endpoint, while we capture and verify the generated telemetry.
5267
"""
68+
# Extra flush right before test starts to clear any events that accumulated
69+
# between fixture cleanup and now (e.g., from other tests on same worker)
70+
if TelemetryClientFactory._executor:
71+
TelemetryClientFactory._executor.shutdown(wait=True)
72+
TelemetryClientFactory._executor = None
73+
TelemetryClientFactory._clients.clear()
74+
TelemetryClientFactory._initialized = False
75+
5376
num_threads = 30
5477
capture_lock = threading.Lock()
5578
captured_telemetry = []
@@ -139,6 +162,7 @@ def execute_query_worker(thread_id):
139162
assert "errors" not in response or not response["errors"]
140163
if "numProtoSuccess" in response:
141164
total_successful_events += response["numProtoSuccess"]
165+
142166
assert total_successful_events == num_threads * 2
143167

144168
assert (

tests/e2e/test_telemetry_e2e.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,45 @@ def connection(self, extra_params=()):
4444

4545

4646
@pytest.mark.serial
47+
@pytest.mark.xdist_group(name="serial_telemetry")
4748
class TestTelemetryE2E(TelemetryTestBase):
4849
"""E2E tests for telemetry scenarios - must run serially due to shared host-level telemetry client"""
4950

5051
@pytest.fixture(autouse=True)
5152
def telemetry_setup_teardown(self):
5253
"""Clean up telemetry client state before and after each test"""
54+
# Clean up BEFORE test starts
55+
# Use wait=True to ensure all pending telemetry from previous tests completes
56+
if TelemetryClientFactory._executor:
57+
TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for pending telemetry
58+
TelemetryClientFactory._executor = None
59+
TelemetryClientFactory._stop_flush_thread()
60+
TelemetryClientFactory._flush_event.clear() # Clear the event flag
61+
TelemetryClientFactory._clients.clear()
62+
TelemetryClientFactory._initialized = False
63+
64+
# Clear feature flags cache before test starts
65+
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
66+
with FeatureFlagsContextFactory._lock:
67+
FeatureFlagsContextFactory._context_map.clear()
68+
if FeatureFlagsContextFactory._executor:
69+
FeatureFlagsContextFactory._executor.shutdown(wait=False)
70+
FeatureFlagsContextFactory._executor = None
71+
5372
try:
5473
yield
5574
finally:
75+
# Clean up AFTER test ends
76+
# Use wait=True to ensure this test's telemetry completes
5677
if TelemetryClientFactory._executor:
57-
TelemetryClientFactory._executor.shutdown(wait=True)
78+
TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for this test's telemetry
5879
TelemetryClientFactory._executor = None
5980
TelemetryClientFactory._stop_flush_thread()
81+
TelemetryClientFactory._flush_event.clear() # Clear the event flag
82+
TelemetryClientFactory._clients.clear()
6083
TelemetryClientFactory._initialized = False
6184

62-
# Clear feature flags cache to prevent state leakage between tests
63-
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
85+
# Clear feature flags cache after test ends
6486
with FeatureFlagsContextFactory._lock:
6587
FeatureFlagsContextFactory._context_map.clear()
6688
if FeatureFlagsContextFactory._executor:

0 commit comments

Comments
 (0)