Skip to content

Commit

Permalink
feat(recipe): add ExistingDataWatch class
Browse files Browse the repository at this point in the history
This adds a subclass of DataWatch which only operates on existing
ZNodes.  If a user uses a DataWatch on a path and the ZNode at that
path is deleted, the DataWatch will still issue an "exists" call
and set a watch right before the final callback.  That means that
regardless of the return value of the callback and whether or not
Kazoo will invoke the callback again, the ZooKeeper server still
has a watch entry for that path.

In short, using a DataWatch on a path and then deleting that path
can leak watch entries on the ZooKeeper server.

Because the DataWatch recipe is designed to watch non-existing paths,
this behavior may be desired and relied on by some users, so it's
not considered a bug.  But other users may want to use DataWatches
for nodes where this behavior would be a problem.

The ExistingDataWatch class behaves similarly to its parent class,
DataWatch, but it does not set a watch on paths which do not exist
(whether that's because they never existed or were recently deleted).
This means that a user of an ExistingDataWatch can be assured that
after the callback with the deleted event, the watch is removed from
the server.
  • Loading branch information
jeblair authored and ceache committed Jun 2, 2022
1 parent 9bb8499 commit 48f5412
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/api/recipe/watchers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ Public API

.. automethod:: __call__

.. autoclass:: ExistingDataWatch
:members:

.. automethod:: __init__

.. automethod:: __call__

.. autoclass:: ChildrenWatch
:members:
Expand Down
3 changes: 2 additions & 1 deletion kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from kazoo.recipe.partitioner import SetPartitioner
from kazoo.recipe.party import Party, ShallowParty
from kazoo.recipe.queue import Queue, LockingQueue
from kazoo.recipe.watchers import ChildrenWatch, DataWatch
from kazoo.recipe.watchers import ChildrenWatch, DataWatch, ExistingDataWatch


string_types = six.string_types
Expand Down Expand Up @@ -352,6 +352,7 @@ def _retry(*args, **kwargs):
self.DoubleBarrier = partial(DoubleBarrier, self)
self.ChildrenWatch = partial(ChildrenWatch, self)
self.DataWatch = partial(DataWatch, self)
self.ExistingDataWatch = partial(ExistingDataWatch, self)
self.Election = partial(Election, self)
self.NonBlockingLease = partial(NonBlockingLease, self)
self.MultiNonBlockingLease = partial(MultiNonBlockingLease, self)
Expand Down
61 changes: 61 additions & 0 deletions kazoo/recipe/watchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,67 @@ def _session_watcher(self, state):
self._client.handler.spawn(self._get_data)


class ExistingDataWatch(DataWatch):
"""Watches a node for data updates and calls the specified
function each time it changes
Similar to :class:`~kazoo.recipes.watchers.DataWatch`, but it does
not operate on nodes which do not exist.
The function will also be called the very first time its
registered to get the data.
Returning `False` from the registered function will disable future
data change calls. If the client connection is closed (using the
close command), the DataWatch will no longer get updates.
If the function supplied takes three arguments, then the third one
will be a :class:`~kazoo.protocol.states.WatchedEvent`. It will
only be set if the change to the data occurs as a result of the
server notifying the watch that there has been a change. Events
like reconnection or the first call will not include an event.
If the node does not exist on creation then the function will be
called with ``None`` for all values and no futher callbacks will
occur. If the node is deleted after the watch is created, the
function will be called with the event argument indicating a
delete event and no further callbacks will occur.
"""

@_ignore_closed
def _get_data(self, event=None):
# Ensure this runs one at a time, possible because the session
# watcher may trigger a run
with self._run_lock:
if self._stopped:
return

initial_version = self._version

try:
data, stat = self._retry(self._client.get,
self._path, self._watcher)
except NoNodeError:
data = stat = None

# No node data, clear out version
if stat is None:
self._version = None
else:
self._version = stat.mzxid

# Call our function if its the first time ever, or if the
# version has changed
if initial_version != self._version or not self._ever_called:
self._log_func_exception(data, stat, event)

# If the node doesn't exist, we won't be watching any more
if stat is None:
self._stopped = True
self._func = None
self._client.remove_listener(self._session_watcher)


class ChildrenWatch(object):
"""Watches a node for children updates and calls the specified
function each time it changes
Expand Down
76 changes: 76 additions & 0 deletions kazoo/tests/test_watchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,82 @@ def changed(val, stat):
assert b is False


class KazooExistingDataWatcherTests(KazooTestCase):
def setUp(self):
super(KazooExistingDataWatcherTests, self).setUp()
self.path = "/" + uuid.uuid4().hex
self.client.ensure_path(self.path)

def test_data_watcher_non_existent_path(self):
update = threading.Event()
data = [True]

# Make it a non-existent path
self.path += 'f'

@self.client.ExistingDataWatch(self.path)
def changed(d, stat):
data.pop()
data.append(d)
update.set()

update.wait(10)
assert data == [None]
update.clear()

# We should not get an update
self.client.create(self.path, b'fred')
update.wait(0.2)
assert data == [None]
update.clear()

def test_data_watcher_existing_path(self):
update = threading.Event()
data = [True]

# Make it an existing path
self.path += 'f'
self.client.create(self.path, b'fred')

@self.client.ExistingDataWatch(self.path)
def changed(d, stat):
data.pop()
data.append(d)
update.set()

update.wait(10)
assert data[0] == b'fred'
update.clear()

def test_data_watcher_delete(self):
update = threading.Event()
data = [True]

# Make it an existing path
self.path += 'f'
self.client.create(self.path, b'fred')

@self.client.ExistingDataWatch(self.path)
def changed(d, stat):
data.pop()
data.append(d)
update.set()

update.wait(10)
assert data[0] == b'fred'
update.clear()

self.client.delete(self.path)
update.wait(10)
assert data == [None]
update.clear()

self.client.create(self.path, b'ginger')
update.wait(0.2)
assert data == [None]
update.clear()


class KazooChildrenWatcherTests(KazooTestCase):
def setUp(self):
super(KazooChildrenWatcherTests, self).setUp()
Expand Down

0 comments on commit 48f5412

Please sign in to comment.