Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement simple asyncio wrapper API with basic tests #646

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions kazoo/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Simple asyncio integration of the threaded async executor engine.
"""
73 changes: 73 additions & 0 deletions kazoo/aio/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio

from kazoo.aio.handler import AioSequentialThreadingHandler
from kazoo.client import KazooClient, TransactionRequest


class AioKazooClient(KazooClient):
"""
The asyncio compatibility mostly mimics the behaviour of the base async one. All calls are wrapped in
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
asyncio.shield() to prevent cancellation that is not supported in the base async implementation.
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

The sync and base-async API are still completely functional. Mixing the use of any of the 3 should be okay.
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, *args, **kwargs):
if not kwargs.get("handler"):
kwargs["handler"] = AioSequentialThreadingHandler()
KazooClient.__init__(self, *args, **kwargs)

# asyncio compatible api wrappers
async def start_aio(self):
return await asyncio.shield(self.start_async().future)

async def add_auth_aio(self, *args, **kwargs):
return await asyncio.shield(self.add_auth_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def sync_aio(self, *args, **kwargs):
return await asyncio.shield(self.sync_async(*args, **kwargs).future)

async def create_aio(self, *args, **kwargs):
return await asyncio.shield(self.create_async(*args, **kwargs).future)

async def ensure_path_aio(self, *args, **kwargs):
return await asyncio.shield(self.ensure_path_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def exists_aio(self, *args, **kwargs):
return await asyncio.shield(self.exists_async(*args, **kwargs).future)

async def get_aio(self, *args, **kwargs):
return await asyncio.shield(self.get_async(*args, **kwargs).future)

async def get_children_aio(self, *args, **kwargs):
return await asyncio.shield(self.get_children_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def get_acls_aio(self, *args, **kwargs):
return await asyncio.shield(self.get_acls_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def set_acls_aio(self, *args, **kwargs):
return await asyncio.shield(self.set_acls_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def set_aio(self, *args, **kwargs):
return await asyncio.shield(self.set_async(*args, **kwargs).future)

def transaction_aio(self):
return AioTransactionRequest(self)

async def delete_aio(self, *args, **kwargs):
return await asyncio.shield(self.delete_async(*args, **kwargs).future)

async def reconfig_aio(self, *args, **kwargs):
return await asyncio.shield(self.reconfig_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved


class AioTransactionRequest(TransactionRequest):
async def commit_aio(self):
return await asyncio.shield(self.commit_async().future)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, exc_tb):
if not exc_type:
await self.commit_aio()
52 changes: 52 additions & 0 deletions kazoo/aio/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
import threading

from kazoo.handlers.threading import AsyncResult, SequentialThreadingHandler


class AioAsyncResult(AsyncResult):
def __init__(self, handler):
self.future = handler.loop.create_future()
AsyncResult.__init__(self, handler)

def set(self, value=None):
"""
The completion of the future has the same guarantees as the notification emitting of the condition.
Provided that no callbacks raise it will complete.
"""
AsyncResult.set(self, value)
self._handler.loop.call_soon_threadsafe(self.future.set_result, value)

def set_exception(self, exception):
"""
The completion of the future has the same guarantees as the notification emitting of the condition.
Provided that no callbacks raise it will complete.
"""
AsyncResult.set_exception(self, exception)
self._handler.loop.call_soon_threadsafe(self.future.set_exception, exception)


class AioSequentialThreadingHandler(SequentialThreadingHandler):
def __init__(self):
"""
Creating the handler must be done on the asyncio-loop's thread.
"""
self.loop = asyncio.get_running_loop()
self._aio_thread = threading.current_thread()
SequentialThreadingHandler.__init__(self)

def async_result(self):
"""
Almost all async-result objects are created by a method that is invoked from the user's thead. The
one exception I'm aware of is in the PatientChildrenWatch utility, that creates an async-result in
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
its worker thread. Just because of that it is imperative to only create asyncio compatible results
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
when the invoking code is from the loop's thread. There is no PEP/API guarantee that implementing
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
the create_future() has to be thread-safe. The default is mostly thread-safe. The only thing that
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
may get synchronization issue is a debug-feature for asyncio development. Quickly looking at the
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
alternate implementation of uvloop, they use the default Future implementation, so no change there.
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
For now, just to be safe, we check the current thread and create an async-result object based on the
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved
invoking thread's identity.
"""
if threading.current_thread() is self._aio_thread:
return AioAsyncResult(self)
return AsyncResult(self)
4 changes: 2 additions & 2 deletions kazoo/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kazoo.testing.harness import KazooTestCase, KazooTestHarness
from kazoo.testing.harness import KazooAioTestCase, KazooTestCase, KazooTestHarness
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved


__all__ = ('KazooTestHarness', 'KazooTestCase', )
__all__ = ('KazooTestHarness', 'KazooTestCase', 'KazooAioTestCase', )
29 changes: 27 additions & 2 deletions kazoo/testing/harness.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Kazoo testing harnesses"""
import asyncio
import logging
import os
import uuid
import unittest

from kazoo import python2atexit as atexit
from kazoo.aio.client import AioKazooClient
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from kazoo.protocol.connection import _CONNECTION_DROP, _SESSION_EXPIRED
Expand Down Expand Up @@ -144,6 +146,7 @@ def test_something_else(self):

"""
DEFAULT_CLIENT_TIMEOUT = 15
CLIENT_CLS = KazooClient

def __init__(self, *args, **kw):
super(KazooTestHarness, self).__init__(*args, **kw)
Expand All @@ -159,14 +162,14 @@ def servers(self):
return ",".join([s.address for s in self.cluster])

def _get_nonchroot_client(self):
c = KazooClient(self.servers)
c = self.CLIENT_CLS(self.servers)
self._clients.append(c)
return c

def _get_client(self, **client_options):
if 'timeout' not in client_options:
client_options['timeout'] = self.DEFAULT_CLIENT_TIMEOUT
c = KazooClient(self.hosts, **client_options)
c = self.CLIENT_CLS(self.hosts, **client_options)
self._clients.append(c)
return c

Expand Down Expand Up @@ -245,3 +248,25 @@ def setUp(self):

def tearDown(self):
self.teardown_zookeeper()


class KazooAioTestCase(KazooTestHarness):
CLIENT_CLS = AioKazooClient

def __init__(self, *args, **kw):
super(KazooAioTestCase, self).__init__(*args, **kw)
self.loop = None

async def setup_zookeeper_aio(self):
self.setup_zookeeper() # NOTE: could enhance this to call start_aio() on the client
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def teardown_zookeeper_aio(self):
self.teardown_zookeeper()

def setUp(self):
self.loop = asyncio.get_event_loop_policy().new_event_loop()
self.loop.run_until_complete(self.setup_zookeeper_aio())

def tearDown(self):
self.loop.run_until_complete(self.teardown_zookeeper_aio())
self.loop.close()
32 changes: 32 additions & 0 deletions kazoo/tests/test_aio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from kazoo.exceptions import NotEmptyError, NoNodeError
from kazoo.protocol.states import ZnodeStat
from kazoo.testing import KazooAioTestCase


class KazooAioTests(KazooAioTestCase):
def test_basic_aio_functionality(self):
self.loop.run_until_complete(self._test_basic_aio_functionality())

async def _test_basic_aio_functionality(self):
assert await self.client.create_aio("/tmp") == "/tmp"
assert await self.client.get_children_aio("/") == ["tmp"]
assert await self.client.ensure_path_aio("/tmp/x/y") == "/tmp/x/y"
assert await self.client.exists_aio("/tmp/x/y")
assert isinstance(await self.client.set_aio("/tmp/x/y", b"very aio"), ZnodeStat)
data, stat = await self.client.get_aio("/tmp/x/y")
assert data == b"very aio"
assert isinstance(stat, ZnodeStat)
try:
await self.client.delete_aio("/tmp/x")
except NotEmptyError:
pass
await self.client.delete_aio("/tmp/x/y")
try:
await self.client.get_aio("/tmp/x/y")
except NoNodeError:
pass
async with self.client.transaction_aio() as tx:
tx.create("/tmp/z", b"ZZZ")
tx.set_data("/tmp/x", b"XXX")
assert (await self.client.get_aio("/tmp/x"))[0] == b"XXX"
assert (await self.client.get_aio("/tmp/z"))[0] == b"ZZZ"