Skip to content

Commit efbeb1a

Browse files
committed
fixed linting issues
1 parent b85cf4e commit efbeb1a

File tree

4 files changed

+131
-125
lines changed

4 files changed

+131
-125
lines changed

src/databricks/sql/auth/common.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ def __init__(
8282
self.pool_connections = pool_connections or 10
8383
self.pool_maxsize = pool_maxsize or 20
8484
self.user_agent = user_agent
85-
self.telemetry_circuit_breaker_enabled = telemetry_circuit_breaker_enabled if telemetry_circuit_breaker_enabled is not None else False
85+
self.telemetry_circuit_breaker_enabled = (
86+
telemetry_circuit_breaker_enabled
87+
if telemetry_circuit_breaker_enabled is not None
88+
else False
89+
)
8690

8791

8892
def get_effective_azure_login_app_id(hostname) -> str:

src/databricks/sql/telemetry/circuit_breaker_manager.py

Lines changed: 60 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -33,195 +33,191 @@
3333

3434
# Logging Message Constants
3535
LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s"
36-
LOG_CIRCUIT_BREAKER_OPENED = "Circuit breaker opened for %s - telemetry requests will be blocked"
37-
LOG_CIRCUIT_BREAKER_CLOSED = "Circuit breaker closed for %s - telemetry requests will be allowed"
38-
LOG_CIRCUIT_BREAKER_HALF_OPEN = "Circuit breaker half-open for %s - testing telemetry requests"
36+
LOG_CIRCUIT_BREAKER_OPENED = (
37+
"Circuit breaker opened for %s - telemetry requests will be blocked"
38+
)
39+
LOG_CIRCUIT_BREAKER_CLOSED = (
40+
"Circuit breaker closed for %s - telemetry requests will be allowed"
41+
)
42+
LOG_CIRCUIT_BREAKER_HALF_OPEN = (
43+
"Circuit breaker half-open for %s - testing telemetry requests"
44+
)
3945

4046

4147
class CircuitBreakerStateListener(CircuitBreakerListener):
4248
"""Listener for circuit breaker state changes."""
43-
49+
4450
def before_call(self, cb: CircuitBreaker, func, *args, **kwargs) -> None:
4551
"""Called before the circuit breaker calls a function."""
4652
pass
47-
53+
4854
def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:
4955
"""Called when a function called by the circuit breaker fails."""
5056
pass
51-
57+
5258
def success(self, cb: CircuitBreaker) -> None:
5359
"""Called when a function called by the circuit breaker succeeds."""
5460
pass
55-
61+
5662
def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
5763
"""Called when the circuit breaker state changes."""
5864
old_state_name = old_state.name if old_state else "None"
5965
new_state_name = new_state.name if new_state else "None"
60-
66+
6167
logger.info(
62-
LOG_CIRCUIT_BREAKER_STATE_CHANGED,
63-
old_state_name, new_state_name, cb.name
68+
LOG_CIRCUIT_BREAKER_STATE_CHANGED, old_state_name, new_state_name, cb.name
6469
)
65-
70+
6671
if new_state_name == CIRCUIT_BREAKER_STATE_OPEN:
67-
logger.warning(
68-
LOG_CIRCUIT_BREAKER_OPENED,
69-
cb.name
70-
)
72+
logger.warning(LOG_CIRCUIT_BREAKER_OPENED, cb.name)
7173
elif new_state_name == CIRCUIT_BREAKER_STATE_CLOSED:
72-
logger.info(
73-
LOG_CIRCUIT_BREAKER_CLOSED,
74-
cb.name
75-
)
74+
logger.info(LOG_CIRCUIT_BREAKER_CLOSED, cb.name)
7675
elif new_state_name == CIRCUIT_BREAKER_STATE_HALF_OPEN:
77-
logger.info(
78-
LOG_CIRCUIT_BREAKER_HALF_OPEN,
79-
cb.name
80-
)
76+
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name)
8177

8278

8379
@dataclass(frozen=True)
8480
class CircuitBreakerConfig:
8581
"""Configuration for circuit breaker behavior.
86-
82+
8783
This class is immutable to prevent modification of circuit breaker settings.
8884
All configuration values are set to constants defined at the module level.
8985
"""
90-
86+
9187
# Failure threshold percentage (0.0 to 1.0)
9288
failure_threshold: float = DEFAULT_FAILURE_THRESHOLD
93-
89+
9490
# Minimum number of calls before circuit can open
9591
minimum_calls: int = DEFAULT_MINIMUM_CALLS
96-
92+
9793
# Time window for counting failures (in seconds)
9894
timeout: int = DEFAULT_TIMEOUT
99-
95+
10096
# Time to wait before trying to close circuit (in seconds)
10197
reset_timeout: int = DEFAULT_RESET_TIMEOUT
102-
98+
10399
# Expected exception types that should trigger circuit breaker
104100
expected_exception: tuple = DEFAULT_EXPECTED_EXCEPTION
105-
101+
106102
# Name for the circuit breaker (for logging)
107103
name: str = DEFAULT_NAME
108104

