From 7b376d193608cd63421966dfce657f4cabb540a6 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Sun, 1 Nov 2020 23:05:08 -0800 Subject: [PATCH 1/2] Delayed ask()/tell() --- src/pykka/_actor.py | 70 ++++++++++++++++++++++++++++++++++++++++- src/pykka/_envelope.py | 14 +++++++-- src/pykka/_envelope.pyi | 10 ++++-- src/pykka/_ref.py | 10 +++--- src/pykka/_threading.py | 4 +++ tests/test_actor.py | 43 +++++++++++++++++++++++++ tests/test_envelope.py | 15 +++++++-- 7 files changed, 154 insertions(+), 12 deletions(-) diff --git a/src/pykka/_actor.py b/src/pykka/_actor.py index 6d1bf8fe..2b1e4437 100644 --- a/src/pykka/_actor.py +++ b/src/pykka/_actor.py @@ -1,6 +1,8 @@ +import bisect import logging import sys import threading +import time import uuid from pykka import ActorDeadError, ActorRef, ActorRegistry, messages @@ -10,6 +12,30 @@ logger = logging.getLogger("pykka") +class TimedInbox: + def __init__(self): + self.timestamps = [] + self.envelopes = [] + + def empty(self): + return len(self.timestamps) == 0 + + def add(self, envelope): + idx = bisect.bisect(self.timestamps, envelope.timestamp) + self.timestamps.insert(idx, envelope.timestamp) + self.envelopes.insert(idx, envelope) + + def pop(self): + self.timestamps.pop(0) + return self.envelopes.pop(0) + + def next_event_in(self): + if len(self.timestamps) == 0: + return None + else: + return max(self.timestamps[0] - time.monotonic(), 0) + + class Actor: """ To create an actor: @@ -98,6 +124,11 @@ def _create_actor_inbox(): """Internal method for implementors of new actor types.""" raise NotImplementedError("Use a subclass of Actor") + @staticmethod + def _queue_empty_exception(): + """Internal method for implementors of new actor types.""" + raise NotImplementedError("Use a subclass of Actor") + @staticmethod def _create_future(): """Internal method for implementors of new actor types.""" @@ -146,6 +177,9 @@ def __init__(self, *args, **kwargs): self.actor_ref = ActorRef(self) + self.__timed_inbox = TimedInbox() + self.__queue_empty_exception = self._queue_empty_exception() + def __str__(self): return f"{self.__class__.__name__} ({self.actor_urn})" @@ -180,8 +214,38 @@ def _actor_loop(self): except Exception: self._handle_failure(*sys.exc_info()) + next_event_in = None + while not self.actor_stopped.is_set(): - envelope = self.actor_inbox.get() + + next_event_in = self.__timed_inbox.next_event_in() + + # Take all the messages out of the inbox and put them + # in our internal inbox where they're sorted by timestamps + try: + envelope = self.actor_inbox.get(timeout=next_event_in) + self.__timed_inbox.add(envelope) + except self.__queue_empty_exception: + # Raised if we waited for a non-None timeout, + # but the queue is still empty. + # This implies that there was something in the internal inbox + # that is now ready to be processed. + pass + else: + while not self.actor_inbox.empty(): + envelope = self.actor_inbox.get() + self.__timed_inbox.add(envelope) + + # Update the time of the next event, + # since we received at least one new envelope. + next_event_in = self.__timed_inbox.next_event_in() + + # Check if there's something to be processed right now. + if next_event_in > 0: + continue + + envelope = self.__timed_inbox.pop() + try: response = self._handle_receive(envelope.message) if envelope.reply_to is not None: @@ -207,6 +271,10 @@ def _actor_loop(self): while not self.actor_inbox.empty(): envelope = self.actor_inbox.get() + self.__timed_inbox.add(envelope) + + while not self.__timed_inbox.empty(): + envelope = self.__timed_inbox.pop() if envelope.reply_to is not None: if isinstance(envelope.message, messages._ActorStop): envelope.reply_to.set(None) diff --git a/src/pykka/_envelope.py b/src/pykka/_envelope.py index e4ad0973..90f87f6c 100644 --- a/src/pykka/_envelope.py +++ b/src/pykka/_envelope.py @@ -1,3 +1,6 @@ +import time + + class Envelope: """ Envelope to add metadata to a message. @@ -11,11 +14,16 @@ class Envelope: """ # Using slots speeds up envelope creation with ~20% - __slots__ = ["message", "reply_to"] + __slots__ = ["message", "reply_to", "timestamp"] - def __init__(self, message, reply_to=None): + def __init__(self, message, reply_to=None, delay=0): self.message = message self.reply_to = reply_to + self.timestamp = time.monotonic() + delay def __repr__(self): - return f"Envelope(message={self.message!r}, reply_to={self.reply_to!r})" + return ( + f"Envelope(message={self.message!r}, " + f"reply_to={self.reply_to!r}, " + f"timestamp={self.timestamp})" + ) diff --git a/src/pykka/_envelope.pyi b/src/pykka/_envelope.pyi index 296007bf..8520b09f 100644 --- a/src/pykka/_envelope.pyi +++ b/src/pykka/_envelope.pyi @@ -1,9 +1,15 @@ -from typing import Any, Optional +from typing import Any, Optional, Union from pykka import Future class Envelope: message: Any reply_to: Optional[Future] - def __init__(self, message: Any, reply_to: Optional[Future] = ...) -> None: ... + timestamp: float + def __init__( + self, + message: Any, + reply_to: Optional[Future] = ..., + delay: Union[float, int] = ..., + ) -> None: ... def __repr__(self) -> str: ... diff --git a/src/pykka/_ref.py b/src/pykka/_ref.py index 28944d31..6cb0ab1f 100644 --- a/src/pykka/_ref.py +++ b/src/pykka/_ref.py @@ -55,7 +55,7 @@ def is_alive(self): """ return not self.actor_stopped.is_set() - def tell(self, message): + def tell(self, message, delay=0): """ Send message to actor without waiting for any response. @@ -70,9 +70,9 @@ def tell(self, message): """ if not self.is_alive(): raise ActorDeadError(f"{self} not found") - self.actor_inbox.put(Envelope(message)) + self.actor_inbox.put(Envelope(message, delay=delay)) - def ask(self, message, block=True, timeout=None): + def ask(self, message, block=True, timeout=None, delay=0): """ Send message to actor and wait for the reply. @@ -107,7 +107,9 @@ def ask(self, message, block=True, timeout=None): except ActorDeadError: future.set_exception() else: - self.actor_inbox.put(Envelope(message, reply_to=future)) + self.actor_inbox.put( + Envelope(message, reply_to=future, delay=delay) + ) if block: return future.get(timeout=timeout) diff --git a/src/pykka/_threading.py b/src/pykka/_threading.py index 4de6035b..7e534324 100644 --- a/src/pykka/_threading.py +++ b/src/pykka/_threading.py @@ -94,6 +94,10 @@ class ThreadingActor(Actor): def _create_actor_inbox(): return queue.Queue() + @staticmethod + def _queue_empty_exception(): + return queue.Empty + @staticmethod def _create_future(): return ThreadingFuture() diff --git a/tests/test_actor.py b/tests/test_actor.py index 7e5d982a..257f9f94 100644 --- a/tests/test_actor.py +++ b/tests/test_actor.py @@ -1,3 +1,4 @@ +import time import uuid import pytest @@ -237,3 +238,45 @@ def test_actor_processes_all_messages_before_stop_on_self_stops_it(actor_ref, ev events.on_stop_was_called.wait(5) assert len(ActorRegistry.get_all()) == 0 + + +def test_delayed_message_is_actually_delayed(runtime, actor_ref): + event = runtime.event_class() + lst = [] + + actor_ref.tell( + { + "command": "callback", + "callback": lambda: lst.append(2) or event.set(), + }, + delay=1, + ) + actor_ref.tell( + {"command": "callback", "callback": lambda: lst.append(1)}, delay=0.5 + ) + event_set = event.wait() + actor_ref.stop() + + assert event_set + assert lst == [1, 2] + + +def test_delayed_message_with_shorter_delay_cuts_in_line(runtime, actor_ref): + event = runtime.event_class() + + departure_time = time.time() + arrival_time = [] + delay = 1 + + actor_ref.tell( + { + "command": "callback", + "callback": lambda: arrival_time.append(time.time()) or event.set(), + }, + delay=delay, + ) + event_set = event.wait() + actor_ref.stop() + + assert event_set + assert abs(arrival_time[0] - departure_time - delay) < 0.1 diff --git a/tests/test_envelope.py b/tests/test_envelope.py index d9f3c85e..bd865071 100644 --- a/tests/test_envelope.py +++ b/tests/test_envelope.py @@ -1,7 +1,18 @@ +import re +import time + from pykka._envelope import Envelope def test_envelope_repr(): - envelope = Envelope("message", reply_to=123) + current_time = time.monotonic() + delay = 10 + envelope = Envelope("message", reply_to=123, delay=delay) + match = re.match( + r"Envelope\(message='message', reply_to=123, timestamp=([\d\.]+)\)", + repr(envelope), + ) - assert repr(envelope) == "Envelope(message='message', reply_to=123)" + assert match is not None + # there will be some difference, execution takes time + assert abs(float(match.group(1)) - current_time - delay) < 0.1 From 50dac57f3cf305422dbed2bc560a9680b86a0261 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Wed, 4 Nov 2020 22:22:35 -0800 Subject: [PATCH 2/2] Delayed proxy objects --- src/pykka/_proxy.py | 125 +++++++++++++++++++----- src/pykka/_ref.py | 13 +-- tests/proxy/test_proxy.py | 4 +- tests/proxy/test_static_method_calls.py | 4 +- 4 files changed, 114 insertions(+), 32 deletions(-) diff --git a/src/pykka/_proxy.py b/src/pykka/_proxy.py index f6185a87..7817f9e0 100644 --- a/src/pykka/_proxy.py +++ b/src/pykka/_proxy.py @@ -14,7 +14,7 @@ class AttrInfo(NamedTuple): traversable: bool -class ActorProxy: +class ActorProxyMessageBuilder: """ An :class:`ActorProxy` wraps an :class:`ActorRef ` instance. The proxy allows the referenced actor to be used through regular @@ -170,7 +170,7 @@ def _is_exposable_attribute(self, attr_name): def _is_self_proxy(self, attr): """Returns true if attribute is an equivalent actor proxy.""" - return attr == self + return hasattr(attr, "_message_builder") and attr._message_builder == self def _is_callable_attribute(self, attr): """Returns true for any attribute that is callable.""" @@ -187,7 +187,7 @@ def _is_traversable_attribute(self, attr): ) def __eq__(self, other): - if not isinstance(other, ActorProxy): + if not isinstance(other, ActorProxyMessageBuilder): return False if self._actor != other._actor: return False @@ -199,16 +199,15 @@ def __hash__(self): return hash((self._actor, self._attr_path)) def __repr__(self): - return f"" + return ( + f"" + ) - def __dir__(self): - result = ["__class__"] - result += list(self.__class__.__dict__.keys()) - result += list(self.__dict__.keys()) - result += [attr_path[0] for attr_path in list(self._known_attrs.keys())] - return sorted(result) + def dir(self): + return [attr_path[0] for attr_path in list(self._known_attrs.keys())] - def __getattr__(self, name): + def getattr(self, name): """Get a field or callable from the actor.""" attr_path = self._attr_path + (name,) @@ -221,29 +220,110 @@ def __getattr__(self, name): if attr_info.callable: if attr_path not in self._callable_proxies: - self._callable_proxies[attr_path] = CallableProxy( + self._callable_proxies[attr_path] = CallableProxyFactory( self.actor_ref, attr_path ) return self._callable_proxies[attr_path] elif attr_info.traversable: if attr_path not in self._actor_proxies: - self._actor_proxies[attr_path] = ActorProxy(self.actor_ref, attr_path) + self._actor_proxies[attr_path] = ActorProxyMessageBuilder( + self.actor_ref, attr_path + ) return self._actor_proxies[attr_path] else: - message = messages.ProxyGetAttr(attr_path=attr_path) - return self.actor_ref.ask(message, block=False) + return messages.ProxyGetAttr(attr_path=attr_path) - def __setattr__(self, name, value): + def setattr(self, name): """ Set a field on the actor. Blocks until the field is set to check if any exceptions was raised. """ + attr_path = self._attr_path + (name,) + return lambda value: messages.ProxySetAttr(attr_path=attr_path, value=value) + + +class ActorProxyBase: + @classmethod + def from_actor_ref(cls, actor_ref): + return cls(ActorProxyMessageBuilder(actor_ref)) + + def __init__(self, message_builder): + self._message_builder = message_builder + self.actor_ref = message_builder.actor_ref + + def __eq__(self, other): + if not isinstance(other, ActorProxyBase): + return False + else: + return self._message_builder == other._message_builder + + def __hash__(self): + return hash(self._message_builder) + + def __dir__(self): + result = dir(self.__class__) + self._message_builder.dir() + return sorted(result) + + def __repr__(self): + return f"" + + +class ActorProxy(ActorProxyBase): + def __getattr__(self, name): + res = self._message_builder.getattr(name) + if isinstance(res, CallableProxyFactory): + return res() + elif isinstance(res, ActorProxyMessageBuilder): + return ActorProxy(res) + else: + return self.actor_ref.ask(res, block=False) + + def __setattr__(self, name, value): if name == "actor_ref" or name.startswith("_"): return super().__setattr__(name, value) - attr_path = self._attr_path + (name,) - message = messages.ProxySetAttr(attr_path=attr_path, value=value) - self.actor_ref.ask(message) + message_factory = self._message_builder.setattr(name) + self.actor_ref.ask(message_factory(value)) + + +class ExtendedActorProxy: + def __init__(self, actor_ref): + self._message_builder = ActorProxyMessageBuilder(actor_ref) + self.actor_ref = actor_ref + + def delayed(self, delay): + return DelayedProxy(self._message_builder, delay=delay) + + +class DelayedProxy(ActorProxyBase): + def __init__(self, message_builder, delay=0): + self._message_builder = message_builder + self._delay = delay + self.actor_ref = message_builder.actor_ref + + def __getattr__(self, name): + res = self._message_builder.getattr(name) + if isinstance(res, CallableProxyFactory): + return res(delay=self._delay) + elif isinstance(res, ActorProxyMessageBuilder): + return DelayedProxy(res, delay=self._delay) + else: + self.actor_ref.ask(res, block=False, delay=self._delay) + + def __setattr__(self, name, value): + if name == "actor_ref" or name.startswith("_"): + return super().__setattr__(name, value) + message_factory = self._message_builder.setattr(name) + self.actor_ref.ask(message_factory(value), delay=self._delay) + + +class CallableProxyFactory: + def __init__(self, actor_ref, attr_path): + self.actor_ref = actor_ref + self.attr_path = attr_path + + def __call__(self, delay=0): + return CallableProxy(self.actor_ref, self.attr_path, delay=delay) class CallableProxy: @@ -263,9 +343,10 @@ class CallableProxy: proxy.do_work.defer() """ - def __init__(self, actor_ref, attr_path): + def __init__(self, actor_ref, attr_path, delay=0): self.actor_ref = actor_ref self._attr_path = attr_path + self._delay = delay def __call__(self, *args, **kwargs): """Call with :meth:`~pykka.ActorRef.ask` semantics. @@ -280,7 +361,7 @@ def __call__(self, *args, **kwargs): message = messages.ProxyCall( attr_path=self._attr_path, args=args, kwargs=kwargs ) - return self.actor_ref.ask(message, block=False) + return self.actor_ref.ask(message, block=False, delay=self._delay) def defer(self, *args, **kwargs): """Call with :meth:`~pykka.ActorRef.tell` semantics. @@ -296,7 +377,7 @@ def defer(self, *args, **kwargs): message = messages.ProxyCall( attr_path=self._attr_path, args=args, kwargs=kwargs ) - self.actor_ref.tell(message) + self.actor_ref.tell(message, delay=self._delay) def traversable(obj): diff --git a/src/pykka/_ref.py b/src/pykka/_ref.py index 6cb0ab1f..280d4595 100644 --- a/src/pykka/_ref.py +++ b/src/pykka/_ref.py @@ -1,5 +1,5 @@ -from pykka import ActorDeadError, ActorProxy from pykka._envelope import Envelope +from pykka._proxy import ActorDeadError, ActorProxy, ExtendedActorProxy from pykka.messages import _ActorStop __all__ = ["ActorRef"] @@ -107,9 +107,7 @@ def ask(self, message, block=True, timeout=None, delay=0): except ActorDeadError: future.set_exception() else: - self.actor_inbox.put( - Envelope(message, reply_to=future, delay=delay) - ) + self.actor_inbox.put(Envelope(message, reply_to=future, delay=delay)) if block: return future.get(timeout=timeout) @@ -153,7 +151,7 @@ def _stop_result_converter(timeout): else: return converted_future - def proxy(self): + def proxy(self, extended=False): """ Wraps the :class:`ActorRef` in an :class:`ActorProxy `. @@ -169,4 +167,7 @@ def proxy(self): :raise: :exc:`pykka.ActorDeadError` if actor is not available :return: :class:`pykka.ActorProxy` """ - return ActorProxy(self) + if extended: + return ExtendedActorProxy(self) + else: + return ActorProxy.from_actor_ref(self) diff --git a/tests/proxy/test_proxy.py b/tests/proxy/test_proxy.py index 64d0ac95..17396715 100644 --- a/tests/proxy/test_proxy.py +++ b/tests/proxy/test_proxy.py @@ -26,7 +26,7 @@ def a_method(self): @pytest.fixture def proxy(actor_class): - proxy = ActorProxy(actor_class.start()) + proxy = ActorProxy.from_actor_ref(actor_class.start()) yield proxy proxy.stop() @@ -110,7 +110,7 @@ def test_proxy_constructor_raises_exception_if_actor_is_dead(actor_class): actor_ref.stop() with pytest.raises(ActorDeadError) as exc_info: - ActorProxy(actor_ref) + ActorProxy.from_actor_ref(actor_ref) assert str(exc_info.value) == f"{actor_ref} not found" diff --git a/tests/proxy/test_static_method_calls.py b/tests/proxy/test_static_method_calls.py index cefe46f6..1f1fbaa8 100644 --- a/tests/proxy/test_static_method_calls.py +++ b/tests/proxy/test_static_method_calls.py @@ -89,7 +89,7 @@ def test_call_to_unknown_method_raises_attribute_error(proxy): result = str(exc_info.value) - assert result.startswith("