Skip to content

Commit

Permalink
feat(core): Add support for Container and TTL nodes
Browse files Browse the repository at this point in the history
Also add support through transactions.

Closes python-zk#334, python-zk#496
  • Loading branch information
ceache committed Feb 12, 2023
1 parent b7b2b15 commit 476d616
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 98 deletions.
232 changes: 136 additions & 96 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
CloseInstance,
Create,
Create2,
CreateContainer,
CreateTTL,
Delete,
Exists,
GetChildren,
Expand Down Expand Up @@ -917,6 +919,8 @@ def create(
sequence=False,
makepath=False,
include_data=False,
container=False,
ttl=0,
):
"""Create a node with the given value as its data. Optionally
set an ACL on the node.
Expand Down Expand Up @@ -994,6 +998,9 @@ def create(
The `makepath` option.
.. versionadded:: 2.7
The `include_data` option.
.. versionadded:: 2.9
The `container` and `ttl` options.
"""
acl = acl or self.default_acl
return self.create_async(
Expand All @@ -1004,6 +1011,8 @@ def create(
sequence=sequence,
makepath=makepath,
include_data=include_data,
container=container,
ttl=ttl,
).get()

def create_async(
Expand All @@ -1015,6 +1024,8 @@ def create_async(
sequence=False,
makepath=False,
include_data=False,
container=False,
ttl=0,
):
"""Asynchronously create a ZNode. Takes the same arguments as
:meth:`create`.
Expand All @@ -1025,50 +1036,39 @@ def create_async(
The makepath option.
.. versionadded:: 2.7
The `include_data` option.
.. versionadded:: 2.9
The `container` and `ttl` options.
"""
if acl is None and self.default_acl:
acl = self.default_acl

if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if acl and (
isinstance(acl, ACL) or not isinstance(acl, (tuple, list))
):
raise TypeError(
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
)
if value is not None and not isinstance(value, bytes):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
if not isinstance(sequence, bool):
raise TypeError("Invalid type for 'sequence' (bool expected)")
if not isinstance(makepath, bool):
raise TypeError("Invalid type for 'makepath' (bool expected)")
if not isinstance(include_data, bool):
raise TypeError("Invalid type for 'include_data' (bool expected)")

flags = 0
if ephemeral:
flags |= 1
if sequence:
flags |= 2
if acl is None:
acl = OPEN_ACL_UNSAFE

opcode = _create_opcode(
path,
value,
acl,
self.chroot,
ephemeral,
sequence,
include_data,
container,
ttl,
)
async_result = self.handler.async_result()

@capture_exceptions(async_result)
def do_create():
result = self._create_async_inner(
path,
value,
acl,
flags,
trailing=sequence,
include_data=include_data,
)
result.rawlink(create_completion)
inner_async_result = self.handler.async_result()

call_result = self._call(opcode, inner_async_result)
if call_result is False:
# We hit a short-circuit exit on the _call. Because we are
# not using the original async_result here, we bubble the
# exception upwards to the do_create function in
# KazooClient.create so that it gets set on the correct
# async_result object
raise inner_async_result.exception

inner_async_result.rawlink(create_completion)

@capture_exceptions(async_result)
def retry_completion(result):
Expand All @@ -1078,11 +1078,11 @@ def retry_completion(result):
@wrap(async_result)
def create_completion(result):
try:
if include_data:
if opcode.type == Create.type:
return self.unchroot(result.get())
else:
new_path, stat = result.get()
return self.unchroot(new_path), stat
else:
return self.unchroot(result.get())
except NoNodeError:
if not makepath:
raise
Expand All @@ -1095,33 +1095,6 @@ def create_completion(result):
do_create()
return async_result

def _create_async_inner(
self, path, value, acl, flags, trailing=False, include_data=False
):
async_result = self.handler.async_result()
if include_data:
opcode = Create2
else:
opcode = Create

call_result = self._call(
opcode(
_prefix_root(self.chroot, path, trailing=trailing),
value,
acl,
flags,
),
async_result,
)
if call_result is False:
# We hit a short-circuit exit on the _call. Because we are
# not using the original async_result here, we bubble the
# exception upwards to the do_create function in
# KazooClient.create so that it gets set on the correct
# async_result object
raise async_result.exception
return async_result

def ensure_path(self, path, acl=None):
"""Recursively create a path if it doesn't exist.
Expand Down Expand Up @@ -1680,48 +1653,33 @@ def create(
ephemeral=False,
sequence=False,
include_data=False,
container=False,
ttl=0,
):
"""Add a create ZNode to the transaction. Takes the same
arguments as :meth:`KazooClient.create`, with the exception
of `makepath`.
:returns: None
.. versionadded:: 2.9
The `include_data`, `container` and `ttl` options.
"""
if acl is None and self.client.default_acl:
acl = self.client.default_acl

if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if acl and not isinstance(acl, (tuple, list)):
raise TypeError(
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
)
if not isinstance(value, bytes):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
if not isinstance(sequence, bool):
raise TypeError("Invalid type for 'sequence' (bool expected)")
if not isinstance(include_data, bool):
raise TypeError("Invalid type for 'include_data' (bool expected)")

flags = 0
if ephemeral:
flags |= 1
if sequence:
flags |= 2
if acl is None:
acl = OPEN_ACL_UNSAFE
if include_data:
opcode = Create2
else:
opcode = Create

self._add(
opcode(_prefix_root(self.client.chroot, path), value, acl, flags),
None,
opcode = _create_opcode(
path,
value,
acl,
self.client.chroot,
ephemeral,
sequence,
include_data,
container,
ttl,
)
self._add(opcode, None)

def delete(self, path, version=-1):
"""Add a delete ZNode to the transaction. Takes the same
Expand Down Expand Up @@ -1802,3 +1760,85 @@ def _add(self, request, post_processor=None):
self._check_tx_state()
self.client.logger.log(BLATHER, "Added %r to %r", request, self)
self.operations.append(request)


def _create_opcode(
path,
value,
acl,
chroot,
ephemeral,
sequence,
include_data,
container,
ttl,
):
"""Helper function.
Creates the create OpCode for regular `client.create()` operations as
well as in a `client.transaction()` context.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if acl and (isinstance(acl, ACL) or not isinstance(acl, (tuple, list))):
raise TypeError(
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
)
if value is not None and not isinstance(value, bytes_types):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
if not isinstance(sequence, bool):
raise TypeError("Invalid type for 'sequence' (bool expected)")
if not isinstance(include_data, bool):
raise TypeError("Invalid type for 'include_data' (bool expected)")
if not isinstance(container, bool):
raise TypeError("Invalid type for 'container' (bool expected)")
if not isinstance(ttl, int) or ttl < 0:
raise TypeError("Invalid 'ttl' (integer >= 0 expected)")
if ttl and ephemeral:
raise TypeError("Invalid node creation: ephemeral & ttl")
if container and (ephemeral or sequence or ttl):
raise TypeError(
"Invalid node creation: container & ephemeral/sequence/ttl"
)

# Should match Zookeeper's CreateMode fromFlag
# https://github.com/apache/zookeeper/blob/master/zookeeper-server/
# src/main/java/org/apache/zookeeper/CreateMode.java#L112
flags = 0
if ephemeral:
flags |= 1
if sequence:
flags |= 2
if container:
flags = 4
if ttl:
if sequence:
flags = 6
else:
flags = 5

if acl is None:
acl = OPEN_ACL_UNSAFE

# Figure out the OpCode we are going to send
if include_data:
return Create2(
_prefix_root(chroot, path, trailing=sequence), value, acl, flags
)
elif container:
return CreateContainer(
_prefix_root(chroot, path, trailing=False), value, acl, flags
)
elif ttl:
return CreateTTL(
_prefix_root(chroot, path, trailing=sequence),
value,
acl,
flags,
ttl,
)
else:
return Create(
_prefix_root(chroot, path, trailing=sequence), value, acl, flags
)
49 changes: 49 additions & 0 deletions kazoo/protocol/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,55 @@ def deserialize(cls, bytes, offset):
return data, stat


class CreateContainer(namedtuple("CreateContainer", "path data acl flags")):
type = 19

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(write_buffer(self.data))
b.extend(int_struct.pack(len(self.acl)))
for acl in self.acl:
b.extend(
int_struct.pack(acl.perms)
+ write_string(acl.id.scheme)
+ write_string(acl.id.id)
)
b.extend(int_struct.pack(self.flags))
return b

@classmethod
def deserialize(cls, bytes, offset):
path, offset = read_string(bytes, offset)
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
return path, stat


class CreateTTL(namedtuple("CreateTTL", "path data acl flags ttl")):
type = 21

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(write_buffer(self.data))
b.extend(int_struct.pack(len(self.acl)))
for acl in self.acl:
b.extend(
int_struct.pack(acl.perms)
+ write_string(acl.id.scheme)
+ write_string(acl.id.id)
)
b.extend(int_struct.pack(self.flags))
b.extend(long_struct.pack(self.ttl))
return b

@classmethod
def deserialize(cls, bytes, offset):
path, offset = read_string(bytes, offset)
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
return path, stat


class Auth(namedtuple("Auth", "auth_type scheme auth")):
type = 100

Expand Down
7 changes: 5 additions & 2 deletions kazoo/testing/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ def get_global_cluster():
"localSessionsEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO,
"localSessionsUpgradingEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO,
]
# If defined, this sets the superuser password to "test"
additional_java_system_properties = [
# Enable extended types (container & ttl znodes)
"-Dzookeeper.extendedTypesEnabled=true",
"-Dznode.container.checkIntervalMs=100",
# If defined, this sets the superuser password to "test"
"-Dzookeeper.DigestAuthenticationProvider.superDigest="
"super:D/InIHSb7yEEbrWz8b9l71RjZJU="
"super:D/InIHSb7yEEbrWz8b9l71RjZJU=",
]
else:
additional_configuration_entries = []
Expand Down
Loading

0 comments on commit 476d616

Please sign in to comment.