Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delayed messages #102

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion src/pykka/_actor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import bisect
import logging
import sys
import threading
import time
import uuid

from pykka import ActorDeadError, ActorRef, ActorRegistry, messages
Expand All @@ -10,6 +12,30 @@
logger = logging.getLogger("pykka")


class TimedInbox:
def __init__(self):
self.timestamps = []
self.envelopes = []

def empty(self):
return len(self.timestamps) == 0

def add(self, envelope):
idx = bisect.bisect(self.timestamps, envelope.timestamp)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this use heapq for a priority queue instead of using bisect like this? If there is a reason for preferring it as I would add it as a comment to help future readers og the code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, I admit I wasn't aware of heapq's existence.

self.timestamps.insert(idx, envelope.timestamp)
self.envelopes.insert(idx, envelope)

def pop(self):
self.timestamps.pop(0)
return self.envelopes.pop(0)

def next_event_in(self):
if len(self.timestamps) == 0:
return None
else:
return max(self.timestamps[0] - time.monotonic(), 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being able to sub in a different time source could be useful for tests. Probably not required for this initial PR, but worth keeping in mind.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what kind of API is needed for that, given that it has to match the call in _envelope.py. Perhaps it will be easier to just mock time.monotonic in tests?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to read up on the clock pattern:
https://agilewarrior.wordpress.com/2017/03/03/clock-pattern/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how is that different from mocking time.monotonic?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mocking time.monotonic() in particular may not be feasible. A dedicated clock object can provide more flexibility in this regard, as well as a nicer interface to manage the clock in tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's true. But there's still a problem of similarly affecting the timeouted methods of Queue or Event.



class Actor:
"""
To create an actor:
Expand Down Expand Up @@ -98,6 +124,11 @@ def _create_actor_inbox():
"""Internal method for implementors of new actor types."""
raise NotImplementedError("Use a subclass of Actor")

@staticmethod
def _queue_empty_exception():
"""Internal method for implementors of new actor types."""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this return a type or an instance, could we document this?

raise NotImplementedError("Use a subclass of Actor")

@staticmethod
def _create_future():
"""Internal method for implementors of new actor types."""
Expand Down Expand Up @@ -146,6 +177,9 @@ def __init__(self, *args, **kwargs):

self.actor_ref = ActorRef(self)

self.__timed_inbox = TimedInbox()
self.__queue_empty_exception = self._queue_empty_exception()

def __str__(self):
return f"{self.__class__.__name__} ({self.actor_urn})"

Expand Down Expand Up @@ -180,8 +214,38 @@ def _actor_loop(self):
except Exception:
self._handle_failure(*sys.exc_info())

next_event_in = None

while not self.actor_stopped.is_set():
envelope = self.actor_inbox.get()

next_event_in = self.__timed_inbox.next_event_in()

# Take all the messages out of the inbox and put them
# in our internal inbox where they're sorted by timestamps
try:
envelope = self.actor_inbox.get(timeout=next_event_in)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when timeout=0

Copy link
Author

@fjarri fjarri Mar 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as with any other timeout, really. It either returns an Envelope (if there's one in the queue), or goes into the except self.__queue_empty_exception branch.

self.__timed_inbox.add(envelope)
except self.__queue_empty_exception:
# Raised if we waited for a non-None timeout,
# but the queue is still empty.
# This implies that there was something in the internal inbox
# that is now ready to be processed.
pass
else:
while not self.actor_inbox.empty():
envelope = self.actor_inbox.get()
self.__timed_inbox.add(envelope)

# Update the time of the next event,
# since we received at least one new envelope.
next_event_in = self.__timed_inbox.next_event_in()

# Check if there's something to be processed right now.
if next_event_in > 0:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought next event in can be zero? Is the check here so that we skip any future messages that have delays left and only run those that are ready. Perhaps makes this more clear with a comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather, if there are no messages to execute right now (that is, we received some messages, but they were all delayed), we go into the waiting mode again. If there is something to execute, next_event_in would be 0. I'm not sure how can I make it clearer in the comment; could you propose an alternative?

continue

envelope = self.__timed_inbox.pop()

try:
response = self._handle_receive(envelope.message)
if envelope.reply_to is not None:
Expand All @@ -207,6 +271,10 @@ def _actor_loop(self):

while not self.actor_inbox.empty():
envelope = self.actor_inbox.get()
self.__timed_inbox.add(envelope)

while not self.__timed_inbox.empty():
envelope = self.__timed_inbox.pop()
if envelope.reply_to is not None:
if isinstance(envelope.message, messages._ActorStop):
envelope.reply_to.set(None)
Expand Down
14 changes: 11 additions & 3 deletions src/pykka/_envelope.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time


class Envelope:
"""
Envelope to add metadata to a message.
Expand All @@ -11,11 +14,16 @@ class Envelope:
"""

# Using slots speeds up envelope creation with ~20%
__slots__ = ["message", "reply_to"]
__slots__ = ["message", "reply_to", "timestamp"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name this monotonic timestamp? I personally prefer being a bit overly explicit in naming if there isn't a type this boils down to. I.e. if this was typed to a datetime or something it wouldn't be needed.


def __init__(self, message, reply_to=None):
def __init__(self, message, reply_to=None, delay=0):
self.message = message
self.reply_to = reply_to
self.timestamp = time.monotonic() + delay

def __repr__(self):
return f"Envelope(message={self.message!r}, reply_to={self.reply_to!r})"
return (
f"Envelope(message={self.message!r}, "
f"reply_to={self.reply_to!r}, "
f"timestamp={self.timestamp})"
)
10 changes: 8 additions & 2 deletions src/pykka/_envelope.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from typing import Any, Optional
from typing import Any, Optional, Union

from pykka import Future

class Envelope:
message: Any
reply_to: Optional[Future]
def __init__(self, message: Any, reply_to: Optional[Future] = ...) -> None: ...
timestamp: float
def __init__(
self,
message: Any,
reply_to: Optional[Future] = ...,
delay: Union[float, int] = ...,
) -> None: ...
def __repr__(self) -> str: ...
125 changes: 103 additions & 22 deletions src/pykka/_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class AttrInfo(NamedTuple):
traversable: bool


class ActorProxy:
class ActorProxyMessageBuilder:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation still refers to ActorProxy, not the ActorProxyMessageBuilder. I would also try and explain how this fits in the "architecture" so new users don't have to reverse engineer this from the implementation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New contributors, rather. Users shouldn't need to care about this.

"""
An :class:`ActorProxy` wraps an :class:`ActorRef <pykka.ActorRef>`
instance. The proxy allows the referenced actor to be used through regular
Expand Down Expand Up @@ -170,7 +170,7 @@ def _is_exposable_attribute(self, attr_name):

def _is_self_proxy(self, attr):
"""Returns true if attribute is an equivalent actor proxy."""
return attr == self
return hasattr(attr, "_message_builder") and attr._message_builder == self

def _is_callable_attribute(self, attr):
"""Returns true for any attribute that is callable."""
Expand All @@ -187,7 +187,7 @@ def _is_traversable_attribute(self, attr):
)

