Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add support for Container and TTL nodes #613

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 143 additions & 91 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
CloseInstance,
Create,
Create2,
CreateContainer,
CreateTTL,
Delete,
Exists,
GetChildren,
Expand Down Expand Up @@ -917,6 +919,8 @@ def create(
sequence=False,
makepath=False,
include_data=False,
container=False,
ttl=0,
):
"""Create a node with the given value as its data. Optionally
set an ACL on the node.
Expand Down Expand Up @@ -994,6 +998,9 @@ def create(
The `makepath` option.
.. versionadded:: 2.7
The `include_data` option.
.. versionadded:: 2.9
The `container` and `ttl` options.

"""
acl = acl or self.default_acl
return self.create_async(
Expand All @@ -1004,6 +1011,8 @@ def create(
sequence=sequence,
makepath=makepath,
include_data=include_data,
container=container,
ttl=ttl,
).get()

def create_async(
Expand All @@ -1015,6 +1024,8 @@ def create_async(
sequence=False,
makepath=False,
include_data=False,
container=False,
ttl=0,
):
"""Asynchronously create a ZNode. Takes the same arguments as
:meth:`create`.
Expand All @@ -1025,50 +1036,39 @@ def create_async(
The makepath option.
.. versionadded:: 2.7
The `include_data` option.
.. versionadded:: 2.9
The `container` and `ttl` options.
"""
if acl is None and self.default_acl:
acl = self.default_acl

if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if acl and (
isinstance(acl, ACL) or not isinstance(acl, (tuple, list))
):
raise TypeError(
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
)
if value is not None and not isinstance(value, bytes):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
if not isinstance(sequence, bool):
raise TypeError("Invalid type for 'sequence' (bool expected)")
if not isinstance(makepath, bool):
raise TypeError("Invalid type for 'makepath' (bool expected)")
if not isinstance(include_data, bool):
raise TypeError("Invalid type for 'include_data' (bool expected)")

flags = 0
if ephemeral:
flags |= 1
if sequence:
flags |= 2
if acl is None:
acl = OPEN_ACL_UNSAFE

opcode = _create_opcode(
path,
value,
acl,
self.chroot,
ephemeral,
sequence,
include_data,
container,
ttl,
)
async_result = self.handler.async_result()

@capture_exceptions(async_result)
def do_create():
result = self._create_async_inner(
path,
value,
acl,
flags,
trailing=sequence,
include_data=include_data,
)
result.rawlink(create_completion)
inner_async_result = self.handler.async_result()

call_result = self._call(opcode, inner_async_result)
if call_result is False:
# We hit a short-circuit exit on the _call. Because we are
# not using the original async_result here, we bubble the
# exception upwards to the do_create function in
# KazooClient.create so that it gets set on the correct
# async_result object
raise inner_async_result.exception

inner_async_result.rawlink(create_completion)

@capture_exceptions(async_result)
def retry_completion(result):
Expand All @@ -1078,11 +1078,11 @@ def retry_completion(result):
@wrap(async_result)
def create_completion(result):
try:
if include_data:
if opcode.type == Create.type:
return self.unchroot(result.get())
else:
new_path, stat = result.get()
return self.unchroot(new_path), stat
else:
return self.unchroot(result.get())
except NoNodeError:
if not makepath:
raise
Expand All @@ -1095,33 +1095,6 @@ def create_completion(result):
do_create()
return async_result

def _create_async_inner(
self, path, value, acl, flags, trailing=False, include_data=False
):
async_result = self.handler.async_result()
if include_data:
opcode = Create2
else:
opcode = Create

call_result = self._call(
opcode(
_prefix_root(self.chroot, path, trailing=trailing),
value,
acl,
flags,
),
async_result,
)
if call_result is False:
# We hit a short-circuit exit on the _call. Because we are
# not using the original async_result here, we bubble the
# exception upwards to the do_create function in
# KazooClient.create so that it gets set on the correct
# async_result object
raise async_result.exception
return async_result

def ensure_path(self, path, acl=None):
"""Recursively create a path if it doesn't exist.

Expand Down Expand Up @@ -1673,43 +1646,40 @@ def __init__(self, client):
self.committed = False

def create(
self, path, value=b"", acl=None, ephemeral=False, sequence=False
self,
path,
value=b"",
acl=None,
ephemeral=False,
sequence=False,
include_data=False,
container=False,
ttl=0,
):
"""Add a create ZNode to the transaction. Takes the same
arguments as :meth:`KazooClient.create`, with the exception
of `makepath`.

:returns: None

.. versionadded:: 2.9
The `include_data`, `container` and `ttl` options.
"""
if acl is None and self.client.default_acl:
acl = self.client.default_acl

if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if acl and not isinstance(acl, (tuple, list)):
raise TypeError(
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
)
if not isinstance(value, bytes):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
if not isinstance(sequence, bool):
raise TypeError("Invalid type for 'sequence' (bool expected)")

flags = 0
if ephemeral:
flags |= 1
if sequence:
flags |= 2
if acl is None:
acl = OPEN_ACL_UNSAFE

self._add(
Create(_prefix_root(self.client.chroot, path), value, acl, flags),
None,
opcode = _create_opcode(
path,
value,
acl,
self.client.chroot,
ephemeral,
sequence,
include_data,
container,
ttl,
)
self._add(opcode, None)

def delete(self, path, version=-1):
"""Add a delete ZNode to the transaction. Takes the same
Expand Down Expand Up @@ -1790,3 +1760,85 @@ def _add(self, request, post_processor=None):
self._check_tx_state()
self.client.logger.log(BLATHER, "Added %r to %r", request, self)
self.operations.append(request)


def _create_opcode(
path,
value,
acl,
chroot,
ephemeral,
sequence,
include_data,
container,
ttl,
):
"""Helper function.
Creates the create OpCode for regular `client.create()` operations as
well as in a `client.transaction()` context.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if acl and (isinstance(acl, ACL) or not isinstance(acl, (tuple, list))):
raise TypeError(
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
)
if value is not None and not isinstance(value, bytes):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
if not isinstance(sequence, bool):
raise TypeError("Invalid type for 'sequence' (bool expected)")
if not isinstance(include_data, bool):
raise TypeError("Invalid type for 'include_data' (bool expected)")
if not isinstance(container, bool):
raise TypeError("Invalid type for 'container' (bool expected)")
if not isinstance(ttl, int) or ttl < 0:
raise TypeError("Invalid 'ttl' (integer >= 0 expected)")
if ttl and ephemeral:
raise TypeError("Invalid node creation: ephemeral & ttl")
if container and (ephemeral or sequence or ttl):
raise TypeError(
"Invalid node creation: container & ephemeral/sequence/ttl"
)

# Should match Zookeeper's CreateMode fromFlag
# https://github.com/apache/zookeeper/blob/master/zookeeper-server/
# src/main/java/org/apache/zookeeper/CreateMode.java#L112
flags = 0
if ephemeral:
flags |= 1
if sequence:
flags |= 2
if container:
flags = 4
if ttl:
if sequence:
flags = 6
else:
flags = 5

if acl is None:
acl = OPEN_ACL_UNSAFE

# Figure out the OpCode we are going to send
if include_data:
return Create2(
_prefix_root(chroot, path, trailing=sequence), value, acl, flags
)
elif container:
return CreateContainer(
_prefix_root(chroot, path, trailing=False), value, acl, flags
)
elif ttl:
return CreateTTL(
_prefix_root(chroot, path, trailing=sequence),
value,
acl,
flags,
ttl,
)
else:
return Create(
_prefix_root(chroot, path, trailing=sequence), value, acl, flags
)
Loading
Loading