-
-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delayed messages #102
base: main
Are you sure you want to change the base?
Delayed messages #102
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import bisect | ||
import logging | ||
import sys | ||
import threading | ||
import time | ||
import uuid | ||
|
||
from pykka import ActorDeadError, ActorRef, ActorRegistry, messages | ||
|
@@ -10,6 +12,30 @@ | |
logger = logging.getLogger("pykka") | ||
|
||
|
||
class TimedInbox: | ||
def __init__(self): | ||
self.timestamps = [] | ||
self.envelopes = [] | ||
|
||
def empty(self): | ||
return len(self.timestamps) == 0 | ||
|
||
def add(self, envelope): | ||
idx = bisect.bisect(self.timestamps, envelope.timestamp) | ||
self.timestamps.insert(idx, envelope.timestamp) | ||
self.envelopes.insert(idx, envelope) | ||
|
||
def pop(self): | ||
self.timestamps.pop(0) | ||
return self.envelopes.pop(0) | ||
|
||
def next_event_in(self): | ||
if len(self.timestamps) == 0: | ||
return None | ||
else: | ||
return max(self.timestamps[0] - time.monotonic(), 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Being able to sub in a different time source could be useful for tests. Probably not required for this initial PR, but worth keeping in mind. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what kind of API is needed for that, given that it has to match the call in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might want to read up on the clock pattern: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So how is that different from mocking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mocking time.monotonic() in particular may not be feasible. A dedicated clock object can provide more flexibility in this regard, as well as a nicer interface to manage the clock in tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that's true. But there's still a problem of similarly affecting the timeouted methods of |
||
|
||
|
||
class Actor: | ||
""" | ||
To create an actor: | ||
|
@@ -98,6 +124,11 @@ def _create_actor_inbox(): | |
"""Internal method for implementors of new actor types.""" | ||
raise NotImplementedError("Use a subclass of Actor") | ||
|
||
@staticmethod | ||
def _queue_empty_exception(): | ||
"""Internal method for implementors of new actor types.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this return a type or an instance, could we document this? |
||
raise NotImplementedError("Use a subclass of Actor") | ||
|
||
@staticmethod | ||
def _create_future(): | ||
"""Internal method for implementors of new actor types.""" | ||
|
@@ -146,6 +177,9 @@ def __init__(self, *args, **kwargs): | |
|
||
self.actor_ref = ActorRef(self) | ||
|
||
self.__timed_inbox = TimedInbox() | ||
self.__queue_empty_exception = self._queue_empty_exception() | ||
|
||
def __str__(self): | ||
return f"{self.__class__.__name__} ({self.actor_urn})" | ||
|
||
|
@@ -180,8 +214,38 @@ def _actor_loop(self): | |
except Exception: | ||
self._handle_failure(*sys.exc_info()) | ||
|
||
next_event_in = None | ||
|
||
while not self.actor_stopped.is_set(): | ||
envelope = self.actor_inbox.get() | ||
|
||
next_event_in = self.__timed_inbox.next_event_in() | ||
|
||
# Take all the messages out of the inbox and put them | ||
# in our internal inbox where they're sorted by timestamps | ||
try: | ||
envelope = self.actor_inbox.get(timeout=next_event_in) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens when timeout=0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same as with any other timeout, really. It either returns an |
||
self.__timed_inbox.add(envelope) | ||
except self.__queue_empty_exception: | ||
# Raised if we waited for a non-None timeout, | ||
# but the queue is still empty. | ||
# This implies that there was something in the internal inbox | ||
# that is now ready to be processed. | ||
pass | ||
else: | ||
while not self.actor_inbox.empty(): | ||
envelope = self.actor_inbox.get() | ||
self.__timed_inbox.add(envelope) | ||
|
||
# Update the time of the next event, | ||
# since we received at least one new envelope. | ||
next_event_in = self.__timed_inbox.next_event_in() | ||
|
||
# Check if there's something to be processed right now. | ||
if next_event_in > 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought next event in can be zero? Is the check here so that we skip any future messages that have delays left and only run those that are ready. Perhaps makes this more clear with a comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather, if there are no messages to execute right now (that is, we received some messages, but they were all delayed), we go into the waiting mode again. If there is something to execute, |
||
continue | ||
|
||
envelope = self.__timed_inbox.pop() | ||
|
||
try: | ||
response = self._handle_receive(envelope.message) | ||
if envelope.reply_to is not None: | ||
|
@@ -207,6 +271,10 @@ def _actor_loop(self): | |
|
||
while not self.actor_inbox.empty(): | ||
envelope = self.actor_inbox.get() | ||
self.__timed_inbox.add(envelope) | ||
|
||
while not self.__timed_inbox.empty(): | ||
envelope = self.__timed_inbox.pop() | ||
if envelope.reply_to is not None: | ||
if isinstance(envelope.message, messages._ActorStop): | ||
envelope.reply_to.set(None) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,6 @@ | ||
import time | ||
|
||
|
||
class Envelope: | ||
""" | ||
Envelope to add metadata to a message. | ||
|
@@ -11,11 +14,16 @@ class Envelope: | |
""" | ||
|
||
# Using slots speeds up envelope creation with ~20% | ||
__slots__ = ["message", "reply_to"] | ||
__slots__ = ["message", "reply_to", "timestamp"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Name this monotonic timestamp? I personally prefer being a bit overly explicit in naming if there isn't a type this boils down to. I.e. if this was typed to a |
||
|
||
def __init__(self, message, reply_to=None): | ||
def __init__(self, message, reply_to=None, delay=0): | ||
self.message = message | ||
self.reply_to = reply_to | ||
self.timestamp = time.monotonic() + delay | ||
|
||
def __repr__(self): | ||
return f"Envelope(message={self.message!r}, reply_to={self.reply_to!r})" | ||
return ( | ||
f"Envelope(message={self.message!r}, " | ||
f"reply_to={self.reply_to!r}, " | ||
f"timestamp={self.timestamp})" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,15 @@ | ||
from typing import Any, Optional | ||
from typing import Any, Optional, Union | ||
|
||
from pykka import Future | ||
|
||
class Envelope: | ||
message: Any | ||
reply_to: Optional[Future] | ||
def __init__(self, message: Any, reply_to: Optional[Future] = ...) -> None: ... | ||
timestamp: float | ||
def __init__( | ||
self, | ||
message: Any, | ||
reply_to: Optional[Future] = ..., | ||
delay: Union[float, int] = ..., | ||
) -> None: ... | ||
def __repr__(self) -> str: ... |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ class AttrInfo(NamedTuple): | |
traversable: bool | ||
|
||
|
||
class ActorProxy: | ||
class ActorProxyMessageBuilder: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Documentation still refers to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New contributors, rather. Users shouldn't need to care about this. |
||
""" | ||
An :class:`ActorProxy` wraps an :class:`ActorRef <pykka.ActorRef>` | ||
instance. The proxy allows the referenced actor to be used through regular | ||
|
@@ -170,7 +170,7 @@ def _is_exposable_attribute(self, attr_name): | |
|
||
def _is_self_proxy(self, attr): | ||
"""Returns true if attribute is an equivalent actor proxy.""" | ||
return attr == self | ||
return hasattr(attr, "_message_builder") and attr._message_builder == self | ||
|
||
def _is_callable_attribute(self, attr): | ||
"""Returns true for any attribute that is callable.""" | ||
|
@@ -187,7 +187,7 @@ def _is_traversable_attribute(self, attr): | |
) | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, ActorProxy): | ||
if not isinstance(other, ActorProxyMessageBuilder): | ||
return False | ||
if self._actor != other._actor: | ||
return False | ||
|
@@ -199,16 +199,15 @@ def __hash__(self): | |
return hash((self._actor, self._attr_path)) | ||
|
||
def __repr__(self): | ||
return f"<ActorProxy for {self.actor_ref}, attr_path={self._attr_path!r}>" | ||
return ( | ||
f"<ActorProxyMessageBuilder for {self.actor_ref}, " | ||
f"attr_path={self._attr_path!r}>" | ||
) | ||
|
||
def __dir__(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just out curiosity, why are we changing these two methods? I.e. if you explain the new builders and factory stuff on a high level about how things fit together I think you'll answer this at the same time. |
||
result = ["__class__"] | ||
result += list(self.__class__.__dict__.keys()) | ||
result += list(self.__dict__.keys()) | ||
result += [attr_path[0] for attr_path in list(self._known_attrs.keys())] | ||
return sorted(result) | ||
def dir(self): | ||
return [attr_path[0] for attr_path in list(self._known_attrs.keys())] | ||
|
||
def __getattr__(self, name): | ||
def getattr(self, name): | ||
"""Get a field or callable from the actor.""" | ||
attr_path = self._attr_path + (name,) | ||
|
||
|
@@ -221,29 +220,110 @@ def __getattr__(self, name): | |
|
||
if attr_info.callable: | ||
if attr_path not in self._callable_proxies: | ||
self._callable_proxies[attr_path] = CallableProxy( | ||
self._callable_proxies[attr_path] = CallableProxyFactory( | ||
self.actor_ref, attr_path | ||
) | ||
return self._callable_proxies[attr_path] | ||
elif attr_info.traversable: | ||
if attr_path not in self._actor_proxies: | ||
self._actor_proxies[attr_path] = ActorProxy(self.actor_ref, attr_path) | ||
self._actor_proxies[attr_path] = ActorProxyMessageBuilder( | ||
self.actor_ref, attr_path | ||
) | ||
return self._actor_proxies[attr_path] | ||
else: | ||
message = messages.ProxyGetAttr(attr_path=attr_path) | ||
return self.actor_ref.ask(message, block=False) | ||
return messages.ProxyGetAttr(attr_path=attr_path) | ||
|
||
def __setattr__(self, name, value): | ||
def setattr(self, name): | ||
""" | ||
Set a field on the actor. | ||
|
||
Blocks until the field is set to check if any exceptions was raised. | ||
""" | ||
attr_path = self._attr_path + (name,) | ||
return lambda value: messages.ProxySetAttr(attr_path=attr_path, value=value) | ||
|
||
|
||
class ActorProxyBase: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing docstring about how and why this should be used. But I see this is also the case for |
||
@classmethod | ||
def from_actor_ref(cls, actor_ref): | ||
return cls(ActorProxyMessageBuilder(actor_ref)) | ||
|
||
def __init__(self, message_builder): | ||
self._message_builder = message_builder | ||
self.actor_ref = message_builder.actor_ref | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, ActorProxyBase): | ||
return False | ||
else: | ||
return self._message_builder == other._message_builder | ||
|
||
def __hash__(self): | ||
return hash(self._message_builder) | ||
|
||
def __dir__(self): | ||
result = dir(self.__class__) + self._message_builder.dir() | ||
return sorted(result) | ||
|
||
def __repr__(self): | ||
return f"<ActorProxyBase with {self._message_builder!r}>" | ||
|
||
|
||
class ActorProxy(ActorProxyBase): | ||
def __getattr__(self, name): | ||
res = self._message_builder.getattr(name) | ||
if isinstance(res, CallableProxyFactory): | ||
return res() | ||
elif isinstance(res, ActorProxyMessageBuilder): | ||
return ActorProxy(res) | ||
else: | ||
return self.actor_ref.ask(res, block=False) | ||
|
||
def __setattr__(self, name, value): | ||
if name == "actor_ref" or name.startswith("_"): | ||
return super().__setattr__(name, value) | ||
attr_path = self._attr_path + (name,) | ||
message = messages.ProxySetAttr(attr_path=attr_path, value=value) | ||
self.actor_ref.ask(message) | ||
message_factory = self._message_builder.setattr(name) | ||
self.actor_ref.ask(message_factory(value)) | ||
|
||
|
||
class ExtendedActorProxy: | ||
def __init__(self, actor_ref): | ||
self._message_builder = ActorProxyMessageBuilder(actor_ref) | ||
self.actor_ref = actor_ref | ||
|
||
def delayed(self, delay): | ||
return DelayedProxy(self._message_builder, delay=delay) | ||
|
||
|
||
class DelayedProxy(ActorProxyBase): | ||
def __init__(self, message_builder, delay=0): | ||
self._message_builder = message_builder | ||
self._delay = delay | ||
self.actor_ref = message_builder.actor_ref | ||
|
||
def __getattr__(self, name): | ||
res = self._message_builder.getattr(name) | ||
if isinstance(res, CallableProxyFactory): | ||
return res(delay=self._delay) | ||
elif isinstance(res, ActorProxyMessageBuilder): | ||
return DelayedProxy(res, delay=self._delay) | ||
else: | ||
self.actor_ref.ask(res, block=False, delay=self._delay) | ||
|
||
def __setattr__(self, name, value): | ||
if name == "actor_ref" or name.startswith("_"): | ||
return super().__setattr__(name, value) | ||
message_factory = self._message_builder.setattr(name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading this line I almost assumed it was changing an attr on the builder, but I assume it's creating a builder that has the job of setting an attr? Could we make this more clear with better naming or comments? |
||
self.actor_ref.ask(message_factory(value), delay=self._delay) | ||
|
||
|
||
class CallableProxyFactory: | ||
def __init__(self, actor_ref, attr_path): | ||
self.actor_ref = actor_ref | ||
self.attr_path = attr_path | ||
|
||
def __call__(self, delay=0): | ||
return CallableProxy(self.actor_ref, self.attr_path, delay=delay) | ||
|
||
|
||
class CallableProxy: | ||
|
@@ -263,9 +343,10 @@ class CallableProxy: | |
proxy.do_work.defer() | ||
""" | ||
|
||
def __init__(self, actor_ref, attr_path): | ||
def __init__(self, actor_ref, attr_path, delay=0): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Type, units and documentation for delay? |
||
self.actor_ref = actor_ref | ||
self._attr_path = attr_path | ||
self._delay = delay | ||
|
||
def __call__(self, *args, **kwargs): | ||
"""Call with :meth:`~pykka.ActorRef.ask` semantics. | ||
|
@@ -280,7 +361,7 @@ def __call__(self, *args, **kwargs): | |
message = messages.ProxyCall( | ||
attr_path=self._attr_path, args=args, kwargs=kwargs | ||
) | ||
return self.actor_ref.ask(message, block=False) | ||
return self.actor_ref.ask(message, block=False, delay=self._delay) | ||
|
||
def defer(self, *args, **kwargs): | ||
"""Call with :meth:`~pykka.ActorRef.tell` semantics. | ||
|
@@ -296,7 +377,7 @@ def defer(self, *args, **kwargs): | |
message = messages.ProxyCall( | ||
attr_path=self._attr_path, args=args, kwargs=kwargs | ||
) | ||
self.actor_ref.tell(message) | ||
self.actor_ref.tell(message, delay=self._delay) | ||
|
||
|
||
def traversable(obj): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this use
heapq
for a priority queue instead of using bisect like this? If there is a reason for preferring it as I would add it as a comment to help future readers og the code.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea, I admit I wasn't aware of
heapq
's existence.