def __eq__(self, other):
if not isinstance(other, ActorProxy):
if not isinstance(other, ActorProxyMessageBuilder):
return False
if self._actor != other._actor:
return False
Expand All @@ -199,16 +199,15 @@ def __hash__(self):
return hash((self._actor, self._attr_path))

def __repr__(self):
return f"<ActorProxy for {self.actor_ref}, attr_path={self._attr_path!r}>"
return (
f"<ActorProxyMessageBuilder for {self.actor_ref}, "
f"attr_path={self._attr_path!r}>"
)

def __dir__(self):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out curiosity, why are we changing these two methods? I.e. if you explain the new builders and factory stuff on a high level about how things fit together I think you'll answer this at the same time.

result = ["__class__"]
result += list(self.__class__.__dict__.keys())
result += list(self.__dict__.keys())
result += [attr_path[0] for attr_path in list(self._known_attrs.keys())]
return sorted(result)
def dir(self):
return [attr_path[0] for attr_path in list(self._known_attrs.keys())]

def __getattr__(self, name):
def getattr(self, name):
"""Get a field or callable from the actor."""
attr_path = self._attr_path + (name,)

Expand All @@ -221,29 +220,110 @@ def __getattr__(self, name):

if attr_info.callable:
if attr_path not in self._callable_proxies:
self._callable_proxies[attr_path] = CallableProxy(
self._callable_proxies[attr_path] = CallableProxyFactory(
self.actor_ref, attr_path
)
return self._callable_proxies[attr_path]
elif attr_info.traversable:
if attr_path not in self._actor_proxies:
self._actor_proxies[attr_path] = ActorProxy(self.actor_ref, attr_path)
self._actor_proxies[attr_path] = ActorProxyMessageBuilder(
self.actor_ref, attr_path
)
return self._actor_proxies[attr_path]
else:
message = messages.ProxyGetAttr(attr_path=attr_path)
return self.actor_ref.ask(message, block=False)
return messages.ProxyGetAttr(attr_path=attr_path)

def __setattr__(self, name, value):
def setattr(self, name):
"""
Set a field on the actor.

Blocks until the field is set to check if any exceptions was raised.
"""
attr_path = self._attr_path + (name,)
return lambda value: messages.ProxySetAttr(attr_path=attr_path, value=value)


class ActorProxyBase:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring about how and why this should be used. But I see this is also the case for ActorProxy - so it might not be fair to put this on you as a PR author. But I do think this could make sense for the project overall.

@classmethod
def from_actor_ref(cls, actor_ref):
return cls(ActorProxyMessageBuilder(actor_ref))

def __init__(self, message_builder):
self._message_builder = message_builder
self.actor_ref = message_builder.actor_ref

def __eq__(self, other):
if not isinstance(other, ActorProxyBase):
return False
else:
return self._message_builder == other._message_builder

def __hash__(self):
return hash(self._message_builder)

def __dir__(self):
result = dir(self.__class__) + self._message_builder.dir()
return sorted(result)

def __repr__(self):
return f"<ActorProxyBase with {self._message_builder!r}>"


class ActorProxy(ActorProxyBase):
def __getattr__(self, name):
res = self._message_builder.getattr(name)
if isinstance(res, CallableProxyFactory):
return res()
elif isinstance(res, ActorProxyMessageBuilder):
return ActorProxy(res)
else:
return self.actor_ref.ask(res, block=False)

def __setattr__(self, name, value):
if name == "actor_ref" or name.startswith("_"):
return super().__setattr__(name, value)
attr_path = self._attr_path + (name,)
message = messages.ProxySetAttr(attr_path=attr_path, value=value)
self.actor_ref.ask(message)
message_factory = self._message_builder.setattr(name)
self.actor_ref.ask(message_factory(value))


class ExtendedActorProxy:
def __init__(self, actor_ref):
self._message_builder = ActorProxyMessageBuilder(actor_ref)
self.actor_ref = actor_ref

def delayed(self, delay):
return DelayedProxy(self._message_builder, delay=delay)


class DelayedProxy(ActorProxyBase):
def __init__(self, message_builder, delay=0):
self._message_builder = message_builder
self._delay = delay
self.actor_ref = message_builder.actor_ref

def __getattr__(self, name):
res = self._message_builder.getattr(name)
if isinstance(res, CallableProxyFactory):
return res(delay=self._delay)
elif isinstance(res, ActorProxyMessageBuilder):
return DelayedProxy(res, delay=self._delay)
else:
self.actor_ref.ask(res, block=False, delay=self._delay)

def __setattr__(self, name, value):
if name == "actor_ref" or name.startswith("_"):
return super().__setattr__(name, value)
message_factory = self._message_builder.setattr(name)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this line I almost assumed it was changing an attr on the builder, but I assume it's creating a builder that has the job of setting an attr? Could we make this more clear with better naming or comments?

self.actor_ref.ask(message_factory(value), delay=self._delay)


class CallableProxyFactory:
def __init__(self, actor_ref, attr_path):
self.actor_ref = actor_ref
self.attr_path = attr_path

def __call__(self, delay=0):
return CallableProxy(self.actor_ref, self.attr_path, delay=delay)


class CallableProxy:
Expand All @@ -263,9 +343,10 @@ class CallableProxy:
proxy.do_work.defer()
"""

def __init__(self, actor_ref, attr_path):
def __init__(self, actor_ref, attr_path, delay=0):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type, units and documentation for delay?

self.actor_ref = actor_ref
self._attr_path = attr_path
self._delay = delay

def __call__(self, *args, **kwargs):
"""Call with :meth:`~pykka.ActorRef.ask` semantics.
Expand All @@ -280,7 +361,7 @@ def __call__(self, *args, **kwargs):
message = messages.ProxyCall(
attr_path=self._attr_path, args=args, kwargs=kwargs
)
return self.actor_ref.ask(message, block=False)
return self.actor_ref.ask(message, block=False, delay=self._delay)

def defer(self, *args, **kwargs):
"""Call with :meth:`~pykka.ActorRef.tell` semantics.
Expand All @@ -296,7 +377,7 @@ def defer(self, *args, **kwargs):
message = messages.ProxyCall(
attr_path=self._attr_path, args=args, kwargs=kwargs
)
self.actor_ref.tell(message)
self.actor_ref.tell(message, delay=self._delay)


def traversable(obj):
Expand Down
Loading