Skip to content

Commit 6d7a81a

Browse files
authored
Merge pull request #8 from ydb-platform/Cherry-pick-settings-propagation
Cherry pick settings propagation
2 parents b470a62 + 1061b41 commit 6d7a81a

File tree

4 files changed

+123
-12
lines changed

4 files changed

+123
-12
lines changed

tests/test_cursors.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,11 @@ class TestCursor(BaseCursorTestSuit):
142142
def sync_cursor(
143143
self, session_pool_sync: ydb.QuerySessionPool
144144
) -> Generator[Cursor]:
145-
cursor = Cursor(session_pool_sync, ydb.QuerySerializableReadWrite())
145+
cursor = Cursor(
146+
session_pool_sync,
147+
ydb.QuerySerializableReadWrite(),
148+
request_settings=ydb.BaseRequestSettings(),
149+
)
146150
yield cursor
147151
cursor.close()
148152

@@ -177,7 +181,11 @@ class TestAsyncCursor(BaseCursorTestSuit):
177181
async def async_cursor(
178182
self, session_pool: ydb.aio.QuerySessionPool
179183
) -> AsyncGenerator[Cursor]:
180-
cursor = AsyncCursor(session_pool, ydb.QuerySerializableReadWrite())
184+
cursor = AsyncCursor(
185+
session_pool,
186+
ydb.QuerySerializableReadWrite(),
187+
request_settings=ydb.BaseRequestSettings(),
188+
)
181189
yield cursor
182190
await greenlet_spawn(cursor.close)
183191

ydb_dbapi/connections.py

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from .errors import InternalError
1919
from .errors import NotSupportedError
2020
from .utils import handle_ydb_errors
21+
from .utils import maybe_get_current_trace_id
2122

2223

2324
class IsolationLevel(str, Enum):
@@ -101,6 +102,9 @@ def __init__(
101102
self._session_pool = self._pool_cls(self._driver, size=5)
102103

103104
self._session: ydb.QuerySession | ydb.aio.QuerySession | None = None
105+
self.request_settings: ydb.BaseRequestSettings = (
106+
ydb.BaseRequestSettings()
107+
)
104108

105109
def set_isolation_level(self, isolation_level: IsolationLevel) -> None:
106110
if self._tx_context and self._tx_context.tx_id:
@@ -129,6 +133,20 @@ def get_isolation_level(self) -> str:
129133
msg = f"{self._tx_mode.name} is not supported"
130134
raise NotSupportedError(msg)
131135

136+
def set_ydb_request_settings(self, value: ydb.BaseRequestSettings) -> None:
137+
self.request_settings = value
138+
139+
def get_ydb_request_settings(self) -> ydb.BaseRequestSettings:
140+
return self.request_settings
141+
142+
def _get_request_settings(self) -> ydb.BaseRequestSettings:
143+
settings = self.request_settings.make_copy()
144+
145+
if self.request_settings.trace_id is None:
146+
settings = settings.with_trace_id(maybe_get_current_trace_id())
147+
148+
return settings
149+
132150
def _get_client_settings(self) -> ydb.QueryClientSettings:
133151
return (
134152
ydb.QueryClientSettings()
@@ -172,6 +190,7 @@ def cursor(self) -> Cursor:
172190
tx_mode=self._tx_mode,
173191
tx_context=self._tx_context,
174192
table_path_prefix=self.table_path_prefix,
193+
request_settings=self.request_settings,
175194
)
176195

177196
def wait_ready(self, timeout: int = 10) -> None:
@@ -197,15 +216,17 @@ def begin(self) -> None:
197216
@handle_ydb_errors
198217
def commit(self) -> None:
199218
if self._tx_context and self._tx_context.tx_id:
200-
self._tx_context.commit()
219+
settings = self._get_request_settings()
220+
self._tx_context.commit(settings=settings)
201221
self._session_pool.release(self._session)
202222
self._tx_context = None
203223
self._session = None
204224

205225
@handle_ydb_errors
206226
def rollback(self) -> None:
207227
if self._tx_context and self._tx_context.tx_id:
208-
self._tx_context.rollback()
228+
settings = self._get_request_settings()
229+
self._tx_context.rollback(settings=settings)
209230
self._session_pool.release(self._session)
210231
self._tx_context = None
211232
self._session = None
@@ -223,10 +244,15 @@ def close(self) -> None:
223244

224245
@handle_ydb_errors
225246
def describe(self, table_path: str) -> ydb.TableSchemeEntry:
247+
settings = self._get_request_settings()
248+
226249
abs_table_path = posixpath.join(
227250
self.database, self.table_path_prefix, table_path
228251
)
229-
return self._driver.table_client.describe_table(abs_table_path)
252+
return self._driver.table_client.describe_table(
253+
abs_table_path,
254+
settings=settings,
255+
)
230256

231257
@handle_ydb_errors
232258
def check_exists(self, table_path: str) -> bool:
@@ -243,9 +269,12 @@ def get_table_names(self) -> list[str]:
243269

244270
def _check_path_exists(self, table_path: str) -> bool:
245271
try:
272+
settings = self._get_request_settings()
246273

247274
def callee() -> None:
248-
self._driver.scheme_client.describe_path(table_path)
275+
self._driver.scheme_client.describe_path(
276+
table_path, settings=settings
277+
)
249278

250279
retry_operation_sync(callee)
251280
except ydb.SchemeError:
@@ -254,8 +283,13 @@ def callee() -> None:
254283
return True
255284

256285
def _get_table_names(self, abs_dir_path: str) -> list[str]:
286+
settings = self._get_request_settings()
287+
257288
def callee() -> ydb.Directory:
258-
return self._driver.scheme_client.list_directory(abs_dir_path)
289+
return self._driver.scheme_client.list_directory(
290+
abs_dir_path,
291+
settings=settings,
292+
)
259293

260294
directory = retry_operation_sync(callee)
261295
result = []
@@ -300,6 +334,7 @@ def cursor(self) -> AsyncCursor:
300334
tx_mode=self._tx_mode,
301335
tx_context=self._tx_context,
302336
table_path_prefix=self.table_path_prefix,
337+
request_settings=self.request_settings,
303338
)
304339

