Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka style Become and Unbecome #97

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions pykka/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import sys
import threading
import uuid
from collections import deque

from pykka import ActorDeadError, ActorRef, ActorRegistry, messages


__all__ = ["Actor"]

logger = logging.getLogger("pykka")
Expand Down Expand Up @@ -126,6 +126,10 @@ def _start_actor_loop(self):
#: continue processing messages. Use :meth:`stop` to change it.
actor_stopped = None

#: A stack of message handlers. These handlers are tuples of a function and
#: its args. It always has at least one element - default :meth:`on_receive`.
actor_message_handlers = None

def __init__(self, *args, **kwargs):
"""
Your are free to override :meth:`__init__`, but you must call your
Expand All @@ -150,6 +154,7 @@ def __init__(self, *args, **kwargs):
self.actor_stopped = threading.Event()

self.actor_ref = ActorRef(self)
self.actor_message_handlers = deque([(self.on_receive, ())])

def __str__(self):
return f"{self.__class__.__name__} ({self.actor_urn})"
Expand Down Expand Up @@ -298,7 +303,8 @@ def _handle_receive(self, message):
parent_attr = self._get_attribute_from_path(message.attr_path[:-1])
attr_name = message.attr_path[-1]
return setattr(parent_attr, attr_name, message.value)
return self.on_receive(message)
message_handler, args = self.actor_message_handlers[-1]
return message_handler(message, *args)

def on_receive(self, message):
"""
Expand Down Expand Up @@ -345,3 +351,27 @@ def _introspect_attributes(self, obj):
if hasattr(obj, "__dict__"):
result.update(obj.__dict__)
return result

def _become(self, func, *args, discard_old=True):
"""
Adds a specified message handler to a stack to allow changing actor
behaviour on the fly.

If `discard_old` is set to `True`, it replaces the latest handler,
so :meth:`_unbecome` will always lead to returning to the original
:meth:`on_receive` method.
If `discard_old` is set to `False`, the new handler will be placed
to the top of the stack, and it will need several :meth:`_unbecome`s
to return to the original behaviour.
"""
if discard_old:
self._unbecome()
self.actor_message_handlers.append((func, args))

def _unbecome(self):
"""
Removes one message handler from a stack if there are any handlers
but the original :meth:`on_receive` method.
"""
if len(self.actor_message_handlers) > 1:
self.actor_message_handlers.pop()
8 changes: 7 additions & 1 deletion pykka/_actor.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import threading
from collections import deque
from types import TracebackType
from typing import Any, Dict, Sequence, Type
from typing import Any, Callable, Dict, Sequence, Type

from typing_extensions import Protocol # Py38+: Available in ``typing``

Expand All @@ -23,6 +24,7 @@ class Actor:
actor_inbox: ActorInbox
actor_ref: ActorRef
actor_stopped: threading.Event
actor_message_handlers: deque
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
def __str__(self) -> str: ...
def stop(self) -> None: ...
Expand All @@ -49,3 +51,7 @@ class Actor:
self, attr_path: Sequence[str]
) -> Dict[str, Any]: ...
def _introspect_attributes(self, obj: Any) -> Dict[str, Any]: ...
def _unbecome(self) -> None: ...
def _become(
self, func: Callable, *args: Any, discard_old: bool
) -> None: ...
10 changes: 6 additions & 4 deletions pykka/_future.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ from typing import (
from pykka._types import OptExcInfo

_T = TypeVar("_T")
I = TypeVar("I") # For when T is Iterable[I] # noqa
I = TypeVar("I") # noqa # For when T is Iterable[I]

_M = TypeVar("_M") # For Future.map()
_R = TypeVar("_R") # For Future.reduce()
Expand All @@ -26,12 +26,14 @@ class Future(Generic[_T]):
def set_exception(self, exc_info: Optional[OptExcInfo] = ...) -> None: ...
def set_get_hook(self, func: GetHookFunc) -> None: ...
def filter(
self: Future[Iterable[I]], func: Callable[[I], bool]
) -> Future[Iterable[I]]: ...
self: Future[Iterable[I]], func: Callable[[I], bool] # noqa
) -> Future[Iterable[I]]: ... # noqa
def join(self, *futures: Future[Any]) -> Future[Iterable[Any]]: ...
def map(self, func: Callable[[_T], _M]) -> Future[_M]: ...
def reduce(
self: Future[Iterable[I]], func: Callable[[_R, I], _R], *args: _R
self: Future[Iterable[I]], # noqa
func: Callable[[_R, I], _R], # noqa
*args: _R,
) -> Future[_R]: ...
def __await__(self) -> Generator[None, None, _T]: ...

