diff --git a/README.md b/README.md index 7a81be2..e88aa71 100644 --- a/README.md +++ b/README.md @@ -387,7 +387,68 @@ modal serve modal_server_vllm ```shell Python modal deploy modal_server_vllm ``` - + +## Quick Cloud Deployment + +We use [Skypilot](https://skypilot.readthedocs.io/en/latest/) to deploy Functionary models onto various clouds. Currently, we support the following clouds: +- Lambdalabs +- RunPod + +### Get Started + +1. Install night version of Skypilot (we currently use the 2024-10-23 version): + +```bash +pip install skypilot-nightly[all]==1.0.0.dev20241023 +``` + +2. Set up your cloud credentials by following the instructions [here](https://skypilot.readthedocs.io/en/latest/getting-started/installation.html#cloud-account-setup) + +### Inference + +Use the `deploy_skypilot.py` script to deploy a Functionary model onto various clouds using Skypilot. + +#### Usage + +1. Run the following command to check the available arguments: +```bash +python deploy_skypilot.py --help +``` + +2. For Lambdalabs, please expose the port for the server manually first [here](https://cloud.lambdalabs.com/firewall) before running `deploy_skypilot.py`. + +3. By default, `args.detach_run` is enabled. To stream the job logs, enter `sky logs ` If you want to run the command in the foreground, please set `args.detach_run` to `False`. + +4. SkyPilot does not support stopping instances both Lambdalabs and RunPod currently. To terminate the cluster, run the following command: +```bash +sky down +``` + +### Training + +Use the `train_skypilot.py` script to train a Functionary model using Skypilot. It performs all the steps mentioned in the [Training](functionary/train/README.md) section. In addition, it automatically uploads the trained model to Hugging Face at the end of the training job. + +#### Usage + +1. Run the following command to check the available arguments: + +```bash +python train_skypilot.py --help +``` + +2. `train_skypilot.py` accepts the same training command in [Training](functionary/train/README.md) section. Please write the training command into a shell script file, e.g., `train.sh`, and pass it to `train_skypilot.py` using the `--train-command-file` argument. + +3. When using Skypilot, we will mount the data files to the cluster. Therefore, you should not specify the `train_data_path` and `eval_data_path` in the training command. Instead, you should specify the paths to the data files in the `train_skypilot.py` script using the `--train-data-path` and `--eval-data-path` arguments. + +4. To successfully upload the trained model to Hugging Face post-training as well as log the training process to Weights & Biases, please provide your WandB and Hugging Face tokens to the `train_skypilot.py` script using the `--wandb-token` and `--hf-token` arguments. The HF repository will be in `args.hf_organization` and with the name stated in `output_dir` in the training command file. + +**Example Command** + +```bash +python train_skypilot.py --cluster-name train-cluster --method lora --cloud runpod --accelerators A100-80GB-SXM --num-accelerators 8 --train-command-file train.sh --train-data-path train_dataset.jsonl --eval-data-path eval_dataset.jsonl --wandb-token --hf-token --hf-organization +``` + + # Use Cases Here are a few examples of how you can use this function calling system: diff --git a/deploy_skypilot.py b/deploy_skypilot.py new file mode 100644 index 0000000..ff2e373 --- /dev/null +++ b/deploy_skypilot.py @@ -0,0 +1,240 @@ +import argparse +import logging + +import sky + +from functionary.skypilot_utils import ( + CLOUD_MAPPING, + check_features, + form_setup, + get_cloud_provider, +) + +# Set up logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +def form_command() -> str: + """ + Form the command to run the vLLM server. + + This function constructs the command string to start the vLLM server + based on the provided arguments. It includes the model, port, host, + and optional parameters like max_model_len and tensor_parallel_size. + + Returns: + str: The formatted command string to run the vLLM server. + """ + if args.docker_image: + command = f"sudo docker run --gpus all --shm-size 1g {args.docker_image}" + else: + command = "cd functionary && " + if args.backend == "vllm": + command += f"python server_vllm.py" + else: + command += f"python server_sglang.py" + + command += f" --model {args.model} --port {args.port} --host {args.host}" + if args.max_model_len is not None: + if args.backend == "vllm": + command += f" --max-model-len {args.max_model_len}" + else: + command += f" --context-length {args.max_model_len}" + if args.tensor_parallel_size is not None: + command += f" --tensor-parallel-size {args.tensor_parallel_size}" + + return command + + +def main(): + """ + Main function to deploy a Functionary model using Skypilot. + + This function performs the following steps: + 1. Retrieves the cloud provider based on the specified argument. + 2. Checks the features supported by the cloud provider. + 3. Creates a Skypilot Task with the necessary setup and run commands. + 4. Sets the resources for the task, including cloud, accelerators, ports, and disk size. + 5. Launches the task using Skypilot, with specified cluster name and optional timeout settings. + + Side effects: + - Modifies global 'args' object based on cloud provider features. + - Launches a Skypilot task, which may create or modify cloud resources. + + Raises: + Any exceptions raised by Skypilot during task creation or launch. + """ + cloud = get_cloud_provider(cloud_name=args.cloud) + check_features(cloud=cloud, args=args, logger=logger) + + envs = {} + + if args.docker_image: + envs["DOCKER_USERNAME"] = args.docker_username + envs["DOCKER_PASSWORD"] = args.docker_password + setup = f"docker login --username $DOCKER_USERNAME --password $DOCKER_PASSWORD" + else: + setup = form_setup(args=args) + if args.backend == "vllm": + setup += "pip install -e .[vllm]" + else: + setup += "pip install -e .[sglang] --find-links https://flashinfer.ai/whl/cu121/torch2.4/flashinfer/" + + # Authenticate HF if token is provided + if args.hf_token: + envs["HF_TOKEN"] = args.hf_token + if args.docker_image is None: + setup += f" && huggingface-cli login --token $HF_TOKEN" + + task = sky.Task( + setup=setup, + run=form_command(), + envs=envs, + workdir=None, + ) + + task.set_resources( + sky.Resources( + cloud=cloud, + accelerators=f"{args.accelerators}:{args.num_accelerators}", + ports=args.port_to_open, + disk_size=args.disk_size, + region=args.region, + ) + ) + + sky.launch( + task, + cluster_name=args.cluster_name, + idle_minutes_to_autostop=args.idle_timeout, + down=args.down, + detach_run=args.detach_run, + ) + + +def parse_args(): + parser = argparse.ArgumentParser(description="Deploy Skypilot") + parser.add_argument( + "--cluster-name", type=str, required=True, help="Name of the cluster" + ) + parser.add_argument( + "--docker-image", + type=str, + default=None, + help="Docker image to run. If None, setup and run commands will be used instead.", + ) + parser.add_argument( + "--docker-username", + type=str, + default=None, + help="Docker username to use. Only used if docker-image is provided.", + ) + parser.add_argument( + "--docker-password", + type=str, + default=None, + help="Docker password to use. Only used if docker-image is provided.", + ) + parser.add_argument( + "--commit", + type=str, + default=None, + help="Provide a commit hash to deploy a specific version of Functionary. If None, the latest commit in the main branch will be deployed.", + ) + parser.add_argument( + "--backend", + type=str, + choices=["vllm", "sglang"], + default="vllm", + help="Backend inference framework to use. (Currently either `vllm` or `sglang`)", + ) + parser.add_argument( + "--cloud", + type=str, + default=None, + help=f"Cloud provider (default: None). Currently only supports {list(CLOUD_MAPPING.keys())}", + ) + parser.add_argument( + "--accelerators", + type=str, + default="A100", + help="Accelerator type. Check available types with `sky show-gpus --all`", + ) + parser.add_argument( + "--num-accelerators", + type=int, + default=1, + help="Number of accelerators. Check available values with `sky show-gpus --all`", + ) + parser.add_argument( + "--disk-size", + type=str, + default=256, + help="The size of the OS disk in GiB. If None, defaults to 256 GiB", + ) + parser.add_argument( + "--region", type=str, default=None, help="Region (default: None)" + ) + parser.add_argument( + "--idle-timeout", + type=int, + default=-1, + help="Idle timeout in minutes. `-1` means no timeout", + ) + parser.add_argument( + "--down", + type=bool, + default=False, + help="Whether to tear down the cluster when timeout", + ) + parser.add_argument( + "--model", + type=str, + default="meetkai/functionary-small-v3.2", + help="Model to use", + ) + parser.add_argument("--max-model-len", type=int, default=None, help="Model to use") + parser.add_argument( + "--tensor-parallel-size", type=int, default=1, help="Tensor parallel size" + ) + parser.add_argument("--port", type=int, default=8000, help="Port to use") + parser.add_argument("--host", type=str, default="0.0.0.0", help="host to use") + parser.add_argument( + "--detach-run", + type=bool, + default=True, + help="Detach run upon job to run server is submitted.", + ) + parser.add_argument( + "--hf-token", + type=str, + default=None, + help="Hugging Face token for downloading models. Only use this is the model is gated or private.", + ) + + args = parser.parse_args() + + if args.docker_image: + if args.docker_username is None or args.docker_password is None: + raise ValueError( + "Docker username and password must be provided if docker-image is used." + ) + if args.cloud == "runpod": + raise ValueError("Runpod does not support docker images.") + + if args.disk_size is None: + args.disk_size = 256 + args.disk_size = min(int(args.disk_size), 1024) # Set max disk size to 1TB + if args.idle_timeout == -1: + args.idle_timeout = None + args.port_to_open = args.port + + return args + + +if __name__ == "__main__": + args = parse_args() + main() diff --git a/functionary/skypilot_utils.py b/functionary/skypilot_utils.py new file mode 100644 index 0000000..6a57e66 --- /dev/null +++ b/functionary/skypilot_utils.py @@ -0,0 +1,82 @@ +import argparse +import logging + +import sky + +CLOUD_MAPPING = { + "lambda": sky.Lambda(), + "runpod": sky.RunPod(), +} + + +def get_cloud_provider(cloud_name: str) -> sky.clouds.Cloud: + """ + Get the cloud provider object based on the given cloud name. + + Args: + cloud_name (str): The name of the cloud provider. + + Returns: + sky.clouds.Cloud: The corresponding cloud provider object. + + Raises: + AssertionError: If an invalid cloud provider name is given. + """ + assert cloud_name.lower() in CLOUD_MAPPING, f"Invalid cloud provider: {cloud_name}" + return CLOUD_MAPPING[cloud_name.lower()] + + +def check_features( + cloud: sky.clouds.Cloud, args: argparse.Namespace, logger: logging.Logger +): + """ + Check if the cloud provider supports certain features and update arguments accordingly. + + This function checks if the given cloud provider supports stopping instances and opening ports. + If these features are not supported, it updates the corresponding arguments and logs warnings. + + Args: + cloud (sky.clouds.Cloud): The cloud provider object to check. + args (argparse.Namespace): The parsed command line arguments to update. + logger (logging.Logger): Logger instance for outputting warnings. + + Side effects: + - Modifies args.idle_timeout and args.down if stopping is not supported + - Modifies args.port_to_open if opening ports is not supported + - Logs warnings for unsupported features + """ + unsupported_features = cloud._unsupported_features_for_resources(None) + + if sky.clouds.CloudImplementationFeatures.STOP in unsupported_features: + logger.warning( + f"Stopping is not supported on {repr(cloud)}. Setting args.idle_timeout and args.down to None." + ) + args.idle_timeout = None + args.down = None + if sky.clouds.CloudImplementationFeatures.OPEN_PORTS in unsupported_features: + logger.warning( + f"Opening port is not supported on {repr(cloud)}. Setting args.port_to_open to None. Please open port manually." + ) + args.port_to_open = None + + +def form_setup(args: argparse.Namespace) -> str: + """ + Form the setup command string for initializing the environment. + + This function constructs the setup command string that handles cloning the repository + and checking out a specific commit if specified. + + Args: + args (argparse.Namespace): The parsed command line arguments containing: + - commit (str, optional): Git commit hash to checkout. If None, uses latest main branch. + + Returns: + str: The formatted setup command string. + """ + setup = "if [ ! -d 'functionary' ]; then git clone https://github.com/meetkai/functionary.git && cd functionary" + if args.commit is not None: + setup += f" && git checkout {args.commit}" + setup += "; else cd functionary; fi && " + + return setup diff --git a/train_skypilot.py b/train_skypilot.py new file mode 100644 index 0000000..74db453 --- /dev/null +++ b/train_skypilot.py @@ -0,0 +1,330 @@ +import argparse +import logging +from typing import Any, Dict + +import sky + +from functionary.skypilot_utils import ( + CLOUD_MAPPING, + check_features, + form_setup, + get_cloud_provider, +) + +# Set up logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +def parse_train_command(train_command_file: str) -> dict: + """ + Parse a training command file and extract arguments into a dictionary. + + This function reads a shell script containing training commands and parses the command line + arguments into a dictionary format. It handles both flag arguments (--flag) and key-value + arguments (--key value). + + Args: + train_command_file (str): Path to the shell script containing the training command. + + Returns: + dict: Dictionary mapping argument names (without -- prefix) to their values. + For flag arguments, the value will be True. + For key-value arguments, the value will be the string value provided. + + Example: + For a command file containing: + ``` + deepspeed train.py --model_name model1 --bf16 --epochs 3 + ``` + Returns: + { + 'model_name': 'model1', + 'bf16': True, + 'epochs': '3' + } + """ + # Read train command file and parse arguments into dictionary + train_args = {} + with open(train_command_file, "r") as f: + cmd = f.read() + # Split on newlines and backslashes to get all parts + parts = [p.strip() for p in cmd.replace("\\\n", " ").split()] + for i, part in enumerate(parts): + if part.startswith("--"): + key = part[2:] # Remove -- prefix + # If this is the last argument or next is another flag, treat as boolean flag + if i == len(parts) - 1 or parts[i + 1].startswith("--"): + train_args[key] = True + else: + # Otherwise take the next part as the value + train_args[key] = parts[i + 1] + return train_args + + +def form_command(train_file_args: Dict[str, Any]) -> str: + """ + Form the command string for training and model uploading. + + This function constructs a multi-step command string that: + 1. Changes to the functionary directory + 2. Creates a training script with the provided arguments + 3. Runs the training script + 4. If using LoRA, merges the learned weights with the base model + 5. Creates a private repo on Hugging Face + 6. Uploads the trained model to Hugging Face + + Args: + train_file_args (Dict[str, Any]): Dictionary of training arguments parsed from + the training command file, containing keys like 'model_name_or_path', + 'output_dir', etc. + + Returns: + str: The complete command string to execute all training and upload steps. + """ + command = "cd functionary && " + # Write the training command to a file to be run + with open(args.train_command_file, "r") as f: + train_command = f.read() + command += "cat > train.sh << 'EOL'\n" + command += train_command + " \\" + command += f"\n --train_data_path train.jsonl \\" + command += f"\n --eval_data_path val.jsonl" + command += "\nEOL\n" + command += "bash train.sh" + # Merge the learned model with the base model + if args.method == "lora": + command += f" && python -m functionary.train.merge_lora_weight merged_model {train_file_args['model_name_or_path']} {train_file_args['output_dir']} {train_file_args['model_max_length']} {train_file_args['prompt_template_version']}" + model_name = "merged_model" + else: + model_name = train_file_args["output_dir"] + # Create new private repo on Hugging Face + command += f' && python -c \'from huggingface_hub import create_repo; create_repo("{args.hf_organization}/{train_file_args["output_dir"]}", repo_type="model", private=True)\'' + # Push the model to the new repo + command += f' && python -c \'from huggingface_hub import HfApi; api = HfApi(); api.upload_folder(repo_id="{args.hf_organization}/{train_file_args["output_dir"]}", folder_path="{model_name}", repo_type="model")\'' + return command + + +def main(): + """ + Main function to train a Functionary model using Skypilot. + + This function performs the following steps: + 1. Gets the cloud provider based on the specified argument + 2. Checks the features supported by the cloud provider + 3. Parses the training command file to extract arguments + 4. Forms the setup command to initialize the environment with: + - Moving data files into place + - Installing PyTorch and other dependencies + - Installing optional components (LoRA, Liger) if specified + - Logging into Hugging Face and Weights & Biases if tokens provided + 5. Creates a Skypilot Task with the setup and training commands + 6. Sets the task resources including cloud, accelerators, and disk size + 7. Launches the task with specified cluster name and timeout settings + + The function handles both regular training and LoRA fine-tuning, with automatic + model merging and uploading to Hugging Face Hub after training completes. + + Side effects: + - Creates or modifies cloud resources via Skypilot + - Uploads trained model to Hugging Face Hub + - Logs training metrics to Weights & Biases if configured + + Raises: + Any exceptions raised by Skypilot during task creation or launch. + """ + cloud = get_cloud_provider(cloud_name=args.cloud) + check_features(cloud=cloud, args=args, logger=logger) + train_file_args = parse_train_command(args.train_command_file) + + envs = {} + + # Form setup command + setup = form_setup(args=args) + setup += ( + "mv ../train.jsonl ./train.jsonl && mv ../val.jsonl ./val.jsonl" + " && cd functionary/train" + " && pip install torch==2.4.0 torchvision==0.19.0 torchaudio==2.4.0" + " --index-url https://download.pytorch.org/whl/cu124 && pip install -e ." + ) + sections = [] + if args.method == "lora": + sections.append("lora") + if train_file_args.get("use_liger", False): + sections.append("liger") + if sections: + setup += f"[{','.join(sections)}]" + + # Authenticate HF and WandB if tokens are provided + if args.hf_token: + envs["HF_TOKEN"] = args.hf_token + setup += f" && huggingface-cli login --token $HF_TOKEN" + if args.wandb_token: + envs["WANDB_API_KEY"] = args.wandb_token + setup += f" && wandb login $WANDB_API_KEY" + + # Define task with setup and run commands + task = sky.Task( + setup=setup, + run=form_command(train_file_args=train_file_args), + envs=envs, + workdir=None, + ) + + # Set task resources and move data files into instance + task.set_resources( + sky.Resources( + cloud=cloud, + accelerators=f"{args.accelerators}:{args.num_accelerators}", + disk_size=args.disk_size, + image_id=args.runpod_image_id if isinstance(cloud, sky.RunPod) else None, + region=args.region, + ) + ).set_file_mounts( + { + "./train.jsonl": args.train_data_path, + "./val.jsonl": args.eval_data_path, + } + ) + + # Launch the task + sky.launch( + task, + cluster_name=args.cluster_name, + idle_minutes_to_autostop=args.idle_timeout, + down=args.down, + detach_run=args.detach_run, + ) + + +def parse_args(): + parser = argparse.ArgumentParser(description="Deploy training via Skypilot") + + # Cluster arguments + parser.add_argument( + "--cluster-name", type=str, required=True, help="Name of the cluster" + ) + parser.add_argument( + "--commit", + type=str, + default=None, + help=( + "Provide a commit hash to deploy a specific version of Functionary. " + "If None, the latest commit in the main branch will be deployed." + ), + ) + parser.add_argument( + "--cloud", + type=str, + default=None, + help=f"Cloud provider (default: None). Currently only supports {list(CLOUD_MAPPING.keys())}", + ) + parser.add_argument( + "--accelerators", + type=str, + default="A100", + help="Accelerator type. Check available types with `sky show-gpus --all`", + ) + parser.add_argument( + "--num-accelerators", + type=int, + default=1, + help="Number of accelerators. Check available values with `sky show-gpus --all`", + ) + parser.add_argument( + "--disk-size", + type=str, + default=256, + help="The size of the OS disk in GiB. If None, defaults to 256 GiB", + ) + parser.add_argument( + "--runpod-image-id", + type=str, + default="runpod/pytorch:2.4.0-py3.11-cuda12.4.1-devel-ubuntu22.04", + help="The image id to run the runpod instance on. (Only used if cloud is RunPod)", + ) + parser.add_argument( + "--region", type=str, default=None, help="Region (default: None)" + ) + parser.add_argument( + "--idle-timeout", + type=int, + default=-1, + help="Idle timeout in minutes. `-1` means no timeout", + ) + parser.add_argument( + "--down", + type=bool, + default=False, + help="Whether to tear down the cluster when timeout", + ) + parser.add_argument( + "--detach-run", + type=bool, + default=True, + help="Detach run upon job to run server is submitted.", + ) + + # Training arguments + parser.add_argument( + "--method", + type=str, + choices=["full", "lora"], + default="full", + help="Training method to use. (Currently either `full` or `lora`)", + ) + parser.add_argument( + "--train-command-file", + type=str, + default="train.sh", + help="Path to the command to train.", + ) + parser.add_argument( + "--train-data-path", + type=str, + required=True, + help="Path to the local training data.", + ) + parser.add_argument( + "--eval-data-path", + type=str, + required=True, + help="Path to the local validation data.", + ) + + # Uploading to Hugging Face and Weights & Biases + parser.add_argument( + "--hf-token", + type=str, + default=None, + help="Hugging Face token for downloading models. Only use this is the model is gated.", + ) + parser.add_argument( + "--wandb-token", + type=str, + default=None, + help="Wandb token for logging into wandb. Use this if you want to log into wandb.", + ) + parser.add_argument( + "--hf-organization", + type=str, + default="meetkai", + help="Hugging Face organization to push the model to.", + ) + + args = parser.parse_args() + + if args.disk_size is None: + args.disk_size = 256 + args.disk_size = min(int(args.disk_size), 1024) # Set max disk size to 1TB + if args.idle_timeout == -1: + args.idle_timeout = None + + return args + + +if __name__ == "__main__": + args = parse_args() + main()