305340
async def wait_ready(self, timeout: int = 10) -> None:
@@ -325,15 +360,17 @@ async def begin(self) -> None:
325360
@handle_ydb_errors
326361
async def commit(self) -> None:
327362
if self._session and self._tx_context and self._tx_context.tx_id:
328-
await self._tx_context.commit()
363+
settings = self._get_request_settings()
364+
await self._tx_context.commit(settings=settings)
329365
await self._session_pool.release(self._session)
330366
self._session = None
331367
self._tx_context = None
332368

333369
@handle_ydb_errors
334370
async def rollback(self) -> None:
335371
if self._session and self._tx_context and self._tx_context.tx_id:
336-
await self._tx_context.rollback()
372+
settings = self._get_request_settings()
373+
await self._tx_context.rollback(settings=settings)
337374
await self._session_pool.release(self._session)
338375
self._session = None
339376
self._tx_context = None
@@ -351,10 +388,15 @@ async def close(self) -> None:
351388

352389
@handle_ydb_errors
353390
async def describe(self, table_path: str) -> ydb.TableSchemeEntry:
391+
settings = self._get_request_settings()
392+
354393
abs_table_path = posixpath.join(
355394
self.database, self.table_path_prefix, table_path
356395
)
357-
return await self._driver.table_client.describe_table(abs_table_path)
396+
return await self._driver.table_client.describe_table(
397+
abs_table_path,
398+
settings=settings,
399+
)
358400

359401
@handle_ydb_errors
360402
async def check_exists(self, table_path: str) -> bool:
@@ -371,9 +413,13 @@ async def get_table_names(self) -> list[str]:
371413

372414
async def _check_path_exists(self, table_path: str) -> bool:
373415
try:
416+
settings = self._get_request_settings()
374417

375418
async def callee() -> None:
376-
await self._driver.scheme_client.describe_path(table_path)
419+
await self._driver.scheme_client.describe_path(
420+
table_path,
421+
settings=settings,
422+
)
377423

378424
await retry_operation_async(callee)
379425
except ydb.SchemeError:
@@ -382,9 +428,12 @@ async def callee() -> None:
382428
return True
383429

384430
async def _get_table_names(self, abs_dir_path: str) -> list[str]:
431+
settings = self._get_request_settings()
432+
385433
async def callee() -> ydb.Directory:
386434
return await self._driver.scheme_client.list_directory(
387-
abs_dir_path
435+
abs_dir_path,
436+
settings=settings,
388437
)
389438

390439
directory = await retry_operation_async(callee)

ydb_dbapi/cursors.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from .errors import ProgrammingError
1717
from .utils import CursorStatus
1818
from .utils import handle_ydb_errors
19+
from .utils import maybe_get_current_trace_id
1920

