Skip to content

Commit

Permalink
Update port strategy to random assignment (#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwarmstrong authored Dec 19, 2024
1 parent 9bfc68f commit e2a934e
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 47 deletions.
2 changes: 1 addition & 1 deletion nemo_skills/pipeline/check_contamination.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def check_contamination(

if server_address is None: # we need to host the model
assert server_gpus is not None, "Need to specify server_gpus if hosting the model"
server_port = get_free_port()
server_port = get_free_port(strategy="random")
server_address = f"localhost:{server_port}"

server_config = {
Expand Down
5 changes: 4 additions & 1 deletion nemo_skills/pipeline/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def eval(

if server_address is None: # we need to host the model
assert server_gpus is not None, "Need to specify server_gpus if hosting the model"
server_port = get_free_port()
server_port = get_free_port(strategy="random")
server_address = f"localhost:{server_port}"

server_config = {
Expand All @@ -164,7 +164,10 @@ def eval(
"server_args": server_args,
"server_port": server_port,
}
# += is okay here because the args have already been copied in this context
extra_arguments += f" ++server.server_type={server_type} "
extra_arguments += f" ++server.host=localhost "
extra_arguments += f" ++server.port={server_port} "
else: # model is hosted elsewhere
server_config = None
extra_arguments += (
Expand Down
22 changes: 14 additions & 8 deletions nemo_skills/pipeline/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def get_cmd(output_dir, extra_arguments, random_seed=None, eval_args=None):
def get_rm_cmd(output_dir, extra_arguments, random_seed=None, eval_args=None):
if eval_args is not None:
raise ValueError("Cannot specify eval_args for reward model")

cmd = (
f"python -m nemo_skills.inference.reward_model "
f" ++skip_filled=True "
Expand Down Expand Up @@ -126,7 +127,7 @@ def configure_client(
extra_arguments,
):
if server_address is None: # we need to host the model
server_port = get_free_port()
server_port = get_free_port(strategy="random")
assert server_gpus is not None, "Need to specify server_gpus if hosting the model"
server_address = f"localhost:{server_port}"

Expand All @@ -138,11 +139,15 @@ def configure_client(
"server_args": server_args,
"server_port": server_port,
}
extra_arguments += f" ++server.server_type={server_type} "
extra_arguments = (
f"{extra_arguments} ++server.server_type={server_type} "
f"++server.host=localhost ++server.port={server_port} "
)
else: # model is hosted elsewhere
server_config = None
extra_arguments += (
f" ++server.server_type={server_type} ++server.base_url={server_address} ++server.model={model} "
extra_arguments = (
f"{extra_arguments} ++server.server_type={server_type} "
f"++server.base_url={server_address} ++server.model={model} "
)
server_port = None
return server_config, extra_arguments, server_address, server_port
Expand Down Expand Up @@ -224,9 +229,10 @@ def generate(
original_server_address = server_address

with run.Experiment(expname) as exp:
extra_arguments_original = extra_arguments
if random_seeds:
for seed in random_seeds:
server_port = get_free_port()
server_port = get_free_port(strategy="random")
server_config, extra_arguments, server_address, server_port = configure_client(
generation_type=generation_type,
server_gpus=server_gpus,
Expand All @@ -236,7 +242,7 @@ def generate(
server_nodes=server_nodes,
model=model,
server_args=server_args,
extra_arguments=extra_arguments,
extra_arguments=extra_arguments_original,
)

cmd = get_cmd(
Expand Down Expand Up @@ -271,7 +277,7 @@ def generate(
)
prev_tasks = [new_task]
else:
server_port = get_free_port()
server_port = get_free_port(strategy="random")
server_config, extra_arguments, server_address, server_port = configure_client(
generation_type=generation_type,
server_gpus=server_gpus,
Expand All @@ -281,7 +287,7 @@ def generate(
server_nodes=server_nodes,
model=model,
server_args=server_args,
extra_arguments=extra_arguments,
extra_arguments=extra_arguments_original,
)
cmd = get_cmd(
random_seed=None,
Expand Down
2 changes: 1 addition & 1 deletion nemo_skills/pipeline/start_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def start_server(
"num_gpus": server_gpus,
"num_nodes": server_nodes,
"server_args": server_args,
"server_port": get_free_port(),
"server_port": get_free_port(strategy="random"),
}

with run.Experiment("server") as exp:
Expand Down
98 changes: 62 additions & 36 deletions nemo_skills/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import subprocess
import sys
import tarfile
from contextlib import contextmanager
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
Expand Down Expand Up @@ -92,13 +93,22 @@ def _get_handles(exp):
raise ValueError(f"Experiment {expname} not found!")


def get_free_port(exclude: list[int] | None = None):
def get_free_port(exclude: list[int] | None = None, strategy: int | str = 5000) -> int:
"""Will return a free port on the host."""
exclude = exclude or []
port = 5000
while port in exclude:
port += 1
return port
if isinstance(strategy, int):
port = strategy
while port in exclude:
port += 1
return port
elif strategy == "random":
import random
port = random.randint(1024, 65535)
while port in exclude:
port = random.randint(1024, 65535)
return port
else:
raise ValueError(f"Strategy {strategy} not supported.")


def get_generation_command(server_address, generation_commands):
Expand Down Expand Up @@ -136,7 +146,7 @@ def get_reward_server_command(
check_if_mounted(cluster_config, model_path)

if server_type == 'nemo':
nemo_aligner_reward_model_port = get_free_port(exclude=[server_port])
nemo_aligner_reward_model_port = get_free_port(strategy="random", exclude=[server_port])
server_start_cmd = (
# Note: The order of the two commands is important as the reward model server
# needs to be the first command so it can get the HF_TOKEN from the environment
Expand Down Expand Up @@ -693,6 +703,8 @@ def get_executor(
# we need to be explicit about this in srun as commands might need to run in parallel
f"--ntasks={tasks_per_node * num_nodes}",
f"--nodes={num_nodes}",
# NeMo-run should take care of this, but we'll put it here temporarily
f"--container-env={','.join([k.strip() for k in env_vars.keys()])}",
]
if not cluster_config.get("disable_gpus_per_node", False) and gpus_per_node is not None:
srun_args.append(f"--gpus-per-node={gpus_per_node}")
Expand All @@ -709,7 +721,6 @@ def get_executor(
container_mounts=mounts,
time=timeout,
additional_parameters=additional_parameters,
exclusive=True, # TODO: remove after we fix port conflicts
packager=packager,
gpus_per_node=gpus_per_node if not cluster_config.get("disable_gpus_per_node", False) else None,
srun_args=srun_args,
Expand All @@ -727,6 +738,17 @@ def get_executor(
**(slurm_kwargs or {}),
)

@contextmanager
def temporary_env_update(cluster_config, updates):
original_env_vars = cluster_config.get("env_vars", []).copy()
updated_env_vars = original_env_vars.copy()
for key, value in updates.items():
updated_env_vars.append(f"{key}={value}")
cluster_config["env_vars"] = updated_env_vars
try:
yield
finally:
cluster_config["env_vars"] = original_env_vars

def add_task(
exp,
Expand Down Expand Up @@ -787,6 +809,8 @@ def add_task(
if not 'cpu' in (partition or cluster_config.get("partition", "")):
num_gpus = 1

sandbox_port = get_free_port(strategy="random")

commands = []
executors = []
# assuming server always has the largest resources request, so it needs to go first
Expand Down Expand Up @@ -818,45 +842,47 @@ def add_task(
if cmd:
if cluster_config["executor"] == "local" and num_tasks > 1:
cmd = f"mpirun --allow-run-as-root -np {num_tasks} bash -c {shlex.quote(cmd)}"
commands.append(cmd)
executors.append(
get_executor(
with temporary_env_update(cluster_config, {"NEMO_SKILLS_SANDBOX_PORT": sandbox_port}):
commands.append(cmd)
executors.append(
get_executor(
cluster_config=cluster_config,
container=container,
num_nodes=num_nodes,
tasks_per_node=num_tasks,
gpus_per_node=num_gpus,
partition=partition,
time_min=time_min,
dependencies=dependencies,
job_name=task_name,
log_dir=log_dir,
log_prefix="main",
extra_package_dirs=extra_package_dirs,
slurm_kwargs=slurm_kwargs,
)
)

# finally a sandbox if needed
if with_sandbox:
with temporary_env_update(cluster_config, {"LISTEN_PORT": sandbox_port}):
commands.append(get_sandox_command())
sandbox_executor = get_executor(
cluster_config=cluster_config,
container=container,
num_nodes=num_nodes,
tasks_per_node=num_tasks,
container=cluster_config["containers"]["sandbox"],
num_nodes=executors[0].nodes if cluster_config["executor"] == "slurm" else 1,
tasks_per_node=1,
gpus_per_node=num_gpus,
partition=partition,
time_min=time_min,
mounts=tuple(), # we don't want to mount anything
dependencies=dependencies,
job_name=task_name,
log_dir=log_dir,
log_prefix="main",
log_prefix="sandbox",
extra_package_dirs=extra_package_dirs,
slurm_kwargs=slurm_kwargs,
)
)

# finally a sandbox if needed
if with_sandbox:
sandbox_executor = get_executor(
cluster_config=cluster_config,
container=cluster_config["containers"]["sandbox"],
num_nodes=executors[0].nodes if cluster_config["executor"] == "slurm" else 1,
tasks_per_node=1,
gpus_per_node=num_gpus,
partition=partition,
time_min=time_min,
mounts=tuple(), # we don't want to mount anything
dependencies=dependencies,
job_name=task_name,
log_dir=log_dir,
log_prefix="sandbox",
extra_package_dirs=extra_package_dirs,
slurm_kwargs=slurm_kwargs,
)
commands.append(get_sandox_command())
executors.append(sandbox_executor)
executors.append(sandbox_executor)

if reuse_code_exp is not None:
tunnel = get_tunnel(cluster_config)
Expand Down

0 comments on commit e2a934e

Please sign in to comment.