diff --git a/bittensor/core/axon.py b/bittensor/core/axon.py index 378315f77d..f1f28d44e3 100644 --- a/bittensor/core/axon.py +++ b/bittensor/core/axon.py @@ -22,6 +22,7 @@ import copy import inspect import json +import socket import threading import time import traceback @@ -71,6 +72,12 @@ V_7_2_0 = 7002000 +""" +The quantum of time to sleep in waiting loops, in seconds. +""" +TIME_SLEEP_INTERVAL: float = 1e-3 + + class FastAPIThreadedServer(uvicorn.Server): """ The ``FastAPIThreadedServer`` class is a specialized server implementation for the Axon server in the Bittensor network. @@ -119,25 +126,79 @@ class FastAPIThreadedServer(uvicorn.Server): should_exit: bool = False is_running: bool = False + """ + Provide a channel to signal exceptions from the thread to our caller. + """ + _exception: Optional[Exception] = None + _lock: threading.Lock = threading.Lock() + _thread: Optional[threading.Thread] = None + _started: bool = False + + def set_exception(self, exception: Exception) -> None: + """ + Set self._exception in a thread safe manner, so the worker thread can communicate exceptions to the main thread. + """ + with self._lock: + self._exception = exception + + def get_exception(self) -> Optional[Exception]: + with self._lock: + return self._exception + + def set_thread(self, thread: threading.Thread): + """ + Set self._thread in a thread safe manner, so the main thread can get the worker thread object. + """ + with self._lock: + self._thread = thread + + def get_thread(self) -> Optional[threading.Thread]: + with self._lock: + return self._thread + + def set_started(self, started: bool) -> None: + """ + Set self._started in a thread safe manner, so the main thread can get the worker thread status. + """ + with self._lock: + self._started = started + + def get_started(self) -> bool: + with self._lock: + return self._started + def install_signal_handlers(self): """ Overrides the default signal handlers provided by ``uvicorn.Server``. This method is essential to ensure that the signal handling in the threaded server does not interfere with the main application's flow, especially in a complex asynchronous environment like the Axon server. """ + async def startup(self, sockets: Optional[list[socket.socket]] = None) -> None: + """ + Adds a thread-safe call to set a 'started' flag on the object. + """ + await super().startup(sockets) + self.set_started(True) + @contextlib.contextmanager def run_in_thread(self): """ Manages the execution of the server in a separate thread, allowing the FastAPI application to run asynchronously without blocking the main thread of the Axon server. This method is a key component in enabling concurrent request handling in the Axon server. Yields: - None: This method yields control back to the caller while the server is running in the background thread. + thread: a running thread + + Raises: + Exception: in case the server did not start (as signalled by self.get_started()) """ thread = threading.Thread(target=self.run, daemon=True) thread.start() try: - while not self.started: - time.sleep(1e-3) - yield + time_start = time.time() + while not self.get_started() and time.time() - time_start < 1: + time.sleep(TIME_SLEEP_INTERVAL) + if not self.get_started(): + raise Exception("failed to start server") + yield thread finally: self.should_exit = True thread.join() @@ -146,9 +207,15 @@ def _wrapper_run(self): """ A wrapper method for the :func:`run_in_thread` context manager. This method is used internally by the ``start`` method to initiate the server's execution in a separate thread. """ - with self.run_in_thread(): - while not self.should_exit: - time.sleep(1e-3) + try: + with self.run_in_thread() as thread: + self.set_thread(thread) + while not self.should_exit: + if not thread.is_alive(): + raise Exception("worker thread died") + time.sleep(TIME_SLEEP_INTERVAL) + except Exception as e: + self.set_exception(e) def start(self): """ @@ -409,6 +476,26 @@ def info(self) -> "AxonInfo": placeholder2=0, ) + @property + def exception(self) -> Optional[Exception]: + """ + Axon objects expose exceptions that occurred internally through the .exception property. + """ + # for future use: setting self._exception to signal an exception + exception = getattr(self, "_exception", None) + if exception: + return exception + return self.fast_server.get_exception() + + def is_running(self) -> bool: + """ + Axon objects can be queried using .is_running() to test whether worker threads are running. + """ + thread = self.fast_server.get_thread() + if thread is None: + return False + return thread.is_alive() + def attach( self, forward_fn: Callable,