2021
ParametersType = dict[
2122
str,
@@ -148,12 +149,14 @@ def __init__(
148149
self,
149150
session_pool: ydb.QuerySessionPool,
150151
tx_mode: ydb.BaseQueryTxMode,
152+
request_settings: ydb.BaseRequestSettings,
151153
tx_context: ydb.QueryTxContext | None = None,
152154
table_path_prefix: str = "",
153155
) -> None:
154156
super().__init__()
155157
self._session_pool = session_pool
156158
self._tx_mode = tx_mode
159+
self._request_settings = request_settings
157160
self._tx_context = tx_context
158161
self._table_path_prefix = table_path_prefix
159162

@@ -169,16 +172,27 @@ def fetchmany(self, size: int | None = None) -> list:
169172
def fetchall(self) -> list:
170173
return self._fetchall_from_buffer()
171174

175+
def _get_request_settings(self) -> ydb.BaseRequestSettings:
176+
settings = self._request_settings.make_copy()
177+
178+
if self._request_settings.trace_id is None:
179+
settings = settings.with_trace_id(maybe_get_current_trace_id())
180+
181+
return settings
182+
172183
@handle_ydb_errors
173184
def _execute_generic_query(
174185
self, query: str, parameters: ParametersType | None = None
175186
) -> Iterator[ydb.convert.ResultSet]:
187+
settings = self._get_request_settings()
188+
176189
def callee(
177190
session: ydb.QuerySession,
178191
) -> Iterator[ydb.convert.ResultSet]:
179192
return session.execute(
180193
query=query,
181194
parameters=parameters,
195+
settings=settings,
182196
)
183197

184198
return self._session_pool.retry_operation_sync(callee)
@@ -189,13 +203,16 @@ def _execute_session_query(
189203
query: str,
190204
parameters: ParametersType | None = None,
191205
) -> Iterator[ydb.convert.ResultSet]:
206+
settings = self._get_request_settings()
207+
192208
def callee(
193209
session: ydb.QuerySession,
194210
) -> Iterator[ydb.convert.ResultSet]:
195211
return session.transaction(self._tx_mode).execute(
196212
query=query,
197213
parameters=parameters,
198214
commit_tx=True,
215+
settings=settings,
199216
)
200217

201218
return self._session_pool.retry_operation_sync(callee)
@@ -207,10 +224,12 @@ def _execute_transactional_query(
207224
query: str,
208225
parameters: ParametersType | None = None,
209226
) -> Iterator[ydb.convert.ResultSet]:
227+
settings = self._get_request_settings()
210228
return tx_context.execute(
211229
query=query,
212230
parameters=parameters,
213231
commit_tx=False,
232+
settings=settings,
214233
)
215234

216235
def execute_scheme(
@@ -304,12 +323,14 @@ def __init__(
304323
self,
305324
session_pool: ydb.aio.QuerySessionPool,
306325
tx_mode: ydb.BaseQueryTxMode,
326+
request_settings: ydb.BaseRequestSettings,
307327
tx_context: ydb.aio.QueryTxContext | None = None,
308328
table_path_prefix: str = "",
309329
) -> None:
310330
super().__init__()
311331
self._session_pool = session_pool
312332
self._tx_mode = tx_mode
333+
self._request_settings = request_settings
313334
self._tx_context = tx_context
314335
self._table_path_prefix = table_path_prefix
315336

@@ -325,16 +346,27 @@ async def fetchmany(self, size: int | None = None) -> list:
325346
async def fetchall(self) -> list:
326347
return self._fetchall_from_buffer()
327348

349+
def _get_request_settings(self) -> ydb.BaseRequestSettings:
350+
settings = self._request_settings.make_copy()
351+
352+
if self._request_settings.trace_id is None:
353+
settings = settings.with_trace_id(maybe_get_current_trace_id())
354+
355+
return settings
356+
328357
@handle_ydb_errors
329358
async def _execute_generic_query(
330359
self, query: str, parameters: ParametersType | None = None
331360
) -> AsyncIterator[ydb.convert.ResultSet]:
361+
settings = self._get_request_settings()
362+
332363
async def callee(
333364
session: ydb.aio.QuerySession,
334365
) -> AsyncIterator[ydb.convert.ResultSet]:
335366
return await session.execute(
336367
query=query,
337368
parameters=parameters,
369+
settings=settings,
338370
)
339371

340372
return await self._session_pool.retry_operation_async(callee)
@@ -345,13 +377,16 @@ async def _execute_session_query(
345377
query: str,
346378
parameters: ParametersType | None = None,
347379
) -> AsyncIterator[ydb.convert.ResultSet]:
380+
settings = self._get_request_settings()
381+
348382
async def callee(
349383
session: ydb.aio.QuerySession,
350384
) -> AsyncIterator[ydb.convert.ResultSet]:
351385
return await session.transaction(self._tx_mode).execute(
352386
query=query,
353387
parameters=parameters,
354388
commit_tx=True,
389+
settings=settings,
355390
)
356391

357392
return await self._session_pool.retry_operation_async(callee)
@@ -363,10 +398,12 @@ async def _execute_transactional_query(
363398
query: str,
364399
parameters: ParametersType | None = None,
365400
) -> AsyncIterator[ydb.convert.ResultSet]:
401+
settings = self._get_request_settings()
366402
return await tx_context.execute(
367403
query=query,
368404
parameters=parameters,
369405
commit_tx=False,
406+
settings=settings,
370407
)
371408

372409
async def execute_scheme(

ydb_dbapi/utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
from __future__ import annotations
2+
13
import functools
4+
import importlib.util
25
from enum import Enum
36
from inspect import iscoroutinefunction
47
from typing import Any
@@ -100,3 +103,17 @@ class CursorStatus(str, Enum):
100103
running = "running"
101104
finished = "finished"
102105
closed = "closed"
106+
107+
108+
def maybe_get_current_trace_id() -> str | None:
109+
# Check if OpenTelemetry is available
110+
if importlib.util.find_spec("opentelemetry"):
111+
from opentelemetry import trace # type: ignore
112+
113+
current_span = trace.get_current_span()
114+
115+
if current_span.get_span_context().is_valid:
116+
return format(current_span.get_span_context().trace_id, "032x")
117+
118+
# Return None if OpenTelemetry is not available or trace ID is invalid
119+
return None

0 commit comments

Comments
 (0)