Skip to content

Commit bcd6760

Browse files
committed
remove cb congig class to constants
Signed-off-by: Nikhil Suri <[email protected]>
1 parent c646335 commit bcd6760

10 files changed

+251
-152
lines changed

src/databricks/sql/common/unified_http_client.py

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,42 @@
2828
logger = logging.getLogger(__name__)
2929

3030

31+
def _extract_http_status_from_max_retry_error(e: MaxRetryError) -> Optional[int]:
32+
"""
33+
Extract HTTP status code from MaxRetryError if available.
34+
35+
urllib3 structures MaxRetryError in different ways depending on the failure scenario:
36+
- e.reason.response.status: Most common case when retries are exhausted
37+
- e.response.status: Alternate structure in some scenarios
38+
39+
Args:
40+
e: MaxRetryError exception from urllib3
41+
42+
Returns:
43+
HTTP status code as int if found, None otherwise
44+
"""
45+
# Try primary structure: e.reason.response.status
46+
if (
47+
hasattr(e, "reason")
48+
and e.reason is not None
49+
and hasattr(e.reason, "response")
50+
and e.reason.response is not None
51+
):
52+
http_code = getattr(e.reason.response, "status", None)
53+
if http_code is not None:
54+
return http_code
55+
56+
# Try alternate structure: e.response.status
57+
if (
58+
hasattr(e, "response")
59+
and e.response is not None
60+
and hasattr(e.response, "status")
61+
):
62+
return e.response.status
63+
64+
return None
65+
66+
3167
class UnifiedHttpClient:
3268
"""
3369
Unified HTTP client for all Databricks SQL connector HTTP operations.
@@ -265,23 +301,8 @@ def request_context(
265301
except MaxRetryError as e:
266302
logger.error("HTTP request failed after retries: %s", e)
267303

268-
# Try to extract HTTP status code from the MaxRetryError
269-
http_code = None
270-
if (
271-
hasattr(e, "reason")
272-
and e.reason is not None
273-
and hasattr(e.reason, "response")
274-
and e.reason.response is not None
275-
):
276-
# The reason may contain a response object with status
277-
http_code = getattr(e.reason.response, "status", None)
278-
elif (
279-
hasattr(e, "response")
280-
and e.response is not None
281-
and hasattr(e.response, "status")
282-
):
283-
# Or the error itself may have a response
284-
http_code = e.response.status
304+
# Extract HTTP status code from MaxRetryError if available
305+
http_code = _extract_http_status_from_max_retry_error(e)
285306

286307
context = {}
287308
if http_code is not None:

src/databricks/sql/telemetry/circuit_breaker_manager.py

Lines changed: 16 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
44
This module provides circuit breaker functionality to prevent telemetry failures
55
from impacting the main SQL operations. It uses pybreaker library to implement
6-
the circuit breaker pattern with configurable thresholds and timeouts.
6+
the circuit breaker pattern.
77
"""
88

99
import logging
1010
import threading
11-
from typing import Dict, Optional, Any
12-
from dataclasses import dataclass
11+
from typing import Dict
1312

1413
import pybreaker
1514
from pybreaker import CircuitBreaker, CircuitBreakerError, CircuitBreakerListener
@@ -18,10 +17,10 @@
1817

1918
logger = logging.getLogger(__name__)
2019

21-
# Circuit Breaker Configuration Constants
22-
DEFAULT_MINIMUM_CALLS = 20
23-
DEFAULT_RESET_TIMEOUT = 30
24-
DEFAULT_NAME = "telemetry-circuit-breaker"
20+
# Circuit Breaker Constants
21+
MINIMUM_CALLS = 20 # Number of failures before circuit opens
22+
RESET_TIMEOUT = 30 # Seconds to wait before trying to close circuit
23+
NAME_PREFIX = "telemetry-circuit-breaker"
2524

2625
# Circuit Breaker State Constants (used in logging)
2726
CIRCUIT_BREAKER_STATE_OPEN = "open"
@@ -73,47 +72,16 @@ def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
7372
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name)
7473

7574

76-
@dataclass(frozen=True)
77-
class CircuitBreakerConfig:
78-
"""Configuration for circuit breaker behavior.
79-
80-
This class is immutable to prevent modification of circuit breaker settings.
81-
All configuration values are set to constants defined at the module level.
82-
"""
83-
84-
# Minimum number of calls before circuit can open
85-
minimum_calls: int = DEFAULT_MINIMUM_CALLS
86-
87-
# Time to wait before trying to close circuit (in seconds)
88-
reset_timeout: int = DEFAULT_RESET_TIMEOUT
89-
90-
# Name for the circuit breaker (for logging)
91-
name: str = DEFAULT_NAME
92-
93-
9475
class CircuitBreakerManager:
9576
"""
9677
Manages circuit breaker instances for telemetry requests.
9778
98-
This class provides a singleton pattern to manage circuit breaker instances
99-
per host, ensuring that telemetry failures don't impact main SQL operations.
79+
Creates and caches circuit breaker instances per host to ensure telemetry
80+
failures don't impact main SQL operations.
10081
"""
10182

