Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement simple asyncio wrapper API with basic tests #646

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions kazoo/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Simple asyncio integration of the threaded async executor engine.
"""
92 changes: 92 additions & 0 deletions kazoo/aio/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import asyncio

from kazoo.aio.handler import AioSequentialThreadingHandler
from kazoo.client import KazooClient, TransactionRequest


class AioKazooClient(KazooClient):
"""
The asyncio compatibility mostly mimics the behaviour of the base async
one. All calls are wrapped in asyncio.shield() to prevent cancellation
that is not supported in the base async implementation.

The sync and base-async API are still completely functional. Mixing the
use of any of the 3 should be okay.
"""

def __init__(self, *args, **kwargs):
if not kwargs.get("handler"):
kwargs["handler"] = AioSequentialThreadingHandler()
KazooClient.__init__(self, *args, **kwargs)

# asyncio compatible api wrappers
async def start_aio(self, timeout=15):
"""
There is no protection for calling this multiple times in parallel.
The start_async() seems to lack that as well. Maybe it is allowed and
handled internally.
"""
await self.handler.loop.run_in_executor(None, self.start, timeout)

async def add_auth_aio(self, *args, **kwargs):
return await asyncio.shield(
self.add_auth_async(*args, **kwargs).future
)

async def sync_aio(self, *args, **kwargs):
return await asyncio.shield(self.sync_async(*args, **kwargs).future)

async def create_aio(self, *args, **kwargs):
return await asyncio.shield(self.create_async(*args, **kwargs).future)

async def ensure_path_aio(self, *args, **kwargs):
return await asyncio.shield(
self.ensure_path_async(*args, **kwargs).future
)

async def exists_aio(self, *args, **kwargs):
return await asyncio.shield(self.exists_async(*args, **kwargs).future)

async def get_aio(self, *args, **kwargs):
return await asyncio.shield(self.get_async(*args, **kwargs).future)

async def get_children_aio(self, *args, **kwargs):
return await asyncio.shield(
self.get_children_async(*args, **kwargs).future
)

async def get_acls_aio(self, *args, **kwargs):
return await asyncio.shield(
self.get_acls_async(*args, **kwargs).future
)

async def set_acls_aio(self, *args, **kwargs):
return await asyncio.shield(
self.set_acls_async(*args, **kwargs).future
)

async def set_aio(self, *args, **kwargs):
return await asyncio.shield(self.set_async(*args, **kwargs).future)

def transaction_aio(self):
return AioTransactionRequest(self)

async def delete_aio(self, *args, **kwargs):
return await asyncio.shield(self.delete_async(*args, **kwargs).future)

async def reconfig_aio(self, *args, **kwargs):
return await asyncio.shield(
self.reconfig_async(*args, **kwargs).future
)


class AioTransactionRequest(TransactionRequest):
async def commit_aio(self):
return await asyncio.shield(self.commit_async().future)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, exc_tb):
if not exc_type:
await self.commit_aio()
60 changes: 60 additions & 0 deletions kazoo/aio/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import asyncio
import threading

from kazoo.handlers.threading import AsyncResult, SequentialThreadingHandler


class AioAsyncResult(AsyncResult):
def __init__(self, handler):
self.future = handler.loop.create_future()
AsyncResult.__init__(self, handler)

def set(self, value=None):
"""
The completion of the future has the same guarantees as the
notification emitting of the condition.
Provided that no callbacks raise it will complete.
"""
AsyncResult.set(self, value)
self._handler.loop.call_soon_threadsafe(self.future.set_result, value)

def set_exception(self, exception):
"""
The completion of the future has the same guarantees as the
notification emitting of the condition.
Provided that no callbacks raise it will complete.
"""
AsyncResult.set_exception(self, exception)
self._handler.loop.call_soon_threadsafe(
self.future.set_exception, exception
)


class AioSequentialThreadingHandler(SequentialThreadingHandler):
def __init__(self):
"""
Creating the handler must be done on the asyncio-loop's thread.
"""
self.loop = asyncio.get_running_loop()
self._aio_thread = threading.current_thread()
SequentialThreadingHandler.__init__(self)

def async_result(self):
"""
Almost all async-result objects are created by a method that is
invoked from the user's thead. The one exception I'm aware of is
in the PatientChildrenWatch utility, that creates an async-result
in its worker thread. Just because of that it is imperative to
only create asyncio compatible results when the invoking code is
from the loop's thread. There is no PEP/API guarantee that
implementing the create_future() has to be thread-safe. The default
is mostly thread-safe. The only thing that may get synchronization
issue is a debug-feature for asyncio development. Quickly looking at
the alternate implementation of uvloop, they use the default Future
implementation, so no change there.
For now, just to be safe, we check the current thread and create an
async-result object based on the invoking thread's identity.
"""
if threading.current_thread() is self._aio_thread:
return AioAsyncResult(self)
return AsyncResult(self)
119 changes: 119 additions & 0 deletions kazoo/aio/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import asyncio
import random
import time

from kazoo.exceptions import (
ConnectionClosedError,
ConnectionLoss,
OperationTimeoutError,
SessionExpiredError,
)
from kazoo.retry import ForceRetryError, RetryFailedError


class AioKazooRetry(object):
"""
This is similar to KazooRetry, but they do not have compatible
interfaces. The threaded and asyncio constructs are too different
to easily wrap the KazooRetry implementation. Unless, all retries
always get their own thread to work in.

