Skip to content

Commit

Permalink
feat(core): Add support for Container and TTL nodes
Browse files Browse the repository at this point in the history
Also add support through transations.

Closes python-zk#334, python-zk#496
  • Loading branch information
ceache authored and Charles-Henri de Boysson committed Jun 16, 2020
1 parent e4f808f commit 4634c04
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 36 deletions.
173 changes: 138 additions & 35 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
CloseInstance,
Create,
Create2,
CreateContainer,
CreateTTL,
Delete,
Exists,
GetChildren,
Expand Down Expand Up @@ -873,7 +875,8 @@ def sync(self, path):
return self.sync_async(path).get()

def create(self, path, value=b"", acl=None, ephemeral=False,
sequence=False, makepath=False, include_data=False):
sequence=False, makepath=False, include_data=False,
container=False, ttl=0):
"""Create a node with the given value as its data. Optionally
set an ACL on the node.
Expand Down Expand Up @@ -950,15 +953,19 @@ def create(self, path, value=b"", acl=None, ephemeral=False,
The `makepath` option.
.. versionadded:: 2.7
The `include_data` option.
.. versionadded:: 2.8
The container and ttl options.
"""
acl = acl or self.default_acl
return self.create_async(
path, value, acl=acl, ephemeral=ephemeral,
sequence=sequence, makepath=makepath, include_data=include_data
).get()
sequence=sequence, makepath=makepath, include_data=include_data,
container=container, ttl=ttl).get()