Expand Down
13 changes: 2 additions & 11 deletions pykka/_ref.pyi
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import threading
from typing import (
Any,
Optional,
Type,
Union,
overload,
)
from typing import Any, Optional, Type, Union, overload

from typing_extensions import Literal

Expand All @@ -32,10 +26,7 @@ class ActorRef:
) -> Future[Any]: ...
@overload # noqa: Allow redefinition
def ask(
self,
message: Any,
block: Literal[True],
timeout: Optional[float] = ...,
self, message: Any, block: Literal[True], timeout: Optional[float] = ...
) -> Any: ...
@overload # noqa: Allow redefinition
def ask(
Expand Down
84 changes: 84 additions & 0 deletions tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,45 @@ def on_stop(self):
return EarlyStoppingActor


@pytest.fixture(scope="module")
def dynamic_actor_class(runtime):
class DynamicActor(runtime.actor_class):
def on_receive(self, message):
if message.get("command") == "current behaviour":
return "default"
if message.get("command") == "go deeper":
self._become(self.new_behaviour1, "deep")
if message.get("command") == "go high":
self._unbecome()

def new_behaviour1(self, message, word1):
if message.get("command") == "current behaviour":
return word1
if message.get("command") == "go deeper":
self._become(self.new_behaviour2, "very", "deep")
if message.get("command") == "step deeper":
self._become(
self.new_behaviour2, "very", "deep", discard_old=False
)
if message.get("command") == "go high":
self._unbecome()

def new_behaviour2(self, message, word1, word2):
if message.get("command") == "current behaviour":
return f"{word1} {word2}"
if message.get("command") == "go high":
self._unbecome()

return DynamicActor


@pytest.fixture
def dynamic_actor_ref(dynamic_actor_class):
ref = dynamic_actor_class.start()
yield ref
ref.stop()


def test_messages_left_in_queue_after_actor_stops_receive_an_error(
runtime, actor_ref
):
Expand Down Expand Up @@ -252,3 +291,48 @@ def test_actor_processes_all_messages_before_stop_on_self_stops_it(

events.on_stop_was_called.wait(5)
assert len(ActorRegistry.get_all()) == 0


def test_actor_changes_behaviour_on_become(dynamic_actor_ref):
before_become = dynamic_actor_ref.ask({"command": "current behaviour"})
dynamic_actor_ref.tell({"command": "go deeper"})
first_become = dynamic_actor_ref.ask({"command": "current behaviour"})
dynamic_actor_ref.tell({"command": "go deeper"})
second_become = dynamic_actor_ref.ask({"command": "current behaviour"})
assert before_become == "default"
assert first_become == "deep"
assert second_become == "very deep"


def test_actor_changes_behaviour_on_unbecome(dynamic_actor_ref):
dynamic_actor_ref.tell({"command": "go deeper"})
dynamic_actor_ref.tell({"command": "go deeper"})
after_become = dynamic_actor_ref.ask({"command": "current behaviour"})
dynamic_actor_ref.tell({"command": "go high"})
after_unbecome = dynamic_actor_ref.ask({"command": "current behaviour"})
assert after_become == "very deep"
assert after_unbecome == "default"


def test_actor_stacks_handlers(dynamic_actor_ref):
dynamic_actor_ref.tell({"command": "go deeper"})
dynamic_actor_ref.tell({"command": "step deeper"})
after_become = dynamic_actor_ref.ask({"command": "current behaviour"})
dynamic_actor_ref.tell({"command": "go high"})
after_unbecome = dynamic_actor_ref.ask({"command": "current behaviour"})
assert after_become == "very deep"
assert after_unbecome == "deep"


def test_actor_cant_drop_default_handler(dynamic_actor_ref):
before_become = dynamic_actor_ref.ask({"command": "current behaviour"})
dynamic_actor_ref.tell({"command": "go deeper"})
after_become = dynamic_actor_ref.ask({"command": "current behaviour"})
dynamic_actor_ref.tell({"command": "go high"})
first_unbecome = dynamic_actor_ref.ask({"command": "current behaviour"})
dynamic_actor_ref.tell({"command": "go high"})
second_unbecome = dynamic_actor_ref.ask({"command": "current behaviour"})
assert before_become == "default"
assert after_become == "deep"
assert first_unbecome == "default"
assert second_unbecome == "default"
7 changes: 1 addition & 6 deletions tests/test_messages.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
from pykka.messages import (
ProxyCall,
ProxyGetAttr,
ProxySetAttr,
_ActorStop,
)
from pykka.messages import ProxyCall, ProxyGetAttr, ProxySetAttr, _ActorStop


def test_actor_stop():
Expand Down