diff --git a/constants/__init__.py b/constants/__init__.py index 1a39634b..d7b7f446 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -24,7 +24,10 @@ ModelConstraints, NormValidationConstraints, ) -from taoverse.model.competition.epsilon import FixedEpsilon +from taoverse.model.competition.epsilon import ( + FixedEpsilon, + LinearDecay +) from competitions.data import CompetitionId from typing import Dict, List, Tuple @@ -34,7 +37,7 @@ # --------------------------------- # Release -__version__ = "4.2.0" +__version__ = "4.3.0" # Validator schema version __validator_version__ = "3.1.0" @@ -150,7 +153,62 @@ epsilon_func=FixedEpsilon(0.005), max_bytes=29 * 1024 * 1024 * 1024, ), +} +# Defined model constraints by competition id with decaying epsilon +MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY: Dict[CompetitionId, ModelConstraints] = { + CompetitionId.M772_MODEL: ModelConstraints( + max_model_parameter_size=772_000_000, + min_model_parameter_size=572_000_000, + sequence_length=1024, + allowed_architectures=ALLOWED_MODEL_TYPES_1, + tokenizer="distilgpt2", + eval_block_delay=0, + epsilon_func=LinearDecay(0.005, 0.001, 50400), + max_bytes=5 * 1024 * 1024 * 1024, + ), + CompetitionId.B7_MODEL: ModelConstraints( + max_model_parameter_size=6_900_000_000, + min_model_parameter_size=6_700_000_000, + sequence_length=4096, + allowed_architectures=ALLOWED_MODEL_TYPES_2, + tokenizer="Xenova/gpt-4", + kwargs={ + "torch_dtype": torch.bfloat16, + "attn_implementation": "flash_attention_2", + }, + eval_block_delay=0, + epsilon_func=LinearDecay(0.005, 0.001, 50400), + max_bytes=15 * 1024 * 1024 * 1024, + ), + CompetitionId.B3_MODEL: ModelConstraints( + max_model_parameter_size=3_400_000_000, + min_model_parameter_size=3_200_000_000, + sequence_length=4096, + allowed_architectures=ALLOWED_MODEL_TYPES_2, + tokenizer="Xenova/gpt-4", + kwargs={ + "torch_dtype": torch.bfloat16, + "attn_implementation": "flash_attention_2", + }, + eval_block_delay=0, + epsilon_func=LinearDecay(0.005, 0.001, 50400), + max_bytes=15 * 1024 * 1024 * 1024, + ), + CompetitionId.B14_MODEL: ModelConstraints( + max_model_parameter_size=13_900_000_000, + min_model_parameter_size=13_700_000_000, + sequence_length=4096, + allowed_architectures=ALLOWED_MODEL_TYPES_2, + tokenizer="Xenova/gpt-4", + kwargs={ + "torch_dtype": torch.bfloat16, + "attn_implementation": "flash_attention_2", + }, + eval_block_delay=0, + epsilon_func=LinearDecay(0.005, 0.001, 100800), + max_bytes=29 * 1024 * 1024 * 1024, + ), } @@ -206,28 +264,26 @@ [ Competition( CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.M772_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[CompetitionId.M772_MODEL], 0.14, ), Competition( CompetitionId.B3_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B3_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[CompetitionId.B3_MODEL], 0.29, ), Competition( CompetitionId.B7_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[CompetitionId.B7_MODEL], 0.15, ), Competition( CompetitionId.B14_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[CompetitionId.B14_MODEL], 0.42, - ) - + ), ], ), - ] for block_and_competitions in COMPETITION_SCHEDULE_BY_BLOCK: @@ -251,8 +307,6 @@ # validator scoring exponential temperature # 0.01 gives ~96% to best model with only ~3 receiving any weights. temperature = 0.01 -# validator score boosting for earlier models. -timestamp_epsilon = 0.005 # block to activate sample unpacking sample_unpack_block = BLOCK_3B_7BSTAR_UNPACK @@ -275,5 +329,7 @@ updated_models_limit = sample_min * len(MODEL_CONSTRAINTS_BY_COMPETITION_ID) + 10 # time required between updates to the chain. chain_update_cadence = dt.timedelta(minutes=20) -# time required between retrying evaluation of a stale model. (First retry will be immediate). -model_retry_cadence = dt.timedelta(hours=4) +# Number of blocks required between retrying evaluation of a model. +model_retry_cadence = 300 # Roughly 1 hour +# How frequently to check the models given weights by other large validators. +scan_top_model_cadence = dt.timedelta(minutes=30) diff --git a/docs/validator.md b/docs/validator.md index 2df3457a..755569a5 100644 --- a/docs/validator.md +++ b/docs/validator.md @@ -38,12 +38,12 @@ You can view the entire validation system by reading the code in `neurons/valida set_weights( weight ) ``` -The behaviour of `iswin( loss_a, loss_b, block_a, block_b, epsilon)` function intentionally skews the win function to reward models which have been hosted earlier such that newer models are only better than others iff their loss is `epsilon` percent lower accoring to the following function. Currently `epsilon` is set to 1% and is a hyper parameter of the mechanism +The behaviour of `iswin( loss_a, loss_b, block_a, block_b, epsilon_func, curr_block)` function intentionally skews the win function to reward models which have been hosted earlier such that newer models are only better than others iff their loss is `epsilon` percent lower accoring to the following function. `epsilon` is calculated based on a per-competition specified function based on the distance from the earlier model block to the current block. ```python -def iswin( loss_a, loss_b, block_a, block_b, epsilon): - loss_a = (1 - epsilon) * loss_a if block_a < block_b else loss_a - loss_b = (1 - epsilon) * loss_b if block_b < block_a else loss_b +def iswin(loss_a, loss_b, block_a, block_b, epsilon_func, curr_block): + loss_a = (1 - epsilon_func(curr_block, block_a)) * loss_a if block_a < block_b else loss_a + loss_b = (1 - epsilon_func(curr_block, block_b)) * loss_b if block_b < block_a else loss_b return loss_a < loss_b ``` @@ -53,7 +53,7 @@ It is important to note that this affects the game theoretics of the incentive l Validators will need enough disk space to store the models of miners being evaluated. Each model has a max size by block defined in [constants/__init__.py](https://github.com/macrocosm-os/pretraining/blob/main/constants/__init__.py#L57) and the validator has cleanup logic to remove old models. It is recommended to have at least 2 TB of disk space and 80GB of system memory. -Validators will need enough processing power to evaluate their model. As of Apr 1st, 2024 it is required to have a GPU that supports [flash attention 2](https://github.com/Dao-AILab/flash-attention) with atleast 48 GB of VRAM and at least 38 TFLOPs for half precision (bfloat 16) operations. +Validators will need enough processing power to evaluate their model. As of Sept 2nd, 2024, an upgrade to the Nvidia A100 GPU with 80GB of VRAM is required. This GPU's high throughput and FLOPs enable the running of 14B models without impacting the speed of the validation cycle. Although only 40GB of VRAM is necessary, we have observed that A100 GPUs with 80GB are more readily available and are offered at a comparable price to the 40GB variants. The additional VRAM provided by this GPU will allows more flexibility for optimization in future releases, enabling larger validation batch sizes to enhance the stability of the validation process by reducing scoring variance. # Getting Started diff --git a/model/retry.py b/model/retry.py new file mode 100644 index 00000000..b9c48869 --- /dev/null +++ b/model/retry.py @@ -0,0 +1,53 @@ +import math +from typing import List + +from taoverse.model.competition.data import EpsilonFunc +from taoverse.model.data import EvalResult + + +def should_retry_model( + epsilon_func: EpsilonFunc, curr_block: int, eval_history: List[EvalResult] +) -> bool: + """Determines if a model should be retried based on its evaluation history and the current state. + + A model is retryable if any of the following apply: + - It has never been evaluated. + - When it was last evaluated it had a better loss than the top model but couldn't overcome the epsilon disadvantage. + However, now epsilon has lowered to the point that it may be able to overcome the epsilon disadvantage. + - The model has only been evaluated once and it hit an error. In this case, we allow a single retry. + + Args: + epsilon_func (EpsilonFunc): The function to compute the current epsilon. + curr_block (int): The current block + eval_history (List[EvalResult]): The (potentially empty) evaluation history of the model. + """ + # If the model has never been evaluated, we should retry it. + if not eval_history: + return True + + # Find the most recent successful eval. + last_successful_eval = None + for eval_result in reversed(eval_history): + if eval_result.score != math.inf: + last_successful_eval = eval_result + break + + if last_successful_eval: + # If this model had worse loss than the top model during the last eval, no need to retry. + # NOTE: "score" = avg_loss so lower is better. + if last_successful_eval.score > last_successful_eval.winning_model_score: + return False + + # Otherwise, this model is potentially better than the top model but at the time it was evaluated + # it couldn't overcome the epsilon disadvantage. Check if epsilon has changed to the point where + # we should retry this model now. + curr_epsilon = epsilon_func.compute_epsilon( + current_block=curr_block, + model_block=last_successful_eval.winning_model_block, + ) + # Compute the adjusted loss of the top model based on the current epsilon. + top_model_score = last_successful_eval.winning_model_score * (1 - curr_epsilon) + return last_successful_eval.score < top_model_score + + # This model has been evaluated but has errored every time. Allow a single retry in this case. + return len(eval_history) < 2 diff --git a/neurons/validator.py b/neurons/validator.py index 94508220..051ca85c 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -16,13 +16,18 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. +# Due to the implementation of disable_progress_bars(), this has to be the first import+call in the application relating to huggingface +from huggingface_hub.utils import disable_progress_bars + +disable_progress_bars() + import asyncio import copy +import dataclasses import datetime as dt import functools import json import math -import multiprocessing import os import pickle import threading @@ -34,6 +39,7 @@ import bittensor as bt import torch import wandb + from huggingface_hub.utils import RepositoryNotFoundError from rich.console import Console from rich.table import Table @@ -44,6 +50,8 @@ from taoverse.model.competition import utils as competition_utils from taoverse.model.competition.competition_tracker import CompetitionTracker from taoverse.model.competition.data import Competition +from taoverse.model.competition.epsilon import EpsilonFunc, FixedEpsilon +from taoverse.model.data import EvalResult from taoverse.model.model_tracker import ModelTracker from taoverse.model.model_updater import MinerMisconfiguredError, ModelUpdater from taoverse.model.storage.chain.chain_model_metadata_store import ( @@ -59,11 +67,29 @@ import constants import pretrain as pt from competitions.data import CompetitionId +from model.retry import should_retry_model from neurons import config os.environ["TOKENIZERS_PARALLELISM"] = "true" +@dataclasses.dataclass +class PerUIDEvalState: + """State tracked per UID in the eval loop""" + + # The block the model was submitted. + block: int = math.inf + + # The hotkey for the UID at the time of eval. + hotkey: str = "Unknown" + + # The hugging face repo name. + repo_name: str = "Unknown" + + # The losses per batch. + losses: typing.List[float] = dataclasses.field(default=None) + + class Validator: MODEL_TRACKER_FILENAME = "model_tracker.pickle" COMPETITION_TRACKER_FILENAME = "competition_tracker.pickle" @@ -107,6 +133,12 @@ def __init__(self): self.config.netuid ) + # Register a listener for metagraph updates. + self.subnet_metagraph_syncer.register_listener( + self._on_subnet_metagraph_updated, + netuids=[self.config.netuid], + ) + torch.backends.cudnn.benchmark = True # Dont check registration status if offline. @@ -122,7 +154,6 @@ def __init__(self): # === Running args === self.weights = torch.zeros_like(torch.tensor(self.metagraph.S)) - self.epoch_step = 0 self.global_step = 0 self.last_epoch = self.metagraph.block.item() self.last_wandb_step = 0 @@ -330,129 +361,30 @@ def update_models(self): uid_last_checked_sequential = dict() # Track how recently we checked the list of top models. last_checked_top_models_time = None - # Track how recently we retried a model with incentive we've already dropped. - uid_last_retried_evaluation = dict() # The below loop iterates across all miner uids and checks to see # if they should be updated. while not self.stop_event.is_set(): try: - # At most once per `chain_update_cadence`, check which models are being assigned weight by + # At most once per `scan_top_model_cadence`, check which models are being assigned weight by # the top validators and ensure they'll be evaluated soon. if ( not last_checked_top_models_time or dt.datetime.now() - last_checked_top_models_time - > constants.chain_update_cadence + > constants.scan_top_model_cadence ): last_checked_top_models_time = dt.datetime.now() - # Take a deep copy of the metagraph for use in the top uid retry check. - # The regular loop below will use self.metagraph which may be updated as we go. - with self.metagraph_lock: - metagraph = copy.deepcopy(self.metagraph) - - # Find any miner UIDs which top valis are assigning weight and aren't currently scheduled for an eval. - # This is competition agnostic, as anything with weight is 'winning' a competition for some vali. - top_miner_uids = metagraph_utils.get_top_miners( - metagraph, - constants.WEIGHT_SYNC_VALI_MIN_STAKE, - constants.WEIGHT_SYNC_MINER_MIN_PERCENT, - ) - - with self.pending_uids_to_eval_lock: - all_uids_to_eval = set() - all_pending_uids_to_eval = set() - # Loop through the uids across all competitions. - for uids in self.uids_to_eval.values(): - all_uids_to_eval.update(uids) - for uids in self.pending_uids_to_eval.values(): - all_pending_uids_to_eval.update(uids) - - # Reduce down to top models that are not in any competition yet. - uids_to_add = ( - top_miner_uids - all_uids_to_eval - all_pending_uids_to_eval - ) - - for uid in uids_to_add: - # Limit how often we'll retry these top models. - time_diff = ( - dt.datetime.now() - uid_last_retried_evaluation[uid] - if uid in uid_last_retried_evaluation - else constants.model_retry_cadence # Default to being stale enough to check again. - ) - if time_diff >= constants.model_retry_cadence: - try: - uid_last_retried_evaluation[uid] = dt.datetime.now() - - # Redownload this model and schedule it for eval. - # Still respect the eval block delay so that previously top uids can't bypass it. - hotkey = metagraph.hotkeys[uid] - should_retry = asyncio.run( - self.model_updater.sync_model( - hotkey=hotkey, - curr_block=metagraph.block.item(), - schedule_by_block=constants.COMPETITION_SCHEDULE_BY_BLOCK, - force=True, - ) - ) - - if should_retry: - # Since this is a top model (as determined by other valis), - # we don't worry if self.pending_uids is already "full". At most - # there can be 10 top models that we'd add here and that would be - # a wildy exceptional case. It would require every vali to have a - # different top model. - # Validators should only have ~1 winner per competition and we only check bigger valis - # so there should not be many simultaneous top models not already being evaluated. - top_model_metadata = self.model_tracker.get_model_metadata_for_miner_hotkey( - hotkey - ) - if top_model_metadata is not None: - bt.logging.trace( - f"Shortcutting to top model or retrying evaluation for previously discarded top model with incentive for UID={uid}" - ) - with self.pending_uids_to_eval_lock: - self.pending_uids_to_eval[ - top_model_metadata.id.competition_id - ].add(uid) - else: - bt.logging.warning( - f"Failed to find metadata for uid {uid} with hotkey {hotkey}" - ) - - except Exception: - bt.logging.debug( - f"Failure in update loop for UID={uid} during top model check. {traceback.format_exc()}" - ) + self._queue_top_models_for_eval() # Top model check complete. Now continue with the sequential iterator to check for the next miner # to update. - # Only allow up to limit for updated models. Typically this is carryover from sample_min + new models. - # Note that this is shared across all competitions. So if we happen to get more pending for one - # competition we still need to wait until that competition goes down to sample_min. - pending_uid_count, current_uid_count = ( - self.get_pending_and_current_uid_counts() - ) - - # Only allow at most sample max models. Typically this will be carryover from sample_min + new models. - while ( - pending_uid_count + current_uid_count - >= self.config.updated_models_limit - ): - # Wait 5 minutes for the eval loop to process them. - bt.logging.info( - f"Update loop: Already {pending_uid_count + current_uid_count} synced models pending eval. Checking again in 5 minutes." - ) - time.sleep(300) - # Check to see if the pending uids have been cleared yet. - pending_uid_count, current_uid_count = ( - self.get_pending_and_current_uid_counts() - ) + self._wait_for_open_eval_slot() # We have space to add more models for eval. Process the next UID. next_uid = next(self.miner_iterator) - # Confirm that we haven't already checked it in the chain update cadence. + # Confirm that we haven't already checked it within the chain update cadence. time_diff = ( dt.datetime.now() - uid_last_checked_sequential[next_uid] if next_uid in uid_last_checked_sequential @@ -469,21 +401,72 @@ def update_models(self): time.sleep(time_to_sleep) uid_last_checked_sequential[next_uid] = dt.datetime.now() + curr_block = self._get_current_block() # Get their hotkey from the metagraph. with self.metagraph_lock: hotkey = self.metagraph.hotkeys[next_uid] - curr_block = self.metagraph.block.item() + + # Check if we should retry this model and force a sync if necessary. + force_sync = False + model_metadata = self.model_tracker.get_model_metadata_for_miner_hotkey( + hotkey + ) + + if model_metadata: + # Check if the model is already queued for eval. + is_queued_for_eval = False + with self.pending_uids_to_eval_lock: + is_queued_for_eval = ( + next_uid + in self.pending_uids_to_eval[ + model_metadata.id.competition_id + ] + or next_uid + in self.uids_to_eval[model_metadata.id.competition_id] + ) + + competition = competition_utils.get_competition_for_block( + model_metadata.id.competition_id, + curr_block, + constants.COMPETITION_SCHEDULE_BY_BLOCK, + ) + if competition is not None and not is_queued_for_eval: + eval_history = ( + self.model_tracker.get_eval_results_for_miner_hotkey(hotkey) + ) + force_sync = should_retry_model( + competition.constraints.epsilon_func, + curr_block, + eval_history, + ) + if force_sync: + bt.logging.debug( + f"Force downloading model for UID {next_uid} because it should be retried. Eval_history={eval_history}" + ) # Compare metadata and tracker, syncing new model from remote store to local if necessary. - updated = asyncio.run( - self.model_updater.sync_model( - hotkey=hotkey, - curr_block=curr_block, - schedule_by_block=constants.COMPETITION_SCHEDULE_BY_BLOCK, - force=False, + try: + updated = asyncio.run( + self.model_updater.sync_model( + hotkey=hotkey, + curr_block=curr_block, + schedule_by_block=constants.COMPETITION_SCHEDULE_BY_BLOCK, + force=force_sync, + ) ) - ) + except MinerMisconfiguredError as e: + self.model_tracker.on_model_evaluated( + hotkey, + EvalResult( + block=curr_block, + score=math.inf, + # We don't care about the winning model for this check since we just need to log the model eval failure. + winning_model_block=0, + winning_model_score=0, + ), + ) + raise e if updated: metadata = self.model_tracker.get_model_metadata_for_miner_hotkey( @@ -513,6 +496,110 @@ def update_models(self): bt.logging.info("Exiting update models loop.") + def _wait_for_open_eval_slot(self) -> None: + """Waits until there is at least one slot open to download and evaluate a model.""" + pending_uid_count, current_uid_count = self.get_pending_and_current_uid_counts() + + while pending_uid_count + current_uid_count >= self.config.updated_models_limit: + # Wait 5 minutes for the eval loop to process them. + bt.logging.info( + f"Update loop: There are already {pending_uid_count + current_uid_count} synced models pending eval. Checking again in 5 minutes." + ) + time.sleep(300) + # Check to see if the pending uids have been cleared yet. + pending_uid_count, current_uid_count = ( + self.get_pending_and_current_uid_counts() + ) + + def _queue_top_models_for_eval(self) -> None: + # Take a deep copy of the metagraph for use in the top uid retry check. + # The regular loop below will use self.metagraph which may be updated as we go. + with self.metagraph_lock: + metagraph = copy.deepcopy(self.metagraph) + + # Find any miner UIDs which top valis are assigning weight and aren't currently scheduled for an eval. + # This is competition agnostic, as anything with weight is 'winning' a competition for some vali. + top_miner_uids = metagraph_utils.get_top_miners( + metagraph, + constants.WEIGHT_SYNC_VALI_MIN_STAKE, + constants.WEIGHT_SYNC_MINER_MIN_PERCENT, + ) + + with self.pending_uids_to_eval_lock: + all_uids_to_eval = set() + all_pending_uids_to_eval = set() + # Loop through the uids across all competitions. + for uids in self.uids_to_eval.values(): + all_uids_to_eval.update(uids) + for uids in self.pending_uids_to_eval.values(): + all_pending_uids_to_eval.update(uids) + + # Reduce down to top models that are not in any competition yet. + uids_to_add = top_miner_uids - all_uids_to_eval - all_pending_uids_to_eval + + for uid in uids_to_add: + # Check when we last evaluated this model. + hotkey = metagraph.hotkeys[uid] + eval_history = self.model_tracker.get_eval_results_for_miner_hotkey(hotkey) + last_eval_block = eval_history[-1].block if eval_history else 0 + curr_block = self._get_current_block() + if curr_block - last_eval_block >= constants.model_retry_cadence: + try: + # It's been long enough - redownload this model and schedule it for eval. + # This still respects the eval block delay so that previously top uids can't bypass it. + try: + should_retry = asyncio.run( + self.model_updater.sync_model( + hotkey=hotkey, + curr_block=curr_block, + schedule_by_block=constants.COMPETITION_SCHEDULE_BY_BLOCK, + force=True, + ) + ) + except MinerMisconfiguredError as e: + self.model_tracker.on_model_evaluated( + hotkey, + EvalResult( + block=curr_block, + score=math.inf, + # We don't care about the winning model for this check since we just need to log the model eval failure. + winning_model_block=0, + winning_model_score=0, + ), + ) + raise e + + if not should_retry: + continue + + # Since this is a top model (as determined by other valis), + # we don't worry if self.pending_uids is already "full". At most + # there can be 10 * comps top models that we'd add here and that would be + # a wildy exceptional case. It would require every vali to have a + # different top model. + # Validators should only have ~1 winner per competition and we only check bigger valis + # so there should not be many simultaneous top models not already being evaluated. + top_model_metadata = ( + self.model_tracker.get_model_metadata_for_miner_hotkey(hotkey) + ) + if top_model_metadata is not None: + bt.logging.trace( + f"Shortcutting to top model or retrying evaluation for previously discarded top model with incentive for UID={uid}" + ) + with self.pending_uids_to_eval_lock: + self.pending_uids_to_eval[ + top_model_metadata.id.competition_id + ].add(uid) + else: + bt.logging.warning( + f"Failed to find metadata for uid {uid} with hotkey {hotkey}" + ) + + except Exception: + bt.logging.debug( + f"Failure in update loop for UID={uid} during top model check. {traceback.format_exc()}" + ) + def clean_models(self): """Cleans up models that are no longer referenced.""" @@ -567,13 +654,12 @@ def clean_models(self): bt.logging.info("Exiting clean models loop.") - async def try_set_weights(self, ttl: int): + async def try_set_weights(self, block: int, ttl: int): """Sets the weights on the chain with ttl, without raising exceptions if it times out.""" async def _try_set_weights(): with self.metagraph_lock: uids = self.metagraph.uids - cur_block = self.metagraph.block.item() try: self.weights.nan_to_num(0.0) self.subtensor.set_weights( @@ -585,7 +671,7 @@ async def _try_set_weights(): version_key=constants.weights_version_key, ) # We only update the last epoch when we successfully set weights. - self.last_epoch = cur_block + self.last_epoch = block except: bt.logging.warning("Failed to set weights. Trying again later.") @@ -605,27 +691,32 @@ async def _try_set_weights(): except asyncio.TimeoutError: bt.logging.error(f"Failed to set weights after {ttl} seconds") - async def try_sync_metagraph(self, ttl: int): - """Syncs the metagraph with ttl in a background process, without raising exceptions if it times out.""" - - def sync_metagraph(endpoint): - metagraph = bt.subtensor(endpoint).metagraph(self.config.netuid, lite=False) - metagraph.save() - - process = multiprocessing.Process( - target=sync_metagraph, args=(self.subtensor.chain_endpoint,) - ) - process.start() - process.join(timeout=ttl) - if process.is_alive(): - process.terminate() - process.join() - bt.logging.error(f"Failed to sync metagraph after {ttl} seconds") + def _get_current_block(self) -> int: + """Returns the current block.""" + try: + return self.subtensor.block + except: + bt.logging.debug( + "Failed to get the latest block from the chain. Using the block from the cached metagraph." + ) + # Network call failed. Fallback to using the block from the metagraph, + # even though it'll be a little stale. + with self.metagraph_lock: + return self.metagraph.block.item() + + def _on_subnet_metagraph_updated( + self, metagraph: bt.metagraph, netuid: int + ) -> None: + """Processes an update to the metagraph for the subnet.""" + if netuid != self.config.netuid: + bt.logging.error( + f"Unexpected subnet uid in subnet metagraph syncer: {netuid}" + ) return - bt.logging.info("Synced metagraph") with self.metagraph_lock: - self.metagraph.load() + bt.logging.info("Synced metagraph") + self.metagraph = copy.deepcopy(metagraph) self.miner_iterator.set_miner_uids(self.metagraph.uids.tolist()) self.model_tracker.on_hotkeys_updated(set(self.metagraph.hotkeys)) @@ -654,12 +745,7 @@ async def run_step(self): 7. Logs all relevant data for the step, including model IDs, pages, batches, wins, win rates, and losses. """ - # Take the current block. - # Note from Finetuning repo: - # block on the metagraph only updates on sync operations. - # Therefore validators may not start evaluating on a new competition schedule immediately. - with self.metagraph_lock: - cur_block = self.metagraph.block.item() + cur_block = self._get_current_block() # Get the competition schedule for the current block. # This is a list of competitions @@ -697,12 +783,7 @@ async def run_step(self): time.sleep(300) return - # TODO: Consider condensing the following + competition id into a uid to metadata map. - # Keep track of which block this uid last updated their model. - # Default to an infinite block if we can't retrieve the metadata for the miner. - uid_to_block = defaultdict(lambda: math.inf) - # Keep track of the hugging face repo for this uid. - uid_to_hf = defaultdict(lambda: "unknown") + uid_to_state = defaultdict(PerUIDEvalState) bt.logging.trace(f"Current block: {cur_block}") @@ -754,9 +835,6 @@ async def run_step(self): bt.logging.debug(f"Competition {competition.id} | Computing losses on {uids}") bt.logging.debug(f"Pages used are {pages}") - # Compute model losses on batches. - losses_per_uid = {muid: None for muid in uids} - load_model_perf = PerfMonitor("Eval: Load model") compute_loss_perf = PerfMonitor("Eval: Compute loss") @@ -769,6 +847,7 @@ async def run_step(self): # Check that the model is in the tracker. with self.metagraph_lock: hotkey = self.metagraph.hotkeys[uid_i] + uid_to_state[uid_i].hotkey = hotkey model_i_metadata = self.model_tracker.get_model_metadata_for_miner_hotkey( hotkey @@ -784,9 +863,11 @@ async def run_step(self): ) # Update the block this uid last updated their model. - uid_to_block[uid_i] = model_i_metadata.block + uid_to_state[uid_i].block = model_i_metadata.block # Update the hf repo for this model. - uid_to_hf[uid_i] = model_utils.get_hf_repo_name(model_i_metadata) + uid_to_state[uid_i].repo_name = model_utils.get_hf_repo_name( + model_i_metadata + ) # Get the model locally and evaluate its loss. model_i = None @@ -820,17 +901,27 @@ async def run_step(self): f"Unable to load the model for {uid_i} or it belongs to another competition. Setting loss to inifinity for this competition." ) - losses_per_uid[uid_i] = losses + uid_to_state[uid_i].losses = losses average_model_loss = sum(losses) / len(losses) bt.logging.trace( f"Computed model losses for uid:{uid_i} with average loss: {average_model_loss}" ) # Compute wins and win rates per uid. + losses_per_uid = {uid: state.losses for uid, state in uid_to_state.items()} + uid_to_block = {uid: state.block for uid, state in uid_to_state.items()} wins, win_rate = pt.validation.compute_wins( - uids, losses_per_uid, batches, uid_to_block, constants.timestamp_epsilon + uids, + losses_per_uid, + batches, + uid_to_block, + competition.constraints.epsilon_func, + cur_block, ) + top_uid = max(win_rate, key=win_rate.get) + self._record_eval_results(top_uid, cur_block, uid_to_state) + # Compute softmaxed weights based on win rate. model_weights = torch.tensor( [win_rate[uid] for uid in uids], dtype=torch.float32 @@ -843,13 +934,15 @@ async def run_step(self): and cur_block >= constants.timestamp_epsilon_experiment_start_block and cur_block < constants.timestamp_epsilon_experiment_end_block ): + epsilon_experiment_func = FixedEpsilon(0.001) wins_epsilon_experiment, win_rate_epsilon_experiment = ( pt.validation.compute_wins( uids, losses_per_uid, batches, uid_to_block, - constants.timestamp_epsilon_experiment, + epsilon_experiment_func, + cur_block, ) ) @@ -888,15 +981,15 @@ async def run_step(self): ) self.log_step( CompetitionId.B7_MODEL_LOWER_EPSILON, + epsilon_experiment_func, + cur_block, uids, - uid_to_block, - uid_to_hf, + uid_to_state, uids_to_competition_ids_epsilon_experiment, pages, model_weights_epsilon_experiment, wins_epsilon_experiment, win_rate_epsilon_experiment, - losses_per_uid, load_model_perf, compute_loss_perf, ) @@ -937,12 +1030,14 @@ async def run_step(self): for uid, wr in win_rate.items() } - with self.pending_uids_to_eval_lock: - self.uids_to_eval[competition.id] = set( - sorted( - model_prioritization, key=model_prioritization.get, reverse=True - )[: self.config.sample_min] - ) + models_to_keep = set( + sorted(model_prioritization, key=model_prioritization.get, reverse=True)[ + : self.config.sample_min + ] + ) + self._update_uids_to_eval( + competition.id, models_to_keep, active_competition_ids + ) # Save state self.save_state() @@ -954,15 +1049,15 @@ async def run_step(self): # Log to screen and wandb. self.log_step( competition.id, + competition.constraints.epsilon_func, + cur_block, uids, - uid_to_block, - uid_to_hf, + uid_to_state, self._get_uids_to_competition_ids(), pages, model_weights, wins, win_rate, - losses_per_uid, load_model_perf, compute_loss_perf, ) @@ -970,18 +1065,83 @@ async def run_step(self): # Increment the number of completed run steps by 1 self.run_step_count += 1 + def _update_uids_to_eval( + self, + competition_id: CompetitionId, + uids: typing.Set[int], + active_competitions: typing.Set[int], + ): + """Updates the uids to evaluate and clears out any sunset competitions. + + Args: + competition_id (CompetitionId): The competition id to update. + uids (typing.Set[int]): The set of uids to evaluate in this competition on the next eval loop. + """ + with self.pending_uids_to_eval_lock: + self.uids_to_eval[competition_id] = uids + + # Clean up sunset competitions. + # This works as expected even though the keys are CompetitionIds and active_competitions are ints. + comps_to_delete = ( + set(self.uids_to_eval.keys()) | set(self.pending_uids_to_eval.keys()) + ) - active_competitions + for comp in comps_to_delete: + bt.logging.debug( + f"Cleaning up uids to eval from sunset competition {comp}." + ) + if comp in self.uids_to_eval: + del self.uids_to_eval[comp] + if comp in self.pending_uids_to_eval: + del self.pending_uids_to_eval[comp] + + def _record_eval_results( + self, + top_uid: int, + curr_block: int, + uid_to_state: typing.Dict[int, PerUIDEvalState], + ) -> None: + """Records the results of the evaluation step to the model tracker. + + Args: + top_uid (int): The uid of the model with the higest win rate. + curr_block (int): The current block. + uid_to_state (typing.Dict[int, PerUIDEvalState]): A dictionary mapping uids to their eval state. + """ + top_model_loss = self._compute_avg_loss(uid_to_state[top_uid].losses) + for _, state in uid_to_state.items(): + self.model_tracker.on_model_evaluated( + state.hotkey, + EvalResult( + block=curr_block, + score=self._compute_avg_loss(state.losses), + winning_model_block=uid_to_state[top_uid].block, + winning_model_score=top_model_loss, + ), + ) + + def _compute_avg_loss(self, losses: typing.List[float]) -> float: + """Safely computes the average loss from a list of losses. + + Args: + losses (typing.List[float]): A list of losses. + + Returns: + float: The average loss. + """ + return sum(losses) / len(losses) if losses else math.inf + def log_step( self, competition_id: CompetitionId, + competition_epsilon_func: EpsilonFunc, + current_block: int, uids: typing.List[int], - uid_to_block: typing.Dict[int, int], - uid_to_hf: typing.Dict[int, str], + uid_to_state: typing.Dict[int, PerUIDEvalState], uid_to_competition_id: typing.Dict[int, typing.Optional[int]], pages: typing.List[str], model_weights: typing.List[float], wins: typing.Dict[int, int], win_rate: typing.Dict[int, float], - losses_per_uid: typing.Dict[int, typing.List[float]], load_model_perf: PerfMonitor, compute_loss_perf: PerfMonitor, ): @@ -1000,13 +1160,17 @@ def log_step( model_weights / constants.temperature, dim=0 ) + # All uids in the competition step log are from the same competition. for idx, uid in enumerate(uids): step_log["uid_data"][str(uid)] = { "uid": uid, - "block": uid_to_block[uid], - "hf": uid_to_hf[uid], - "competition_id": uid_to_competition_id[uid], - "average_loss": sum(losses_per_uid[uid]) / len(losses_per_uid[uid]), + "block": uid_to_state[uid].block, + "hf": uid_to_state[uid].repo_name, + "competition_id": int(competition_id), + "average_loss": self._compute_avg_loss(uid_to_state[uid].losses), + "epsilon_adv": competition_epsilon_func.compute_epsilon( + current_block, uid_to_state[uid].block + ), "win_rate": win_rate[uid], "win_total": wins[uid], "weight": self.weights[uid].item(), @@ -1015,7 +1179,8 @@ def log_step( table = Table(title="Step", expand=True) table.add_column("uid", justify="right", style="cyan", no_wrap=True) table.add_column("hf", style="magenta", overflow="fold") - table.add_column("average_loss", style="magenta", overflow="fold") + table.add_column("avg_loss", style="magenta", overflow="fold") + table.add_column("epsilon_adv", style="magenta", overflow="fold") table.add_column("win_rate", style="magenta", overflow="fold") table.add_column("win_total", style="magenta", overflow="fold") table.add_column("total_weight", style="magenta", overflow="fold") @@ -1028,6 +1193,7 @@ def log_step( str(uid), str(step_log["uid_data"][str(uid)]["hf"]), str(round(step_log["uid_data"][str(uid)]["average_loss"], 4)), + str(round(step_log["uid_data"][str(uid)]["epsilon_adv"], 4)), str(round(step_log["uid_data"][str(uid)]["win_rate"], 4)), str(step_log["uid_data"][str(uid)]["win_total"]), str(round(self.weights[uid].item(), 4)), @@ -1044,11 +1210,13 @@ def log_step( table = Table(title="Weights > 0.001") table.add_column("uid", justify="right", style="cyan", no_wrap=True) table.add_column("weight", style="magenta") + table.add_column("comp", style="magenta") for index, weight in list(zip(ui.tolist(), ws.tolist())): if weight > 0.001: table.add_row( str(index), str(round(weight, 4)), + str(uid_to_competition_id[index]), ) console = Console() console.print(table) @@ -1073,15 +1241,16 @@ def log_step( uid_data = step_log["uid_data"] # Create a new dictionary with the required format - with self.metagraph_lock: - block = self.metagraph.block.item() graphed_data = { "time": time.time(), "step_competition_id": competition_id, - "block": block, + "block": current_block, "uid_data": { str(uid): uid_data[str(uid)]["average_loss"] for uid in uids }, + "uid_epsilon_adv": { + str(uid): uid_data[str(uid)]["epsilon_adv"] for uid in uids + }, "win_rate_data": { str(uid): uid_data[str(uid)]["win_rate"] for uid in uids }, @@ -1093,11 +1262,7 @@ def log_step( str(uid): sub_competition_weights[i].item() for i, uid in enumerate(uids) }, - "competition_id": { - str(uid): uid_to_competition_id[uid] - for uid in uids - if uid_to_competition_id[uid] is not None - }, + "competition_id": {str(uid): int(competition_id)}, "load_model_perf": { "min": load_model_perf.min(), "median": load_model_perf.median(), @@ -1142,23 +1307,22 @@ async def run(self): """Runs the validator loop, which continuously evaluates models and sets weights.""" while True: try: + # First run a step. + await self.try_run_step(ttl=60 * 60) + self.global_step += 1 - while ( - self.metagraph.block.item() - self.last_epoch - ) < self.config.blocks_per_epoch: - await self.try_run_step(ttl=60 * 20) - await self.try_sync_metagraph(ttl=60) - self.save_state() - bt.logging.debug( - f"{self.metagraph.block.item() - self.last_epoch } / {self.config.blocks_per_epoch} blocks until next epoch." - ) - self.global_step += 1 + block = self._get_current_block() + # Then check if we should set weights and do so if needed. if not self.config.dont_set_weights and not self.config.offline: - await self.try_set_weights(ttl=60) - self.last_epoch = self.metagraph.block.item() - self.epoch_step += 1 + blocks_until_epoch = block - self.last_epoch + if blocks_until_epoch >= self.config.blocks_per_epoch: + await self.try_set_weights(block=block, ttl=60) + else: + bt.logging.debug( + f"{blocks_until_epoch} / {self.config.blocks_per_epoch} blocks until next epoch." + ) except KeyboardInterrupt: bt.logging.info( "KeyboardInterrupt caught, gracefully closing the wandb run..." diff --git a/pretrain/validation.py b/pretrain/validation.py index c89552e2..618f0b43 100644 --- a/pretrain/validation.py +++ b/pretrain/validation.py @@ -19,15 +19,23 @@ # Tools for performing validation over models. import math -import torch -import typing -import constants import traceback -import numpy as np +import typing + import bittensor as bt +import numpy as np +import torch +from taoverse.model.competition.epsilon import EpsilonFunc -def iswin(loss_i, loss_j, block_i, block_j, epsilon) -> bool: +def iswin( + loss_i: float, + loss_j: float, + block_i: int, + block_j: int, + epsilon_func: EpsilonFunc, + current_block: int, +) -> bool: """ Determines the winner between two models based on the epsilon adjusted loss. @@ -36,14 +44,23 @@ def iswin(loss_i, loss_j, block_i, block_j, epsilon) -> bool: loss_j (float): Loss of uid j on batch. block_i (int): Block of uid i. block_j (int): Block of uid j. - epsilon (float): How much advantage to give to the earlier block. + epsilon_func (EpsilonFunc): Function that determines how much advantage to give to the earlier block. + current_block: The current block. Returns: bool: True if loss i is better, False otherwise. """ - # Adjust loss based on timestamp and pretrain epsilon - loss_i = (1 - epsilon) * loss_i if block_i < block_j else loss_i - loss_j = (1 - epsilon) * loss_j if block_j < block_i else loss_j + # Adjust loss based on timestamp and epsilon. + loss_i = ( + (1 - epsilon_func.compute_epsilon(current_block, block_i)) * loss_i + if block_i < block_j + else loss_i + ) + loss_j = ( + (1 - epsilon_func.compute_epsilon(current_block, block_j)) * loss_j + if block_j < block_i + else loss_j + ) return loss_i < loss_j @@ -52,7 +69,8 @@ def compute_wins( losses_per_uid: typing.Dict[int, typing.List[float]], batches: typing.List[torch.FloatTensor], uid_to_block: typing.Dict[int, int], - epsilon: float + epsilon_func: EpsilonFunc, + current_block: int, ) -> typing.Tuple[typing.Dict[int, int], typing.Dict[int, float]]: """ Computes the wins and win rate for each model based on loss comparison. @@ -62,7 +80,8 @@ def compute_wins( losses_per_uid (dict): A dictionary of losses for each uid by batch. batches (List): A list of data batches. uid_to_block (dict): A dictionary of blocks for each uid. - epsilon (float): How much advantage to give to the earlier block. + epsilon_func (EpsilonFunc): Function that determines how much advantage to give to the earlier block. + current_block: The current block. Returns: tuple: A tuple containing two dictionaries, one for wins and one for win rates. @@ -79,7 +98,13 @@ def compute_wins( for batch_idx, _ in enumerate(batches): loss_i = losses_per_uid[uid_i][batch_idx] loss_j = losses_per_uid[uid_j][batch_idx] - wins[uid_i] += 1 if iswin(loss_i, loss_j, block_i, block_j, epsilon) else 0 + wins[uid_i] += ( + 1 + if iswin( + loss_i, loss_j, block_i, block_j, epsilon_func, current_block + ) + else 0 + ) total_matches += 1 # Calculate win rate for uid i win_rate[uid_i] = wins[uid_i] / total_matches if total_matches > 0 else 0 @@ -145,11 +170,11 @@ def check_for_reasonable_output( def compute_losses( - model, - batches: typing.List[np.ndarray], - device: str, - pad_token_id: int, - sample_packing_used: bool, + model, + batches: typing.List[np.ndarray], + device: str, + pad_token_id: int, + sample_packing_used: bool, ) -> typing.List[float]: """ Computes the losses for a given model on provided batches. @@ -198,8 +223,8 @@ def compute_losses( # For this reason, we want to ignore all but the # first EOS token (the real one) pad_mask = shift_labels == pad_token_id - zeros = torch.zeros_like(shift_labels[...,:1]) - pad_mask = torch.cat((zeros, pad_mask[...,:-1]), dim=-1).bool() + zeros = torch.zeros_like(shift_labels[..., :1]) + pad_mask = torch.cat((zeros, pad_mask[..., :-1]), dim=-1).bool() # Set all the padded labels to -100, since the # CrossEntropyLoss ignores -100 labels by default. shift_labels[pad_mask] = -100 diff --git a/requirements.txt b/requirements.txt index d62cbdd9..f4cde013 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,4 @@ transformers==4.44.1 wandb datasets flash-attn -taoverse==1.0.4 +taoverse==1.0.5 diff --git a/tests/model/test_retry.py b/tests/model/test_retry.py new file mode 100644 index 00000000..a0c06f96 --- /dev/null +++ b/tests/model/test_retry.py @@ -0,0 +1,150 @@ +import math +import unittest + +from taoverse.model.competition.epsilon import FixedEpsilon +from taoverse.model.data import EvalResult + +from model.retry import should_retry_model + + +class RetryTests(unittest.TestCase): + def test_should_retry_model_empty_eval_results(self): + """Verifies that a model is retried if it has never been evaluated.""" + self.assertTrue(should_retry_model(FixedEpsilon(0.005), 10, [])) + + def test_should_retry_model_loss_worse_than_winning_model( + self, + ): + """Verifies that the model is not retried if the loss is worse than the winning model.""" + eval_history = [ + EvalResult( + block=1, score=0.2, winning_model_score=0.19, winning_model_block=1 + ) + ] + self.assertFalse( + should_retry_model( + FixedEpsilon(0.005), curr_block=10, eval_history=eval_history + ) + ) + + def test_should_retry_model_loss_better_than_winning_model(self): + """Verifies that the model is retried if the loss (accounting for epsilon) is within 0.999 of the winning model's loss.""" + + test_cases = [ + # Make loss the same as the winning model and make sure it's never retried. + (0.005, 1.0, False), + (0.001, 1.0, False), + (0.0001, 1.0, False), + # Make loss better than the winning model by 50% (for easy math) and adjust epsilon to test each interesting case. + (0.51, 0.5, False), + (0.5004, 0.5, False), + (0.49, 0.5, True), + ] + for tc in test_cases: + epsilon, model_loss, should_retry = tc + print( + f"Running test with epsilon: {epsilon}, model_loss: {model_loss}, should_retry: {should_retry}" + ) + eval_history = [ + EvalResult( + block=1, + score=model_loss, + winning_model_score=1.0, + winning_model_block=1, + ) + ] + self.assertEqual( + should_retry_model( + FixedEpsilon(epsilon), curr_block=10, eval_history=eval_history + ), + should_retry, + ) + + def test_should_retry_model_uses_last_successful_eval(self): + """Verifies that only the last successful evaluation is used to judge if the model should be retried.""" + + test_cases = [ + # Test case 1: The last successful eval is worse than the winning model. + ( + [ + EvalResult( + block=2, + score=0.9, + winning_model_score=1.0, + winning_model_block=1, + ), + EvalResult( + block=4, + score=1.1, + winning_model_score=1.0, + winning_model_block=1, + ), + ], + False, + ), + # Test case 2: The last successful eval is better than the winning model. + ( + [ + EvalResult( + block=2, + score=1.1, + winning_model_score=1.0, + winning_model_block=1, + ), + EvalResult( + block=4, + score=0.9, + winning_model_score=1.0, + winning_model_block=1, + ), + ], + True, + ), + ] + + for eval_history, expected in test_cases: + # Also inject eval failures into each position to make sure it doesn't impact the result. + for i, result in enumerate(eval_history): + eval_history_copy = eval_history.copy() + # .insert() inserts at the position before the given index. + eval_history_copy.insert( + i, + EvalResult( + block=result.block + 1 if i > 0 else 1, + score=math.inf, + winning_model_score=1.0, + winning_model_block=1, + ), + ) + self.assertEqual( + should_retry_model(FixedEpsilon(0.005), 10, eval_history_copy), + expected, + ) + + def test_should_retry_model_only_failed_evals( + self, + ): + """Verifies that a model is retried if it has only failed evaluations and has not been retried before.""" + test_cases = [ + (1, True), + (2, False), + (3, False), + ] + for num_failures, expected in test_cases: + eval_history = [ + EvalResult( + block=i, + score=math.inf, + winning_model_score=1.0, + winning_model_block=1, + ) + for i in range(1, num_failures + 1) + ] + self.assertEqual( + should_retry_model(FixedEpsilon(0.005), 10, eval_history), + expected, + ) + + +if __name__ == "__main__": + unittest.main()