109105

110106
class CircuitBreakerManager:
111107
"""
112108
Manages circuit breaker instances for telemetry requests.
113-
109+
114110
This class provides a singleton pattern to manage circuit breaker instances
115111
per host, ensuring that telemetry failures don't impact main SQL operations.
116112
"""
117-
113+
118114
_instances: Dict[str, CircuitBreaker] = {}
119115
_lock = threading.RLock()
120116
_config: Optional[CircuitBreakerConfig] = None
121-
117+
122118
@classmethod
123119
def initialize(cls, config: CircuitBreakerConfig) -> None:
124120
"""
125121
Initialize the circuit breaker manager with configuration.
126-
122+
127123
Args:
128124
config: Circuit breaker configuration
129125
"""
130126
with cls._lock:
131127
cls._config = config
132128
logger.debug("CircuitBreakerManager initialized with config: %s", config)
133-
129+
134130
@classmethod
135131
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
136132
"""
137133
Get or create a circuit breaker instance for the specified host.
138-
134+
139135
Args:
140136
host: The hostname for which to get the circuit breaker
141-
137+
142138
Returns:
143139
CircuitBreaker instance for the host
144140
"""
145141
if not cls._config:
146142
# Return a no-op circuit breaker if not initialized
147143
return cls._create_noop_circuit_breaker()
148-
144+
149145
with cls._lock:
150146
if host not in cls._instances:
151147
cls._instances[host] = cls._create_circuit_breaker(host)
152148
logger.debug("Created circuit breaker for host: %s", host)
153-
149+
154150
return cls._instances[host]
155-
151+
156152
@classmethod
157153
def _create_circuit_breaker(cls, host: str) -> CircuitBreaker:
158154
"""
159155
Create a new circuit breaker instance for the specified host.
160-
156+
161157
Args:
162158
host: The hostname for the circuit breaker
163-
159+
164160
Returns:
165161
New CircuitBreaker instance
166162
"""
167163
config = cls._config
168-
164+
if config is None:
165+
raise RuntimeError("CircuitBreakerManager not initialized")
166+
169167
# Create circuit breaker with configuration
170168
breaker = CircuitBreaker(
171169
fail_max=config.minimum_calls, # Number of failures before circuit opens
172170
reset_timeout=config.reset_timeout,
173-
name=f"{config.name}-{host}"
171+
name=f"{config.name}-{host}",
174172
)
175-
173+
176174
# Add state change listeners for logging
177175
breaker.add_listener(CircuitBreakerStateListener())
178-
176+
179177
return breaker
180-
178+
181179
@classmethod
182180
def _create_noop_circuit_breaker(cls) -> CircuitBreaker:
183181
"""
184182
Create a no-op circuit breaker that always allows calls.
185-
183+
186184
Returns:
187185
CircuitBreaker that never opens
188186
"""
189187
# Create a circuit breaker with very high thresholds so it never opens
190188
breaker = CircuitBreaker(
191189
fail_max=1000000, # Very high threshold
192-
reset_timeout=1, # Short reset time
193-
name="noop-circuit-breaker"
190+
reset_timeout=1, # Short reset time
191+
name="noop-circuit-breaker",
194192
)
195-
breaker.failure_threshold = 1.0 # 100% failure threshold
196193
return breaker
197-
198-
194+
199195
@classmethod
200196
def get_circuit_breaker_state(cls, host: str) -> str:
201197
"""
202198
Get the current state of the circuit breaker for a host.
203-
199+
204200
Args:
205201
host: The hostname
206-
202+
207203
Returns:
208204
Current state of the circuit breaker
209205
"""
210206
if not cls._config:
211207
return CIRCUIT_BREAKER_STATE_DISABLED
212-
208+
213209
with cls._lock:
214210
if host not in cls._instances:
215211
return CIRCUIT_BREAKER_STATE_NOT_INITIALIZED
216-
212+
217213
breaker = cls._instances[host]
218214
return breaker.current_state
219-
215+
220216
@classmethod
221217
def reset_circuit_breaker(cls, host: str) -> None:
222218
"""
223219
Reset the circuit breaker for a host to closed state.
224-
220+
225221
Args:
226222
host: The hostname
227223
"""
@@ -230,20 +226,20 @@ def reset_circuit_breaker(cls, host: str) -> None:
230226
# pybreaker doesn't have a reset method, we need to recreate the breaker
231227
del cls._instances[host]
232228
logger.info("Reset circuit breaker for host: %s", host)
233-
229+
234230
@classmethod
235231
def clear_circuit_breaker(cls, host: str) -> None:
236232
"""
237233
Remove the circuit breaker instance for a host.
238-
234+
239235
Args:
240236
host: The hostname
241237
"""
242238
with cls._lock:
243239
if host in cls._instances:
244240
del cls._instances[host]
245241
logger.debug("Cleared circuit breaker for host: %s", host)
246-
242+
247243
@classmethod
248244
def clear_all_circuit_breakers(cls) -> None:
249245
"""Clear all circuit breaker instances."""
@@ -255,10 +251,10 @@ def clear_all_circuit_breakers(cls) -> None:
255251
def is_circuit_breaker_error(exception: Exception) -> bool:
256252
"""
257253
Check if an exception is a circuit breaker error.
258-
254+
259255
Args:
260256
exception: The exception to check
261-
257+
262258
Returns:
263259
True if the exception is a circuit breaker error
264260
"""

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@
4444
from databricks.sql.telemetry.telemetry_push_client import (
4545
ITelemetryPushClient,
4646
TelemetryPushClient,
47-
CircuitBreakerTelemetryPushClient
47+
CircuitBreakerTelemetryPushClient,
48+
)
49+
from databricks.sql.telemetry.circuit_breaker_manager import (
50+
CircuitBreakerConfig,
51+
is_circuit_breaker_error,
4852
)
49-
from databricks.sql.telemetry.circuit_breaker_manager import CircuitBreakerConfig, is_circuit_breaker_error
5053

