Skip to content

Commit

Permalink
Fill in Server
Browse files Browse the repository at this point in the history
  • Loading branch information
Moosems committed Sep 5, 2024
1 parent 077f126 commit b871c90
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 17 deletions.
2 changes: 1 addition & 1 deletion collegamento/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from .files_variant import FileClient, FileServer # noqa: F401, E402
from .simple_client_server import ( # noqa: F401, E402
USER_FUNCTION,
COMMANDS_MAPPING,
USER_FUNCTION,
CollegamentoError,
Request,
RequestQueueType,
Expand Down
2 changes: 1 addition & 1 deletion collegamento/simple_client_server/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .client import SimpleClient, SimpleServer # noqa: F401, E402
from .misc import ( # noqa: F401, E402
USER_FUNCTION,
COMMANDS_MAPPING,
USER_FUNCTION,
CollegamentoError,
Request,
RequestQueueType,
Expand Down
238 changes: 230 additions & 8 deletions collegamento/simple_client_server/new_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,38 @@

from logging import Logger, getLogger
from multiprocessing import Process, Queue, freeze_support
from random import randint
from time import sleep

from beartype.typing import Any, Callable

from misc import (
from utils import ( # TODO: Relative import
USER_FUNCTION,
CollegamentoError,
Request,
RequestQueueType,
Response,
ResponseQueueType,
) # TODO: Relative import
)


def command_sort_func(
request: Request, priority_commands: list[str]
) -> tuple[bool, int]:
command_index: int = 0
in_priority_commands: bool = request["command"] in priority_commands

if in_priority_commands:
command_index = priority_commands.index(request["command"])

return (
not in_priority_commands,
command_index,
) # It it sorts False before True


class Server:
"""A basic and multipurpose server that can be easily subclassed for your specific needs."""

def __init__(
self,
commands: dict[str, tuple[USER_FUNCTION, bool]],
Expand All @@ -42,10 +58,158 @@ def __init__(
self.response_queue: ResponseQueueType = response_queue
self.requests_queue: RequestQueueType = requests_queue
self.all_ids: list[int] = []
self.newest_ids: dict[str, int] = {}
self.newest_requests: dict[str, Request | None] = {}
self.newest_ids: dict[str, list[int]] = {}
self.newest_requests: dict[str, list[Request]] = {}
self.priority_commands: list[str] = priority_commands

self.commands: dict[str, tuple[USER_FUNCTION, bool]] = commands
for command, func_tuple in self.commands.items():
self.newest_ids[command] = []
self.newest_requests[command] = []

self.logger.info("Server setup complete")

while True:
self.run_tasks()
sleep(0.0025)

def simple_id_response(self, id: int, cancelled: bool = True) -> None:
self.logger.debug(f"Creating simple response for id {id}")
response: Response = {
"id": id,
"type": "response",
"cancelled": cancelled,
}
self.logger.debug(f"Sending simple response for id {id}")
self.response_queue.put(response)
self.logger.info(f"Simple response for id {id} sent")

def parse_line(self, message: Request) -> None:
self.logger.debug("Parsing Message from user")
id: int = message["id"]

if message["type"] != "request":
self.logger.warning(
f"Unknown type {type}. Sending simple response"
)
self.simple_id_response(id)
self.logger.debug(f"Simple response for id {id} sent")
return

self.logger.info(f"Mesage with id {id} is of type request")
self.all_ids.append(id)
command: str = message["command"] # type: ignore

if not self.commands[command][1]:
self.newest_ids[command] = []
self.newest_requests[command] = []

self.newest_ids[command].append(id)
self.newest_requests[command].append(message)
self.logger.debug("Request stored for parsing")

def cancel_old_ids(self) -> None:
self.logger.info("Cancelling all old id's")

accepted_ids = [
request["id"]
for request_list in list(self.newest_requests.values())
for request in request_list
]

for request in self.all_ids:
if request in accepted_ids:
self.logger.debug(
f"Id {request} is an either the newest request or the command allows multiple requests"
)
continue

self.logger.debug(
f"Id {request} is an unwanted request, sending simple respone"
)
self.simple_id_response(request)

self.all_ids = []
self.logger.debug("All ids list cleaned up")

def handle_request(self, request: Request) -> None:
command: str = request["command"]
id: int = request["id"]
result: Any # noqa: F842

command = request["command"]
response: Response = {
"id": id,
"type": "response",
"cancelled": False,
"command": command,
}

if command == "add-command":
request_name: str = request["name"] # type: ignore

request_tuple: tuple[USER_FUNCTION, bool] = request["function"] # type: ignore
self.commands[request_name] = request_tuple
response["result"] = None
response["cancelled"] = True
self.logger.debug("Response created")
self.response_queue.put(response)
self.newest_ids[command].remove(id)
self.logger.info(f"Response sent for request of command {command}")
return

if command not in self.commands:
self.logger.warning(f"Command {command} not recognized")
response["result"] = None
response["cancelled"] = True
else:
self.logger.debug(f"Running user function for command {command}")
response["result"] = self.commands[command][0](self, request)

self.logger.debug("Response created")
self.response_queue.put(response)
self.newest_ids[command].remove(id)
self.logger.info(f"Response sent for request of command {command}")

def run_tasks(self) -> None:
if self.requests_queue.empty():
return

self.logger.debug("New request in queue")
while not self.requests_queue.empty():
self.logger.debug("Parsing request")
self.parse_line(self.requests_queue.get())

if not self.all_ids:
self.logger.debug("All requests were notifications")

self.logger.debug("Cancelling all old id's")
self.cancel_old_ids()

requests_list: list[Request] = [
request
for request_list in self.newest_requests.values()
if request_list
for request in request_list
]

requests_list = sorted(
requests_list,
key=lambda request: command_sort_func(
request, self.priority_commands
),
)

for request in requests_list:
if request is None:
continue
command: str = request["command"]
self.logger.info(f"Handling request of command {command}")
self.handle_request(request)
self.newest_requests[command].remove(request)
self.logger.debug("Request completed")


class Client:
"""An easy to use implementation for IPC in Python.
Expand Down Expand Up @@ -94,14 +258,15 @@ def __init__(

self.commands[command] = func

self.logger.info("Creating Server")
self.request_queue: Queue
self.response_queue: Queue
self.main_process: Process
self.logger.info("Creating Server")
self.create_server()
self.logger.info("Initialization is complete")

def create_server(self):
"""Creates a Server and terminates the old one if it exists - internal API"""
freeze_support()

if hasattr(self, "main_process"):
Expand All @@ -124,12 +289,69 @@ def create_server(self):
self.main_process.start()

def log_exception(self, exception_str: str) -> None:
"""Logs an exception and raises a CollegamentoError"""
"""Logs an exception and raises a CollegamentoError - internal API"""
self.logger.exception(exception_str)
raise CollegamentoError(exception_str)

def create_message_id(self) -> int:
"""Creates a Message id - internal API"""
self.logger.info("Creating message for server")

# In cases where there are many many requests being sent it may be faster to choose a
# random id than to iterate through the list of id's and find an unclaimed one
id = randint(1, self.id_max) # 0 is reserved for the empty case
while id in self.all_ids:
id = randint(1, self.id_max)
self.all_ids.append(id)

self.logger.debug("ID for message created")

if not self.main_process.is_alive():
# No point in an id if the server's dead
self.logger.critical(
"Server was killed at some point, creating server"
)
self.create_server()

return id

def request(
self,
request_details: dict,
) -> int | None:
"""Sends the main_server a request of type command with given kwargs - external API"""
self.logger.debug("Beginning request")

# NOTE: this variable could've been a standalone line but I thought it would just be better
# to use the walrus operator. No point in a language feature if its never used. Plus,
# it also looks quite nice :D
if (command := request_details["command"]) not in self.commands:
self.logger.exception(
f"Command {command} not in builtin commands. Those are {self.commands}!"
)
raise CollegamentoError(
f"Command {command} not in builtin commands. Those are {self.commands}!"
)

self.logger.info("Creating request for server")

id: int = self.create_message_id()

# self.current_ids[command] = id
# final_request: Request = {
# "id": id,
# "type": "request",
# "command": command,
# }
# final_request.update(request_details) # type: ignore
# self.logger.debug(f"Request created: {final_request}")

# self.request_queue.put(final_request)
# self.logger.info("Message sent")


def kill_IPC(self):
"""Kills the internal Process and frees up some storage and CPU that may have been used otherwise"""
"""Kills the internal Process and frees up some storage and CPU that may have been used otherwise - external API"""
self.main_process.terminate()

def __del__(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
from enum import Flag, auto
from multiprocessing.queues import Queue as GenericQueueClass
from typing import TYPE_CHECKING, Any, NotRequired, TypedDict
from enum import Flag, auto

from beartype.typing import Callable


class CollegamentoFlags(Flag):
MULTI_REQUESTS = auto()
POOL = MULTI_REQUESTS | auto()


class Message(TypedDict):
"""Base class for messages in and out of the server"""

Expand All @@ -31,7 +26,7 @@ class Response(Message):
result: NotRequired[Any]


USER_FUNCTION = Callable[["SimpleServer", Request], Any] # type: ignore
USER_FUNCTION = Callable[["Server", Request], Any] # type: ignore
COMMANDS_MAPPING = dict[
str, USER_FUNCTION | tuple[USER_FUNCTION, bool]
] # if bool is true the command allows multiple requests
Expand Down

0 comments on commit b871c90

Please sign in to comment.