Skip to content

Commit e3038d8

Browse files
authored
RTC Latency improvements (#149)
1 parent 75f355c commit e3038d8

File tree

8 files changed

+358
-249
lines changed

8 files changed

+358
-249
lines changed

getstream/base.py

Lines changed: 85 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import time
3+
import uuid
34
from typing import Any, Dict, Optional, Type, get_origin
45

56
from getstream.models import APIError
@@ -16,7 +17,11 @@
1617
span_request,
1718
current_operation,
1819
metric_attributes,
20+
with_span,
21+
get_current_call_cid,
22+
get_current_channel_cid,
1923
)
24+
import ijson
2025

2126

2227
def build_path(path: str, path_params: dict) -> str:
@@ -28,6 +33,7 @@ def build_path(path: str, path_params: dict) -> str:
2833

2934

3035
class ResponseParserMixin:
36+
@with_span("parse_response")
3137
def _parse_response(
3238
self, response: httpx.Response, data_type: Type[T]
3339
) -> StreamResponse[T]:
@@ -89,23 +95,28 @@ def _normalize_endpoint_from_path(self, path: str) -> str:
8995
return ".".join(norm_parts) if norm_parts else "root"
9096

9197
def _prepare_request(self, method: str, path: str, query_params, kwargs):
98+
headers = kwargs.get("headers", {})
9299
path_params = kwargs.get("path_params") if kwargs else None
93100
url_path = (
94101
build_path(path, path_params) if path_params else build_path(path, None)
95102
)
96103
url_full = f"{self.base_url}{url_path}"
97104
endpoint = self._endpoint_name(path)
105+
client_request_id = str(uuid.uuid4())
106+
headers["x-client-request-id"] = client_request_id
107+
kwargs["headers"] = headers
98108
span_attrs = common_attributes(
99109
api_key=self.api_key,
100110
endpoint=endpoint,
101111
method=method,
102112
url=url_full,
113+
client_request_id=client_request_id,
103114
)
104115
# Enrich with contextual IDs when available (set by decorators)
105-
call_cid = getattr(self, "_call_cid", None)
116+
call_cid = get_current_call_cid()
106117
if call_cid:
107118
span_attrs["stream.call_cid"] = call_cid
108-
channel_cid = getattr(self, "_channel_cid", None)
119+
channel_cid = get_current_channel_cid()
109120
if channel_cid:
110121
span_attrs["stream.channel_cid"] = channel_cid
111122
return url_path, url_full, endpoint, span_attrs
@@ -145,7 +156,14 @@ def _endpoint_name(self, path: str) -> str:
145156
return op or current_operation(self._normalize_endpoint_from_path(path))
146157

