Skip to content

Commit

Permalink
Release 2.14.2 (#490)
Browse files Browse the repository at this point in the history
Co-authored-by: richwardle <[email protected]>
Co-authored-by: Hollyqui <[email protected]>
Co-authored-by: Hollyqui <[email protected]>
  • Loading branch information
4 people authored Dec 11, 2024
1 parent c3d9c95 commit f6eeaf7
Show file tree
Hide file tree
Showing 15 changed files with 408 additions and 372 deletions.
2 changes: 2 additions & 0 deletions .env.validator.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# The network UID. If 1 for main, 61 for testnet.
NETUID = 61
DEPLOY_API = false
API_PORT = 8094

# The network name [test, finney, local].
SUBTENSOR_NETWORK = "test"
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ __pycache__/
.DS_Store
**/.DS_Store

*.npy
*.npz
prompting/storage/

Expand Down Expand Up @@ -179,3 +180,4 @@ wandb
.vscode
api_keys.json
prompting/api/api_keys.json
AutoAWQ
232 changes: 26 additions & 206 deletions neurons/validator.py
Original file line number Diff line number Diff line change
@@ -1,242 +1,62 @@
# ruff: noqa: E402
import asyncio
import json
import time

from loguru import logger

from prompting import settings
from prompting.utils.profiling import profiler

settings.settings = settings.Settings.load(mode="validator")
settings = settings.settings

from loguru import logger

from prompting import mutable_globals
from prompting.api.api import start_api
from prompting.base.dendrite import DendriteResponseEvent
from prompting.base.epistula import query_miners
from prompting.base.forward import log_stream_results
from prompting.base.validator import BaseValidatorNeuron
from prompting.llms.model_manager import model_scheduler
from prompting.llms.utils import GPUInfo
from prompting.miner_availability.miner_availability import availability_checking_loop, miner_availabilities
from prompting.mutable_globals import scoring_queue
from prompting.miner_availability.miner_availability import availability_checking_loop
from prompting.rewards.scoring import task_scorer
from prompting.tasks.base_task import BaseTextTask
from prompting.tasks.task_creation import task_loop
from prompting.utils.logging import ErrorLoggingEvent, ValidatorLoggingEvent
from prompting.utils.timer import Timer
from prompting.tasks.task_sending import task_sender
from prompting.weight_setting.weight_setter import weight_setter

NEURON_SAMPLE_SIZE = 100


class Validator(BaseValidatorNeuron):
"""Text prompt validator neuron."""

def __init__(self, config=None):
super(Validator, self).__init__(config=config)
self.load_state()
self._lock = asyncio.Lock()
self.time_of_block_sync = None

@property
def estimate_block(self):
"""
Estimate the current block number based on the time since the last block sync.
Returns:
Optional[int]: The estimated block number or None if an error occurs.
"""

if self.time_of_block_sync is None:
block = self.block
return block

# Calculate the block based on the time since the last block
time_since_last_block = time.time() - self.time_of_block_sync
# A block happens every 12 seconds
blocks_since_last_block = time_since_last_block // 12
estimated_block = int(self._block + blocks_since_last_block)

return estimated_block

async def run_step(self, k: int, timeout: float) -> ValidatorLoggingEvent | ErrorLoggingEvent | None:
"""Executes a single step of the agent, which consists of:
- Getting a list of uids to query
- Querying the network
- Rewarding the network
- Updating the scores
- Logging the event
Args:
agent (HumanAgent): The agent to run the step for.
roles (List[str]): The roles for the synapse.
messages (List[str]): The messages for the synapse.
k (int): The number of uids to query.
timeout (float): The timeout for the queries.
exclude (list, optional): The list of uids to exclude from the query. Defaults to [].
"""
while len(scoring_queue) > settings.SCORING_QUEUE_LENGTH_THRESHOLD:
# logger.debug("Scoring queue is full. Waiting 1 second...")
await asyncio.sleep(1)
while len(mutable_globals.task_queue) == 0:
# logger.warning("No tasks in queue. Waiting 1 second...")
await asyncio.sleep(1)
try:
# get task from the task queue
mutable_globals.task_queue: list[BaseTextTask]
task = mutable_globals.task_queue.pop(0)

# send the task to the miners and collect the responses
with Timer() as timer:
response_event = await self.collect_responses(task=task)
if response_event is None:
logger.warning("No response event collected. This should not be happening.")
return
logger.debug(f"Collected responses in {timer.final_time:.2f} seconds")

# scoring_manager will score the responses as and when the correct model is loaded
task_scorer.add_to_queue(
task=task,
response=response_event,
dataset_entry=task.dataset_entry,
block=self.estimate_block,
step=self.step,
task_id=task.task_id,
)

# Log the step event.
return ValidatorLoggingEvent(
block=self.estimate_block,
step=self.step,
step_time=timer.final_time,
response_event=response_event,
task_id=task.task_id,
)

except Exception as ex:
logger.exception(ex)
return ErrorLoggingEvent(
error=str(ex),
)

async def collect_responses(self, task: BaseTextTask) -> DendriteResponseEvent | None:
# Get the list of uids and their axons to query for this step.
uids = miner_availabilities.get_available_miners(task=task, model=task.llm_model_id, k=NEURON_SAMPLE_SIZE)
logger.debug(f"🔍 Querying uids: {uids}")
if len(uids) == 0:
logger.warning("No available miners. This should already have been caught earlier.")
return

body = {
"seed": task.seed,
"sampling_parameters": task.sampling_params,
"task": task.__class__.__name__,
"model": task.llm_model_id,
"messages": [
{"role": "user", "content": task.query},
],
}
body_bytes = json.dumps(body).encode("utf-8")
stream_results = await query_miners(uids, body_bytes)

log_stream_results(stream_results)

response_event = DendriteResponseEvent(
stream_results=stream_results, uids=uids, timeout=settings.NEURON_TIMEOUT
)
return response_event

async def forward(self):
"""
Encapsulates a full conversation between the validator and miners. Contains one or more rounds of request-response.
"""
logger.info("🚀 Starting forward loop...")
with Timer() as timer:
# in run_step, a task is generated and sent to the miners
async with self._lock:
event = await self.run_step(
k=NEURON_SAMPLE_SIZE,
timeout=settings.NEURON_TIMEOUT,
)

if not event:
return

event.forward_time = timer.final_time

def __enter__(self):
if settings.NO_BACKGROUND_THREAD:
logger.warning("Running validator in main thread.")
self.run()
else:
self.run_in_background_thread()

return self

def __exit__(self, exc_type, exc_value, traceback):
"""
Stops the validator's background operations upon exiting the context.
This method facilitates the use of the validator in a 'with' statement.
Args:
exc_type: The type of the exception that caused the context to be exited.
None if the context was exited without an exception.
exc_value: The instance of the exception that caused the context to be exited.
None if the context was exited without an exception.
traceback: A traceback object encoding the stack trace.
None if the context was exited without an exception.
"""
if self.is_running:
logger.debug("Stopping validator in background thread.")
self.should_exit = True
self.thread.join(5)
self.is_running = False
logger.debug("Stopped")


async def main():
# will start checking the availability of miners at regular intervals, needed for API and Validator
asyncio.create_task(availability_checking_loop.start())

if settings.DEPLOY_API:
asyncio.create_task(start_api())

GPUInfo.log_gpu_info()
# start profiling
asyncio.create_task(profiler.print_stats())
if settings.DEPLOY_VALIDATOR:
# start profiling
asyncio.create_task(profiler.print_stats())

# start rotating LLM models
asyncio.create_task(model_scheduler.start())
# start rotating LLM models
asyncio.create_task(model_scheduler.start())

# start creating tasks
asyncio.create_task(task_loop.start())
# start creating tasks
asyncio.create_task(task_loop.start())

# will start checking the availability of miners at regular intervals
asyncio.create_task(availability_checking_loop.start())
# start sending tasks to miners
asyncio.create_task(task_sender.start())

# sets weights at regular intervals (synchronised between all validators)
asyncio.create_task(weight_setter.start())

# sets weights at regular intervals (synchronised between all validators)
asyncio.create_task(weight_setter.start())
# start scoring tasks in separate loop
asyncio.create_task(task_scorer.start())

# start scoring tasks in separate loop
asyncio.create_task(task_scorer.start())
# # TODO: Think about whether we want to store the task queue locally in case of a crash
# # TODO: Possibly run task scorer & model scheduler with a lock so I don't unload a model whilst it's generating
# # TODO: Make weight setting happen as specific intervals as we load/unload models
with Validator() as v:
while True:
logger.info(
f"Validator running:: network: {settings.SUBTENSOR.network} "
f"| block: {v.estimate_block} "
f"| step: {v.step} "
f"| uid: {v.uid} "
f"| last updated: {v.estimate_block - settings.METAGRAPH.last_update[v.uid]} "
f"| vtrust: {settings.METAGRAPH.validator_trust[v.uid]:.3f} "
f"| emission {settings.METAGRAPH.emission[v.uid]:.3f}"
)
print(v.block)
time.sleep(5)

if v.should_exit:
logger.warning("Ending validator...")
start = time.time()
while True:
await asyncio.sleep(1)
time_diff = -start + (start := time.time())
logger.debug(f"Running {time_diff:.2f} seconds")


# The main function parses the configuration and runs the validator.
Expand Down
3 changes: 2 additions & 1 deletion prompting/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from prompting.api.api_managements.api import router as api_management_router
from prompting.api.gpt_endpoints.api import router as gpt_router
from prompting.api.miner_availabilities.api import router as miner_availabilities_router
from prompting.settings import settings

app = FastAPI()

Expand All @@ -25,4 +26,4 @@ def health():
# if __name__ == "__main__":
async def start_api():
logger.info("Starting API...")
uvicorn.run("prompting.api.api:app", host="0.0.0.0", port=8004, loop="asyncio", reload=False)
uvicorn.run("prompting.api.api:app", host="0.0.0.0", port=settings.API_PORT, reload=False)
Loading

0 comments on commit f6eeaf7

Please sign in to comment.