Skip to content

Commit

Permalink
Delayed ask()/tell()
Browse files Browse the repository at this point in the history
  • Loading branch information
fjarri committed Jan 9, 2021
1 parent d1b33b3 commit 2fd5eb0
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 12 deletions.
70 changes: 69 additions & 1 deletion pykka/_actor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import bisect
import logging
import sys
import threading
import time
import uuid

from pykka import ActorDeadError, ActorRef, ActorRegistry, messages
Expand All @@ -11,6 +13,30 @@
logger = logging.getLogger("pykka")


class TimedInbox:
def __init__(self):
self.timestamps = []
self.envelopes = []

def empty(self):
return len(self.timestamps) == 0

def add(self, envelope):
idx = bisect.bisect(self.timestamps, envelope.timestamp)
self.timestamps.insert(idx, envelope.timestamp)
self.envelopes.insert(idx, envelope)

def pop(self):
self.timestamps.pop(0)
return self.envelopes.pop(0)

def next_event_in(self):
if len(self.timestamps) == 0:
return None
else:
return max(self.timestamps[0] - time.monotonic(), 0)


class Actor:
"""
To create an actor:
Expand Down Expand Up @@ -101,6 +127,11 @@ def _create_actor_inbox():
"""Internal method for implementors of new actor types."""
raise NotImplementedError("Use a subclass of Actor")

@staticmethod
def _queue_empty_exception():
"""Internal method for implementors of new actor types."""
raise NotImplementedError("Use a subclass of Actor")

@staticmethod
def _create_future():
"""Internal method for implementors of new actor types."""
Expand Down Expand Up @@ -151,6 +182,9 @@ def __init__(self, *args, **kwargs):

self.actor_ref = ActorRef(self)

self.__timed_inbox = TimedInbox()
self.__queue_empty_exception = self._queue_empty_exception()

def __str__(self):
return f"{self.__class__.__name__} ({self.actor_urn})"

Expand Down Expand Up @@ -185,8 +219,38 @@ def _actor_loop(self):
except Exception:
self._handle_failure(*sys.exc_info())

next_event_in = None

while not self.actor_stopped.is_set():
envelope = self.actor_inbox.get()

next_event_in = self.__timed_inbox.next_event_in()

# Take all the messages out of the inbox and put them
# in our internal inbox where they're sorted by timestamps
try:
envelope = self.actor_inbox.get(timeout=next_event_in)
self.__timed_inbox.add(envelope)
except self.__queue_empty_exception:
# Raised if we waited for a non-None timeout,
# but the queue is still empty.
# This implies that there was something in the internal inbox
# that is now ready to be processed.
pass
else:
while not self.actor_inbox.empty():
envelope = self.actor_inbox.get()
self.__timed_inbox.add(envelope)

# Update the time of the next event,
# since we received at least one new envelope.
next_event_in = self.__timed_inbox.next_event_in()

# Check if there's something to be processed right now.
if next_event_in > 0:
continue

envelope = self.__timed_inbox.pop()

try:
response = self._handle_receive(envelope.message)
if envelope.reply_to is not None:
Expand Down Expand Up @@ -214,6 +278,10 @@ def _actor_loop(self):

while not self.actor_inbox.empty():
envelope = self.actor_inbox.get()
self.__timed_inbox.add(envelope)

while not self.__timed_inbox.empty():
envelope = self.__timed_inbox.pop()
if envelope.reply_to is not None:
if isinstance(envelope.message, messages._ActorStop):
envelope.reply_to.set(None)
Expand Down
14 changes: 11 additions & 3 deletions pykka/_envelope.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time


class Envelope:
"""
Envelope to add metadata to a message.
Expand All @@ -11,11 +14,16 @@ class Envelope:
"""

# Using slots speeds up envelope creation with ~20%
__slots__ = ["message", "reply_to"]
__slots__ = ["message", "reply_to", "timestamp"]

def __init__(self, message, reply_to=None):
def __init__(self, message, reply_to=None, delay=0):
self.message = message
self.reply_to = reply_to
self.timestamp = time.monotonic() + delay

def __repr__(self):
return f"Envelope(message={self.message!r}, reply_to={self.reply_to!r})"
return (
f"Envelope(message={self.message!r}, "
f"reply_to={self.reply_to!r}, "
f"timestamp={self.timestamp})"
)
8 changes: 6 additions & 2 deletions pykka/_envelope.pyi
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from typing import Any, Optional
from typing import Any, Optional, Union

from pykka import Future

class Envelope:
message: Any
reply_to: Optional[Future]
timestamp: float
def __init__(
self, message: Any, reply_to: Optional[Future] = ...
self,
message: Any,
reply_to: Optional[Future] = ...,
delay: Union[float, int] = ...,
) -> None: ...
def __repr__(self) -> str: ...
10 changes: 6 additions & 4 deletions pykka/_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def is_alive(self):
"""
return not self.actor_stopped.is_set()