10283
_instances: Dict[str, CircuitBreaker] = {}
10384
_lock = threading.RLock()
104-
_config: Optional[CircuitBreakerConfig] = None
105-
106-
@classmethod
107-
def initialize(cls, config: CircuitBreakerConfig) -> None:
108-
"""
109-
Initialize the circuit breaker manager with configuration.
110-
111-
Args:
112-
config: Circuit breaker configuration
113-
"""
114-
with cls._lock:
115-
cls._config = config
116-
logger.debug("CircuitBreakerManager initialized with config: %s", config)
11785

11886
@classmethod
11987
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
@@ -126,56 +94,16 @@ def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
12694
Returns:
12795
CircuitBreaker instance for the host
12896
"""
129-
if not cls._config:
130-
# Return a no-op circuit breaker if not initialized
131-
return cls._create_noop_circuit_breaker()
132-
13397
with cls._lock:
13498
if host not in cls._instances:
135-
cls._instances[host] = cls._create_circuit_breaker(host)
99+
breaker = CircuitBreaker(
100+
fail_max=MINIMUM_CALLS,
101+
reset_timeout=RESET_TIMEOUT,
102+
name=f"{NAME_PREFIX}-{host}",
103+
)
104+
# Add state change listener for logging
105+
breaker.add_listener(CircuitBreakerStateListener())
106+
cls._instances[host] = breaker
136107
logger.debug("Created circuit breaker for host: %s", host)
137108

138109
return cls._instances[host]
139-
140-
@classmethod
141-
def _create_circuit_breaker(cls, host: str) -> CircuitBreaker:
142-
"""
143-
Create a new circuit breaker instance for the specified host.
144-
145-
Args:
146-
host: The hostname for the circuit breaker
147-
148-
Returns:
149-
New CircuitBreaker instance
150-
"""
151-
config = cls._config
152-
if config is None:
153-
raise RuntimeError("CircuitBreakerManager not initialized")
154-
155-
# Create circuit breaker with configuration
156-
breaker = CircuitBreaker(
157-
fail_max=config.minimum_calls, # Number of failures before circuit opens
158-
reset_timeout=config.reset_timeout,
159-
name=f"{config.name}-{host}",
160-
)
161-
162-
# Add state change listeners for logging
163-
breaker.add_listener(CircuitBreakerStateListener())
164-
165-
return breaker
166-
167-
@classmethod
168-
def _create_noop_circuit_breaker(cls) -> CircuitBreaker:
169-
"""
170-
Create a no-op circuit breaker that always allows calls.
171-
172-
Returns:
173-
CircuitBreaker that never opens
174-
"""
175-
# Create a circuit breaker with very high thresholds so it never opens
176-
breaker = CircuitBreaker(
177-
fail_max=1000000, # Very high threshold
178-
reset_timeout=1, # Short reset time
179-
name="noop-circuit-breaker",
180-
)
181-
return breaker

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def __init__(
196196

197197
# Create telemetry push client based on circuit breaker enabled flag
198198
if client_context.telemetry_circuit_breaker_enabled:
199-
# Create circuit breaker telemetry push client with fixed configuration
199+
# Create circuit breaker telemetry push client (circuit breakers created on-demand)
200200
self._telemetry_push_client: ITelemetryPushClient = (
201201
CircuitBreakerTelemetryPushClient(
202202
TelemetryPushClient(self._http_client),

src/databricks/sql/telemetry/telemetry_push_client.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,18 @@ def _create_mock_success_response(self) -> BaseHTTPResponse:
9191
9292
This allows telemetry to fail silently without raising exceptions.
9393
"""
94-
from unittest.mock import Mock
94+
# Create a simple object that mimics BaseHTTPResponse interface
95+
class _MockTelemetryResponse:
96+
"""Simple response object for silently handling circuit breaker state."""
9597

96-
mock_response = Mock(spec=BaseHTTPResponse)
97-
mock_response.status = 200
98-
mock_response.data = b'{"numProtoSuccess": 0, "errors": []}'
99-
return mock_response
98+
status = 200
99+
# Include all required fields for TelemetryResponse dataclass
100+
data = b'{"numProtoSuccess": 0, "numSuccess": 0, "numRealtimeSuccess": 0, "errors": []}'
101+
102+
def close(self):
103+
pass
104+
105+
return _MockTelemetryResponse()
100106

