Skip to content

Commit 55184e2

Browse files
committed
move wait logic to separate function
1 parent c781fe9 commit 55184e2

File tree

4 files changed

+38
-21
lines changed

4 files changed

+38
-21
lines changed

ydb/_utilities.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,21 @@ def inc_and_get(self) -> int:
182182
with self._lock:
183183
self._value += 1
184184
return self._value
185+
186+
187+
def get_first_message_with_timeout(status_stream: SyncResponseIterator, timeout: int):
188+
waiter = future()
189+
190+
def get_first_response(waiter):
191+
first_response = next(status_stream)
192+
waiter.set_result(first_response)
193+
194+
thread = threading.Thread(
195+
target=get_first_response,
196+
args=(waiter,),
197+
name="first response attach stream thread",
198+
daemon=True,
199+
)
200+
thread.start()
201+
202+
return waiter.result(timeout=timeout)

ydb/aio/_utilities.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import asyncio
2+
3+
14
class AsyncResponseIterator(object):
25
def __init__(self, it, wrapper):
36
self.it = it.__aiter__()
@@ -21,3 +24,10 @@ async def next(self):
2124

2225
async def __anext__(self):
2326
return await self._next()
27+
28+
29+
async def get_first_message_with_timeout(stream: AsyncResponseIterator, timeout: int):
30+
async def get_first_response():
31+
return await stream.next()
32+
33+
return await asyncio.wait_for(get_first_response(), timeout)

ydb/aio/query/session.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ async def _attach(self) -> None:
4444
lambda response: common_utils.ServerStatus.from_proto(response),
4545
)
4646

47-
async def get_first_response():
48-
first_response = await self._status_stream.next()
47+
try:
48+
first_response = await _utilities.get_first_message_with_timeout(
49+
self._status_stream,
50+
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT,
51+
)
4952
if first_response.status != issues.StatusCode.SUCCESS:
5053
raise RuntimeError("Failed to attach session")
51-
52-
try:
53-
await asyncio.wait_for(get_first_response(), DEFAULT_ATTACH_FIRST_RESP_TIMEOUT)
5454
except Exception as e:
5555
self._state.reset()
5656
self._status_stream.cancel()

ydb/query/session.py

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -230,24 +230,13 @@ def _attach(self, first_resp_timeout: int = DEFAULT_ATTACH_FIRST_RESP_TIMEOUT) -
230230
lambda response: common_utils.ServerStatus.from_proto(response),
231231
)
232232

233-
waiter = _utilities.future()
234-
235-
def get_first_response(waiter):
236-
first_response = next(status_stream)
233+
try:
234+
first_response = _utilities.get_first_message_with_timeout(
235+
status_stream,
236+
first_resp_timeout,
237+
)
237238
if first_response.status != issues.StatusCode.SUCCESS:
238239
raise RuntimeError("Failed to attach session")
239-
waiter.set_result(True)
240-
241-
thread = threading.Thread(
242-
target=get_first_response,
243-
args=(waiter,),
244-
name="first response attach stream thread",
245-
daemon=True,
246-
)
247-
thread.start()
248-
249-
try:
250-
waiter.result(timeout=first_resp_timeout)
251240
except Exception as e:
252241
self._state.reset()
253242
status_stream.cancel()

0 commit comments

Comments
 (0)