Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

axon threads and exception handling [merge conflict fixed] #2459

Open
wants to merge 14 commits into
base: staging
Choose a base branch
from
Open
101 changes: 94 additions & 7 deletions bittensor/core/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import copy
import inspect
import json
import socket
import threading
import time
import traceback
Expand Down Expand Up @@ -71,6 +72,12 @@
V_7_2_0 = 7002000


"""
The quantum of time to sleep in waiting loops, in seconds.
"""
TIME_SLEEP_INTERVAL: float = 1e-3


class FastAPIThreadedServer(uvicorn.Server):
"""
The ``FastAPIThreadedServer`` class is a specialized server implementation for the Axon server in the Bittensor network.
Expand Down Expand Up @@ -119,25 +126,79 @@ class FastAPIThreadedServer(uvicorn.Server):
should_exit: bool = False
is_running: bool = False

"""
Provide a channel to signal exceptions from the thread to our caller.
"""
_exception: Optional[Exception] = None
_lock: threading.Lock = threading.Lock()
_thread: Optional[threading.Thread] = None
_started: bool = False

def set_exception(self, exception: Exception) -> None:
"""
Set self._exception in a thread safe manner, so the worker thread can communicate exceptions to the main thread.
"""
with self._lock:
self._exception = exception

def get_exception(self) -> Optional[Exception]:
with self._lock:
return self._exception

def set_thread(self, thread: threading.Thread):
"""
Set self._thread in a thread safe manner, so the main thread can get the worker thread object.
"""
with self._lock:
self._thread = thread

def get_thread(self) -> Optional[threading.Thread]:
with self._lock:
return self._thread

def set_started(self, started: bool) -> None:
"""
Set self._started in a thread safe manner, so the main thread can get the worker thread status.
"""
with self._lock:
self._started = started

def get_started(self) -> bool:
with self._lock:
return self._started

def install_signal_handlers(self):
"""
Overrides the default signal handlers provided by ``uvicorn.Server``. This method is essential to ensure that the signal handling in the threaded server does not interfere with the main application's flow, especially in a complex asynchronous environment like the Axon server.
"""

async def startup(self, sockets: Optional[list[socket.socket]] = None) -> None:
"""
Adds a thread-safe call to set a 'started' flag on the object.
"""
await super().startup(sockets)
self.set_started(True)

@contextlib.contextmanager
def run_in_thread(self):
"""
Manages the execution of the server in a separate thread, allowing the FastAPI application to run asynchronously without blocking the main thread of the Axon server. This method is a key component in enabling concurrent request handling in the Axon server.

Yields:
None: This method yields control back to the caller while the server is running in the background thread.
thread: a running thread

Raises:
Exception: in case the server did not start (as signalled by self.get_started())
"""
thread = threading.Thread(target=self.run, daemon=True)
thread.start()
try:
while not self.started:
time.sleep(1e-3)
yield
time_start = time.time()
while not self.get_started() and time.time() - time_start < 1:
time.sleep(TIME_SLEEP_INTERVAL)
if not self.get_started():
raise Exception("failed to start server")
yield thread
finally:
self.should_exit = True
thread.join()
Expand All @@ -146,9 +207,15 @@ def _wrapper_run(self):
"""
A wrapper method for the :func:`run_in_thread` context manager. This method is used internally by the ``start`` method to initiate the server's execution in a separate thread.
"""
with self.run_in_thread():
while not self.should_exit:
time.sleep(1e-3)
try:
with self.run_in_thread() as thread:
self.set_thread(thread)
while not self.should_exit:
if not thread.is_alive():
raise Exception("worker thread died")
time.sleep(TIME_SLEEP_INTERVAL)
except Exception as e:
self.set_exception(e)

def start(self):
"""
Expand Down Expand Up @@ -409,6 +476,26 @@ def info(self) -> "AxonInfo":
placeholder2=0,
)

@property
def exception(self) -> Optional[Exception]:
"""
Axon objects expose exceptions that occurred internally through the .exception property.
"""
# for future use: setting self._exception to signal an exception
exception = getattr(self, "_exception", None)
if exception:
return exception
return self.fast_server.get_exception()

def is_running(self) -> bool:
"""
Axon objects can be queried using .is_running() to test whether worker threads are running.
"""
thread = self.fast_server.get_thread()
if thread is None:
return False
return thread.is_alive()

def attach(
self,
forward_fn: Callable,
Expand Down
Loading