diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..104d474 --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Log files +*.log + +.tox/ +.coverage + +# vi +.*.swp + +# pycharm +.idea/ diff --git a/README.org b/README.org new file mode 100644 index 0000000..e93e6a0 --- /dev/null +++ b/README.org @@ -0,0 +1,7 @@ +* Thespian Echo + +This is an example application using the [[https://thespianpy.com/][Thespian]] library. It implements a distributed echo application using Thespian actors. + +There's a blog post with some notes around actor systems here: https://sabaini.at/peterlog/posts/2020/Feb/16/thespian-a-python-actor-system/ + + diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/client.py b/client.py new file mode 100644 index 0000000..562ed39 --- /dev/null +++ b/client.py @@ -0,0 +1,24 @@ +import sys +from datetime import timedelta + +from thespian.actors import ActorSystem + + +if __name__ == "__main__": + # We take the convention leaders address from the command line + # Also, we tag this system with "Client" + capabilities = {"Convention Address.IPv4": (sys.argv[1], 1900), "Client": True} + actor_system = ActorSystem("multiprocTCPBase", capabilities) + # We create an actor from the echo library with class EchoRequestor + echo_app = actor_system.createActor("echo.EchoRequestor") + # Send the echo actor a message: the number of echo requests it should perform + actor_system.tell(echo_app, int(sys.argv[2])) + # Now, send the echo payload, and wait max. 10s for an answer + resp = actor_system.ask(echo_app, "hello world", timedelta(seconds=10)) + while resp: + # If we get "echo_done" as an answer we break out + if resp == "echo_done": + break + # Otherwise we'll retry to get a response + print("unexpected message {}".format(resp)) + resp = actor_system.listen(timedelta(seconds=10)) diff --git a/echo.py b/echo.py new file mode 100644 index 0000000..930c6a9 --- /dev/null +++ b/echo.py @@ -0,0 +1,110 @@ +import datetime +import logging +import logging.handlers + +from thespian.actors import ActorTypeDispatcher, requireCapability + +# Set up some logging to see what is going on +log = logging.getLogger("Echologger") +log.setLevel(logging.DEBUG) +handler = logging.handlers.SysLogHandler(address="/dev/log") +log.addHandler(handler) + + +class Ping: + """A simple object that just carries a payload""" + + def __init__(self, payload): + self.payload = payload + + +class Pong(Ping): + """Same as the ping class. + + We subclass it so we can distinguish it by type, but it's really the same thing + """ + pass + + +@requireCapability("Server") +class EchoServer(ActorTypeDispatcher): + """The echo server actor + + It will receive ping messages, log them, and reply back to the sender with + a pong message + + Specifies a system tagged with the "Server" capability as a requirement. + This will cause the linked actor systems to instantiate it on the server + actor system + """ + + def receiveMsg_Ping(self, ping_request, sender): + log.debug("Got {}, ponging back at {}".format(ping_request, sender)) + self.send(sender, Pong(ping_request.payload)) + + +@requireCapability("Client") +class EchoRequestor(ActorTypeDispatcher): + """The echo client actor + + It specifies an actor system tagged with the "Client" capability. The + client module is tagged with Client: True, so this actor will get + get started on the client actor system + """ + + echo_server = None # hold an echo server instance + + def __init__(self): + # Initialise counters and timer, and calls the superclass constructor + self.pings_to_send = 0 + self.pongs_to_receive = 0 + self.time = None + super().__init__() + + def receiveMsg_int(self, count, _client): + """Add integer as a count of pings to send + + If this actor receives an integer, it'll interpret it as + a count of pings, and add it to the pings to send counter + """ + self.pings_to_send += count + + def receiveMsg_str(self, payload, client): + """Receive a payload and start pinging + + If this actor receives a str message, it'll interpret it as a paylod + to ping with, and start pinging the number of times + """ + # First we save the client, we will need it later to notify once we're done + self.client = client + # Then, instantiate an echo server. As the EchoServer class has a requirement + # "Server" it'll get started on the actor system tagged with the "Server" capability + self.echo_server = self.createActor(EchoServer) + # Then start to send out ping messages, and save the start time + ping = Ping(payload) + log.debug( + "Sending, srv: {}; message: {}; count: {}".format( + self.echo_server, ping, self.pings_to_send + ) + ) + self.time = datetime.datetime.now() + for _ in range(1, self.pings_to_send): + # Fire out pings_to_send pings to the server + self.send(self.echo_server, ping) + # Update counters + self.pongs_to_receive += self.pings_to_send + self.pings_to_send = 0 + + def receiveMsg_Pong(self, _pong, _server): + # Receive answers back from the echo server actor + # We decrease the counter until it's zero + self.pongs_to_receive -= 1 + if self.pongs_to_receive <= 1: + log.info( + "Got all messages, timedelta: {}".format( + datetime.datetime.now() - self.time + ) + ) + # We're done, send a message to the client saying so + log.info("Sending end request to {}".format(self.client)) + self.send(self.client, "echo_done") diff --git a/server.py b/server.py new file mode 100644 index 0000000..5f9b2d6 --- /dev/null +++ b/server.py @@ -0,0 +1,22 @@ +import logging.handlers +import socket + +from thespian.actors import ActorSystem + + +def get_my_ip(): + """Return the ipaddress of the local host""" + return socket.gethostbyname(socket.gethostname()) + + +if __name__ == "__main__": + # Setting up some logging + log = logging.getLogger("Echologger") + log.setLevel(logging.DEBUG) + handler = logging.handlers.SysLogHandler(address="/dev/log") + log.addHandler(handler) + + # Setup this system as the convention leader, and give it a capability "Server" + # Note by default actor systems use port 1900, so we'll set this here too + capabilities = {"Convention Address.IPv4": (get_my_ip(), 1900), "Server": True} + ActorSystem("multiprocTCPBase", capabilities)