diff --git a/pykka/_actor.py b/pykka/_actor.py index a0c066f1..185a284d 100644 --- a/pykka/_actor.py +++ b/pykka/_actor.py @@ -2,10 +2,10 @@ import sys import threading import uuid +from collections import deque from pykka import ActorDeadError, ActorRef, ActorRegistry, messages - __all__ = ["Actor"] logger = logging.getLogger("pykka") @@ -126,6 +126,10 @@ def _start_actor_loop(self): #: continue processing messages. Use :meth:`stop` to change it. actor_stopped = None + #: A stack of message handlers. These handlers are tuples of a function and + #: its args. It always has at least one element - default :meth:`on_receive`. + actor_message_handlers = None + def __init__(self, *args, **kwargs): """ Your are free to override :meth:`__init__`, but you must call your @@ -150,6 +154,7 @@ def __init__(self, *args, **kwargs): self.actor_stopped = threading.Event() self.actor_ref = ActorRef(self) + self.actor_message_handlers = deque([(self.on_receive, ())]) def __str__(self): return f"{self.__class__.__name__} ({self.actor_urn})" @@ -298,7 +303,8 @@ def _handle_receive(self, message): parent_attr = self._get_attribute_from_path(message.attr_path[:-1]) attr_name = message.attr_path[-1] return setattr(parent_attr, attr_name, message.value) - return self.on_receive(message) + message_handler, args = self.actor_message_handlers[-1] + return message_handler(message, *args) def on_receive(self, message): """ @@ -345,3 +351,27 @@ def _introspect_attributes(self, obj): if hasattr(obj, "__dict__"): result.update(obj.__dict__) return result + + def _become(self, func, *args, discard_old=True): + """ + Adds a specified message handler to a stack to allow changing actor + behaviour on the fly. + + If `discard_old` is set to `True`, it replaces the latest handler, + so :meth:`_unbecome` will always lead to returning to the original + :meth:`on_receive` method. + If `discard_old` is set to `False`, the new handler will be placed + to the top of the stack, and it will need several :meth:`_unbecome`s + to return to the original behaviour. + """ + if discard_old: + self._unbecome() + self.actor_message_handlers.append((func, args)) + + def _unbecome(self): + """ + Removes one message handler from a stack if there are any handlers + but the original :meth:`on_receive` method. + """ + if len(self.actor_message_handlers) > 1: + self.actor_message_handlers.pop() diff --git a/pykka/_actor.pyi b/pykka/_actor.pyi index 27b6b99a..65f3516e 100644 --- a/pykka/_actor.pyi +++ b/pykka/_actor.pyi @@ -1,6 +1,7 @@ import threading +from collections import deque from types import TracebackType -from typing import Any, Dict, Sequence, Type +from typing import Any, Callable, Dict, Sequence, Type from typing_extensions import Protocol # Py38+: Available in ``typing`` @@ -23,6 +24,7 @@ class Actor: actor_inbox: ActorInbox actor_ref: ActorRef actor_stopped: threading.Event + actor_message_handlers: deque def __init__(self, *args: Any, **kwargs: Any) -> None: ... def __str__(self) -> str: ... def stop(self) -> None: ... @@ -49,3 +51,7 @@ class Actor: self, attr_path: Sequence[str] ) -> Dict[str, Any]: ... def _introspect_attributes(self, obj: Any) -> Dict[str, Any]: ... + def _unbecome(self) -> None: ... + def _become( + self, func: Callable, *args: Any, discard_old: bool + ) -> None: ... diff --git a/pykka/_future.pyi b/pykka/_future.pyi index 484b82a8..69f45493 100644 --- a/pykka/_future.pyi +++ b/pykka/_future.pyi @@ -11,7 +11,7 @@ from typing import ( from pykka._types import OptExcInfo _T = TypeVar("_T") -I = TypeVar("I") # For when T is Iterable[I] # noqa +I = TypeVar("I") # noqa # For when T is Iterable[I] _M = TypeVar("_M") # For Future.map() _R = TypeVar("_R") # For Future.reduce() @@ -26,12 +26,14 @@ class Future(Generic[_T]): def set_exception(self, exc_info: Optional[OptExcInfo] = ...) -> None: ... def set_get_hook(self, func: GetHookFunc) -> None: ... def filter( - self: Future[Iterable[I]], func: Callable[[I], bool] - ) -> Future[Iterable[I]]: ... + self: Future[Iterable[I]], func: Callable[[I], bool] # noqa + ) -> Future[Iterable[I]]: ... # noqa def join(self, *futures: Future[Any]) -> Future[Iterable[Any]]: ... def map(self, func: Callable[[_T], _M]) -> Future[_M]: ... def reduce( - self: Future[Iterable[I]], func: Callable[[_R, I], _R], *args: _R + self: Future[Iterable[I]], # noqa + func: Callable[[_R, I], _R], # noqa + *args: _R, ) -> Future[_R]: ... def __await__(self) -> Generator[None, None, _T]: ... diff --git a/pykka/_ref.pyi b/pykka/_ref.pyi index a2b50cb8..7d10c9aa 100644 --- a/pykka/_ref.pyi +++ b/pykka/_ref.pyi @@ -1,11 +1,5 @@ import threading -from typing import ( - Any, - Optional, - Type, - Union, - overload, -) +from typing import Any, Optional, Type, Union, overload from typing_extensions import Literal @@ -32,10 +26,7 @@ class ActorRef: ) -> Future[Any]: ... @overload # noqa: Allow redefinition def ask( - self, - message: Any, - block: Literal[True], - timeout: Optional[float] = ..., + self, message: Any, block: Literal[True], timeout: Optional[float] = ... ) -> Any: ... @overload # noqa: Allow redefinition def ask( diff --git a/tests/test_actor.py b/tests/test_actor.py index bb50a45d..9f51b851 100644 --- a/tests/test_actor.py +++ b/tests/test_actor.py @@ -70,6 +70,45 @@ def on_stop(self): return EarlyStoppingActor +@pytest.fixture(scope="module") +def dynamic_actor_class(runtime): + class DynamicActor(runtime.actor_class): + def on_receive(self, message): + if message.get("command") == "current behaviour": + return "default" + if message.get("command") == "go deeper": + self._become(self.new_behaviour1, "deep") + if message.get("command") == "go high": + self._unbecome() + + def new_behaviour1(self, message, word1): + if message.get("command") == "current behaviour": + return word1 + if message.get("command") == "go deeper": + self._become(self.new_behaviour2, "very", "deep") + if message.get("command") == "step deeper": + self._become( + self.new_behaviour2, "very", "deep", discard_old=False + ) + if message.get("command") == "go high": + self._unbecome() + + def new_behaviour2(self, message, word1, word2): + if message.get("command") == "current behaviour": + return f"{word1} {word2}" + if message.get("command") == "go high": + self._unbecome() + + return DynamicActor + + +@pytest.fixture +def dynamic_actor_ref(dynamic_actor_class): + ref = dynamic_actor_class.start() + yield ref + ref.stop() + + def test_messages_left_in_queue_after_actor_stops_receive_an_error( runtime, actor_ref ): @@ -252,3 +291,48 @@ def test_actor_processes_all_messages_before_stop_on_self_stops_it( events.on_stop_was_called.wait(5) assert len(ActorRegistry.get_all()) == 0 + + +def test_actor_changes_behaviour_on_become(dynamic_actor_ref): + before_become = dynamic_actor_ref.ask({"command": "current behaviour"}) + dynamic_actor_ref.tell({"command": "go deeper"}) + first_become = dynamic_actor_ref.ask({"command": "current behaviour"}) + dynamic_actor_ref.tell({"command": "go deeper"}) + second_become = dynamic_actor_ref.ask({"command": "current behaviour"}) + assert before_become == "default" + assert first_become == "deep" + assert second_become == "very deep" + + +def test_actor_changes_behaviour_on_unbecome(dynamic_actor_ref): + dynamic_actor_ref.tell({"command": "go deeper"}) + dynamic_actor_ref.tell({"command": "go deeper"}) + after_become = dynamic_actor_ref.ask({"command": "current behaviour"}) + dynamic_actor_ref.tell({"command": "go high"}) + after_unbecome = dynamic_actor_ref.ask({"command": "current behaviour"}) + assert after_become == "very deep" + assert after_unbecome == "default" + + +def test_actor_stacks_handlers(dynamic_actor_ref): + dynamic_actor_ref.tell({"command": "go deeper"}) + dynamic_actor_ref.tell({"command": "step deeper"}) + after_become = dynamic_actor_ref.ask({"command": "current behaviour"}) + dynamic_actor_ref.tell({"command": "go high"}) + after_unbecome = dynamic_actor_ref.ask({"command": "current behaviour"}) + assert after_become == "very deep" + assert after_unbecome == "deep" + + +def test_actor_cant_drop_default_handler(dynamic_actor_ref): + before_become = dynamic_actor_ref.ask({"command": "current behaviour"}) + dynamic_actor_ref.tell({"command": "go deeper"}) + after_become = dynamic_actor_ref.ask({"command": "current behaviour"}) + dynamic_actor_ref.tell({"command": "go high"}) + first_unbecome = dynamic_actor_ref.ask({"command": "current behaviour"}) + dynamic_actor_ref.tell({"command": "go high"}) + second_unbecome = dynamic_actor_ref.ask({"command": "current behaviour"}) + assert before_become == "default" + assert after_become == "deep" + assert first_unbecome == "default" + assert second_unbecome == "default" diff --git a/tests/test_messages.py b/tests/test_messages.py index f342c840..b4b91426 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -1,9 +1,4 @@ -from pykka.messages import ( - ProxyCall, - ProxyGetAttr, - ProxySetAttr, - _ActorStop, -) +from pykka.messages import ProxyCall, ProxyGetAttr, ProxySetAttr, _ActorStop def test_actor_stop():