There is no equivalent analogue to the interrupt API.
If interrupting the retry is necessary, it must be wrapped in
an asyncio.Task, which can be cancelled. Be aware though that
this will quit waiting on the Zookeeper API call immediately
unlike the threaded API. There is no way to interrupt/cancel an
internal request thread so it will continue and stop eventually
on its own. This means caller can't know if the call is still
in progress and may succeed or the retry was cancelled while it
was waiting for delay.

Usage example. These are equivalent except that the latter lines
will retry the requests on specific exceptions:
await zk.create_aio("/x")
await zk.create_aio("/x/y")

aio_retry = AioKazooRetry()
await aio_retry(zk.create_aio, "/x")
await aio_retry(zk.create_aio, "/x/y")

Re-using an instance is fine as long as it is done serially.
"""

EXCEPTIONS = (
ConnectionLoss,
OperationTimeoutError,
ForceRetryError,
)

EXCEPTIONS_WITH_EXPIRED = EXCEPTIONS + (SessionExpiredError,)

def __init__(
self,
max_tries=1,
delay=0.1,
backoff=2,
max_jitter=0.4,
max_delay=60.0,
ignore_expire=True,
deadline=None,
):
self.max_tries = max_tries
self.delay = delay
self.backoff = backoff
self.max_jitter = max(min(max_jitter, 1.0), 0.0)
self.max_delay = float(max_delay)
self._attempts = 0
self._cur_delay = delay
self.deadline = deadline
self.retry_exceptions = (
self.EXCEPTIONS_WITH_EXPIRED if ignore_expire else self.EXCEPTIONS
)

def reset(self):
self._attempts = 0
self._cur_delay = self.delay

def copy(self):
obj = AioKazooRetry(
max_tries=self.max_tries,
delay=self.delay,
backoff=self.backoff,
max_jitter=self.max_jitter,
max_delay=self.max_delay,
deadline=self.deadline,
)
obj.retry_exceptions = self.retry_exceptions
return obj

async def __call__(self, func, *args, **kwargs):
self.reset()

stop_time = (
None
if self.deadline is None
else time.perf_counter() + self.deadline
)
while True:
try:
return await func(*args, **kwargs)
except ConnectionClosedError:
raise
except self.retry_exceptions:
# Note: max_tries == -1 means infinite tries.
if self._attempts == self.max_tries:
raise RetryFailedError("Too many retry attempts")
self._attempts += 1
jitter = random.uniform(
1.0 - self.max_jitter, 1.0 + self.max_jitter
)
sleeptime = self._cur_delay * jitter
if (
stop_time is not None
and time.perf_counter() + sleeptime >= stop_time
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
):
raise RetryFailedError("Exceeded retry deadline")
await asyncio.sleep(sleeptime)
self._cur_delay = min(
sleeptime * self.backoff, self.max_delay
)
12 changes: 10 additions & 2 deletions kazoo/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
from kazoo.testing.harness import KazooTestCase, KazooTestHarness
from kazoo.testing.harness import (
KazooAioTestCase,
KazooTestCase,
KazooTestHarness,
)


__all__ = ('KazooTestHarness', 'KazooTestCase', )
__all__ = (
"KazooTestHarness",
"KazooTestCase",
"KazooAioTestCase",
)
30 changes: 28 additions & 2 deletions kazoo/testing/harness.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Kazoo testing harnesses"""
import asyncio
import logging
import os
import uuid
import unittest

from kazoo import python2atexit as atexit
from kazoo.aio.client import AioKazooClient
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from kazoo.protocol.connection import _CONNECTION_DROP, _SESSION_EXPIRED
Expand Down Expand Up @@ -144,6 +146,7 @@ def test_something_else(self):

"""
DEFAULT_CLIENT_TIMEOUT = 15
CLIENT_CLS = KazooClient

def __init__(self, *args, **kw):
super(KazooTestHarness, self).__init__(*args, **kw)
Expand All @@ -159,14 +162,14 @@ def servers(self):
return ",".join([s.address for s in self.cluster])

def _get_nonchroot_client(self):
c = KazooClient(self.servers)
c = self.CLIENT_CLS(self.servers)
self._clients.append(c)
return c

def _get_client(self, **client_options):
if 'timeout' not in client_options:
client_options['timeout'] = self.DEFAULT_CLIENT_TIMEOUT
c = KazooClient(self.hosts, **client_options)
c = self.CLIENT_CLS(self.hosts, **client_options)
self._clients.append(c)
return c

Expand Down Expand Up @@ -245,3 +248,26 @@ def setUp(self):

def tearDown(self):
self.teardown_zookeeper()


class KazooAioTestCase(KazooTestHarness):
CLIENT_CLS = AioKazooClient

def __init__(self, *args, **kw):
super(KazooAioTestCase, self).__init__(*args, **kw)
self.loop = None

async def setup_zookeeper_aio(self):
# NOTE: could enhance this to call start_aio() on the client
self.setup_zookeeper()

async def teardown_zookeeper_aio(self):
self.teardown_zookeeper()

def setUp(self):
self.loop = asyncio.get_event_loop_policy().new_event_loop()
self.loop.run_until_complete(self.setup_zookeeper_aio())

def tearDown(self):
self.loop.run_until_complete(self.teardown_zookeeper_aio())
self.loop.close()
Loading