Skip to content

Commit

Permalink
Delayed proxy objects
Browse files Browse the repository at this point in the history
  • Loading branch information
fjarri authored and jodal committed Mar 6, 2021
1 parent 7b376d1 commit ac7574e
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 33 deletions.
6 changes: 5 additions & 1 deletion src/pykka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

from pykka._exceptions import ActorDeadError, Timeout
from pykka._future import Future, get_all
from pykka._proxy import ActorProxy, CallableProxy, traversable
from pykka._proxy import (
ActorProxy,
CallableProxy,
traversable,
)
from pykka._ref import ActorRef
from pykka._registry import ActorRegistry

Expand Down
125 changes: 103 additions & 22 deletions src/pykka/_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class AttrInfo(NamedTuple):
traversable: bool


class ActorProxy:
class ActorProxyMessageBuilder:
"""
An :class:`ActorProxy` wraps an :class:`ActorRef <pykka.ActorRef>`
instance. The proxy allows the referenced actor to be used through regular
Expand Down Expand Up @@ -170,7 +170,7 @@ def _is_exposable_attribute(self, attr_name):

def _is_self_proxy(self, attr):
"""Returns true if attribute is an equivalent actor proxy."""
return attr == self
return hasattr(attr, "_message_builder") and attr._message_builder == self

def _is_callable_attribute(self, attr):
"""Returns true for any attribute that is callable."""
Expand All @@ -187,7 +187,7 @@ def _is_traversable_attribute(self, attr):
)

def __eq__(self, other):
if not isinstance(other, ActorProxy):
if not isinstance(other, ActorProxyMessageBuilder):
return False
if self._actor != other._actor:
return False
Expand All @@ -199,16 +199,15 @@ def __hash__(self):
return hash((self._actor, self._attr_path))

def __repr__(self):
return f"<ActorProxy for {self.actor_ref}, attr_path={self._attr_path!r}>"
return (
f"<ActorProxyMessageBuilder for {self.actor_ref}, "
f"attr_path={self._attr_path!r}>"
)

def __dir__(self):
result = ["__class__"]
result += list(self.__class__.__dict__.keys())
result += list(self.__dict__.keys())
result += [attr_path[0] for attr_path in list(self._known_attrs.keys())]
return sorted(result)
def dir(self):
return [attr_path[0] for attr_path in list(self._known_attrs.keys())]

def __getattr__(self, name):
def getattr(self, name):
"""Get a field or callable from the actor."""
attr_path = self._attr_path + (name,)

Expand All @@ -221,29 +220,110 @@ def __getattr__(self, name):

if attr_info.callable:
if attr_path not in self._callable_proxies:
self._callable_proxies[attr_path] = CallableProxy(
self._callable_proxies[attr_path] = CallableProxyFactory(
self.actor_ref, attr_path
)
return self._callable_proxies[attr_path]
elif attr_info.traversable:
if attr_path not in self._actor_proxies:
self._actor_proxies[attr_path] = ActorProxy(self.actor_ref, attr_path)
self._actor_proxies[attr_path] = ActorProxyMessageBuilder(
self.actor_ref, attr_path
)
return self._actor_proxies[attr_path]
else:
message = messages.ProxyGetAttr(attr_path=attr_path)
return self.actor_ref.ask(message, block=False)
return messages.ProxyGetAttr(attr_path=attr_path)

def __setattr__(self, name, value):
def setattr(self, name):
"""
Set a field on the actor.
Blocks until the field is set to check if any exceptions was raised.
"""
attr_path = self._attr_path + (name,)
return lambda value: messages.ProxySetAttr(attr_path=attr_path, value=value)


class ActorProxyBase:
@classmethod
def from_actor_ref(cls, actor_ref):
return cls(ActorProxyMessageBuilder(actor_ref))

def __init__(self, message_builder):
self._message_builder = message_builder
self.actor_ref = message_builder.actor_ref

def __eq__(self, other):
if not isinstance(other, ActorProxyBase):
return False
else:
return self._message_builder == other._message_builder

def __hash__(self):
return hash(self._message_builder)

def __dir__(self):
result = dir(self.__class__) + self._message_builder.dir()
return sorted(result)

def __repr__(self):
return f"<ActorProxyBase with {self._message_builder!r}>"


class ActorProxy(ActorProxyBase):
def __getattr__(self, name):
res = self._message_builder.getattr(name)
if isinstance(res, CallableProxyFactory):
return res()
elif isinstance(res, ActorProxyMessageBuilder):
return ActorProxy(res)
else:
return self.actor_ref.ask(res, block=False)

def __setattr__(self, name, value):
if name == "actor_ref" or name.startswith("_"):
return super().__setattr__(name, value)
attr_path = self._attr_path + (name,)
message = messages.ProxySetAttr(attr_path=attr_path, value=value)
self.actor_ref.ask(message)
message_factory = self._message_builder.setattr(name)
self.actor_ref.ask(message_factory(value))


class ExtendedActorProxy:
def __init__(self, actor_ref):
self._message_builder = ActorProxyMessageBuilder(actor_ref)
self.actor_ref = actor_ref

def delayed(self, delay):
return DelayedProxy(self._message_builder, delay=delay)


class DelayedProxy(ActorProxyBase):
def __init__(self, message_builder, delay=0):
self._message_builder = message_builder
self._delay = delay
self.actor_ref = message_builder.actor_ref

def __getattr__(self, name):
res = self._message_builder.getattr(name)
if isinstance(res, CallableProxyFactory):
return res(delay=self._delay)
elif isinstance(res, ActorProxyMessageBuilder):
return DelayedProxy(res, delay=self._delay)
else:
self.actor_ref.ask(res, block=False, delay=self._delay)

def __setattr__(self, name, value):
if name == "actor_ref" or name.startswith("_"):
return super().__setattr__(name, value)
message_factory = self._message_builder.setattr(name)
self.actor_ref.ask(message_factory(value), delay=self._delay)


class CallableProxyFactory:
def __init__(self, actor_ref, attr_path):
self.actor_ref = actor_ref
self.attr_path = attr_path

def __call__(self, delay=0):
return CallableProxy(self.actor_ref, self.attr_path, delay=delay)


class CallableProxy:
Expand All @@ -263,9 +343,10 @@ class CallableProxy:
proxy.do_work.defer()
"""