147158
def _request_sync(
148-
self, method: str, path: str, *, query_params=None, args=(), kwargs=None
159+
self,
160+
method: str,
161+
path: str,
162+
*,
163+
query_params=None,
164+
args=(),
165+
kwargs=None,
166+
data_type: Optional[Type[T]] = None,
149167
):
150168
kwargs = kwargs or {}
151169
url_path, url_full, endpoint, attrs = self._prepare_request(
@@ -161,22 +179,26 @@ def _request_sync(
161179
response = getattr(self.client, method.lower())(
162180
url_path, params=query_params, *args, **call_kwargs
163181
)
182+
duration = parse_duration_from_body(response.content)
183+
if duration:
184+
span.set_attribute("http.server.duration", duration)
164185
try:
165186
span and span.set_attribute(
166187
"http.response.status_code", response.status_code
167188
)
168189
except Exception:
169190
pass
170-
duration_ms = (time.perf_counter() - start) * 1000.0
171-
# Metrics should be low-cardinality: exclude url/call_cid/channel_cid
172-
metric_attrs = metric_attributes(
173-
api_key=self.api_key,
174-
endpoint=endpoint,
175-
method=method,
176-
status_code=getattr(response, "status_code", None),
177-
)
178-
record_metrics(duration_ms, attributes=metric_attrs)
179-
return response
191+
192+
duration_ms = (time.perf_counter() - start) * 1000.0
193+
# Metrics should be low-cardinality: exclude url/call_cid/channel_cid
194+
metric_attrs = metric_attributes(
195+
api_key=self.api_key,
196+
endpoint=endpoint,
197+
method=method,
198+
status_code=getattr(response, "status_code", None),
199+
)
200+
record_metrics(duration_ms, attributes=metric_attrs)
201+
return self._parse_response(response, data_type or Dict[str, Any])
180202

181203
def patch(
182204
self,
@@ -187,14 +209,14 @@ def patch(
187209
*args,
188210
**kwargs,
189211
) -> StreamResponse[T]:
190-
response = self._request_sync(
212+
return self._request_sync(
191213
"PATCH",
192214
path,
193215
query_params=query_params,
194216
args=args,
195217
kwargs=kwargs | {"path_params": path_params},
218+
data_type=data_type,
196219
)
197-
return self._parse_response(response, data_type or Dict[str, Any])
198220

199221
def get(
200222
self,
@@ -205,14 +227,14 @@ def get(
205227
*args,
206228
**kwargs,
207229
) -> StreamResponse[T]:
208-
response = self._request_sync(
230+
return self._request_sync(
209231
"GET",
210232
path,
211233
query_params=query_params,
212234
args=args,
213235
kwargs=kwargs | {"path_params": path_params},
236+
data_type=data_type,
214237
)
215-
return self._parse_response(response, data_type or Dict[str, Any])
216238

217239
def post(
218240
self,
@@ -223,14 +245,14 @@ def post(
223245
*args,
224246
**kwargs,
225247
) -> StreamResponse[T]:
226-
response = self._request_sync(
248+
return self._request_sync(
227249
"POST",
228250
path,
229251
query_params=query_params,
230252
args=args,
231253
kwargs=kwargs | {"path_params": path_params},
254+
data_type=data_type,
232255
)
233-
return self._parse_response(response, data_type or Dict[str, Any])
234256

235257
def put(
236258
self,
@@ -241,14 +263,14 @@ def put(
241263
*args,
242264
**kwargs,
243265
) -> StreamResponse[T]:
244-
response = self._request_sync(
266+
return self._request_sync(
245267
"PUT",
246268
path,
247269
query_params=query_params,
248270
args=args,
249271
kwargs=kwargs | {"path_params": path_params},
272+
data_type=data_type,
250273
)
251-
return self._parse_response(response, data_type or Dict[str, Any])
252274

253275
def delete(
254276
self,
@@ -259,14 +281,14 @@ def delete(
259281
*args,
260282
**kwargs,
261283
) -> StreamResponse[T]:
262-
response = self._request_sync(
284+
return self._request_sync(
263285
"DELETE",
264286
path,
265287
query_params=query_params,
266288
args=args,
267289
kwargs=kwargs | {"path_params": path_params},
290+
data_type=data_type,
268291
)
269-
return self._parse_response(response, data_type or Dict[str, Any])
270292

271293
def close(self):
272294
"""
@@ -313,9 +335,17 @@ def _endpoint_name(self, path: str) -> str:
313335
return op or current_operation(self._normalize_endpoint_from_path(path))
314336

315337
async def _request_async(
316-
self, method: str, path: str, *, query_params=None, args=(), kwargs=None
338+
self,
339+
method: str,
340+
path: str,
341+
*,
342+
query_params=None,
343+
args=(),
344+
kwargs=None,
345+
data_type: Optional[Type[T]] = None,
317346
):
318347
kwargs = kwargs or {}
348+
query_params = query_params or {}
319349
url_path, url_full, endpoint, attrs = self._prepare_request(
320350
method, path, query_params, kwargs
321351
)
@@ -328,22 +358,26 @@ async def _request_async(
328358
response = await getattr(self.client, method.lower())(
329359
url_path, params=query_params, *args, **call_kwargs
330360
)
361+
duration = parse_duration_from_body(response.content)
362+
if duration:
363+
span.set_attribute("http.server.duration", duration)
331364
try:
332365
span and span.set_attribute(
333366
"http.response.status_code", response.status_code
334367
)
335368
except Exception:
336369
pass
337-
duration_ms = (time.perf_counter() - start) * 1000.0
338-
# Metrics should be low-cardinality: exclude url/call_cid/channel_cid
339-
metric_attrs = metric_attributes(
340-
api_key=self.api_key,
341-
endpoint=endpoint,
342-
method=method,
343-
status_code=getattr(response, "status_code", None),
344-
)
345-
record_metrics(duration_ms, attributes=metric_attrs)
346-
return response
370+
371+
duration_ms = (time.perf_counter() - start) * 1000.0
372+
# Metrics should be low-cardinality: exclude url/call_cid/channel_cid
373+
metric_attrs = metric_attributes(
374+
api_key=self.api_key,
375+
endpoint=endpoint,
376+
method=method,
377+
status_code=getattr(response, "status_code", None),
378+
)
379+
record_metrics(duration_ms, attributes=metric_attrs)
380+
return self._parse_response(response, data_type or Dict[str, Any])
347381

348382
async def patch(
349383
self,
@@ -354,14 +388,14 @@ async def patch(
354388
*args,
355389
**kwargs,
356390
) -> StreamResponse[T]:
357-
response = await self._request_async(
391+
return await self._request_async(
358392
"PATCH",
359393
path,
360394
query_params=query_params,
361395
args=args,
362396
kwargs=kwargs | {"path_params": path_params},
397+
data_type=data_type,
363398
)
364-
return self._parse_response(response, data_type or Dict[str, Any])
365399

366400
async def get(
367401
self,
@@ -372,14 +406,14 @@ async def get(
372406
*args,
373407
**kwargs,
374408
) -> StreamResponse[T]:
375-
response = await self._request_async(
409+
return await self._request_async(
376410
"GET",
377411
path,
378412
query_params=query_params,
379413
args=args,
380414
kwargs=kwargs | {"path_params": path_params},
415+
data_type=data_type,
381416
)
382-
return self._parse_response(response, data_type or Dict[str, Any])
383417

384418
async def post(
385419
self,
@@ -390,14 +424,14 @@ async def post(
390424
*args,
391425
**kwargs,
392426
) -> StreamResponse[T]:
393-
response = await self._request_async(
427+
return await self._request_async(
394428
"POST",
395429
path,
396430
query_params=query_params,
397431
args=args,
398432
kwargs=kwargs | {"path_params": path_params},
433+
data_type=data_type,
399434
)
400-
return self._parse_response(response, data_type or Dict[str, Any])
401435

402436
async def put(
403437
self,
@@ -408,14 +442,14 @@ async def put(
408442
*args,
409443
**kwargs,
410444
) -> StreamResponse[T]:
411-
response = await self._request_async(
445+
return await self._request_async(
412446
"PUT",
413447
path,
414448
query_params=query_params,
415449
args=args,
416450
kwargs=kwargs | {"path_params": path_params},
451+
data_type=data_type,
417452
)
418-
return self._parse_response(response, data_type or Dict[str, Any])
419453

420454
async def delete(
421455
self,
@@ -426,14 +460,14 @@ async def delete(
426460
*args,
427461
**kwargs,
428462
) -> StreamResponse[T]:
429-
response = await self._request_async(
463+
return await self._request_async(
430464
"DELETE",
431465
path,
432466
query_params=query_params,
433467
args=args,
434468
kwargs=kwargs | {"path_params": path_params},
469+
data_type=data_type,
435470
)
436-
return self._parse_response(response, data_type or Dict[str, Any])
437471

438472

439473
class StreamAPIException(Exception):
@@ -478,3 +512,10 @@ def __str__(self) -> str:
478512
return f'Stream error code {self.api_error.code}: {self.api_error.message}"'
479513
else:
480514
return f"Stream error HTTP code: {self.status_code}"
515+
516+
517+
def parse_duration_from_body(body: bytes) -> Optional[str]:
518+
for prefix, event, value in ijson.parse(body):
519+
if prefix == "duration" and event == "string":
520+
return value
521+
return None

0 commit comments

Comments
 (0)