def create_async(self, path, value=b"", acl=None, ephemeral=False,
sequence=False, makepath=False, include_data=False):
sequence=False, makepath=False, include_data=False,
container=False, ttl=0):
"""Asynchronously create a ZNode. Takes the same arguments as
:meth:`create`.
Expand All @@ -967,7 +974,9 @@ def create_async(self, path, value=b"", acl=None, ephemeral=False,
.. versionadded:: 1.1
The makepath option.
.. versionadded:: 2.7
The `include_data` option.
The include_data option.
.. versionadded:: 2.8
The container and ttl options.
"""
if acl is None and self.default_acl:
acl = self.default_acl
Expand All @@ -988,24 +997,86 @@ def create_async(self, path, value=b"", acl=None, ephemeral=False,
raise TypeError("Invalid type for 'makepath' (bool expected)")
if not isinstance(include_data, bool):
raise TypeError("Invalid type for 'include_data' (bool expected)")

if not isinstance(container, bool):
raise TypeError("Invalid type for 'container' (bool expected)")
if not isinstance(ttl, int) or ttl < 0:
raise TypeError("Invalid 'ttl' (integer >= 0 expected)")
if ttl and ephemeral:
raise TypeError("Invalid node creation: ephemeral & ttl")
if container and (ephemeral or sequence or ttl):
raise TypeError("Invalid node creation: container & ephemeral/sequence/ttl")

# Should match Zookeeper's CreateMode fromFlag
# https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/CreateMode.java#L112
flags = 0
if ephemeral:
flags |= 1
if sequence:
flags |= 2
if container:
flags = 4
if ttl:
if sequence:
flags = 6
else:
flags = 5

if acl is None:
acl = OPEN_ACL_UNSAFE

# Figure out the OpCode we are going to send
if include_data:
stat_in_result = True
opcode = lambda path, value, acl: Create2(
_prefix_root(self.chroot, path, trailing=sequence),
value,
acl,
flags
)
elif container:
stat_in_result = True
opcode = lambda path, value, acl: CreateContainer(
_prefix_root(self.chroot, path, trailing=False),
value,
acl,
flags
)
elif ttl:
stat_in_result = True
opcode = lambda path, value, acl: CreateTTL(
_prefix_root(self.chroot, path, trailing=sequence),
value,
acl,
flags,
ttl
)
else:
stat_in_result = False
opcode = lambda path, value, acl: Create(
_prefix_root(self.chroot, path, trailing=sequence),
value,
acl,
flags
)
async_result = self.handler.async_result()

@capture_exceptions(async_result)
def do_create():
result = self._create_async_inner(
path, value, acl, flags,
trailing=sequence, include_data=include_data
inner_async_result = self.handler.async_result()

call_result = self._call(
opcode(path, value, acl),
inner_async_result
)
result.rawlink(create_completion)
if call_result is False:
# We hit a short-circuit exit on the _call. Because we are
# not using the original async_result here, we bubble the
# exception upwards to the do_create function in
# KazooClient.create so that it gets set on the correct
# async_result object
raise inner_async_result.exception

inner_async_result.rawlink(create_completion)

@capture_exceptions(async_result)
def retry_completion(result):
Expand All @@ -1015,7 +1086,7 @@ def retry_completion(result):
@wrap(async_result)
def create_completion(result):
try:
if include_data:
if stat_in_result:
new_path, stat = result.get()
return self.unchroot(new_path), stat
else:
Expand All @@ -1032,26 +1103,6 @@ def create_completion(result):
do_create()
return async_result

def _create_async_inner(self, path, value, acl, flags,
trailing=False, include_data=False):
async_result = self.handler.async_result()
if include_data:
opcode = Create2
else:
opcode = Create

call_result = self._call(
opcode(_prefix_root(self.chroot, path, trailing=trailing),
value, acl, flags), async_result)
if call_result is False:
# We hit a short-circuit exit on the _call. Because we are
# not using the original async_result here, we bubble the
# exception upwards to the do_create function in
# KazooClient.create so that it gets set on the correct
# async_result object
raise async_result.exception
return async_result

def ensure_path(self, path, acl=None):
"""Recursively create a path if it doesn't exist.
Expand Down Expand Up @@ -1590,13 +1641,15 @@ def __init__(self, client):
self.committed = False

def create(self, path, value=b"", acl=None, ephemeral=False,
sequence=False):
sequence=False, include_data=False, container=False, ttl=0):
"""Add a create ZNode to the transaction. Takes the same
arguments as :meth:`KazooClient.create`, with the exception
of `makepath`.
:returns: None
.. versionadded:: 2.8
The include_data, container and ttl options.
"""
if acl is None and self.client.default_acl:
acl = self.client.default_acl
Expand All @@ -1612,17 +1665,67 @@ def create(self, path, value=b"", acl=None, ephemeral=False,
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
if not isinstance(sequence, bool):
raise TypeError("Invalid type for 'sequence' (bool expected)")

if not isinstance(include_data, bool):
raise TypeError("Invalid type for 'include_data' (bool expected)")
if not isinstance(container, bool):
raise TypeError("Invalid type for 'container' (bool expected)")
if not isinstance(ttl, int) or ttl < 0:
raise TypeError("Invalid 'ttl' (integer >= 0 expected)")
if ttl and ephemeral:
raise TypeError("Invalid node creation: ephemeral & ttl")
if container and (ephemeral or sequence or ttl):
raise TypeError("Invalid node creation: container & ephemeral/sequence/ttl")

# Should match Zookeeper's CreateMode fromFlag
# https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/CreateMode.java#L112
flags = 0
if ephemeral:
flags |= 1
if sequence:
flags |= 2
if container:
flags = 4
if ttl:
if sequence:
flags = 6
else:
flags = 5

if acl is None:
acl = OPEN_ACL_UNSAFE

self._add(Create(_prefix_root(self.client.chroot, path), value, acl,
flags), None)
# Figure out the OpCode we are going to send
if include_data:
opcode = lambda path, value, acl: Create2(
_prefix_root(self.client.chroot, path, trailing=sequence),
value,
acl,
flags
)
elif container:
opcode = lambda path, value, acl: CreateContainer(
_prefix_root(self.client.chroot, path, trailing=False),
value,
acl,
flags
)
elif ttl:
opcode = lambda path, value, acl: CreateTTL(
_prefix_root(self.client.chroot, path, trailing=sequence),
value,
acl,
flags,
ttl
)
else:
opcode = lambda path, value, acl: Create(
_prefix_root(self.client.chroot, path, trailing=sequence),
value,
acl,
flags
)

self._add(opcode(path, value, acl), None)

def delete(self, path, version=-1):
"""Add a delete ZNode to the transaction. Takes the same
Expand Down
55 changes: 54 additions & 1 deletion kazoo/protocol/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,17 @@ def deserialize(cls, bytes, offset):
while not header.done:
if header.type == Create.type:
response, offset = read_string(bytes, offset)
elif header.type == Create2.type:
path, offset = read_string(bytes, offset)
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
offset += stat_struct.size
response = (path, stat)
elif header.type == Delete.type:
response = True
elif header.type == SetData.type:
response = ZnodeStat._make(
stat_struct.unpack_from(bytes, offset))
stat_struct.unpack_from(bytes, offset)
)
offset += stat_struct.size
elif header.type == CheckVersion.type:
response = True
Expand All @@ -346,6 +352,10 @@ def unchroot(client, response):
for result in response:
if isinstance(result, six.string_types):
resp.append(client.unchroot(result))
elif isinstance(result, ZnodeStat): # Need to test before tuple
resp.append(result)
elif isinstance(result, tuple):
resp.append((client.unchroot(result[0]), result[1]))
else:
resp.append(result)
return resp
Expand Down Expand Up @@ -391,6 +401,49 @@ def deserialize(cls, bytes, offset):
return data, stat


class CreateContainer(namedtuple('CreateContainer', 'path data acl flags')):
type = 19

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(write_buffer(self.data))
b.extend(int_struct.pack(len(self.acl)))
for acl in self.acl:
b.extend(int_struct.pack(acl.perms) +
write_string(acl.id.scheme) + write_string(acl.id.id))
b.extend(int_struct.pack(self.flags))
return b

@classmethod
def deserialize(cls, bytes, offset):
path, offset = read_string(bytes, offset)
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
return path, stat


class CreateTTL(namedtuple('CreateTTL', 'path data acl flags ttl')):
type = 21

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(write_buffer(self.data))
b.extend(int_struct.pack(len(self.acl)))
for acl in self.acl:
b.extend(int_struct.pack(acl.perms) +
write_string(acl.id.scheme) + write_string(acl.id.id))
b.extend(int_struct.pack(self.flags))
b.extend(long_struct.pack(self.ttl))
return b

@classmethod
def deserialize(cls, bytes, offset):
path, offset = read_string(bytes, offset)
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
return path, stat


class Auth(namedtuple('Auth', 'auth_type scheme auth')):
type = 100

Expand Down

0 comments on commit 4634c04

Please sign in to comment.