def __init__(self, actor_ref, attr_path):
def __init__(self, actor_ref, attr_path, delay=0):
self.actor_ref = actor_ref
self._attr_path = attr_path
self._delay = delay

def __call__(self, *args, **kwargs):
"""Call with :meth:`~pykka.ActorRef.ask` semantics.
Expand All @@ -280,7 +361,7 @@ def __call__(self, *args, **kwargs):
message = messages.ProxyCall(
attr_path=self._attr_path, args=args, kwargs=kwargs
)
return self.actor_ref.ask(message, block=False)
return self.actor_ref.ask(message, block=False, delay=self._delay)

def defer(self, *args, **kwargs):
"""Call with :meth:`~pykka.ActorRef.tell` semantics.
Expand All @@ -296,7 +377,7 @@ def defer(self, *args, **kwargs):
message = messages.ProxyCall(
attr_path=self._attr_path, args=args, kwargs=kwargs
)
self.actor_ref.tell(message)
self.actor_ref.tell(message, delay=self._delay)


def traversable(obj):
Expand Down
13 changes: 7 additions & 6 deletions src/pykka/_ref.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pykka import ActorDeadError, ActorProxy
from pykka._envelope import Envelope
from pykka._proxy import ActorDeadError, ActorProxy, ExtendedActorProxy
from pykka.messages import _ActorStop

__all__ = ["ActorRef"]
Expand Down Expand Up @@ -107,9 +107,7 @@ def ask(self, message, block=True, timeout=None, delay=0):
except ActorDeadError:
future.set_exception()
else:
self.actor_inbox.put(
Envelope(message, reply_to=future, delay=delay)
)
self.actor_inbox.put(Envelope(message, reply_to=future, delay=delay))

if block:
return future.get(timeout=timeout)
Expand Down Expand Up @@ -153,7 +151,7 @@ def _stop_result_converter(timeout):
else:
return converted_future

def proxy(self):
def proxy(self, extended=False):
"""
Wraps the :class:`ActorRef` in an :class:`ActorProxy
<pykka.ActorProxy>`.
Expand All @@ -169,4 +167,7 @@ def proxy(self):
:raise: :exc:`pykka.ActorDeadError` if actor is not available
:return: :class:`pykka.ActorProxy`
"""
return ActorProxy(self)
if extended:
return ExtendedActorProxy(self)
else:
return ActorProxy.from_actor_ref(self)
4 changes: 2 additions & 2 deletions tests/proxy/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def a_method(self):

@pytest.fixture
def proxy(actor_class):
proxy = ActorProxy(actor_class.start())
proxy = ActorProxy.from_actor_ref(actor_class.start())
yield proxy
proxy.stop()

Expand Down Expand Up @@ -110,7 +110,7 @@ def test_proxy_constructor_raises_exception_if_actor_is_dead(actor_class):
actor_ref.stop()

with pytest.raises(ActorDeadError) as exc_info:
ActorProxy(actor_ref)
ActorProxy.from_actor_ref(actor_ref)

assert str(exc_info.value) == f"{actor_ref} not found"

Expand Down
4 changes: 2 additions & 2 deletions tests/proxy/test_static_method_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_call_to_unknown_method_raises_attribute_error(proxy):

result = str(exc_info.value)

assert result.startswith("<ActorProxy for ActorA")
assert result.startswith("<ActorProxyMessageBuilder for ActorA")
assert result.endswith("has no attribute 'unknown_method'")


Expand All @@ -99,7 +99,7 @@ def test_deferred_call_to_unknown_method_raises_attribute_error(proxy):

result = str(exc_info.value)

assert result.startswith("<ActorProxy for ActorA")
assert result.startswith("<ActorProxyMessageBuilder for ActorA")
assert result.endswith("has no attribute 'unknown_method'")


Expand Down

0 comments on commit ac7574e

Please sign in to comment.