Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pyarrow = [
{ version = ">=18.0.0", python = ">=3.13", optional=true }
]
pyjwt = "^2.0.0"
pybreaker = "^1.0.0"
requests-kerberos = {version = "^0.15.0", optional = true}


Expand Down
6 changes: 6 additions & 0 deletions src/databricks/sql/auth/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
pool_connections: Optional[int] = None,
pool_maxsize: Optional[int] = None,
user_agent: Optional[str] = None,
telemetry_circuit_breaker_enabled: Optional[bool] = None,
):
self.hostname = hostname
self.access_token = access_token
Expand Down Expand Up @@ -81,6 +82,11 @@ def __init__(
self.pool_connections = pool_connections or 10
self.pool_maxsize = pool_maxsize or 20
self.user_agent = user_agent
self.telemetry_circuit_breaker_enabled = (
telemetry_circuit_breaker_enabled
if telemetry_circuit_breaker_enabled is not None
else False
)


def get_effective_azure_login_app_id(hostname) -> str:
Expand Down
138 changes: 138 additions & 0 deletions src/databricks/sql/telemetry/circuit_breaker_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
Circuit breaker implementation for telemetry requests.

This module provides circuit breaker functionality to prevent telemetry failures
from impacting the main SQL operations. It uses pybreaker library to implement
the circuit breaker pattern with configurable thresholds and timeouts.
"""

import logging
import threading
from typing import Dict, Optional, Any
from dataclasses import dataclass

import pybreaker
from pybreaker import CircuitBreaker, CircuitBreakerError, CircuitBreakerListener

logger = logging.getLogger(__name__)

# Circuit Breaker Configuration Constants
MINIMUM_CALLS = 20
RESET_TIMEOUT = 30
CIRCUIT_BREAKER_NAME = "telemetry-circuit-breaker"

# Circuit Breaker State Constants
CIRCUIT_BREAKER_STATE_OPEN = "open"
CIRCUIT_BREAKER_STATE_CLOSED = "closed"
CIRCUIT_BREAKER_STATE_HALF_OPEN = "half-open"
CIRCUIT_BREAKER_STATE_DISABLED = "disabled"

# Logging Message Constants
LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s"
LOG_CIRCUIT_BREAKER_OPENED = (
"Circuit breaker opened for %s - telemetry requests will be blocked"
)
LOG_CIRCUIT_BREAKER_CLOSED = (
"Circuit breaker closed for %s - telemetry requests will be allowed"
)
LOG_CIRCUIT_BREAKER_HALF_OPEN = (
"Circuit breaker half-open for %s - testing telemetry requests"
)


class CircuitBreakerStateListener(CircuitBreakerListener):
"""Listener for circuit breaker state changes."""

def before_call(self, cb: CircuitBreaker, func, *args, **kwargs) -> None:
"""Called before the circuit breaker calls a function."""
pass

def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:
"""Called when a function called by the circuit breaker fails."""
pass

def success(self, cb: CircuitBreaker) -> None:
"""Called when a function called by the circuit breaker succeeds."""
pass

def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
"""Called when the circuit breaker state changes."""
old_state_name = old_state.name if old_state else "None"
new_state_name = new_state.name if new_state else "None"

logger.info(
LOG_CIRCUIT_BREAKER_STATE_CHANGED, old_state_name, new_state_name, cb.name
)

if new_state_name == CIRCUIT_BREAKER_STATE_OPEN:
logger.warning(LOG_CIRCUIT_BREAKER_OPENED, cb.name)
elif new_state_name == CIRCUIT_BREAKER_STATE_CLOSED:
logger.info(LOG_CIRCUIT_BREAKER_CLOSED, cb.name)
elif new_state_name == CIRCUIT_BREAKER_STATE_HALF_OPEN:
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name)


class CircuitBreakerManager:
"""
Manages circuit breaker instances for telemetry requests.

This class provides a singleton pattern to manage circuit breaker instances
per host, ensuring that telemetry failures don't impact main SQL operations.

Circuit breaker configuration is fixed and cannot be overridden.
"""

_instances: Dict[str, CircuitBreaker] = {}
_lock = threading.RLock()

@classmethod
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
"""
Get or create a circuit breaker instance for the specified host.

Args:
host: The hostname for which to get the circuit breaker

Returns:
CircuitBreaker instance for the host
"""
with cls._lock:
if host not in cls._instances:
cls._instances[host] = cls._create_circuit_breaker(host)
logger.debug("Created circuit breaker for host: %s", host)

return cls._instances[host]

@classmethod
def _create_circuit_breaker(cls, host: str) -> CircuitBreaker:
"""
Create a new circuit breaker instance for the specified host.

Args:
host: The hostname for the circuit breaker

Returns:
New CircuitBreaker instance
"""
# Create circuit breaker with fixed configuration
breaker = CircuitBreaker(
fail_max=MINIMUM_CALLS,
reset_timeout=RESET_TIMEOUT,
name=f"{CIRCUIT_BREAKER_NAME}-{host}",
)
breaker.add_listener(CircuitBreakerStateListener())

return breaker


def is_circuit_breaker_error(exception: Exception) -> bool:
"""
Check if an exception is a circuit breaker error.

Args:
exception: The exception to check

Returns:
True if the exception is a circuit breaker error
"""
return isinstance(exception, CircuitBreakerError)
36 changes: 33 additions & 3 deletions src/databricks/sql/telemetry/telemetry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
from databricks.sql.common.unified_http_client import UnifiedHttpClient
from databricks.sql.common.http import HttpMethod
from databricks.sql.telemetry.telemetry_push_client import (
ITelemetryPushClient,
TelemetryPushClient,
CircuitBreakerTelemetryPushClient,
)
from databricks.sql.telemetry.circuit_breaker_manager import (
is_circuit_breaker_error,
)

if TYPE_CHECKING:
from databricks.sql.client import Connection
Expand Down Expand Up @@ -189,6 +197,21 @@ def __init__(
# Create own HTTP client from client context
self._http_client = UnifiedHttpClient(client_context)

# Create telemetry push client based on circuit breaker enabled flag
if client_context.telemetry_circuit_breaker_enabled:
# Create circuit breaker telemetry push client with fixed configuration
self._telemetry_push_client: ITelemetryPushClient = (
CircuitBreakerTelemetryPushClient(
TelemetryPushClient(self._http_client),
host_url,
)
)
else:
# Circuit breaker disabled - use direct telemetry push client
self._telemetry_push_client: ITelemetryPushClient = TelemetryPushClient(
self._http_client
)

def _export_event(self, event):
"""Add an event to the batch queue and flush if batch is full"""
logger.debug("Exporting event for connection %s", self._session_id_hex)
Expand Down Expand Up @@ -252,14 +275,21 @@ def _send_telemetry(self, events):
logger.debug("Failed to submit telemetry request: %s", e)

def _send_with_unified_client(self, url, data, headers, timeout=900):
"""Helper method to send telemetry using the unified HTTP client."""
"""Helper method to send telemetry using the telemetry push client."""
try:
response = self._http_client.request(
response = self._telemetry_push_client.request(
HttpMethod.POST, url, body=data, headers=headers, timeout=timeout
)
return response
except Exception as e:
logger.error("Failed to send telemetry with unified client: %s", e)
if is_circuit_breaker_error(e):
logger.warning(
"Telemetry request blocked by circuit breaker for connection %s: %s",
self._session_id_hex,
e,
)
else:
logger.error("Failed to send telemetry: %s", e)
raise

def _telemetry_request_callback(self, future, sent_count: int):
Expand Down
Loading
Loading