5154
if TYPE_CHECKING:
5255
from databricks.sql.client import Connection
@@ -194,28 +197,32 @@ def __init__(
194197

195198
# Create own HTTP client from client context
196199
self._http_client = UnifiedHttpClient(client_context)
197-
200+
198201
# Create telemetry push client based on circuit breaker enabled flag
199202
if client_context.telemetry_circuit_breaker_enabled:
200203
# Create circuit breaker configuration with hardcoded values
201204
# These values are optimized for telemetry batching and network resilience
202205
circuit_breaker_config = CircuitBreakerConfig(
203-
failure_threshold=0.5, # Opens if 50%+ of calls fail
204-
minimum_calls=20, # Minimum sample size before circuit can open
205-
timeout=30, # Time window for counting failures (seconds)
206-
reset_timeout=30, # Cool-down period before retrying (seconds)
207-
name=f"telemetry-circuit-breaker-{session_id_hex}"
206+
failure_threshold=0.5, # Opens if 50%+ of calls fail
207+
minimum_calls=20, # Minimum sample size before circuit can open
208+
timeout=30, # Time window for counting failures (seconds)
209+
reset_timeout=30, # Cool-down period before retrying (seconds)
210+
name=f"telemetry-circuit-breaker-{session_id_hex}",
208211
)
209-
212+
210213
# Create circuit breaker telemetry push client
211-
self._telemetry_push_client: ITelemetryPushClient = CircuitBreakerTelemetryPushClient(
212-
TelemetryPushClient(self._http_client),
213-
host_url,
214-
circuit_breaker_config
214+
self._telemetry_push_client: ITelemetryPushClient = (
215+
CircuitBreakerTelemetryPushClient(
216+
TelemetryPushClient(self._http_client),
217+
host_url,
218+
circuit_breaker_config,
219+
)
215220
)
216221
else:
217222
# Circuit breaker disabled - use direct telemetry push client
218-
self._telemetry_push_client: ITelemetryPushClient = TelemetryPushClient(self._http_client)
223+
self._telemetry_push_client: ITelemetryPushClient = TelemetryPushClient(
224+
self._http_client
225+
)
219226

220227
def _export_event(self, event):
221228
"""Add an event to the batch queue and flush if batch is full"""
@@ -290,7 +297,8 @@ def _send_with_unified_client(self, url, data, headers, timeout=900):
290297
if is_circuit_breaker_error(e):
291298
logger.warning(
292299
"Telemetry request blocked by circuit breaker for connection %s: %s",
293-
self._session_id_hex, e
300+
self._session_id_hex,
301+
e,
294302
)
295303
else:
296304
logger.error("Failed to send telemetry: %s", e)

0 commit comments

Comments
 (0)