101107
def request(
102108
self,

tests/unit/test_circuit_breaker_http_client.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def test_request_enabled_other_error(self):
9191
response = self.client.request(HttpMethod.POST, "https://test.com", {})
9292
assert response is not None
9393
assert response.status == 200
94+
assert b"numProtoSuccess" in response.data
9495

9596
def test_is_circuit_breaker_enabled(self):
9697
"""Test checking if circuit breaker is enabled."""
@@ -146,14 +147,12 @@ def test_circuit_breaker_opens_after_failures(self):
146147
"""Test that circuit breaker opens after repeated failures (429/503 errors)."""
147148
from databricks.sql.telemetry.circuit_breaker_manager import (
148149
CircuitBreakerManager,
149-
CircuitBreakerConfig,
150-
DEFAULT_MINIMUM_CALLS as MINIMUM_CALLS,
150+
MINIMUM_CALLS,
151151
)
152152
from databricks.sql.exc import TelemetryRateLimitError
153153

154154
# Clear any existing state
155155
CircuitBreakerManager._instances.clear()
156-
CircuitBreakerManager.initialize(CircuitBreakerConfig())
157156

158157
client = CircuitBreakerTelemetryPushClient(self.mock_delegate, self.host)
159158

@@ -177,15 +176,13 @@ def test_circuit_breaker_recovers_after_success(self):
177176
"""Test that circuit breaker recovers after successful calls."""
178177
from databricks.sql.telemetry.circuit_breaker_manager import (
179178
CircuitBreakerManager,
180-
CircuitBreakerConfig,
181-
DEFAULT_MINIMUM_CALLS as MINIMUM_CALLS,
182-
DEFAULT_RESET_TIMEOUT as RESET_TIMEOUT,
179+
MINIMUM_CALLS,
180+
RESET_TIMEOUT,
183181
)
184182
import time
185183

186184
# Clear any existing state
187185
CircuitBreakerManager._instances.clear()
188-
CircuitBreakerManager.initialize(CircuitBreakerConfig())
189186

190187
client = CircuitBreakerTelemetryPushClient(self.mock_delegate, self.host)
191188

@@ -198,11 +195,13 @@ def test_circuit_breaker_recovers_after_success(self):
198195
for i in range(MINIMUM_CALLS + 5):
199196
response = client.request(HttpMethod.POST, "https://test.com", {})
200197
assert response.status == 200 # Returns mock success
198+
assert b"numProtoSuccess" in response.data
201199

202200
# Circuit should be open now - still returns mock response
203201
response = client.request(HttpMethod.POST, "https://test.com", {})
204202
assert response is not None
205203
assert response.status == 200 # Mock success response
204+
assert b"numProtoSuccess" in response.data
206205

207206
# Wait for reset timeout
208207
time.sleep(RESET_TIMEOUT + 1.0)

tests/unit/test_circuit_breaker_manager.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99

1010
from databricks.sql.telemetry.circuit_breaker_manager import (
1111
CircuitBreakerManager,
12-
CircuitBreakerConfig,
13-
DEFAULT_MINIMUM_CALLS as MINIMUM_CALLS,
14-
DEFAULT_RESET_TIMEOUT as RESET_TIMEOUT,
15-
DEFAULT_NAME as CIRCUIT_BREAKER_NAME,
12+
MINIMUM_CALLS,
13+
RESET_TIMEOUT,
14+
NAME_PREFIX as CIRCUIT_BREAKER_NAME,
1615
)
1716
from pybreaker import CircuitBreakerError
1817

@@ -24,13 +23,10 @@ def setup_method(self):
2423
"""Set up test fixtures."""
2524
# Clear any existing instances
2625
CircuitBreakerManager._instances.clear()
27-
# Initialize with default config
28-
CircuitBreakerManager.initialize(CircuitBreakerConfig())
2926

3027
def teardown_method(self):
3128
"""Clean up after tests."""
3229
CircuitBreakerManager._instances.clear()
33-
CircuitBreakerManager._config = None
3430

3531
def test_get_circuit_breaker_creates_instance(self):
3632
"""Test getting circuit breaker creates instance with correct config."""
@@ -60,6 +56,16 @@ def test_get_circuit_breaker_creates_breaker(self):
6056
assert breaker is not None
6157
assert breaker.current_state in ["closed", "open", "half-open"]
6258

59+
def test_circuit_breaker_reused_for_same_host(self):
60+
"""Test that circuit breakers are reused for the same host."""
61+
# Get circuit breaker for a host
62+
breaker1 = CircuitBreakerManager.get_circuit_breaker("host1.example.com")
63+
assert breaker1 is not None
64+
65+
# Get circuit breaker again for the same host - should be SAME instance
66+
breaker2 = CircuitBreakerManager.get_circuit_breaker("host1.example.com")
67+
assert breaker2 is breaker1 # Same instance, state preserved across calls
68+
6369
def test_thread_safety(self):
6470
"""Test thread safety of circuit breaker manager."""
6571
results = []
@@ -92,13 +98,10 @@ class TestCircuitBreakerIntegration:
9298
def setup_method(self):
9399
"""Set up test fixtures."""
94100
CircuitBreakerManager._instances.clear()
95-
# Initialize with default config
96-
CircuitBreakerManager.initialize(CircuitBreakerConfig())
97101

98102
def teardown_method(self):
99103
"""Clean up after tests."""
100104
CircuitBreakerManager._instances.clear()
101-
CircuitBreakerManager._config = None
102105

103106
def test_circuit_breaker_state_transitions(self):
104107
"""Test circuit breaker state transitions."""

0 commit comments

Comments
 (0)