From 0d236db63370425a892db0a5a27c260b2a439105 Mon Sep 17 00:00:00 2001 From: opendansor Date: Thu, 29 Aug 2024 11:24:29 -0700 Subject: [PATCH 1/9] Child Hotkeys netuid Refactor --- bittensor/commands/stake.py | 59 ++++++++++++------- .../subcommands/stake/test_childkeys.py | 8 +-- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/bittensor/commands/stake.py b/bittensor/commands/stake.py index 132529a131..24868d0484 100644 --- a/bittensor/commands/stake.py +++ b/bittensor/commands/stake.py @@ -44,19 +44,25 @@ def get_netuid( - cli: "bittensor.cli", subtensor: "bittensor.subtensor" + cli: "bittensor.cli", subtensor: "bittensor.subtensor", prompt: bool = True ) -> Tuple[bool, int]: """Retrieve and validate the netuid from the user or configuration.""" console = Console() - if not cli.config.is_set("netuid"): - try: - cli.config.netuid = int(Prompt.ask("Enter netuid")) - except ValueError: - console.print( - "[red]Invalid input. Please enter a valid integer for netuid.[/red]" - ) - return False, -1 + if not cli.config.is_set("netuid") and prompt: + cli.config.netuid = Prompt.ask("Enter netuid") + try: + cli.config.netuid = int(cli.config.netuid) + except ValueError: + console.print( + "[red]Invalid input. Please enter a valid integer for netuid.[/red]" + ) + return False, -1 netuid = cli.config.netuid + if netuid < 0 or netuid > 2**32 - 1: + console.print( + "[red]Invalid input. Please enter a valid integer for netuid in subnet range.[/red]" + ) + return False, -1 if not subtensor.subnet_exists(netuid=netuid): console.print( "[red]Network with netuid {} does not exist. Please try again.[/red]".format( @@ -1136,10 +1142,27 @@ def _run(cli: "bittensor.cli", subtensor: "bittensor.subtensor"): wallet = bittensor.wallet(config=cli.config) # check all - if not cli.config.is_set("all"): - exists, netuid = get_netuid(cli, subtensor) - if not exists: - return + if cli.config.is_set("all"): + cli.config.netuid = None + cli.config.all = True + elif cli.config.is_set("netuid"): + if cli.config.netuid == "all": + cli.config.all = True + else: + cli.config.netuid = int(cli.config.netuid) + exists, netuid = get_netuid(cli, subtensor) + if not exists: + return + else: + netuid_input = Prompt.ask("Enter netuid or 'all'", default="all") + if netuid_input == "all": + cli.config.netuid = None + cli.config.all = True + else: + cli.config.netuid = int(netuid_input) + exists, netuid = get_netuid(cli, subtensor, False) + if not exists: + return # get parent hotkey hotkey = get_hotkey(wallet, cli.config) @@ -1148,11 +1171,7 @@ def _run(cli: "bittensor.cli", subtensor: "bittensor.subtensor"): return try: - netuids = ( - subtensor.get_all_subnet_netuids() - if cli.config.is_set("all") - else [netuid] - ) + netuids = subtensor.get_all_subnet_netuids() if cli.config.all else [netuid] hotkey_stake = GetChildrenCommand.get_parent_stake_info( console, subtensor, hotkey ) @@ -1236,7 +1255,7 @@ def add_args(parser: argparse.ArgumentParser): parser = parser.add_parser( "get_children", help="""Get child hotkeys on subnet.""" ) - parser.add_argument("--netuid", dest="netuid", type=int, required=False) + parser.add_argument("--netuid", dest="netuid", type=str, required=False) parser.add_argument("--hotkey", dest="hotkey", type=str, required=False) parser.add_argument( "--all", @@ -1294,7 +1313,7 @@ def render_table( # Add columns to the table with specific styles table.add_column("Index", style="bold yellow", no_wrap=True, justify="center") - table.add_column("ChildHotkey", style="bold green") + table.add_column("Child Hotkey", style="bold green") table.add_column("Proportion", style="bold cyan", no_wrap=True, justify="right") table.add_column( "Childkey Take", style="bold blue", no_wrap=True, justify="right" diff --git a/tests/e2e_tests/subcommands/stake/test_childkeys.py b/tests/e2e_tests/subcommands/stake/test_childkeys.py index 080d01263d..a8f6518fcc 100644 --- a/tests/e2e_tests/subcommands/stake/test_childkeys.py +++ b/tests/e2e_tests/subcommands/stake/test_childkeys.py @@ -54,7 +54,7 @@ async def test_set_revoke_children_multiple(local_chain, capsys): assert local_chain.query("SubtensorModule", "NetworksAdded", [1]).serialize() for exec_command in [alice_exec_command, bob_exec_command, eve_exec_command]: - exec_command(RegisterCommand, ["s", "register", "--netuid", "1"]) + exec_command(RegisterCommand, ["s", "register", "--netuid", "4"]) alice_exec_command(StakeCommand, ["stake", "add", "--amount", "100000"]) @@ -75,8 +75,8 @@ async def wait(): await wait() children_with_proportions = [ - [0.4, bob_keypair.ss58_address], - [0.2, eve_keypair.ss58_address], + [0.2, bob_keypair.ss58_address], + [0.1, eve_keypair.ss58_address], ] # Test 1: Set multiple children @@ -86,7 +86,7 @@ async def wait(): "stake", "set_children", "--netuid", - "1", + "2", "--children", f"{children_with_proportions[0][1]},{children_with_proportions[1][1]}", "--hotkey", From 4379c596fa4a21c27c21cc9a08410be9e4c904d9 Mon Sep 17 00:00:00 2001 From: opendansor Date: Wed, 28 Aug 2024 17:09:10 -0700 Subject: [PATCH 2/9] CHK Test --- tests/e2e_tests/subcommands/stake/test_childkeys.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/e2e_tests/subcommands/stake/test_childkeys.py b/tests/e2e_tests/subcommands/stake/test_childkeys.py index a8f6518fcc..080d01263d 100644 --- a/tests/e2e_tests/subcommands/stake/test_childkeys.py +++ b/tests/e2e_tests/subcommands/stake/test_childkeys.py @@ -54,7 +54,7 @@ async def test_set_revoke_children_multiple(local_chain, capsys): assert local_chain.query("SubtensorModule", "NetworksAdded", [1]).serialize() for exec_command in [alice_exec_command, bob_exec_command, eve_exec_command]: - exec_command(RegisterCommand, ["s", "register", "--netuid", "4"]) + exec_command(RegisterCommand, ["s", "register", "--netuid", "1"]) alice_exec_command(StakeCommand, ["stake", "add", "--amount", "100000"]) @@ -75,8 +75,8 @@ async def wait(): await wait() children_with_proportions = [ - [0.2, bob_keypair.ss58_address], - [0.1, eve_keypair.ss58_address], + [0.4, bob_keypair.ss58_address], + [0.2, eve_keypair.ss58_address], ] # Test 1: Set multiple children @@ -86,7 +86,7 @@ async def wait(): "stake", "set_children", "--netuid", - "2", + "1", "--children", f"{children_with_proportions[0][1]},{children_with_proportions[1][1]}", "--hotkey", From 4b046c3351ce2695445a789bdc7a5d1836ac26a9 Mon Sep 17 00:00:00 2001 From: opendansor Date: Thu, 29 Aug 2024 11:29:07 -0700 Subject: [PATCH 3/9] u16 float limit --- bittensor/commands/stake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bittensor/commands/stake.py b/bittensor/commands/stake.py index 24868d0484..eff415d1a1 100644 --- a/bittensor/commands/stake.py +++ b/bittensor/commands/stake.py @@ -58,7 +58,7 @@ def get_netuid( ) return False, -1 netuid = cli.config.netuid - if netuid < 0 or netuid > 2**32 - 1: + if netuid < 0 or netuid > 65535: console.print( "[red]Invalid input. Please enter a valid integer for netuid in subnet range.[/red]" ) From 7542feaf2089584a9e9e268d677ac22d27604904 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C2=B5?= Date: Mon, 12 Aug 2024 16:03:22 +0000 Subject: [PATCH 4/9] bittensor/axon.py: thread and exception handling Various issues were encountered trying to run and understand e2e tests: - if uvicorn fails to start, an uncaught exception is emitted to stderr - axon keeps spinning waiting for self.started, indefinitely - exceptions are not propagated from threads - there is no way to (simply) test from the outside whether an axon started and/or runs - axon creates a thread that only creates another thread, which seems redundant This patch addresses some of these issues, in FastAPIThreadedServer: - add thread safe set/get_exception() to set/get exceptions - run_in_thread() yields the created thread, so that the code using it can check whether the thread is alive - uvicorn.Server.startup() is wrapped to set a thread-safe flag using self.set_started(True) to indicate startup succeeded - run_in_thread() times out after one second to prevent infinite loop in case self.get_started() never becomes True - run_in_thread() raises an exception if it fails to start the thread - _wrapper_run() tests whether the thread is still alive and in class axon, the following are added: - @property axon.exception(), returning any exception - axon.is_running(), returning True when the axon is operational The seemingly redundant thread is left in until feedback is received on the reasons for including it. --- bittensor/axon.py | 83 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 77 insertions(+), 6 deletions(-) diff --git a/bittensor/axon.py b/bittensor/axon.py index 8cefadfe61..348f257886 100644 --- a/bittensor/axon.py +++ b/bittensor/axon.py @@ -26,6 +26,7 @@ import inspect import json import os +import socket import threading import time import traceback @@ -100,26 +101,72 @@ class FastAPIThreadedServer(uvicorn.Server): should_exit: bool = False is_running: bool = False + """ + Provide a channel to signal exceptions from the thread to our caller. + """ + _exception: Exception = None + _lock: threading.Lock = threading.Lock() + _thread: threading.Thread = None + _started: bool = False + + def set_exception(self, ex): + with self._lock: + self._exception = ex + + def get_exception(self): + with self._lock: + return self._exception + + def set_thread(self, thread): + with self._lock: + self._thread = thread + + def get_thread(self): + with self._lock: + return self._thread + + def set_started(self, started): + with self._lock: + self._started = started + + def get_started(self): + with self._lock: + return self._started + def install_signal_handlers(self): """ Overrides the default signal handlers provided by ``uvicorn.Server``. This method is essential to ensure that the signal handling in the threaded server does not interfere with the main application's flow, especially in a complex asynchronous environment like the Axon server. """ pass + async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None: + """ + Adds a thread-safe call to set a 'started' flag on the object. + """ + ret = await super().startup(sockets) + self.set_started(True) + return ret + @contextlib.contextmanager def run_in_thread(self): """ Manages the execution of the server in a separate thread, allowing the FastAPI application to run asynchronously without blocking the main thread of the Axon server. This method is a key component in enabling concurrent request handling in the Axon server. Yields: - None: This method yields control back to the caller while the server is running in the background thread. + thread: a running thread + + Raises: + Exception: in case the server did not start (as signalled by self.get_started()) """ thread = threading.Thread(target=self.run, daemon=True) thread.start() try: - while not self.started: + t0 = time.time() + while not self.get_started() and time.time()-t0<1: time.sleep(1e-3) - yield + if not self.get_started(): + raise Exception("failed to start server") + yield thread finally: self.should_exit = True thread.join() @@ -128,9 +175,15 @@ def _wrapper_run(self): """ A wrapper method for the :func:`run_in_thread` context manager. This method is used internally by the ``start`` method to initiate the server's execution in a separate thread. """ - with self.run_in_thread(): - while not self.should_exit: - time.sleep(1e-3) + try: + with self.run_in_thread() as thread: + self.set_thread(thread) + while not self.should_exit: + if not thread.is_alive(): + raise Exception("worker thread died") + time.sleep(1e-3) + except Exception as e: + self.set_exception(e) def start(self): """ @@ -405,6 +458,24 @@ def info(self) -> "bittensor.AxonInfo": placeholder2=0, ) + # Our instantiator should be able to test axon.exception to see if any + # exception occurred. + @property + def exception(self): + # for future use: setting self._exception to signal an exception + e = getattr(self,'_exception',None) + if e: + return e + return self.fast_server.get_exception() + + # Our instantiator should be able to test axon.is_running() to see if all + # required threads etc are running. + def is_running(self): + t = self.fast_server.get_thread() + if t is None: + return False + return t.is_alive() + def attach( self, forward_fn: Callable, From 4a02761d52e0670aa140f3fe1e365f788d89f597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C2=B5?= Date: Wed, 14 Aug 2024 09:20:39 +0000 Subject: [PATCH 5/9] processed first round of feedback, plus extra's - ruff formatting - docstrings - variable names - type annotations - don't return ret = startup() which is annotated to return None - moved time sleep intervals to singular global, for clarity --- bittensor/axon.py | 66 +++++++++++++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/bittensor/axon.py b/bittensor/axon.py index 348f257886..99f72eca44 100644 --- a/bittensor/axon.py +++ b/bittensor/axon.py @@ -63,6 +63,12 @@ from bittensor.utils import networking +""" +The quantum of time to sleep in waiting loops, in seconds. +""" +TIME_SLEEP_INTERVAL: float = 1e-3 + + class FastAPIThreadedServer(uvicorn.Server): """ The ``FastAPIThreadedServer`` class is a specialized server implementation for the Axon server in the Bittensor network. @@ -109,27 +115,36 @@ class FastAPIThreadedServer(uvicorn.Server): _thread: threading.Thread = None _started: bool = False - def set_exception(self, ex): + def set_exception(self, exception: Exception) -> None: + """ + Set self._exception in a thread safe manner, so the worker thread can communicate exceptions to the main thread. + """ with self._lock: - self._exception = ex + self._exception = exception - def get_exception(self): + def get_exception(self) -> Optional[Exception]: with self._lock: return self._exception - def set_thread(self, thread): + def set_thread(self, thread: threading.Thread): + """ + Set self._thread in a thread safe manner, so the main thread can get the worker thread object. + """ with self._lock: self._thread = thread - def get_thread(self): + def get_thread(self) -> Optional[threading.Thread]: with self._lock: return self._thread - def set_started(self, started): + def set_started(self, started: bool) -> None: + """ + Set self._started in a thread safe manner, so the main thread can get the worker thread status. + """ with self._lock: self._started = started - def get_started(self): + def get_started(self) -> bool: with self._lock: return self._started @@ -143,9 +158,8 @@ async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None: """ Adds a thread-safe call to set a 'started' flag on the object. """ - ret = await super().startup(sockets) + await super().startup(sockets) self.set_started(True) - return ret @contextlib.contextmanager def run_in_thread(self): @@ -161,9 +175,9 @@ def run_in_thread(self): thread = threading.Thread(target=self.run, daemon=True) thread.start() try: - t0 = time.time() - while not self.get_started() and time.time()-t0<1: - time.sleep(1e-3) + time_start = time.time() + while not self.get_started() and time.time() - time_start < 1: + time.sleep(TIME_SLEEP_INTERVAL) if not self.get_started(): raise Exception("failed to start server") yield thread @@ -181,7 +195,7 @@ def _wrapper_run(self): while not self.should_exit: if not thread.is_alive(): raise Exception("worker thread died") - time.sleep(1e-3) + time.sleep(TIME_SLEEP_INTERVAL) except Exception as e: self.set_exception(e) @@ -458,23 +472,25 @@ def info(self) -> "bittensor.AxonInfo": placeholder2=0, ) - # Our instantiator should be able to test axon.exception to see if any - # exception occurred. @property - def exception(self): + def exception(self) -> Optional[Exception]: + """ + Axon objects expose exceptions that occurred internally through the .exception property. + """ # for future use: setting self._exception to signal an exception - e = getattr(self,'_exception',None) - if e: - return e + exception = getattr(self, "_exception", None) + if exception: + return exception return self.fast_server.get_exception() - # Our instantiator should be able to test axon.is_running() to see if all - # required threads etc are running. - def is_running(self): - t = self.fast_server.get_thread() - if t is None: + def is_running(self) -> bool: + """ + Axon objects can be queried using .is_running() to test whether worker threads are running. + """ + thread = self.fast_server.get_thread() + if thread is None: return False - return t.is_alive() + return thread.is_alive() def attach( self, From 776e9b4fb84b95878849bff302c0488d5b311820 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C2=B5?= Date: Wed, 14 Aug 2024 18:43:39 +0000 Subject: [PATCH 6/9] additional fixes --- bittensor/axon.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bittensor/axon.py b/bittensor/axon.py index 99f72eca44..0a62d6c84b 100644 --- a/bittensor/axon.py +++ b/bittensor/axon.py @@ -110,9 +110,9 @@ class FastAPIThreadedServer(uvicorn.Server): """ Provide a channel to signal exceptions from the thread to our caller. """ - _exception: Exception = None + _exception: Optional[Exception] = None _lock: threading.Lock = threading.Lock() - _thread: threading.Thread = None + _thread: Optional[threading.Thread] = None _started: bool = False def set_exception(self, exception: Exception) -> None: From 1c73994f5ff9cbc8e381309f7be07186f6a03ba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C2=B5?= Date: Wed, 14 Aug 2024 09:42:33 +0000 Subject: [PATCH 7/9] rename bittensor/types.py to bittensor/bt_types.py to prevent mixups after pip install Although it seems common to have types.py (e.g. scalecodec, torch, substrateinterface) this may lead to issues after applying pip install -e to a package, as is suggested for bittensor (see git grep 'pip install -e.* bittensor') Issues were observed where circular imports would arise as Python's native typing.py would include bittensor's types.py. --- bittensor/{types.py => bt_types.py} | 0 bittensor/subtensor.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename bittensor/{types.py => bt_types.py} (100%) diff --git a/bittensor/types.py b/bittensor/bt_types.py similarity index 100% rename from bittensor/types.py rename to bittensor/bt_types.py diff --git a/bittensor/subtensor.py b/bittensor/subtensor.py index ac22a3a14d..cc8f8d10c8 100644 --- a/bittensor/subtensor.py +++ b/bittensor/subtensor.py @@ -109,7 +109,7 @@ unstake_extrinsic, unstake_multiple_extrinsic, ) -from .types import AxonServeCallParams, PrometheusServeCallParams +from .bt_types import AxonServeCallParams, PrometheusServeCallParams from .utils import ( U16_NORMALIZED_FLOAT, ss58_to_vec_u8, From cc84a723d4f5859d434883968f666798a6b5d616 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 22 Nov 2024 12:49:44 +0200 Subject: [PATCH 8/9] Merge conflict --- bittensor/core/axon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bittensor/core/axon.py b/bittensor/core/axon.py index 5a2254dd38..a1bdaaebe1 100644 --- a/bittensor/core/axon.py +++ b/bittensor/core/axon.py @@ -173,7 +173,7 @@ def install_signal_handlers(self): Overrides the default signal handlers provided by ``uvicorn.Server``. This method is essential to ensure that the signal handling in the threaded server does not interfere with the main application's flow, especially in a complex asynchronous environment like the Axon server. """ - async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None: + async def startup(self, sockets: Optional[list[socket.socket]] = None) -> None: """ Adds a thread-safe call to set a 'started' flag on the object. """ From af4db0bb756f667f3e2013ca900e9677f68474d8 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 22 Nov 2024 12:58:50 +0200 Subject: [PATCH 9/9] Merge conflict --- bittensor/core/axon.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bittensor/core/axon.py b/bittensor/core/axon.py index a1bdaaebe1..f1f28d44e3 100644 --- a/bittensor/core/axon.py +++ b/bittensor/core/axon.py @@ -22,7 +22,6 @@ import copy import inspect import json -import os import socket import threading import time