def tell(self, message):
def tell(self, message, delay=0):
"""
Send message to actor without waiting for any response.
Expand All @@ -71,9 +71,9 @@ def tell(self, message):
"""
if not self.is_alive():
raise ActorDeadError(f"{self} not found")
self.actor_inbox.put(Envelope(message))
self.actor_inbox.put(Envelope(message, delay=delay))

def ask(self, message, block=True, timeout=None):
def ask(self, message, block=True, timeout=None, delay=0):
"""
Send message to actor and wait for the reply.
Expand Down Expand Up @@ -108,7 +108,9 @@ def ask(self, message, block=True, timeout=None):
except ActorDeadError:
future.set_exception()
else:
self.actor_inbox.put(Envelope(message, reply_to=future))
self.actor_inbox.put(
Envelope(message, reply_to=future, delay=delay)
)

if block:
return future.get(timeout=timeout)
Expand Down
4 changes: 4 additions & 0 deletions pykka/_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class ThreadingActor(Actor):
def _create_actor_inbox():
return queue.Queue()

@staticmethod
def _queue_empty_exception():
return queue.Empty

@staticmethod
def _create_future():
return ThreadingFuture()
Expand Down
4 changes: 4 additions & 0 deletions pykka/eventlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class EventletActor(Actor):
def _create_actor_inbox():
return eventlet.queue.Queue()

@staticmethod
def _queue_empty_exception():
return eventlet.queue.Empty

@staticmethod
def _create_future():
return EventletFuture()
Expand Down
4 changes: 4 additions & 0 deletions pykka/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class GeventActor(Actor):
def _create_actor_inbox():
return gevent.queue.Queue()

@staticmethod
def _queue_empty_exception():
return gevent.queue.Empty

@staticmethod
def _create_future():
return GeventFuture()
Expand Down
43 changes: 43 additions & 0 deletions tests/test_actor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import uuid

import pytest
Expand Down Expand Up @@ -252,3 +253,45 @@ def test_actor_processes_all_messages_before_stop_on_self_stops_it(

events.on_stop_was_called.wait(5)
assert len(ActorRegistry.get_all()) == 0


def test_delayed_message_is_actually_delayed(runtime, actor_ref):
event = runtime.event_class()
lst = []

actor_ref.tell(
{
"command": "callback",
"callback": lambda: lst.append(2) or event.set(),
},
delay=1,
)
actor_ref.tell(
{"command": "callback", "callback": lambda: lst.append(1)}, delay=0.5
)
event_set = event.wait()
actor_ref.stop()

assert event_set
assert lst == [1, 2]


def test_delayed_message_with_shorter_delay_cuts_in_line(runtime, actor_ref):
event = runtime.event_class()

departure_time = time.time()
arrival_time = []
delay = 1

actor_ref.tell(
{
"command": "callback",
"callback": lambda: arrival_time.append(time.time()) or event.set(),
},
delay=delay,
)
event_set = event.wait()
actor_ref.stop()

assert event_set
assert abs(arrival_time[0] - departure_time - delay) < 0.1
15 changes: 13 additions & 2 deletions tests/test_envelope.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import re
import time

from pykka._envelope import Envelope


def test_envelope_repr():
envelope = Envelope("message", reply_to=123)
current_time = time.time()
delay = 10
envelope = Envelope("message", reply_to=123, delay=delay)
match = re.match(
r"Envelope\(message='message', reply_to=123, timestamp=([\d\.]+)\)",
repr(envelope),
)

assert repr(envelope) == "Envelope(message='message', reply_to=123)"
assert match is not None
# there will be some difference, execution takes time
assert abs(float(match.group(1)) - current_time - delay) < 0.1

0 comments on commit 2fd5eb0

Please sign in to comment.