|
18 | 18 |
|
19 | 19 | from databricks.sql.common.unified_http_client import UnifiedHttpClient |
20 | 20 | from databricks.sql.common.http import HttpMethod |
21 | | -from databricks.sql.exc import TelemetryRateLimitError, RequestError |
| 21 | +from databricks.sql.exc import ( |
| 22 | + TelemetryRateLimitError, |
| 23 | + TelemetryNonRateLimitError, |
| 24 | + RequestError, |
| 25 | +) |
22 | 26 | from databricks.sql.telemetry.circuit_breaker_manager import CircuitBreakerManager |
23 | 27 |
|
24 | 28 | logger = logging.getLogger(__name__) |
@@ -85,124 +89,113 @@ def __init__(self, delegate: ITelemetryPushClient, host: str): |
85 | 89 | host, |
86 | 90 | ) |
87 | 91 |
|
88 | | - def _create_mock_success_response(self) -> BaseHTTPResponse: |
89 | | - """ |
90 | | - Create a mock success response for when circuit breaker is open. |
91 | | -
|
92 | | - This allows telemetry to fail silently without raising exceptions. |
93 | | - """ |
94 | | - # Create a simple object that mimics BaseHTTPResponse interface |
95 | | - class _MockTelemetryResponse: |
96 | | - """Simple response object for silently handling circuit breaker state.""" |
97 | | - |
98 | | - status = 200 |
99 | | - # Include all required fields for TelemetryResponse dataclass |
100 | | - data = b'{"numProtoSuccess": 0, "numSuccess": 0, "numRealtimeSuccess": 0, "errors": []}' |
101 | | - |
102 | | - def close(self): |
103 | | - pass |
104 | | - |
105 | | - return _MockTelemetryResponse() |
106 | | - |
107 | | - def request( |
| 92 | + def _make_request_and_check_status( |
108 | 93 | self, |
109 | 94 | method: HttpMethod, |
110 | 95 | url: str, |
111 | | - headers: Optional[Dict[str, str]] = None, |
| 96 | + headers: Optional[Dict[str, str]], |
112 | 97 | **kwargs, |
113 | 98 | ) -> BaseHTTPResponse: |
114 | 99 | """ |
115 | | - Make an HTTP request with circuit breaker protection. |
| 100 | + Make the request and check response status. |
| 101 | +
|
| 102 | + Raises TelemetryRateLimitError for 429/503 (circuit breaker counts these). |
| 103 | + Wraps other errors in TelemetryNonRateLimitError (circuit breaker excludes these). |
116 | 104 |
|
117 | | - Circuit breaker only opens for 429/503 responses (rate limiting). |
118 | | - If circuit breaker is open, silently drops the telemetry request. |
119 | | - Other errors fail silently without triggering circuit breaker. |
| 105 | + Args: |
| 106 | + method: HTTP method |
| 107 | + url: Request URL |
| 108 | + headers: Request headers |
| 109 | + **kwargs: Additional request parameters |
| 110 | +
|
| 111 | + Returns: |
| 112 | + HTTP response |
| 113 | +
|
| 114 | + Raises: |
| 115 | + TelemetryRateLimitError: For 429/503 status codes (circuit breaker counts) |
| 116 | + TelemetryNonRateLimitError: For other errors (circuit breaker excludes) |
120 | 117 | """ |
| 118 | + try: |
| 119 | + response = self._delegate.request(method, url, headers, **kwargs) |
121 | 120 |
|
122 | | - def _make_request_and_check_status(): |
123 | | - """ |
124 | | - Function that makes the request and checks response status. |
| 121 | + # Check for rate limiting or service unavailable |
| 122 | + if response.status in [429, 503]: |
| 123 | + logger.warning( |
| 124 | + "Telemetry endpoint returned %d for host %s, triggering circuit breaker", |
| 125 | + response.status, |
| 126 | + self._host, |
| 127 | + ) |
| 128 | + raise TelemetryRateLimitError( |
| 129 | + f"Telemetry endpoint rate limited or unavailable: {response.status}" |
| 130 | + ) |
125 | 131 |
|
126 | | - Raises TelemetryRateLimitError ONLY for 429/503 so circuit breaker counts them as failures. |
127 | | - For all other errors, returns mock success response so circuit breaker does NOT count them. |
| 132 | + return response |
128 | 133 |
|
129 | | - This ensures circuit breaker only opens for rate limiting, not for network errors, |
130 | | - timeouts, or server errors. |
131 | | - """ |
132 | | - try: |
133 | | - response = self._delegate.request(method, url, headers, **kwargs) |
| 134 | + except Exception as e: |
| 135 | + # Don't catch TelemetryRateLimitError - let it propagate to circuit breaker |
| 136 | + if isinstance(e, TelemetryRateLimitError): |
| 137 | + raise |
| 138 | + |
| 139 | + # Check if it's a RequestError with rate limiting status code (exhausted retries) |
| 140 | + if isinstance(e, RequestError): |
| 141 | + http_code = ( |
| 142 | + e.context.get("http-code") |
| 143 | + if hasattr(e, "context") and e.context |
| 144 | + else None |
| 145 | + ) |
134 | 146 |
|
135 | | - # Check for rate limiting or service unavailable in successful response |
136 | | - # (case where urllib3 returns response without exhausting retries) |
137 | | - if response.status in [429, 503]: |
| 147 | + if http_code in [429, 503]: |
138 | 148 | logger.warning( |
139 | | - "Telemetry endpoint returned %d for host %s, triggering circuit breaker", |
140 | | - response.status, |
| 149 | + "Telemetry retries exhausted with status %d for host %s, triggering circuit breaker", |
| 150 | + http_code, |
141 | 151 | self._host, |
142 | 152 | ) |
143 | 153 | raise TelemetryRateLimitError( |
144 | | - f"Telemetry endpoint rate limited or unavailable: {response.status}" |
| 154 | + f"Telemetry rate limited after retries: {http_code}" |
145 | 155 | ) |
146 | 156 |
|
147 | | - return response |
148 | | - |
149 | | - except Exception as e: |
150 | | - # Don't catch TelemetryRateLimitError - let it propagate to circuit breaker |
151 | | - if isinstance(e, TelemetryRateLimitError): |
152 | | - raise |
153 | | - |
154 | | - # Check if it's a RequestError with rate limiting status code (exhausted retries) |
155 | | - if isinstance(e, RequestError): |
156 | | - http_code = ( |
157 | | - e.context.get("http-code") |
158 | | - if hasattr(e, "context") and e.context |
159 | | - else None |
160 | | - ) |
| 157 | + # NOT rate limiting (500 errors, network errors, timeouts, etc.) |
| 158 | + # Wrap in TelemetryNonRateLimitError so circuit breaker excludes it |
| 159 | + logger.debug( |
| 160 | + "Non-rate-limit telemetry error for host %s: %s, wrapping to exclude from circuit breaker", |
| 161 | + self._host, |
| 162 | + e, |
| 163 | + ) |
| 164 | + raise TelemetryNonRateLimitError(e) from e |
161 | 165 |
|
162 | | - if http_code in [429, 503]: |
163 | | - logger.warning( |
164 | | - "Telemetry retries exhausted with status %d for host %s, triggering circuit breaker", |
165 | | - http_code, |
166 | | - self._host, |
167 | | - ) |
168 | | - raise TelemetryRateLimitError( |
169 | | - f"Telemetry rate limited after retries: {http_code}" |
170 | | - ) |
171 | | - |
172 | | - # NOT rate limiting (500 errors, network errors, timeouts, etc.) |
173 | | - # Return mock success response so circuit breaker does NOT see this as a failure |
174 | | - logger.debug( |
175 | | - "Non-rate-limit telemetry error for host %s: %s, failing silently", |
176 | | - self._host, |
177 | | - e, |
178 | | - ) |
179 | | - return self._create_mock_success_response() |
| 166 | + def request( |
| 167 | + self, |
| 168 | + method: HttpMethod, |
| 169 | + url: str, |
| 170 | + headers: Optional[Dict[str, str]] = None, |
| 171 | + **kwargs, |
| 172 | + ) -> BaseHTTPResponse: |
| 173 | + """ |
| 174 | + Make an HTTP request with circuit breaker protection. |
180 | 175 |
|
| 176 | + Circuit breaker only opens for TelemetryRateLimitError (429/503 responses). |
| 177 | + Other errors are wrapped in TelemetryNonRateLimitError and excluded from circuit breaker. |
| 178 | + All exceptions propagate to caller (TelemetryClient callback handles them). |
| 179 | + """ |
181 | 180 | try: |
182 | 181 | # Use circuit breaker to protect the request |
183 | | - # The inner function will raise TelemetryRateLimitError for 429/503 |
184 | | - # which the circuit breaker will count as a failure |
185 | | - return self._circuit_breaker.call(_make_request_and_check_status) |
186 | | - |
187 | | - except Exception as e: |
188 | | - # All telemetry errors are consumed and return mock success |
189 | | - # Log appropriate message based on exception type |
190 | | - if isinstance(e, CircuitBreakerError): |
191 | | - logger.debug( |
192 | | - "Circuit breaker is open for host %s, dropping telemetry request", |
193 | | - self._host, |
194 | | - ) |
195 | | - elif isinstance(e, TelemetryRateLimitError): |
196 | | - logger.debug( |
197 | | - "Telemetry rate limited for host %s (already counted by circuit breaker): %s", |
198 | | - self._host, |
199 | | - e, |
200 | | - ) |
201 | | - else: |
202 | | - logger.debug( |
203 | | - "Unexpected telemetry error for host %s: %s, failing silently", |
204 | | - self._host, |
205 | | - e, |
206 | | - ) |
207 | | - |
208 | | - return self._create_mock_success_response() |
| 182 | + # TelemetryRateLimitError will trigger circuit breaker |
| 183 | + # TelemetryNonRateLimitError is excluded from circuit breaker |
| 184 | + return self._circuit_breaker.call( |
| 185 | + self._make_request_and_check_status, |
| 186 | + method, |
| 187 | + url, |
| 188 | + headers, |
| 189 | + **kwargs, |
| 190 | + ) |
| 191 | + |
| 192 | + except TelemetryNonRateLimitError as e: |
| 193 | + # Unwrap and re-raise original exception |
| 194 | + # Circuit breaker didn't count this, but caller should handle it |
| 195 | + logger.debug( |
| 196 | + "Non-rate-limit telemetry error for host %s, re-raising original: %s", |
| 197 | + self._host, |
| 198 | + e.original_exception, |
| 199 | + ) |
| 200 | + raise e.original_exception from e |
| 201 | + # All other exceptions (TelemetryRateLimitError, CircuitBreakerError) propagate as-is |
0 commit comments