diff --git a/nemo_skills/pipeline/check_contamination.py b/nemo_skills/pipeline/check_contamination.py index b7c9914e6..416472f81 100644 --- a/nemo_skills/pipeline/check_contamination.py +++ b/nemo_skills/pipeline/check_contamination.py @@ -108,7 +108,7 @@ def check_contamination( if server_address is None: # we need to host the model assert server_gpus is not None, "Need to specify server_gpus if hosting the model" - server_port = get_free_port() + server_port = get_free_port(strategy="random") server_address = f"localhost:{server_port}" server_config = { diff --git a/nemo_skills/pipeline/eval.py b/nemo_skills/pipeline/eval.py index cc42f2e96..9d2020974 100644 --- a/nemo_skills/pipeline/eval.py +++ b/nemo_skills/pipeline/eval.py @@ -153,7 +153,7 @@ def eval( if server_address is None: # we need to host the model assert server_gpus is not None, "Need to specify server_gpus if hosting the model" - server_port = get_free_port() + server_port = get_free_port(strategy="random") server_address = f"localhost:{server_port}" server_config = { @@ -164,7 +164,10 @@ def eval( "server_args": server_args, "server_port": server_port, } + # += is okay here because the args have already been copied in this context extra_arguments += f" ++server.server_type={server_type} " + extra_arguments += f" ++server.host=localhost " + extra_arguments += f" ++server.port={server_port} " else: # model is hosted elsewhere server_config = None extra_arguments += ( diff --git a/nemo_skills/pipeline/generate.py b/nemo_skills/pipeline/generate.py index fc8e31265..4e5a2662a 100644 --- a/nemo_skills/pipeline/generate.py +++ b/nemo_skills/pipeline/generate.py @@ -60,6 +60,7 @@ def get_cmd(output_dir, extra_arguments, random_seed=None, eval_args=None): def get_rm_cmd(output_dir, extra_arguments, random_seed=None, eval_args=None): if eval_args is not None: raise ValueError("Cannot specify eval_args for reward model") + cmd = ( f"python -m nemo_skills.inference.reward_model " f" ++skip_filled=True " @@ -126,7 +127,7 @@ def configure_client( extra_arguments, ): if server_address is None: # we need to host the model - server_port = get_free_port() + server_port = get_free_port(strategy="random") assert server_gpus is not None, "Need to specify server_gpus if hosting the model" server_address = f"localhost:{server_port}" @@ -138,11 +139,15 @@ def configure_client( "server_args": server_args, "server_port": server_port, } - extra_arguments += f" ++server.server_type={server_type} " + extra_arguments = ( + f"{extra_arguments} ++server.server_type={server_type} " + f"++server.host=localhost ++server.port={server_port} " + ) else: # model is hosted elsewhere server_config = None - extra_arguments += ( - f" ++server.server_type={server_type} ++server.base_url={server_address} ++server.model={model} " + extra_arguments = ( + f"{extra_arguments} ++server.server_type={server_type} " + f"++server.base_url={server_address} ++server.model={model} " ) server_port = None return server_config, extra_arguments, server_address, server_port @@ -224,9 +229,10 @@ def generate( original_server_address = server_address with run.Experiment(expname) as exp: + extra_arguments_original = extra_arguments if random_seeds: for seed in random_seeds: - server_port = get_free_port() + server_port = get_free_port(strategy="random") server_config, extra_arguments, server_address, server_port = configure_client( generation_type=generation_type, server_gpus=server_gpus, @@ -236,7 +242,7 @@ def generate( server_nodes=server_nodes, model=model, server_args=server_args, - extra_arguments=extra_arguments, + extra_arguments=extra_arguments_original, ) cmd = get_cmd( @@ -271,7 +277,7 @@ def generate( ) prev_tasks = [new_task] else: - server_port = get_free_port() + server_port = get_free_port(strategy="random") server_config, extra_arguments, server_address, server_port = configure_client( generation_type=generation_type, server_gpus=server_gpus, @@ -281,7 +287,7 @@ def generate( server_nodes=server_nodes, model=model, server_args=server_args, - extra_arguments=extra_arguments, + extra_arguments=extra_arguments_original, ) cmd = get_cmd( random_seed=None, diff --git a/nemo_skills/pipeline/start_server.py b/nemo_skills/pipeline/start_server.py index 63d54d0ca..dc17c892d 100644 --- a/nemo_skills/pipeline/start_server.py +++ b/nemo_skills/pipeline/start_server.py @@ -72,7 +72,7 @@ def start_server( "num_gpus": server_gpus, "num_nodes": server_nodes, "server_args": server_args, - "server_port": get_free_port(), + "server_port": get_free_port(strategy="random"), } with run.Experiment("server") as exp: diff --git a/nemo_skills/pipeline/utils.py b/nemo_skills/pipeline/utils.py index 6f96d021b..d9fe6a2db 100644 --- a/nemo_skills/pipeline/utils.py +++ b/nemo_skills/pipeline/utils.py @@ -18,6 +18,7 @@ import subprocess import sys import tarfile +from contextlib import contextmanager from dataclasses import dataclass from functools import lru_cache from pathlib import Path @@ -92,13 +93,22 @@ def _get_handles(exp): raise ValueError(f"Experiment {expname} not found!") -def get_free_port(exclude: list[int] | None = None): +def get_free_port(exclude: list[int] | None = None, strategy: int | str = 5000) -> int: """Will return a free port on the host.""" exclude = exclude or [] - port = 5000 - while port in exclude: - port += 1 - return port + if isinstance(strategy, int): + port = strategy + while port in exclude: + port += 1 + return port + elif strategy == "random": + import random + port = random.randint(1024, 65535) + while port in exclude: + port = random.randint(1024, 65535) + return port + else: + raise ValueError(f"Strategy {strategy} not supported.") def get_generation_command(server_address, generation_commands): @@ -136,7 +146,7 @@ def get_reward_server_command( check_if_mounted(cluster_config, model_path) if server_type == 'nemo': - nemo_aligner_reward_model_port = get_free_port(exclude=[server_port]) + nemo_aligner_reward_model_port = get_free_port(strategy="random", exclude=[server_port]) server_start_cmd = ( # Note: The order of the two commands is important as the reward model server # needs to be the first command so it can get the HF_TOKEN from the environment @@ -693,6 +703,8 @@ def get_executor( # we need to be explicit about this in srun as commands might need to run in parallel f"--ntasks={tasks_per_node * num_nodes}", f"--nodes={num_nodes}", + # NeMo-run should take care of this, but we'll put it here temporarily + f"--container-env={','.join([k.strip() for k in env_vars.keys()])}", ] if not cluster_config.get("disable_gpus_per_node", False) and gpus_per_node is not None: srun_args.append(f"--gpus-per-node={gpus_per_node}") @@ -709,7 +721,6 @@ def get_executor( container_mounts=mounts, time=timeout, additional_parameters=additional_parameters, - exclusive=True, # TODO: remove after we fix port conflicts packager=packager, gpus_per_node=gpus_per_node if not cluster_config.get("disable_gpus_per_node", False) else None, srun_args=srun_args, @@ -727,6 +738,17 @@ def get_executor( **(slurm_kwargs or {}), ) +@contextmanager +def temporary_env_update(cluster_config, updates): + original_env_vars = cluster_config.get("env_vars", []).copy() + updated_env_vars = original_env_vars.copy() + for key, value in updates.items(): + updated_env_vars.append(f"{key}={value}") + cluster_config["env_vars"] = updated_env_vars + try: + yield + finally: + cluster_config["env_vars"] = original_env_vars def add_task( exp, @@ -787,6 +809,8 @@ def add_task( if not 'cpu' in (partition or cluster_config.get("partition", "")): num_gpus = 1 + sandbox_port = get_free_port(strategy="random") + commands = [] executors = [] # assuming server always has the largest resources request, so it needs to go first @@ -818,45 +842,47 @@ def add_task( if cmd: if cluster_config["executor"] == "local" and num_tasks > 1: cmd = f"mpirun --allow-run-as-root -np {num_tasks} bash -c {shlex.quote(cmd)}" - commands.append(cmd) - executors.append( - get_executor( + with temporary_env_update(cluster_config, {"NEMO_SKILLS_SANDBOX_PORT": sandbox_port}): + commands.append(cmd) + executors.append( + get_executor( + cluster_config=cluster_config, + container=container, + num_nodes=num_nodes, + tasks_per_node=num_tasks, + gpus_per_node=num_gpus, + partition=partition, + time_min=time_min, + dependencies=dependencies, + job_name=task_name, + log_dir=log_dir, + log_prefix="main", + extra_package_dirs=extra_package_dirs, + slurm_kwargs=slurm_kwargs, + ) + ) + + # finally a sandbox if needed + if with_sandbox: + with temporary_env_update(cluster_config, {"LISTEN_PORT": sandbox_port}): + commands.append(get_sandox_command()) + sandbox_executor = get_executor( cluster_config=cluster_config, - container=container, - num_nodes=num_nodes, - tasks_per_node=num_tasks, + container=cluster_config["containers"]["sandbox"], + num_nodes=executors[0].nodes if cluster_config["executor"] == "slurm" else 1, + tasks_per_node=1, gpus_per_node=num_gpus, partition=partition, time_min=time_min, + mounts=tuple(), # we don't want to mount anything dependencies=dependencies, job_name=task_name, log_dir=log_dir, - log_prefix="main", + log_prefix="sandbox", extra_package_dirs=extra_package_dirs, slurm_kwargs=slurm_kwargs, ) - ) - - # finally a sandbox if needed - if with_sandbox: - sandbox_executor = get_executor( - cluster_config=cluster_config, - container=cluster_config["containers"]["sandbox"], - num_nodes=executors[0].nodes if cluster_config["executor"] == "slurm" else 1, - tasks_per_node=1, - gpus_per_node=num_gpus, - partition=partition, - time_min=time_min, - mounts=tuple(), # we don't want to mount anything - dependencies=dependencies, - job_name=task_name, - log_dir=log_dir, - log_prefix="sandbox", - extra_package_dirs=extra_package_dirs, - slurm_kwargs=slurm_kwargs, - ) - commands.append(get_sandox_command()) - executors.append(sandbox_executor) + executors.append(sandbox_executor) if reuse_code_exp is not None: tunnel = get_tunnel(cluster_config)