From d23e5d706c91bff122a83eaa078fcfb802b52a86 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Thu, 14 Mar 2024 12:45:18 -0700 Subject: [PATCH 01/27] copy launcher.py from composer repo --- scripts/train/launcher.py | 584 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 584 insertions(+) create mode 100755 scripts/train/launcher.py diff --git a/scripts/train/launcher.py b/scripts/train/launcher.py new file mode 100755 index 0000000000..e20626265f --- /dev/null +++ b/scripts/train/launcher.py @@ -0,0 +1,584 @@ +#!/usr/bin/env python3 +# Copyright 2022 MosaicML Composer authors +# SPDX-License-Identifier: Apache-2.0 + +"""The Composer CLI launcher for distributed training.""" + +import contextlib +import datetime +import logging +import os +import signal +import subprocess +import sys +import tempfile +import time +import traceback +from argparse import ArgumentParser +from typing import Any, Dict, List, Union + +import psutil +import torch + +import composer +from composer.loggers.mosaicml_logger import ( + MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR, + MOSAICML_LOG_DIR_ENV_VAR, + MOSAICML_PLATFORM_ENV_VAR, +) +from composer.utils import get_free_tcp_port + +CLEANUP_TIMEOUT = datetime.timedelta(seconds=30) + +log = logging.getLogger(__name__) + + +def _get_parser(): + parser = ArgumentParser(description='Utility for launching distributed machine learning jobs.') + + parser.add_argument('--version', action='version', version=f'MosaicML Composer {composer.__version__}') + + required_args = parser.add_argument_group('required arguments') + + parser.add_argument( + '-n', + '--nproc', + type=int, + help=( + 'The number of processes to launch on this node. Overrides env var `LOCAL_WORLD_SIZE` if specified; ' + 'otherwise, defaults to `max(1, torch.cuda.device_count())`.' + ), + ) + + parser.add_argument( + '--stdout', + type=str, + default=None, + help=( + 'Format string for a filename to dump the STDOUT from the non-local-rank-zero processes. ' + 'The local rank zero process will be piped through to STDOUT. The available format variables are: ' + "'{rank}', '{local_rank}', '{world_size}', '{node_rank}', and '{local_world_size}'. If specified, " + "it is recommended to include '{rank}' or '{local_rank}' in the filename so each rank will write to its " + 'own file. By default, the STDOUT of the non-local-rank-zero processes is discarded; instead, use the ' + 'FileLogger within Composer. This logger captures and saves the STDOUT of each process.' + ), + ) + parser.add_argument( + '--stderr', + type=str, + default=None, + help=( + 'Format string for a filename to dump the STDERR from the non-local-rank-zero processes. ' + 'The local rank zero process will be piped through to STDERR. The available format variables are: ' + "'{rank}', '{local_rank}', '{world_size}', '{node_rank}', and '{local_world_size}'. If specified, " + "it is recommended to include '{rank}' or '{local_rank}' in the filename so each rank will write to its " + 'own file. By default, the STDERR of the non-local-rank-zero processes is discarded; instead, use the ' + 'FileLogger within Composer. This logger captures and saves the STDERR of each process.' + ), + ) + parser.add_argument('-v', '--verbose', action='store_true', help='If set, print verbose messages') + parser.add_argument( + '-m', + '--module_mode', + action='store_true', + help=( + 'If set, run the training script as a module instead of as a script. ' + 'Cannot be used in conjunction with `command_mode`' + ), + ) + parser.add_argument( + '-c', + '--command_mode', + action='store_true', + help=( + 'If set, run the training script as a command (i.e. without `python`). ' + 'Cannot be used in conjunction with `module_mode`.' + ), + ) + + multinode_args = parser.add_argument_group( + 'multi-node arguments', + description=( + 'These arguments generally only need to be set when training in a multi-node ' + 'environment, i.e. when the world_size is bigger than nproc.' + ), + ) + multinode_args.add_argument( + '--world_size', + type=int, + help=( + 'The total number of processes to launch across all nodes. ' + 'Setting this to a value greater than nproc indicates a multi-node ' + 'environment. Overrides env var WORLD_SIZE. Defaults to nproc.' + ), + ) + multinode_args.add_argument( + '--base_rank', + type=int, + help=( + 'The rank of the lowest ranked process to launch on this node. ' + 'Specifying a base_rank B and an nproc N will spawn processes with ' + 'global ranks [B, B+1, ... B+N-1]. In a multi-node environment, ' + 'at least one of base_rank and node_rank must be specified. ' + 'If only one of base_rank and node_rank are provided, it is assumed ' + 'that all nodes have the same amount of processes, and that the two ' + 'values are related as node_rank * nproc = base_rank. If this is ' + 'not the case, both base_rank and node_rank must be provided. ' + 'Overrides env var BASE_RANK. Defaults to 0 in a single-node ' + 'environment.' + ), + ) + multinode_args.add_argument( + '--node_rank', + type=int, + help=( + 'The rank of this node. See base_rank for information on when ' + 'this must be provided. Overrides env var NODE_RANK. Defaults to 0 ' + 'in a single-node environment.' + ), + ) + multinode_args.add_argument( + '--master_addr', + type=str, + help=( + 'The FQDN of the node hosting the C10d TCP store. For single-node ' + 'operation, this can generally be left as 127.0.0.1. Overrides env var ' + 'MASTER_ADDR. Defaults to 127.0.0.1 in a single-node environment.' + ), + ) + multinode_args.add_argument( + '--master_port', + type=int, + help=( + 'The port on the master hosting the C10d TCP store. If you are ' + 'running multiple trainers on a single node, this generally needs ' + 'to be unique for each one. Overrides env var MASTER_PORT. Defaults ' + 'to a random free port in a single-node environment.' + ), + ) + + required_args.add_argument( + 'training_script', + type=str, + help=( + 'The path to the training script used to initialize a single training ' + 'process. Should be followed by any command-line arguments the script ' + 'should be launched with.' + ), + ) + required_args.add_argument( + 'training_script_args', + nargs='...', + help='Any arguments for the training script, given in the expected order.', + ) + + return parser + + +def _parse_args(): + parser = _get_parser() + + args = parser.parse_args() + + # Default values to env vars if they are not provided + if args.nproc is None: + if 'LOCAL_WORLD_SIZE' in os.environ: + args.nproc = int(os.environ['LOCAL_WORLD_SIZE']) + else: + args.nproc = torch.cuda.device_count() + + if args.nproc == 0: + # This could happen if doing cpu-only training, + # which could cause torch.cuda.device_count() to return 0, + # and LOCAL_WORLD_SIZE (as set by MCLI) to be zero + args.nproc = 1 + + if args.nproc < 1: + raise ValueError('The nproc must be 1 or greater') + + if args.world_size is None and 'WORLD_SIZE' in os.environ: + args.world_size = int(os.environ['WORLD_SIZE']) + + if args.base_rank is None and 'BASE_RANK' in os.environ: + args.base_rank = int(os.environ['BASE_RANK']) + + if args.node_rank is None and 'NODE_RANK' in os.environ: + args.node_rank = int(os.environ['NODE_RANK']) + + if args.master_addr is None and 'MASTER_ADDR' in os.environ: + args.master_addr = os.environ['MASTER_ADDR'] + + if args.master_port is None and 'MASTER_PORT' in os.environ: + args.master_port = int(os.environ['MASTER_PORT']) + + if args.world_size is None: + args.world_size = args.nproc + + if args.world_size < args.nproc: + raise ValueError(f'world_size({args.world_size}) cannot be less than nproc({args.nproc})') + + if args.world_size < 1: + raise ValueError('The world_size must be 1 or greater') + + is_multinode = args.world_size > args.nproc + + if is_multinode: + if args.base_rank is None and args.node_rank is None: + raise ValueError(f'In a multi-node environment, at least one of node_rank and base_rank must be provided.') + + if args.node_rank is None: + if args.world_size % args.nproc != 0 or args.base_rank % args.nproc != 0: + raise ValueError( + 'node_rank not provided, but unable to infer from base_rank since nodes appear to ' + 'have different amounts of processes. Please also specify node_rank.', + ) + args.node_rank = args.base_rank // args.nproc + + if args.base_rank is None: + if args.world_size % args.nproc != 0: + raise ValueError( + 'base_rank not provided, but unable to infer from node_rank since nodes appear to ' + 'have different amounts of processes. Please also provide base_rank.', + ) + args.base_rank = args.node_rank * args.nproc + + if args.base_rank + args.nproc > args.world_size: + raise ValueError( + f'Cannot initialize processes for node with base_rank({args.base_rank}) and ' + f'nproc({args.nproc}) because this would mean creating a process with ' + f'rank({args.base_rank + args.nproc - 1}), and all processes must have smaller rank than ' + f'the world_size({args.world_size}).', + ) + + if args.master_addr is None: + raise ValueError('In a multi-node environment, master_addr is required.') + + if args.master_port is None: + raise ValueError('In a multi-node environment, master_port is required.') + + else: + if args.base_rank is not None and args.base_rank != 0: + raise ValueError(f'base_rank({args.base_rank}) != 0 is not valid in a single-node environment.') + args.base_rank = 0 + + if args.node_rank is not None and args.node_rank != 0: + raise ValueError(f'node_rank({args.node_rank}) != 0 is not valid in a single-node environment.') + args.node_rank = 0 + + if args.master_addr is None: + args.master_addr = '127.0.0.1' + + if args.master_port is None: + args.master_port = get_free_tcp_port() + + return args + + +@contextlib.contextmanager +def _patch_env(**environs: str): + """Returns a context manager that patches ``os.environ`` with ``environs``. + + The original ``os.environ`` values are restored at the end. + """ + # Adapted loosely from https://stackoverflow.com/a/34333710 + # Capture the original environ values + original_environs = {k: os.environ.get(k) for k in environs} + + # Patch the environment + for k, v in environs.items(): + os.environ[k] = v + try: + # Run the context manager + yield + finally: + # Restore the original environ values + for k, v in original_environs.items(): + if v is None: + del os.environ[k] + else: + os.environ[k] = v + + +def _launch_processes( + nproc: int, + world_size: int, + base_rank: int, + node_rank: int, + master_addr: str, + master_port: int, + module_mode: bool, + command_mode: bool, + training_script: str, + stdout_file_format: str, + stderr_file_format: Union[str, None], + training_script_args: List[Any], + processes: Dict[int, subprocess.Popen], +): + log.info('Starting distributed environment on local node for global_rank(%s-%s)', base_rank, base_rank + nproc - 1) + log.info('Distributed KV store: tcp://%s:%s', master_addr, master_port) + + for local_rank in range(nproc): + global_rank = base_rank + local_rank + if command_mode and module_mode: + raise ValueError('Either `command_mode` or `module_mode` should be set, but not both.') + cmd = [] + if not command_mode: + cmd.append(sys.executable) + if module_mode: + cmd.append('-m') + + cmd.append(training_script) + + # Update the env with the distributed variables + with _patch_env( + RANK=str(global_rank), + WORLD_SIZE=str(world_size), + LOCAL_RANK=str(local_rank), + LOCAL_WORLD_SIZE=str(nproc), + NODE_RANK=str(node_rank), + MASTER_ADDR=master_addr, + MASTER_PORT=str(master_port), + PYTHONUNBUFFERED='1', + NCCL_ASYNC_ERROR_HANDLING='1', + ): + # Populate the distributed variables in all launcher args + for arg in training_script_args: + cmd.append(os.path.expandvars(os.path.expanduser(arg))) + + log.info( + 'Launching process for local_rank(%s), global_rank(%s) with command(%s)', + local_rank, + global_rank, + cmd, + ) + + if local_rank == 0: + process = subprocess.Popen( + cmd, + text=True, + ) + else: + + def _get_file(format: str): + filename = format.format( + rank=global_rank, + world_size=world_size, + local_rank=local_rank, + local_world_size=nproc, + node_rank=node_rank, + ) + return open(filename, 'x+') + + stdout_file = _get_file(stdout_file_format) + stderr_file = _get_file(stderr_file_format) if stderr_file_format is not None else None + + process = subprocess.Popen( + cmd, + stdout=stdout_file, + stderr=stderr_file if stderr_file is not None else subprocess.STDOUT, + text=True, + ) + process.stdout = stdout_file + if stderr_file is not None: + process.stderr = stderr_file + processes[global_rank] = process + + +def _monitor_processes(processes: Dict[int, subprocess.Popen]): + try: + while True: + process_has_crashed = False + all_processes_finished = True + for global_rank, process in processes.items(): + if process.poll() is None: + # the process is still running + all_processes_finished = False + continue + else: + # return code of 0 implies clean exit + if process.returncode != 0: + log.error(f'Rank {global_rank} crashed with exit code {process.returncode}.') + process_has_crashed = True + break + else: + # exited cleanly + log.info(f'Rank {global_rank} finished successfully.') + if process_has_crashed or all_processes_finished: + break + time.sleep(0.1) + except KeyboardInterrupt: + print('Ctrl-C received; terminating training processes.') + pass + + +def _print_process_exit_status(global_rank: int, process: subprocess.Popen): + stdOutLabel = 'STDOUT' + if process.stdout is None: + output = None + else: + process.stdout.seek(0) + output = process.stdout.read() + + if process.stderr is None: + stderr = None + stdOutLabel = 'logs' + else: + process.stderr.seek(0) + stderr = process.stderr.read() + exc = subprocess.CalledProcessError( + process.returncode, + cmd=process.args, + output=output, + stderr=stderr, + ) + + error_msg = [f'Global rank {global_rank} (PID {process.pid}) exited with code {process.returncode}'] + if output is not None: + error_msg.extend([ + f'----------Begin global rank {global_rank} {stdOutLabel}----------', + output, + f'----------End global rank {global_rank} {stdOutLabel}----------', + ]) + + if stderr is not None: + error_msg.extend([ + f'----------Begin global rank {global_rank} STDERR----------', + exc.stderr, + f'----------End global rank {global_rank} STDERR----------', + ]) + print('\n'.join(error_msg)) + + +def _cleanup_processes(processes: Dict[int, subprocess.Popen]): + for global_rank, process in processes.items(): + process.poll() + if process.returncode is None: + log.info('Killing global rank %s (PID %s) with SIGTERM', global_rank, process.pid) + # Assuming that child processes correctly handle SIGTERM to cleanup any children + try: + os.kill(process.pid, signal.SIGTERM) + except ProcessLookupError: + pass + + current_time = datetime.datetime.now() + + try: + print(( + f'Waiting up to {CLEANUP_TIMEOUT.seconds} seconds for all training processes to terminate. ' + 'Press Ctrl-C to exit immediately.' + )) + while datetime.datetime.now() - current_time < CLEANUP_TIMEOUT: + for process in processes.values(): + process.poll() + if all(process.returncode is not None for process in processes.values()): + break + time.sleep(0.1) + except KeyboardInterrupt: + pass + + for global_rank, process in processes.items(): + process.poll() + if process.returncode is None: + log.warning( + 'Failed to kill global rank %s (PID %s) with SIGTERM; terminating with SIGKILL instead', + global_rank, + process.pid, + ) + try: + proc = psutil.Process(process.pid) + except psutil.NoSuchProcess: + pass + else: + # If using SIGKILL, manually kill all child processes, since the main training process + # likely won't be able to intercept the signal and clean up its children. + for psutil_proc in [proc, *proc.children(recursive=True)]: + try: + os.kill(psutil_proc.pid, signal.SIGKILL) + except ProcessLookupError: + pass + for global_rank, process in processes.items(): + process.poll() + if process.returncode is not None and process.returncode != 0: + if -process.returncode in (signal.SIGKILL, signal.SIGTERM): + # Negative return codes indicate the process was killed via a signal + # If the launcher script killed the training process (which would happen via SIGKILL or SIGTERM), + # then do not print the stack trace. + continue + # only print the processes that have actually crashed, + # not the ones that were killed + _print_process_exit_status(global_rank, process) + + +def _aggregate_process_returncode(processes: Dict[int, subprocess.Popen]) -> int: + for global_rank, process in processes.items(): + process.poll() + if process.returncode is None: + log.error('Global rank %s (PID %s) has still not exited; return exit code 1.', global_rank, process.pid) + return 1 + if process.returncode != 0: + log.error('Global rank %s (PID %s) exited with code %s', global_rank, process.pid, process.returncode) + return process.returncode + + return 0 + + +def main(): + """Entrypoint into the Composer CLI.""" + args = _parse_args() + + logging.basicConfig() + log.setLevel(logging.INFO if args.verbose else logging.WARNING) + + processes = {} + + log_tmpdir = tempfile.TemporaryDirectory() + if args.stdout is None: + args.stdout = f'{log_tmpdir.name}/rank{{rank}}.stdout.txt' + if args.stderr is None: + args.stderr = f'{log_tmpdir.name}/rank{{rank}}.stderr.txt' + + # If running on the Mosaic platform, log all gpu ranks' stderr and stdout to Mosaic platform + if os.environ.get(MOSAICML_PLATFORM_ENV_VAR, 'false').lower() == 'true' and str( + os.environ.get(MOSAICML_LOG_DIR_ENV_VAR, 'false'), + ).lower() != 'false' and os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR, 'false').lower() != 'false': + log.info('Logging all GPU ranks to Mosaic Platform.') + log_file_format = f'{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{local_rank}}.txt' + if args.stderr is not None or args.stdout is not None: + log.info( + 'Logging to Mosaic Platform. Ignoring provided stdout and stderr args. To use provided stdout and stderr, set MOSAICML_LOG_DIR=false.', + ) + args.stdout = log_file_format + args.stderr = None + + try: + _launch_processes( + nproc=args.nproc, + world_size=args.world_size, + base_rank=args.base_rank, + node_rank=args.node_rank, + master_addr=args.master_addr, + master_port=args.master_port, + module_mode=args.module_mode, + command_mode=args.command_mode, + stdout_file_format=args.stdout, + stderr_file_format=args.stderr, + training_script=args.training_script, + training_script_args=args.training_script_args, + processes=processes, + ) + _monitor_processes(processes) + except: + # Print the exception first, then kill the training processes, since killing + # may take up to CLEANUP_TIMEOUT seconds, and the user should know immediately + # what failed. No need to re-raise the exception, as `aggregate_process_returncode` + # will return an appropriate error code, which will cause the script to exit. + traceback.print_exc() + print('Killing training processes') + finally: + _cleanup_processes(processes) + log_tmpdir.cleanup() + return _aggregate_process_returncode(processes) + + +if __name__ == '__main__': + sys.exit(main()) From 48a7ccc20e7cb0e4e96a012da16b3cdf1a164d73 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Tue, 19 Mar 2024 17:49:44 -0700 Subject: [PATCH 02/27] case 1 can run successfully --- .gitignore | 2 + llmfoundry/composerpatch/MLFlowLogger.py | 33 +++++ llmfoundry/composerpatch/__init__.py | 0 llmfoundry/utils/builders.py | 7 +- scripts/train/launcher.py | 63 +++++++-- scripts/train/train.py | 3 +- setup.py | 7 +- ygong/__init__.py | 0 ygong/mosaic/__init__.py | 6 + ygong/mosaic/mpt125mConfig.py | 167 +++++++++++++++++++++++ ygong/mosaic/scaling_config.py | 14 ++ ygong/mosaic/submit.py | 145 ++++++++++++++++++++ 12 files changed, 431 insertions(+), 16 deletions(-) create mode 100644 llmfoundry/composerpatch/MLFlowLogger.py create mode 100644 llmfoundry/composerpatch/__init__.py create mode 100644 ygong/__init__.py create mode 100644 ygong/mosaic/__init__.py create mode 100644 ygong/mosaic/mpt125mConfig.py create mode 100644 ygong/mosaic/scaling_config.py create mode 100644 ygong/mosaic/submit.py diff --git a/.gitignore b/.gitignore index d041a25c22..72f965ce63 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ my-copy-c4*/ my-copy-arxiv*/ *.jsonl* +ygong/notebook/* + # WandB wandb/ diff --git a/llmfoundry/composerpatch/MLFlowLogger.py b/llmfoundry/composerpatch/MLFlowLogger.py new file mode 100644 index 0000000000..e9f73d7767 --- /dev/null +++ b/llmfoundry/composerpatch/MLFlowLogger.py @@ -0,0 +1,33 @@ +from composer.loggers import MLFlowLogger as ComposerMLFlowLogger +from composer.utils import MissingConditionalImportError, dist +import json +import os +from composer.core.state import State +from composer.loggers.logger import Logger +from composer.loggers.logger_destination import LoggerDestination +from composer.utils import MissingConditionalImportError, dist + + + +CONFIG_FILE = "/tmp/mlflow_config.yaml" +EXPERIMENT_ID_FIELD = "experiment_id" +RUN_ID_FIELD = "run_id" +TRACKING_URI_FIELD = "tracking_uri" + + +class MLFlowLogger(ComposerMLFlowLogger): + + def init(self, state: State, logger: Logger) -> None: + super().init(state, logger) + + if self._enabled and dist.get_local_rank() == 0: + if os.path.exists(CONFIG_FILE): + os.remove(CONFIG_FILE) + + with open(CONFIG_FILE, "w") as f: + data = { + EXPERIMENT_ID_FIELD: self._experiment_id, + RUN_ID_FIELD: self._run_id, + TRACKING_URI_FIELD : self.tracking_uri, + } + json.dump(data, f) \ No newline at end of file diff --git a/llmfoundry/composerpatch/__init__.py b/llmfoundry/composerpatch/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/llmfoundry/utils/builders.py b/llmfoundry/utils/builders.py index 328705d2a5..6195152199 100644 --- a/llmfoundry/utils/builders.py +++ b/llmfoundry/utils/builders.py @@ -18,7 +18,7 @@ from composer.core import Algorithm, Callback, Evaluator from composer.datasets.in_context_learning_evaluation import \ get_icl_task_dataloader -from composer.loggers import (InMemoryLogger, LoggerDestination, MLFlowLogger, +from composer.loggers import (InMemoryLogger, LoggerDestination, TensorboardLogger, WandBLogger) from composer.optim import DecoupledAdamW from composer.optim.scheduler import (ComposerScheduler, @@ -31,6 +31,9 @@ from torch.optim.optimizer import Optimizer from transformers import AutoTokenizer, PreTrainedTokenizerBase +from llmfoundry.composerpatch import MLFlowLogger + + from llmfoundry.callbacks import (AsyncEval, CurriculumLearning, EvalGauntlet, FDiffMetrics, GlobalLRScaling, HuggingFaceCheckpointer, LayerFreezing, @@ -240,7 +243,7 @@ def build_logger(name: str, kwargs: Dict[str, Any]) -> LoggerDestination: elif name == 'in_memory_logger': return InMemoryLogger(**kwargs) elif name == 'mlflow': - return MLFlowLogger(**kwargs) + return MLFlowLogger.MLFlowLogger(**kwargs) elif name == 'inmemory': return InMemoryLogger(**kwargs) else: diff --git a/scripts/train/launcher.py b/scripts/train/launcher.py index e20626265f..3d0ee3b47b 100755 --- a/scripts/train/launcher.py +++ b/scripts/train/launcher.py @@ -27,6 +27,8 @@ MOSAICML_PLATFORM_ENV_VAR, ) from composer.utils import get_free_tcp_port +from llmfoundry.composerpatch import MLFlowLogger + CLEANUP_TIMEOUT = datetime.timedelta(seconds=30) @@ -313,9 +315,11 @@ def _launch_processes( stderr_file_format: Union[str, None], training_script_args: List[Any], processes: Dict[int, subprocess.Popen], + log_dirs: set[str], ): log.info('Starting distributed environment on local node for global_rank(%s-%s)', base_rank, base_rank + nproc - 1) log.info('Distributed KV store: tcp://%s:%s', master_addr, master_port) + log.warning(f"ygong: stdout_file_format={stdout_file_format}, stderr_file_format={stderr_file_format}") for local_rank in range(nproc): global_rank = base_rank + local_rank @@ -359,7 +363,7 @@ def _launch_processes( ) else: - def _get_file(format: str): + def _get_file__(format: str): filename = format.format( rank=global_rank, world_size=world_size, @@ -367,10 +371,13 @@ def _get_file(format: str): local_world_size=nproc, node_rank=node_rank, ) + dir = os.path.normpath(os.path.dirname(filename)) + os.makedirs(dir, exist_ok=True) + log_dirs.add(dir) return open(filename, 'x+') - stdout_file = _get_file(stdout_file_format) - stderr_file = _get_file(stderr_file_format) if stderr_file_format is not None else None + stdout_file = _get_file__(stdout_file_format) + stderr_file = _get_file__(stderr_file_format) if stderr_file_format is not None else None process = subprocess.Popen( cmd, @@ -384,7 +391,15 @@ def _get_file(format: str): processes[global_rank] = process -def _monitor_processes(processes: Dict[int, subprocess.Popen]): +def _monitor_processes( + processes: Dict[int, subprocess.Popen], + log_dirs: set[str], + launcher_log: str,): + import mlflow + log_frequency = 200 + cycle = 0 + + mlflow_runid = None try: while True: process_has_crashed = False @@ -405,7 +420,29 @@ def _monitor_processes(processes: Dict[int, subprocess.Popen]): log.info(f'Rank {global_rank} finished successfully.') if process_has_crashed or all_processes_finished: break + + if cycle == 0: + if mlflow_runid is None: + if os.path.exists(MLFlowLogger.CONFIG_FILE): + import json + with open(MLFlowLogger.CONFIG_FILE, "r") as f: + data = json.load(f) + log.error(f"ygong:Started mlflow run {data}") + mlflow_runid = data[MLFlowLogger.RUN_ID_FIELD] + mlflow.set_tracking_uri(data[MLFlowLogger.TRACKING_URI_FIELD]) + mlflow.start_run(run_id=data[MLFlowLogger.RUN_ID_FIELD], experiment_id=data[MLFlowLogger.EXPERIMENT_ID_FIELD]) + else: + try: + for log_dir in log_dirs: + log.warning(f"ygong: Logging directory: {log_dir}") + mlflow.log_artifacts(log_dir, log_dir.lstrip('/')) + mlflow.log_artifact(launcher_log) + except Exception as e: + log.error(f"ygong:Failed to log artifacts to mlflow: {e}") + cycle = (cycle + 1) % log_frequency + time.sleep(0.1) + except KeyboardInterrupt: print('Ctrl-C received; terminating training processes.') pass @@ -526,12 +563,13 @@ def main(): """Entrypoint into the Composer CLI.""" args = _parse_args() - logging.basicConfig() - log.setLevel(logging.INFO if args.verbose else logging.WARNING) - + log_tmpdir = tempfile.TemporaryDirectory() + launcher_log = f"{log_tmpdir.name}/launcher{args.node_rank}.log" + logging.basicConfig(filename=launcher_log, level=logging.INFO if args.verbose else logging.WARNING) + processes = {} + log_dirs = set() - log_tmpdir = tempfile.TemporaryDirectory() if args.stdout is None: args.stdout = f'{log_tmpdir.name}/rank{{rank}}.stdout.txt' if args.stderr is None: @@ -542,13 +580,13 @@ def main(): os.environ.get(MOSAICML_LOG_DIR_ENV_VAR, 'false'), ).lower() != 'false' and os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR, 'false').lower() != 'false': log.info('Logging all GPU ranks to Mosaic Platform.') - log_file_format = f'{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{local_rank}}.txt' if args.stderr is not None or args.stdout is not None: log.info( 'Logging to Mosaic Platform. Ignoring provided stdout and stderr args. To use provided stdout and stderr, set MOSAICML_LOG_DIR=false.', ) - args.stdout = log_file_format - args.stderr = None + + args.stdout = f'{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/stdout/{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{rank}}.txt' + args.stderr = f'{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/stderr/{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{rank}}.txt' try: _launch_processes( @@ -565,8 +603,9 @@ def main(): training_script=args.training_script, training_script_args=args.training_script_args, processes=processes, + log_dirs=log_dirs, ) - _monitor_processes(processes) + _monitor_processes(processes, log_dirs, launcher_log) except: # Print the exception first, then kill the training processes, since killing # may take up to CLEANUP_TIMEOUT seconds, and the user should know immediately diff --git a/scripts/train/train.py b/scripts/train/train.py index 92c6afa128..8784f4e2a0 100644 --- a/scripts/train/train.py +++ b/scripts/train/train.py @@ -12,7 +12,7 @@ import torch from composer import Trainer from composer.core.callback import Callback -from composer.loggers import MosaicMLLogger +from composer.loggers import MosaicMLLogger, MLFlowLogger from composer.loggers.mosaicml_logger import (MOSAICML_ACCESS_TOKEN_ENV_VAR, MOSAICML_PLATFORM_ENV_VAR) from composer.metrics.nlp import InContextLearningMetric @@ -459,6 +459,7 @@ def main(cfg: DictConfig) -> Trainer: mosaicml_logger = MosaicMLLogger() loggers.append(mosaicml_logger) + if metadata is not None: # Flatten the metadata for logging logged_cfg.pop('metadata', None) diff --git a/setup.py b/setup.py index 4ecd34861a..faa27e7556 100644 --- a/setup.py +++ b/setup.py @@ -68,11 +68,12 @@ 'onnxruntime==1.15.1', 'cmake>=3.25.0,<=3.26.3', # required for triton-pre-mlir below # PyPI does not support direct dependencies, so we remove this line before uploading from PyPI - 'triton-pre-mlir@git+https://github.com/vchiley/triton.git@triton_pre_mlir_sm90#subdirectory=python', + # 'triton-pre-mlir@git+https://github.com/vchiley/triton.git@triton_pre_mlir_sm90#subdirectory=python', 'boto3>=1.21.45,<2', 'huggingface-hub>=0.17.0,<1.0', 'beautifulsoup4>=4.12.2,<5', # required for model download utils 'tenacity>=8.2.3,<9', + 'ipywidgets', ] extra_deps = {} @@ -88,6 +89,10 @@ 'hf_transfer==0.1.3', ] +extra_deps['ygong'] = [ + 'databricks-genai', +] + extra_deps['databricks'] = [ 'mosaicml[databricks]>=0.20.1,<0.21', 'databricks-sql-connector>=3,<4', diff --git a/ygong/__init__.py b/ygong/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ygong/mosaic/__init__.py b/ygong/mosaic/__init__.py new file mode 100644 index 0000000000..c5bd21dde7 --- /dev/null +++ b/ygong/mosaic/__init__.py @@ -0,0 +1,6 @@ +from .submit import submit +from .submit import _set_up_environment +from .scaling_config import ScalingConfig +from .mpt125mConfig import MPT125MConfig + +__all__ = ['submit', 'ScalingConfig', "MPT125MConfig", "_set_up_environment"] \ No newline at end of file diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py new file mode 100644 index 0000000000..78c9c057bc --- /dev/null +++ b/ygong/mosaic/mpt125mConfig.py @@ -0,0 +1,167 @@ +from mcli import RunConfig +from ygong.mosaic.scaling_config import ScalingConfig +from typing import Optional +import os +# import databricks_genai.api.config as cfg + + +class MPT125MConfig: + def __init__(self, experimentName: str, data: str): + # TODO: validate the inputs and remove the yu.gong hardcode + self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}" + workspace_url = os.environ.get('WORKSPACE_URL') + self.mlflow_trackingUri = "databricks" + # self.mlflow_trackingUri = "databricks" if workspace_url is None else workspace_url + + self.data = data + + # the run name is pre-configured for all config-driven pretrain runs + self.name = "mpt125m-config-driven-pretrain" + + ######################################## + # model parameters + ######################################## + self.max_seq_len = 2048 + self.global_seed = 17 + # TODO: hardcode, need to respect self.data + self.data_remote = "s3://aaron-mlflow-demo/ygong-c4-process/" + self.data_local = "./my-copy-c4" + self.commands = [ + "cd llm-foundry/scripts", + "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train_small eval_loader.dataset.split=val" + ] + # api_token, endpoint = cfg.get_config_from_env() + # self.workspace_url = "https://dbc-04ac0685-8857.staging.cloud.databricks.com/" + + + def toRunConfig(self, scalingConfig: ScalingConfig): + return RunConfig( + name=self.name, + image='mosaicml/llm-foundry:2.2.1_cu121_flash2-latest', + command="\n".join(self.commands), + compute=scalingConfig.toCompute, + scheduling={}, + integrations=[ + { + 'integration_type': 'git_repo', + 'git_repo': 'ygong1/llm-foundry', + 'git_branch': 'prototype', + 'pip_install': '-e .[gpu]', + 'ssh_clone': False + }, + { + 'integration_type': 'pip_packages', + 'packages': ['pynvml'], + }, + ], + parameters=self.parameters(), + env_variables={}, + ) + + def parameters(self): + return { + "data_local": "./my-copy-c4", + "data_remote": "s3://aaron-mlflow-demo/ygong-c4-process/", + "max_seq_len": self.max_seq_len, + "global_seed": self.global_seed, + "run_name": None, + "model": { + "name": "mpt_causal_lm", + "init_device": "meta", + "d_model": 768, + "n_heads": 12, + "n_layers": 12, + "expansion_ratio": 4, + "max_seq_len": self.max_seq_len, + "vocab_size": 50368, + "attn_config": { + "attn_impl": "triton" + } + }, + "tokenizer": { + "name": "EleutherAI/gpt-neox-20b", + "kwargs": { + "model_max_length": self.max_seq_len + } + }, + "train_loader": { + "name": "text", + "dataset": { + "local": f"{self.data_local}", + "remote": f"{self.data_remote}", + "split": "train", + "shuffle": True, + "max_seq_len": self.max_seq_len, + "shuffle_seed": self.global_seed + }, + "drop_last": True, + "num_workers": 8 + }, + "eval_loader": { + "name": "text", + "dataset": { + "local": f"{self.data_local}", + "remote": f"{self.data_remote}", + "split": "val", + "shuffle": False, + "max_seq_len": self.max_seq_len, + "shuffle_seed": self.global_seed + }, + "drop_last": False, + "num_workers": 8 + }, + "scheduler": { + "name": "cosine_with_warmup", + "t_warmup": "100ba", + "alpha_f": 0.1 + }, + "optimizer": { + "name": "decoupled_adamw", + "lr": 6.0e-4, + "betas": [0.9, 0.95], + "eps": 1.0e-08, + "weight_decay": 0.0 + }, + "algorithms": { + "gradient_clipping": { + "clipping_type": "norm", + "clipping_threshold": 1.0 + } + }, + "max_duration": "480ba", # ~ 2.5B tokens, original + "eval_interval": "50ba", # original 500 + "eval_first": False, + "eval_subset_num_batches": -1, + "global_train_batch_size": 256, + "seed": self.global_seed, + "device_eval_batch_size": 16, + "device_train_microbatch_size": 16, + "precision": "amp_bf16", + "fsdp_config": { + "sharding_strategy": "FULL_SHARD", + "mixed_precision": "PURE", + "activation_checkpointing": False, + "activation_checkpointing_reentrant": False, + "activation_cpu_offload": False, + "limit_all_gathers": True + }, + "progress_bar": False, + "log_to_console": True, + "console_log_interval": "10ba", + "callbacks": { + "speed_monitor": { + "window_size": 10 + }, + "lr_monitor": {}, + "memory_monitor": {}, + "runtime_estimator": {} + }, + "loggers": { + "mlflow": { + "experiment_name": self.mlflow_experimentName, + "tracking_uri": "databricks", + "synchronous": False, + "log_system_metrics": True + } + } + } \ No newline at end of file diff --git a/ygong/mosaic/scaling_config.py b/ygong/mosaic/scaling_config.py new file mode 100644 index 0000000000..f22e8e7e4c --- /dev/null +++ b/ygong/mosaic/scaling_config.py @@ -0,0 +1,14 @@ +class ScalingConfig: + def __init__(self, gpusNum: int, gpuType: str, poolName: str): + # TODO: validate the inputs + self.gpusNum = gpusNum + self.gpuType = gpuType + self.poolName = poolName + + @property + def toCompute(self): + return { + 'gpus': self.gpusNum, + 'gpu_type': self.gpuType, + 'cluster': self.poolName + } diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py new file mode 100644 index 0000000000..91ad41feb1 --- /dev/null +++ b/ygong/mosaic/submit.py @@ -0,0 +1,145 @@ +from ygong.mosaic.scaling_config import ScalingConfig +from ygong.mosaic.mpt125mConfig import MPT125MConfig +from mcli import wait_for_run_status, Run, RunConfig, RunStatus, create_run +import pandas as pd +import ipywidgets as widgets +from IPython.display import display, clear_output, HTML +import mlflow +import os +from typing import Optional +import time +import base64 +import json +import hashlib +from mcli.api.engine.engine import MAPIConnection +from mcli.config import MCLIConfig + +def _set_up_environment(content): + data = json.loads(base64.b64decode(content).decode('utf-8')) + workspace_url = data.get("workspace_url", None) + token = data.get("token", None) + mosaic_token = data.get("mosaic_token", None) + + if token is None: + from databricks.sdk import WorkspaceClient + wc = WorkspaceClient() + ctx = wc.dbutils.entry_point.getDbutils().notebook().getContext() + token = ctx.apiToken().get() + + if workspace_url is None or mosaic_token is None: + raise ValueError("workspace_url and token must be provided") + os.environ['WORKSPACE_URL'] = workspace_url + os.environ['MLFLOW_TRACKING_TOKEN'] = token + + # set up the mosaic token + conf = MCLIConfig.load_config() + conf.api_key = mosaic_token + conf.save_config() + MAPIConnection.reset_connection() + + + hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8] + databricks_secret_name = f"databricks-{hash}" + + # clean up the old secret. MosaicML doesn't support multiple databricks secrets + # would have to clean up the old secret if it exists + from mcli.api.secrets.api_get_secrets import get_secrets + from mcli.api.secrets.api_delete_secrets import delete_secrets + from mcli.models.mcli_secret import SecretType + s = get_secrets(secret_types=[SecretType.databricks]) + if len(s) == 1: + if s[0].name != databricks_secret_name: + delete_secrets(s) + else: + print("databricks secret already exists") + return + from mcli.objects.secrets.create.databricks import DatabricksSecretCreator + from mcli.api.secrets.api_create_secret import create_secret + s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token) + print(f"successfully created databricks secret: {databricks_secret_name}") + create_secret(s) + + + +def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str): + if tracking_uri is None: + raise ValueError("tracking_uri must be provided") + mlflow.set_tracking_uri(tracking_uri) + tracking_uri = tracking_uri.rstrip("/") + experiment = mlflow.get_experiment_by_name(name=experiment_name) + if experiment is None: + raise ValueError(f"experiment {experiment_name} does not exist") + experiment_id = experiment.experiment_id + runs = mlflow.search_runs(experiment_ids=[experiment_id], + filter_string=f'tags.composer_run_name = "{run_name}"', + output_format='list') + if len(runs) == 0: + raise ValueError(f"run {run_name} does not exist in experiment {experiment_name}") + elif len(runs) > 1: + raise ValueError(f"multiple runs {run_name} exist in experiment {experiment_name}") + else: + run_id = runs[0].info.run_id + return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}" + +def _get_run_summary(run: Run, experiment_name: Optional[str] = None): + url = None + if run.status == RunStatus.RUNNING and experiment_name is not None: + url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name) + + df = pd.DataFrame({ + 'Run Name': [run.name], + 'Run ID': [run.run_uid], + "Status": [str(run.status)], + 'Experiment Run': [f'Link' if url is not None else ""], + }) + return df + +def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.Button]): + clear_output(wait=True) + if cancel_button is not None: + display(cancel_button) + display(HTML(summary.to_html(escape=False))) + + + +def submit(model, config: any, scalingConfig: ScalingConfig): + mlflow_experiment_name = None + if model == "mpt125m": + if not isinstance(config, MPT125MConfig): + raise ValueError("config must be an instance of MPT125MConfig") + mlflow_experiment_name = config.mlflow_experimentName + runConfig = config.toRunConfig(scalingConfig) + else: + raise ValueError(f"model {model} is not supported") + + + run = create_run(runConfig) + # Create a button + button = widgets.Button(description="cancel the run") + def on_button_clicked(b): + clear_output(wait=False) + run.stop() + wait_for_run_status(run, RunStatus.TERMINATING) + summary = _get_run_summary(run, mlflow_experiment_name) + display(HTML(summary.to_html(escape=False))) + button.on_click(on_button_clicked) + + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) + + wait_for_run_status(run, RunStatus.RUNNING) + # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet + # when the run just starts running + _display_run_summary(_get_run_summary(run, None), button) + + + try_count = 0 + while try_count < 10: + try_count += 1 + time.sleep(20) + try: + summary = _get_run_summary(run, mlflow_experiment_name) + _display_run_summary(summary, button) + break + except ValueError: + print("waiting for the run to be ready...") + pass \ No newline at end of file From fbf6783de0aa15790169d83d2577c61c28e72259 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Thu, 21 Mar 2024 22:21:18 -0700 Subject: [PATCH 03/27] set up the credential to work with datbricks environment --- ygong/mosaic/mpt125mConfig.py | 23 +++---- ygong/mosaic/submit.py | 119 ++++++++++++++++++++++------------ 2 files changed, 88 insertions(+), 54 deletions(-) diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py index 78c9c057bc..322fc181e3 100644 --- a/ygong/mosaic/mpt125mConfig.py +++ b/ygong/mosaic/mpt125mConfig.py @@ -6,14 +6,14 @@ class MPT125MConfig: - def __init__(self, experimentName: str, data: str): + def __init__(self, experimentName: str, data: str, priority: str = 'high'): # TODO: validate the inputs and remove the yu.gong hardcode self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}" - workspace_url = os.environ.get('WORKSPACE_URL') self.mlflow_trackingUri = "databricks" # self.mlflow_trackingUri = "databricks" if workspace_url is None else workspace_url self.data = data + self.priority = priority # the run name is pre-configured for all config-driven pretrain runs self.name = "mpt125m-config-driven-pretrain" @@ -23,16 +23,13 @@ def __init__(self, experimentName: str, data: str): ######################################## self.max_seq_len = 2048 self.global_seed = 17 - # TODO: hardcode, need to respect self.data - self.data_remote = "s3://aaron-mlflow-demo/ygong-c4-process/" - self.data_local = "./my-copy-c4" + self.data_remote = self.data + self.data_local = "" self.commands = [ "cd llm-foundry/scripts", - "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train_small eval_loader.dataset.split=val" + "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train eval_loader.dataset.split=val" ] - # api_token, endpoint = cfg.get_config_from_env() - # self.workspace_url = "https://dbc-04ac0685-8857.staging.cloud.databricks.com/" - + def toRunConfig(self, scalingConfig: ScalingConfig): return RunConfig( @@ -40,7 +37,7 @@ def toRunConfig(self, scalingConfig: ScalingConfig): image='mosaicml/llm-foundry:2.2.1_cu121_flash2-latest', command="\n".join(self.commands), compute=scalingConfig.toCompute, - scheduling={}, + scheduling={'priority': self.priority}, integrations=[ { 'integration_type': 'git_repo', @@ -51,7 +48,7 @@ def toRunConfig(self, scalingConfig: ScalingConfig): }, { 'integration_type': 'pip_packages', - 'packages': ['pynvml'], + 'packages': ['pynvml', 'mosaicml-streaming[databricks]'], }, ], parameters=self.parameters(), @@ -60,8 +57,8 @@ def toRunConfig(self, scalingConfig: ScalingConfig): def parameters(self): return { - "data_local": "./my-copy-c4", - "data_remote": "s3://aaron-mlflow-demo/ygong-c4-process/", + "data_local": self.data_local, + "data_remote": self.data, "max_seq_len": self.max_seq_len, "global_seed": self.global_seed, "run_name": None, diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 91ad41feb1..0d92125c87 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -13,52 +13,76 @@ import hashlib from mcli.api.engine.engine import MAPIConnection from mcli.config import MCLIConfig +from databricks.sdk import WorkspaceClient +# from databricks_genai.api.config import configure_request +from mcli import config +from mcli.api.runs.api_get_runs import get_run + + +def _set_up_environment(content: str): + os.environ['CREDENTIALS'] = content -def _set_up_environment(content): - data = json.loads(base64.b64decode(content).decode('utf-8')) - workspace_url = data.get("workspace_url", None) - token = data.get("token", None) - mosaic_token = data.get("mosaic_token", None) - if token is None: - from databricks.sdk import WorkspaceClient +def _init_connection(): + def _is_local(): + try: + wc = WorkspaceClient() + wc.dbutils.entry_point.getDbutils().notebook().getContext() + return False + except: + return True + + if _is_local(): + if os.environ.get('CREDENTIALS') is None: + raise ValueError("_set_up_environment must be manually called to configure credentials for local runs") + data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8')) + workspace_url = data.get("workspace_url", None) + token = data.get("token", None) + mosaic_token = data.get("mosaic_token", None) + # set up the mosaic token + conf = MCLIConfig.load_config() + conf.api_key = mosaic_token + conf.save_config() + MAPIConnection.reset_connection() + + + hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8] + databricks_secret_name = f"databricks-{hash}" + + # clean up the old secret. MosaicML doesn't support multiple databricks secrets + # would have to clean up the old secret if it exists + from mcli.api.secrets.api_get_secrets import get_secrets + from mcli.api.secrets.api_delete_secrets import delete_secrets + from mcli.models.mcli_secret import SecretType + s = get_secrets(secret_types=[SecretType.databricks]) + if len(s) == 1: + if s[0].name != databricks_secret_name: + delete_secrets(s) + else: + print("databricks secret already exists") + return + from mcli.objects.secrets.create.databricks import DatabricksSecretCreator + from mcli.api.secrets.api_create_secret import create_secret + s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token) + print(f"successfully created databricks secret: {databricks_secret_name}") + create_secret(s) + + else: wc = WorkspaceClient() + import mlflow.utils.databricks_utils as databricks_utils + workspace_url = databricks_utils.get_workspace_info_from_dbutils()[0] ctx = wc.dbutils.entry_point.getDbutils().notebook().getContext() token = ctx.apiToken().get() + api_url = ctx.apiUrl().get() + endpoint = f'{api_url}/api/2.0/genai-mapi/graphql' + os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}' + os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint - if workspace_url is None or mosaic_token is None: - raise ValueError("workspace_url and token must be provided") + # needed to set up the MLFlow query for experiment runs os.environ['WORKSPACE_URL'] = workspace_url os.environ['MLFLOW_TRACKING_TOKEN'] = token - # set up the mosaic token - conf = MCLIConfig.load_config() - conf.api_key = mosaic_token - conf.save_config() - MAPIConnection.reset_connection() - - hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8] - databricks_secret_name = f"databricks-{hash}" - - # clean up the old secret. MosaicML doesn't support multiple databricks secrets - # would have to clean up the old secret if it exists - from mcli.api.secrets.api_get_secrets import get_secrets - from mcli.api.secrets.api_delete_secrets import delete_secrets - from mcli.models.mcli_secret import SecretType - s = get_secrets(secret_types=[SecretType.databricks]) - if len(s) == 1: - if s[0].name != databricks_secret_name: - delete_secrets(s) - else: - print("databricks secret already exists") - return - from mcli.objects.secrets.create.databricks import DatabricksSecretCreator - from mcli.api.secrets.api_create_secret import create_secret - s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token) - print(f"successfully created databricks secret: {databricks_secret_name}") - create_secret(s) - def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str): @@ -85,6 +109,7 @@ def _get_run_summary(run: Run, experiment_name: Optional[str] = None): url = None if run.status == RunStatus.RUNNING and experiment_name is not None: url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name) + print(f"ygong: url {url}") df = pd.DataFrame({ 'Run Name': [run.name], @@ -100,9 +125,17 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets. display(cancel_button) display(HTML(summary.to_html(escape=False))) +def _wait_for_run_status(run: Run, status: RunStatus): + run_name = run.name + while not run.status.after(status, inclusive=True): + time.sleep(5) + run = get_run(run_name) + return run + def submit(model, config: any, scalingConfig: ScalingConfig): + _init_connection() mlflow_experiment_name = None if model == "mpt125m": if not isinstance(config, MPT125MConfig): @@ -114,21 +147,22 @@ def submit(model, config: any, scalingConfig: ScalingConfig): run = create_run(runConfig) + run_name = run.name # Create a button button = widgets.Button(description="cancel the run") def on_button_clicked(b): clear_output(wait=False) + run = get_run(run_name) run.stop() - wait_for_run_status(run, RunStatus.TERMINATING) + run = _wait_for_run_status(run, RunStatus.TERMINATING) summary = _get_run_summary(run, mlflow_experiment_name) display(HTML(summary.to_html(escape=False))) button.on_click(on_button_clicked) - _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) - - wait_for_run_status(run, RunStatus.RUNNING) + # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet # when the run just starts running + run = _wait_for_run_status(run, RunStatus.RUNNING) _display_run_summary(_get_run_summary(run, None), button) @@ -137,9 +171,12 @@ def on_button_clicked(b): try_count += 1 time.sleep(20) try: + run = get_run(run) + if run.status.after(RunStatus.TERMINATING, inclusive=True): + break summary = _get_run_summary(run, mlflow_experiment_name) _display_run_summary(summary, button) break except ValueError: - print("waiting for the run to be ready...") + print(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}") pass \ No newline at end of file From 717568cc7693201306857deb245c3816ac6d2cee Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Thu, 21 Mar 2024 22:24:15 -0700 Subject: [PATCH 04/27] add sync logic --- ygong/mosaic/submit.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 0d92125c87..e757f34330 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -134,7 +134,7 @@ def _wait_for_run_status(run: Run, status: RunStatus): -def submit(model, config: any, scalingConfig: ScalingConfig): +def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False): _init_connection() mlflow_experiment_name = None if model == "mpt125m": @@ -179,4 +179,9 @@ def on_button_clicked(b): break except ValueError: print(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}") - pass \ No newline at end of file + pass + + if sync: + print(f"DEBUG: synchronously waiting for the run to finish.") + run = _wait_for_run_status(run, RunStatus.TERMINATING) + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) \ No newline at end of file From ca72575d705b6580d6a002a7dc3907956bdc773d Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:02:07 -0700 Subject: [PATCH 05/27] fix the log artifact upload bug --- scripts/train/launcher.py | 43 +++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/scripts/train/launcher.py b/scripts/train/launcher.py index 3d0ee3b47b..e86e083e9c 100755 --- a/scripts/train/launcher.py +++ b/scripts/train/launcher.py @@ -390,6 +390,29 @@ def _get_file__(format: str): process.stderr = stderr_file processes[global_rank] = process +def _logs_upload_to_mlflow(log_dirs: set[str], launcher_log: str): + import mlflow + # intialize mlflow experiment. Need to wait for processors to start the experiment + if os.environ.get("mlflow_runid") is None and os.path.exists(MLFlowLogger.CONFIG_FILE): + import json + with open(MLFlowLogger.CONFIG_FILE, "r") as f: + data = json.load(f) + log.error(f"ygong:Started mlflow run {data}") + os.environ['mlflow_runid'] = data[MLFlowLogger.RUN_ID_FIELD] + + mlflow.set_tracking_uri(data[MLFlowLogger.TRACKING_URI_FIELD]) + mlflow.start_run(run_id=data[MLFlowLogger.RUN_ID_FIELD], experiment_id=data[MLFlowLogger.EXPERIMENT_ID_FIELD]) + + # once the mlflow experiment is started, upload the logs + if os.environ.get("mlflow_runid") is not None: + try: + for log_dir in log_dirs: + log.warning(f"ygong: Logging directory: {log_dir}") + mlflow.log_artifacts(log_dir, log_dir.lstrip('/')) + mlflow.log_artifact(launcher_log) + except Exception as e: + log.error(f"ygong:Failed to log artifacts to mlflow: {e}") + def _monitor_processes( processes: Dict[int, subprocess.Popen], @@ -422,23 +445,7 @@ def _monitor_processes( break if cycle == 0: - if mlflow_runid is None: - if os.path.exists(MLFlowLogger.CONFIG_FILE): - import json - with open(MLFlowLogger.CONFIG_FILE, "r") as f: - data = json.load(f) - log.error(f"ygong:Started mlflow run {data}") - mlflow_runid = data[MLFlowLogger.RUN_ID_FIELD] - mlflow.set_tracking_uri(data[MLFlowLogger.TRACKING_URI_FIELD]) - mlflow.start_run(run_id=data[MLFlowLogger.RUN_ID_FIELD], experiment_id=data[MLFlowLogger.EXPERIMENT_ID_FIELD]) - else: - try: - for log_dir in log_dirs: - log.warning(f"ygong: Logging directory: {log_dir}") - mlflow.log_artifacts(log_dir, log_dir.lstrip('/')) - mlflow.log_artifact(launcher_log) - except Exception as e: - log.error(f"ygong:Failed to log artifacts to mlflow: {e}") + _logs_upload_to_mlflow(log_dirs, launcher_log) cycle = (cycle + 1) % log_frequency time.sleep(0.1) @@ -614,6 +621,8 @@ def main(): traceback.print_exc() print('Killing training processes') finally: + # upload the logs before exit + _logs_upload_to_mlflow(log_dirs=log_dirs, launcher_log=launcher_log) _cleanup_processes(processes) log_tmpdir.cleanup() return _aggregate_process_returncode(processes) From 9acc479f2759dd27669a3588b391e90f2bb7de01 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:32:37 -0700 Subject: [PATCH 06/27] aa --- ygong/mosaic/submit.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index e757f34330..7e3cb5953b 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -17,6 +17,7 @@ # from databricks_genai.api.config import configure_request from mcli import config from mcli.api.runs.api_get_runs import get_run +import logging def _set_up_environment(content: str): @@ -125,16 +126,21 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets. display(cancel_button) display(HTML(summary.to_html(escape=False))) -def _wait_for_run_status(run: Run, status: RunStatus): +def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): run_name = run.name - while not run.status.after(status, inclusive=True): + while not run.status.after(status, inclusive=inclusive): time.sleep(5) - run = get_run(run_name) + run = get_run(run_name) + logging.debug(f"DEBUG: run status {run.status}, expected status {status}") + logging.debug(f"DEBUG: finish waiting run reached expected status {status}") return run -def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False): +def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False): + if debug: + logging.basicConfig(level=logging.DEBUG) + _init_connection() mlflow_experiment_name = None if model == "mpt125m": @@ -154,6 +160,7 @@ def on_button_clicked(b): clear_output(wait=False) run = get_run(run_name) run.stop() + logging.debug(f"DEBUG: run {run_name} is cancelled") run = _wait_for_run_status(run, RunStatus.TERMINATING) summary = _get_run_summary(run, mlflow_experiment_name) display(HTML(summary.to_html(escape=False))) @@ -173,15 +180,19 @@ def on_button_clicked(b): try: run = get_run(run) if run.status.after(RunStatus.TERMINATING, inclusive=True): + logging.debug(f"DEBUG: run {run_name} is terminated. Status {run.status}") break summary = _get_run_summary(run, mlflow_experiment_name) _display_run_summary(summary, button) break except ValueError: - print(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}") + + logging.debug(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}") pass if sync: - print(f"DEBUG: synchronously waiting for the run to finish.") - run = _wait_for_run_status(run, RunStatus.TERMINATING) - _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) \ No newline at end of file + logging.debug(f"DEBUG: synchronously waiting for the run to finish.") + run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False) + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) + + return run \ No newline at end of file From 18bd472e83e3f9da78e1d35ea76fba28027bb05a Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:39:00 -0700 Subject: [PATCH 07/27] bb --- ygong/mosaic/submit.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 7e3cb5953b..bd3cc8337b 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -18,7 +18,8 @@ from mcli import config from mcli.api.runs.api_get_runs import get_run import logging - + +logger = logging.getLogger('ygong.mosaic.submit') def _set_up_environment(content: str): os.environ['CREDENTIALS'] = content @@ -139,7 +140,7 @@ def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False): if debug: - logging.basicConfig(level=logging.DEBUG) + logger.setLevel(logging.DEBUG) _init_connection() mlflow_experiment_name = None From 0368ccaa76adc55344d4b275bd389911500112c4 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:41:48 -0700 Subject: [PATCH 08/27] cc --- ygong/mosaic/submit.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index bd3cc8337b..379bf80eb8 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -132,8 +132,8 @@ def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): while not run.status.after(status, inclusive=inclusive): time.sleep(5) run = get_run(run_name) - logging.debug(f"DEBUG: run status {run.status}, expected status {status}") - logging.debug(f"DEBUG: finish waiting run reached expected status {status}") + logger.debug(f"DEBUG: run status {run.status}, expected status {status}") + logger.debug(f"DEBUG: finish waiting run reached expected status {status}") return run @@ -161,7 +161,7 @@ def on_button_clicked(b): clear_output(wait=False) run = get_run(run_name) run.stop() - logging.debug(f"DEBUG: run {run_name} is cancelled") + logger.debug(f"DEBUG: run {run_name} is cancelled") run = _wait_for_run_status(run, RunStatus.TERMINATING) summary = _get_run_summary(run, mlflow_experiment_name) display(HTML(summary.to_html(escape=False))) @@ -181,18 +181,18 @@ def on_button_clicked(b): try: run = get_run(run) if run.status.after(RunStatus.TERMINATING, inclusive=True): - logging.debug(f"DEBUG: run {run_name} is terminated. Status {run.status}") + logger.debug(f"DEBUG: run {run_name} is terminated. Status {run.status}") break summary = _get_run_summary(run, mlflow_experiment_name) _display_run_summary(summary, button) break except ValueError: - logging.debug(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}") + logger.debug(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}") pass if sync: - logging.debug(f"DEBUG: synchronously waiting for the run to finish.") + logger.debug(f"DEBUG: synchronously waiting for the run to finish.") run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False) _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) From 24db5fa90702764189efe6fb39e14b620c2b3c62 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:50:51 -0700 Subject: [PATCH 09/27] dd --- ygong/mosaic/submit.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 379bf80eb8..8a542475e2 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -18,8 +18,10 @@ from mcli import config from mcli.api.runs.api_get_runs import get_run import logging +import sys logger = logging.getLogger('ygong.mosaic.submit') + def _set_up_environment(content: str): os.environ['CREDENTIALS'] = content @@ -132,15 +134,24 @@ def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): while not run.status.after(status, inclusive=inclusive): time.sleep(5) run = get_run(run_name) - logger.debug(f"DEBUG: run status {run.status}, expected status {status}") - logger.debug(f"DEBUG: finish waiting run reached expected status {status}") + logger.debug(f"run status {run.status}, expected status {status}") + logger.debug(f"finish waiting run reached expected status {status}") return run def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False): if debug: + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setLevel(logging.DEBUG) # Set minimum log level for the handler + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + stdout_handler.setFormatter(formatter) + + # Add the handler to the logger + logger.addHandler(stdout_handler) logger.setLevel(logging.DEBUG) + + logger.info("set the logger to debug mode") _init_connection() mlflow_experiment_name = None @@ -161,7 +172,7 @@ def on_button_clicked(b): clear_output(wait=False) run = get_run(run_name) run.stop() - logger.debug(f"DEBUG: run {run_name} is cancelled") + logger.debug(f"run {run_name} is cancelled") run = _wait_for_run_status(run, RunStatus.TERMINATING) summary = _get_run_summary(run, mlflow_experiment_name) display(HTML(summary.to_html(escape=False))) @@ -181,18 +192,18 @@ def on_button_clicked(b): try: run = get_run(run) if run.status.after(RunStatus.TERMINATING, inclusive=True): - logger.debug(f"DEBUG: run {run_name} is terminated. Status {run.status}") + logger.debug(f"run {run_name} is terminated. Status {run.status}") break summary = _get_run_summary(run, mlflow_experiment_name) _display_run_summary(summary, button) break except ValueError: - logger.debug(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}") + logger.debug(f"waiting for the MLFLow experiment run to be ready, run status{run.status}") pass if sync: - logger.debug(f"DEBUG: synchronously waiting for the run to finish.") + logger.debug(f"synchronously waiting for the run to finish.") run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False) _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) From 4c08f3b9d7b87b149dc3d6bdf21f9393c0a388cd Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 12:13:21 -0700 Subject: [PATCH 10/27] remove packaging --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index faa27e7556..2c32f79894 100644 --- a/setup.py +++ b/setup.py @@ -85,7 +85,7 @@ 'pytest-cov>=4,<5', 'pyright==1.1.256', 'toml>=0.10.2,<0.11', - 'packaging>=21,<23', + # 'packaging>=21,<23', 'hf_transfer==0.1.3', ] From d8ba721f2d8c280f5e373d2b106bf572bfa940b0 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 15:06:43 -0700 Subject: [PATCH 11/27] temprarily avoid displaying the button so that it can be runnable by jobs --- setup.py | 2 +- ygong/mosaic/submit.py | 31 +++++++++++++++++-------------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/setup.py b/setup.py index 2c32f79894..018f067f72 100644 --- a/setup.py +++ b/setup.py @@ -85,7 +85,7 @@ 'pytest-cov>=4,<5', 'pyright==1.1.256', 'toml>=0.10.2,<0.11', - # 'packaging>=21,<23', + 'packaging>=21', 'hf_transfer==0.1.3', ] diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 8a542475e2..d7ae51b66a 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -167,23 +167,25 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, run = create_run(runConfig) run_name = run.name # Create a button - button = widgets.Button(description="cancel the run") - def on_button_clicked(b): - clear_output(wait=False) - run = get_run(run_name) - run.stop() - logger.debug(f"run {run_name} is cancelled") - run = _wait_for_run_status(run, RunStatus.TERMINATING) - summary = _get_run_summary(run, mlflow_experiment_name) - display(HTML(summary.to_html(escape=False))) - button.on_click(on_button_clicked) - _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) + # button = widgets.Button(description="cancel the run") + # def on_button_clicked(b): + # clear_output(wait=False) + # run = get_run(run_name) + # run.stop() + # logger.debug(f"run {run_name} is cancelled") + # run = _wait_for_run_status(run, RunStatus.TERMINATING) + # summary = _get_run_summary(run, mlflow_experiment_name) + # display(HTML(summary.to_html(escape=False))) + # button.on_click(on_button_clicked) + # _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet # when the run just starts running run = _wait_for_run_status(run, RunStatus.RUNNING) - _display_run_summary(_get_run_summary(run, None), button) - + # _display_run_summary(_get_run_summary(run, None), button) + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) + try_count = 0 while try_count < 10: @@ -195,7 +197,8 @@ def on_button_clicked(b): logger.debug(f"run {run_name} is terminated. Status {run.status}") break summary = _get_run_summary(run, mlflow_experiment_name) - _display_run_summary(summary, button) + # _display_run_summary(summary, button) + _display_run_summary(summary, None) break except ValueError: From 766a638e015652e57ad1355caf10b97683d3a53e Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 15:19:43 -0700 Subject: [PATCH 12/27] add more debuggin infor --- ygong/mosaic/submit.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index d7ae51b66a..4f47f6a36c 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -37,6 +37,7 @@ def _is_local(): return True if _is_local(): + logger.debug("init_connection in local mode") if os.environ.get('CREDENTIALS') is None: raise ValueError("_set_up_environment must be manually called to configure credentials for local runs") data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8')) @@ -72,6 +73,7 @@ def _is_local(): create_secret(s) else: + logger.debug("init_connection in databricks environment") wc = WorkspaceClient() import mlflow.utils.databricks_utils as databricks_utils workspace_url = databricks_utils.get_workspace_info_from_dbutils()[0] @@ -79,6 +81,7 @@ def _is_local(): token = ctx.apiToken().get() api_url = ctx.apiUrl().get() endpoint = f'{api_url}/api/2.0/genai-mapi/graphql' + logger.debug(f"init_connection token: {token}, api_url: {api_url}, endpoint: {endpoint}, workspace_url: {workspace_url}") os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}' os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint From 46734a15066ba517f55e432fa33f9f118f184414 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 16:25:57 -0700 Subject: [PATCH 13/27] cc --- ygong/mosaic/submit.py | 46 +++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 4f47f6a36c..ed59909069 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -71,27 +71,27 @@ def _is_local(): s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token) print(f"successfully created databricks secret: {databricks_secret_name}") create_secret(s) + os.environ['IS_JOBS'] = False else: logger.debug("init_connection in databricks environment") wc = WorkspaceClient() - import mlflow.utils.databricks_utils as databricks_utils - workspace_url = databricks_utils.get_workspace_info_from_dbutils()[0] ctx = wc.dbutils.entry_point.getDbutils().notebook().getContext() token = ctx.apiToken().get() api_url = ctx.apiUrl().get() endpoint = f'{api_url}/api/2.0/genai-mapi/graphql' - logger.debug(f"init_connection token: {token}, api_url: {api_url}, endpoint: {endpoint}, workspace_url: {workspace_url}") + workspace_url = api_url os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}' os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint + os.environ['IS_JOBS'] = ctx.jobId() is not None # needed to set up the MLFlow query for experiment runs os.environ['WORKSPACE_URL'] = workspace_url os.environ['MLFLOW_TRACKING_TOKEN'] = token + logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs:{ os.environ['IS_JOBS']}") + - - def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str): if tracking_uri is None: raise ValueError("tracking_uri must be provided") @@ -170,25 +170,26 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, run = create_run(runConfig) run_name = run.name # Create a button - # button = widgets.Button(description="cancel the run") - # def on_button_clicked(b): - # clear_output(wait=False) - # run = get_run(run_name) - # run.stop() - # logger.debug(f"run {run_name} is cancelled") - # run = _wait_for_run_status(run, RunStatus.TERMINATING) - # summary = _get_run_summary(run, mlflow_experiment_name) - # display(HTML(summary.to_html(escape=False))) - # button.on_click(on_button_clicked) - # _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) - _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) - + if os.environ['IS_JOBS']: + button = None + else: + button = widgets.Button(description="cancel the run") + def on_button_clicked(b): + clear_output(wait=False) + run = get_run(run_name) + run.stop() + logger.debug(f"run {run_name} is cancelled") + run = _wait_for_run_status(run, RunStatus.TERMINATING) + summary = _get_run_summary(run, mlflow_experiment_name) + display(HTML(summary.to_html(escape=False))) + button.on_click(on_button_clicked) + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) + # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet # when the run just starts running run = _wait_for_run_status(run, RunStatus.RUNNING) - # _display_run_summary(_get_run_summary(run, None), button) - _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) - + _display_run_summary(_get_run_summary(run, None), button) + try_count = 0 while try_count < 10: @@ -200,8 +201,7 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, logger.debug(f"run {run_name} is terminated. Status {run.status}") break summary = _get_run_summary(run, mlflow_experiment_name) - # _display_run_summary(summary, button) - _display_run_summary(summary, None) + _display_run_summary(summary, button) break except ValueError: From f28cfbc8b5ba9e6f22122d032e429be299d874f6 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Fri, 22 Mar 2024 16:46:39 -0700 Subject: [PATCH 14/27] dd --- ygong/mosaic/submit.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index ed59909069..9ae7b2536c 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -71,7 +71,6 @@ def _is_local(): s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token) print(f"successfully created databricks secret: {databricks_secret_name}") create_secret(s) - os.environ['IS_JOBS'] = False else: logger.debug("init_connection in databricks environment") @@ -83,12 +82,16 @@ def _is_local(): workspace_url = api_url os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}' os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint - os.environ['IS_JOBS'] = ctx.jobId() is not None + try: + jobs_id = ctx.jobId().get() + os.environ['JOB_ID'] = jobs_id + except: + pass # needed to set up the MLFlow query for experiment runs os.environ['WORKSPACE_URL'] = workspace_url os.environ['MLFLOW_TRACKING_TOKEN'] = token - logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs:{ os.environ['IS_JOBS']}") + logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs: {os.environ.get('JOB_ID')}") @@ -116,7 +119,6 @@ def _get_run_summary(run: Run, experiment_name: Optional[str] = None): url = None if run.status == RunStatus.RUNNING and experiment_name is not None: url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name) - print(f"ygong: url {url}") df = pd.DataFrame({ 'Run Name': [run.name], @@ -170,7 +172,8 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, run = create_run(runConfig) run_name = run.name # Create a button - if os.environ['IS_JOBS']: + if os.environ.get('JOB_ID') is not None: + # running in jobs workflow, no need to cancel the run and doesn't support widgets button = None else: button = widgets.Button(description="cancel the run") From 0737de7aa8dad006d2b546059aa7c6aa0c895527 Mon Sep 17 00:00:00 2001 From: Shitao Li Date: Mon, 25 Mar 2024 17:09:38 +0000 Subject: [PATCH 15/27] add support for retry and preemption --- ygong/mosaic/mpt125mConfig.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py index 322fc181e3..a213fce2d5 100644 --- a/ygong/mosaic/mpt125mConfig.py +++ b/ygong/mosaic/mpt125mConfig.py @@ -6,14 +6,24 @@ class MPT125MConfig: - def __init__(self, experimentName: str, data: str, priority: str = 'high'): + def __init__( + self, + experimentName: str, + data: str, + priority: str = 'high', + preemptible: bool = False, + retry_on_system_failure: bool = False): # TODO: validate the inputs and remove the yu.gong hardcode self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}" self.mlflow_trackingUri = "databricks" # self.mlflow_trackingUri = "databricks" if workspace_url is None else workspace_url self.data = data + + # Scheudling parameters self.priority = priority + self.preemptible = preemptible + self.retry_on_system_failure = retry_on_system_failure # the run name is pre-configured for all config-driven pretrain runs self.name = "mpt125m-config-driven-pretrain" @@ -37,7 +47,11 @@ def toRunConfig(self, scalingConfig: ScalingConfig): image='mosaicml/llm-foundry:2.2.1_cu121_flash2-latest', command="\n".join(self.commands), compute=scalingConfig.toCompute, - scheduling={'priority': self.priority}, + scheduling={ + 'priority': self.priority, + 'preemptible': self.preemptible, + 'retry_on_system_failure': self.retry_on_system_failure + }, integrations=[ { 'integration_type': 'git_repo', From 69375aba57bb3e00d5061172b131271ccebc2a6a Mon Sep 17 00:00:00 2001 From: Shitao Li Date: Mon, 25 Mar 2024 21:23:16 +0000 Subject: [PATCH 16/27] use mcli style status --- ygong/mosaic/submit.py | 120 +++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 72 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 8a542475e2..97135f9974 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -1,23 +1,21 @@ from ygong.mosaic.scaling_config import ScalingConfig from ygong.mosaic.mpt125mConfig import MPT125MConfig -from mcli import wait_for_run_status, Run, RunConfig, RunStatus, create_run -import pandas as pd -import ipywidgets as widgets + +from databricks.sdk import WorkspaceClient +from mcli import config, Run, RunStatus, create_run +from mcli.api.runs.api_get_runs import get_run +from mcli.cli.m_get.runs import RunDisplayItem from IPython.display import display, clear_output, HTML +import ipywidgets as widgets import mlflow -import os +import pandas as pd + from typing import Optional -import time import base64 +import time import json -import hashlib -from mcli.api.engine.engine import MAPIConnection -from mcli.config import MCLIConfig -from databricks.sdk import WorkspaceClient -# from databricks_genai.api.config import configure_request -from mcli import config -from mcli.api.runs.api_get_runs import get_run import logging +import os import sys logger = logging.getLogger('ygong.mosaic.submit') @@ -42,35 +40,9 @@ def _is_local(): data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8')) workspace_url = data.get("workspace_url", None) token = data.get("token", None) - mosaic_token = data.get("mosaic_token", None) # set up the mosaic token - conf = MCLIConfig.load_config() - conf.api_key = mosaic_token - conf.save_config() - MAPIConnection.reset_connection() - - - hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8] - databricks_secret_name = f"databricks-{hash}" - - # clean up the old secret. MosaicML doesn't support multiple databricks secrets - # would have to clean up the old secret if it exists - from mcli.api.secrets.api_get_secrets import get_secrets - from mcli.api.secrets.api_delete_secrets import delete_secrets - from mcli.models.mcli_secret import SecretType - s = get_secrets(secret_types=[SecretType.databricks]) - if len(s) == 1: - if s[0].name != databricks_secret_name: - delete_secrets(s) - else: - print("databricks secret already exists") - return - from mcli.objects.secrets.create.databricks import DatabricksSecretCreator - from mcli.api.secrets.api_create_secret import create_secret - s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token) - print(f"successfully created databricks secret: {databricks_secret_name}") - create_secret(s) - + os.environ[config.MCLI_MODE_ENV] = config.MCLIMode.DBX_AWS_STAGING.value + os.environ[config.MOSAICML_ACCESS_TOKEN_FILE_ENV] = "/home/shitao.li/e2_token" else: wc = WorkspaceClient() import mlflow.utils.databricks_utils as databricks_utils @@ -86,8 +58,6 @@ def _is_local(): os.environ['WORKSPACE_URL'] = workspace_url os.environ['MLFLOW_TRACKING_TOKEN'] = token - - def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str): if tracking_uri is None: @@ -108,19 +78,22 @@ def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, ru else: run_id = runs[0].info.run_id return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}" - + + def _get_run_summary(run: Run, experiment_name: Optional[str] = None): url = None - if run.status == RunStatus.RUNNING and experiment_name is not None: - url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name) - print(f"ygong: url {url}") + + run_rows = [] + + # Copy pasted from mcli to display the the resumption status of the run. + for row_raw in RunDisplayItem.from_run(run, [], True): + row = row_raw.to_dict() + if row['Status'].startswith('Running') and experiment_name is not None: + url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name) + row['Experiment Run'] =f'Link' if url is not None else "" + run_rows.append(row) - df = pd.DataFrame({ - 'Run Name': [run.name], - 'Run ID': [run.run_uid], - "Status": [str(run.status)], - 'Experiment Run': [f'Link' if url is not None else ""], - }) + df = pd.DataFrame(run_rows) return df def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.Button]): @@ -129,17 +102,6 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets. display(cancel_button) display(HTML(summary.to_html(escape=False))) -def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): - run_name = run.name - while not run.status.after(status, inclusive=inclusive): - time.sleep(5) - run = get_run(run_name) - logger.debug(f"run status {run.status}, expected status {status}") - logger.debug(f"finish waiting run reached expected status {status}") - return run - - - def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False): if debug: stdout_handler = logging.StreamHandler(sys.stdout) @@ -177,13 +139,28 @@ def on_button_clicked(b): summary = _get_run_summary(run, mlflow_experiment_name) display(HTML(summary.to_html(escape=False))) button.on_click(on_button_clicked) - _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) - # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet - # when the run just starts running + def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): + run_name = run.name + while not run.status.after(status, inclusive=inclusive) and not run.status.is_terminal(): + run = get_run(run_name) + # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet + # when the run just starts running + _display_run_summary(_get_run_summary(run, None), button) + time.sleep(5) + logger.debug(f"finish waiting run reached expected status {status}") + return run + + def _wait_for_run_finish(run: Run): + run_name = run.name + while not run.status.is_terminal(): + run = get_run(run_name) + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) + time.sleep(5) + logger.debug(f"finish waiting run reached terminal") + return run + run = _wait_for_run_status(run, RunStatus.RUNNING) - _display_run_summary(_get_run_summary(run, None), button) - try_count = 0 while try_count < 10: @@ -191,20 +168,19 @@ def on_button_clicked(b): time.sleep(20) try: run = get_run(run) - if run.status.after(RunStatus.TERMINATING, inclusive=True): - logger.debug(f"run {run_name} is terminated. Status {run.status}") + if run.status.is_terminal(): + logger.debug(f"run {run_name} is in terminal state. Status {run.status}") break summary = _get_run_summary(run, mlflow_experiment_name) _display_run_summary(summary, button) break except ValueError: - logger.debug(f"waiting for the MLFLow experiment run to be ready, run status{run.status}") pass if sync: logger.debug(f"synchronously waiting for the run to finish.") - run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False) + run = _wait_for_run_finish(run) _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) return run \ No newline at end of file From 3abd4fa57165c503bad1190948a587c309778a80 Mon Sep 17 00:00:00 2001 From: Shitao Li Date: Tue, 26 Mar 2024 23:31:19 +0000 Subject: [PATCH 17/27] revert some change --- ygong/mosaic/submit.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 62c1f002bb..9767259c81 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -41,9 +41,34 @@ def _is_local(): data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8')) workspace_url = data.get("workspace_url", None) token = data.get("token", None) + mosaic_token = data.get("mosaic_token", None) # set up the mosaic token - os.environ[config.MCLI_MODE_ENV] = config.MCLIMode.DBX_AWS_STAGING.value - os.environ[config.MOSAICML_ACCESS_TOKEN_FILE_ENV] = "/home/shitao.li/e2_token" + conf = MCLIConfig.load_config() + conf.api_key = mosaic_token + conf.save_config() + MAPIConnection.reset_connection() + + + hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8] + databricks_secret_name = f"databricks-{hash}" + + # clean up the old secret. MosaicML doesn't support multiple databricks secrets + # would have to clean up the old secret if it exists + from mcli.api.secrets.api_get_secrets import get_secrets + from mcli.api.secrets.api_delete_secrets import delete_secrets + from mcli.models.mcli_secret import SecretType + s = get_secrets(secret_types=[SecretType.databricks]) + if len(s) == 1: + if s[0].name != databricks_secret_name: + delete_secrets(s) + else: + print("databricks secret already exists") + return + from mcli.objects.secrets.create.databricks import DatabricksSecretCreator + from mcli.api.secrets.api_create_secret import create_secret + s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token) + print(f"successfully created databricks secret: {databricks_secret_name}") + create_secret(s) else: logger.debug("init_connection in databricks environment") wc = WorkspaceClient() From 1359e58f319587a725a7f49217c56bebd246c2b4 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Wed, 27 Mar 2024 10:39:21 -0700 Subject: [PATCH 18/27] make it works in job workflow --- setup.py | 2 +- ygong/mosaic/submit.py | 80 +++++++++++++++++++++++++++++------------- 2 files changed, 57 insertions(+), 25 deletions(-) diff --git a/setup.py b/setup.py index faa27e7556..018f067f72 100644 --- a/setup.py +++ b/setup.py @@ -85,7 +85,7 @@ 'pytest-cov>=4,<5', 'pyright==1.1.256', 'toml>=0.10.2,<0.11', - 'packaging>=21,<23', + 'packaging>=21', 'hf_transfer==0.1.3', ] diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index e757f34330..9ae7b2536c 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -17,7 +17,11 @@ # from databricks_genai.api.config import configure_request from mcli import config from mcli.api.runs.api_get_runs import get_run - +import logging +import sys + +logger = logging.getLogger('ygong.mosaic.submit') + def _set_up_environment(content: str): os.environ['CREDENTIALS'] = content @@ -33,6 +37,7 @@ def _is_local(): return True if _is_local(): + logger.debug("init_connection in local mode") if os.environ.get('CREDENTIALS') is None: raise ValueError("_set_up_environment must be manually called to configure credentials for local runs") data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8')) @@ -68,23 +73,28 @@ def _is_local(): create_secret(s) else: + logger.debug("init_connection in databricks environment") wc = WorkspaceClient() - import mlflow.utils.databricks_utils as databricks_utils - workspace_url = databricks_utils.get_workspace_info_from_dbutils()[0] ctx = wc.dbutils.entry_point.getDbutils().notebook().getContext() token = ctx.apiToken().get() api_url = ctx.apiUrl().get() endpoint = f'{api_url}/api/2.0/genai-mapi/graphql' + workspace_url = api_url os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}' os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint + try: + jobs_id = ctx.jobId().get() + os.environ['JOB_ID'] = jobs_id + except: + pass # needed to set up the MLFlow query for experiment runs os.environ['WORKSPACE_URL'] = workspace_url os.environ['MLFLOW_TRACKING_TOKEN'] = token + logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs: {os.environ.get('JOB_ID')}") + - - def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str): if tracking_uri is None: raise ValueError("tracking_uri must be provided") @@ -109,7 +119,6 @@ def _get_run_summary(run: Run, experiment_name: Optional[str] = None): url = None if run.status == RunStatus.RUNNING and experiment_name is not None: url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name) - print(f"ygong: url {url}") df = pd.DataFrame({ 'Run Name': [run.name], @@ -125,16 +134,30 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets. display(cancel_button) display(HTML(summary.to_html(escape=False))) -def _wait_for_run_status(run: Run, status: RunStatus): +def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): run_name = run.name - while not run.status.after(status, inclusive=True): + while not run.status.after(status, inclusive=inclusive): time.sleep(5) - run = get_run(run_name) + run = get_run(run_name) + logger.debug(f"run status {run.status}, expected status {status}") + logger.debug(f"finish waiting run reached expected status {status}") return run -def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False): +def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False): + if debug: + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setLevel(logging.DEBUG) # Set minimum log level for the handler + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + stdout_handler.setFormatter(formatter) + + # Add the handler to the logger + logger.addHandler(stdout_handler) + logger.setLevel(logging.DEBUG) + + logger.info("set the logger to debug mode") + _init_connection() mlflow_experiment_name = None if model == "mpt125m": @@ -149,17 +172,22 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False) run = create_run(runConfig) run_name = run.name # Create a button - button = widgets.Button(description="cancel the run") - def on_button_clicked(b): - clear_output(wait=False) - run = get_run(run_name) - run.stop() - run = _wait_for_run_status(run, RunStatus.TERMINATING) - summary = _get_run_summary(run, mlflow_experiment_name) - display(HTML(summary.to_html(escape=False))) - button.on_click(on_button_clicked) + if os.environ.get('JOB_ID') is not None: + # running in jobs workflow, no need to cancel the run and doesn't support widgets + button = None + else: + button = widgets.Button(description="cancel the run") + def on_button_clicked(b): + clear_output(wait=False) + run = get_run(run_name) + run.stop() + logger.debug(f"run {run_name} is cancelled") + run = _wait_for_run_status(run, RunStatus.TERMINATING) + summary = _get_run_summary(run, mlflow_experiment_name) + display(HTML(summary.to_html(escape=False))) + button.on_click(on_button_clicked) _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) - + # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet # when the run just starts running run = _wait_for_run_status(run, RunStatus.RUNNING) @@ -173,15 +201,19 @@ def on_button_clicked(b): try: run = get_run(run) if run.status.after(RunStatus.TERMINATING, inclusive=True): + logger.debug(f"run {run_name} is terminated. Status {run.status}") break summary = _get_run_summary(run, mlflow_experiment_name) _display_run_summary(summary, button) break except ValueError: - print(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}") + + logger.debug(f"waiting for the MLFLow experiment run to be ready, run status{run.status}") pass if sync: - print(f"DEBUG: synchronously waiting for the run to finish.") - run = _wait_for_run_status(run, RunStatus.TERMINATING) - _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) \ No newline at end of file + logger.debug(f"synchronously waiting for the run to finish.") + run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False) + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) + + return run \ No newline at end of file From 4cd30cdf1632be7c07e9373cc0ec34536b2da732 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Wed, 27 Mar 2024 10:52:39 -0700 Subject: [PATCH 19/27] port shitao's change https://github.com/ygong1/llm-foundry/pull/1/files --- ygong/mosaic/mpt125mConfig.py | 18 ++++++- ygong/mosaic/submit.py | 97 ++++++++++++++++++----------------- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py index 322fc181e3..a213fce2d5 100644 --- a/ygong/mosaic/mpt125mConfig.py +++ b/ygong/mosaic/mpt125mConfig.py @@ -6,14 +6,24 @@ class MPT125MConfig: - def __init__(self, experimentName: str, data: str, priority: str = 'high'): + def __init__( + self, + experimentName: str, + data: str, + priority: str = 'high', + preemptible: bool = False, + retry_on_system_failure: bool = False): # TODO: validate the inputs and remove the yu.gong hardcode self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}" self.mlflow_trackingUri = "databricks" # self.mlflow_trackingUri = "databricks" if workspace_url is None else workspace_url self.data = data + + # Scheudling parameters self.priority = priority + self.preemptible = preemptible + self.retry_on_system_failure = retry_on_system_failure # the run name is pre-configured for all config-driven pretrain runs self.name = "mpt125m-config-driven-pretrain" @@ -37,7 +47,11 @@ def toRunConfig(self, scalingConfig: ScalingConfig): image='mosaicml/llm-foundry:2.2.1_cu121_flash2-latest', command="\n".join(self.commands), compute=scalingConfig.toCompute, - scheduling={'priority': self.priority}, + scheduling={ + 'priority': self.priority, + 'preemptible': self.preemptible, + 'retry_on_system_failure': self.retry_on_system_failure + }, integrations=[ { 'integration_type': 'git_repo', diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 9ae7b2536c..9767259c81 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -1,23 +1,21 @@ from ygong.mosaic.scaling_config import ScalingConfig from ygong.mosaic.mpt125mConfig import MPT125MConfig -from mcli import wait_for_run_status, Run, RunConfig, RunStatus, create_run -import pandas as pd -import ipywidgets as widgets + +from databricks.sdk import WorkspaceClient +from mcli import config, Run, RunStatus, create_run +from mcli.api.runs.api_get_runs import get_run +from mcli.cli.m_get.runs import RunDisplayItem from IPython.display import display, clear_output, HTML +import ipywidgets as widgets import mlflow -import os +import pandas as pd + from typing import Optional -import time import base64 +import time import json -import hashlib -from mcli.api.engine.engine import MAPIConnection -from mcli.config import MCLIConfig -from databricks.sdk import WorkspaceClient -# from databricks_genai.api.config import configure_request -from mcli import config -from mcli.api.runs.api_get_runs import get_run import logging +import os import sys logger = logging.getLogger('ygong.mosaic.submit') @@ -50,10 +48,10 @@ def _is_local(): conf.save_config() MAPIConnection.reset_connection() - + hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8] databricks_secret_name = f"databricks-{hash}" - + # clean up the old secret. MosaicML doesn't support multiple databricks secrets # would have to clean up the old secret if it exists from mcli.api.secrets.api_get_secrets import get_secrets @@ -71,7 +69,6 @@ def _is_local(): s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token) print(f"successfully created databricks secret: {databricks_secret_name}") create_secret(s) - else: logger.debug("init_connection in databricks environment") wc = WorkspaceClient() @@ -93,8 +90,7 @@ def _is_local(): os.environ['MLFLOW_TRACKING_TOKEN'] = token logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs: {os.environ.get('JOB_ID')}") - - + def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str): if tracking_uri is None: raise ValueError("tracking_uri must be provided") @@ -114,18 +110,22 @@ def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, ru else: run_id = runs[0].info.run_id return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}" - + + def _get_run_summary(run: Run, experiment_name: Optional[str] = None): url = None - if run.status == RunStatus.RUNNING and experiment_name is not None: - url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name) + + run_rows = [] + + # Copy pasted from mcli to display the the resumption status of the run. + for row_raw in RunDisplayItem.from_run(run, [], True): + row = row_raw.to_dict() + if row['Status'].startswith('Running') and experiment_name is not None: + url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name) + row['Experiment Run'] =f'Link' if url is not None else "" + run_rows.append(row) - df = pd.DataFrame({ - 'Run Name': [run.name], - 'Run ID': [run.run_uid], - "Status": [str(run.status)], - 'Experiment Run': [f'Link' if url is not None else ""], - }) + df = pd.DataFrame(run_rows) return df def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.Button]): @@ -134,17 +134,6 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets. display(cancel_button) display(HTML(summary.to_html(escape=False))) -def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): - run_name = run.name - while not run.status.after(status, inclusive=inclusive): - time.sleep(5) - run = get_run(run_name) - logger.debug(f"run status {run.status}, expected status {status}") - logger.debug(f"finish waiting run reached expected status {status}") - return run - - - def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False): if debug: stdout_handler = logging.StreamHandler(sys.stdout) @@ -168,7 +157,6 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, else: raise ValueError(f"model {model} is not supported") - run = create_run(runConfig) run_name = run.name # Create a button @@ -187,12 +175,28 @@ def on_button_clicked(b): display(HTML(summary.to_html(escape=False))) button.on_click(on_button_clicked) _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) - - # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet - # when the run just starts running + + def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): + run_name = run.name + while not run.status.after(status, inclusive=inclusive) and not run.status.is_terminal(): + run = get_run(run_name) + # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet + # when the run just starts running + _display_run_summary(_get_run_summary(run, None), button) + time.sleep(5) + logger.debug(f"finish waiting run reached expected status {status}") + return run + + def _wait_for_run_finish(run: Run): + run_name = run.name + while not run.status.is_terminal(): + run = get_run(run_name) + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) + time.sleep(5) + logger.debug(f"finish waiting run reached terminal") + return run + run = _wait_for_run_status(run, RunStatus.RUNNING) - _display_run_summary(_get_run_summary(run, None), button) - try_count = 0 while try_count < 10: @@ -200,20 +204,19 @@ def on_button_clicked(b): time.sleep(20) try: run = get_run(run) - if run.status.after(RunStatus.TERMINATING, inclusive=True): - logger.debug(f"run {run_name} is terminated. Status {run.status}") + if run.status.is_terminal(): + logger.debug(f"run {run_name} is in terminal state. Status {run.status}") break summary = _get_run_summary(run, mlflow_experiment_name) _display_run_summary(summary, button) break except ValueError: - logger.debug(f"waiting for the MLFLow experiment run to be ready, run status{run.status}") pass if sync: logger.debug(f"synchronously waiting for the run to finish.") - run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False) + run = _wait_for_run_finish(run) _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) return run \ No newline at end of file From 461cb232da8323d6b6c1da256dbdb81b68a613a5 Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Wed, 27 Mar 2024 10:49:17 -0700 Subject: [PATCH 20/27] chagne --- .gitignore | 2 ++ ygong/mosaic/submit.py | 45 +++++++++++++++++------------------------- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/.gitignore b/.gitignore index 72f965ce63..5da48d2cc0 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,5 @@ notebooks/ **/mlruns/* **/tokenizer-save-dir-*/** **/.downloaded_finetuning/ + +.databricks diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 9767259c81..dcf3cf5c40 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -17,6 +17,8 @@ import logging import os import sys +from mcli import RunConfig +import hashlib logger = logging.getLogger('ygong.mosaic.submit') @@ -134,7 +136,16 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets. display(cancel_button) display(HTML(summary.to_html(escape=False))) -def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False): +def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): + run_name = run.name + while not run.status.after(status, inclusive=inclusive): + time.sleep(5) + run = get_run(run_name) + logger.debug(f"run status {run.status}, expected status {status}") + logger.debug(f"finish waiting run reached expected status {status}") + return run + +def submit(config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False): if debug: stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.DEBUG) # Set minimum log level for the handler @@ -149,13 +160,14 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, _init_connection() mlflow_experiment_name = None - if model == "mpt125m": - if not isinstance(config, MPT125MConfig): - raise ValueError("config must be an instance of MPT125MConfig") + if isinstance(config, MPT125MConfig): mlflow_experiment_name = config.mlflow_experimentName runConfig = config.toRunConfig(scalingConfig) + elif isinstance(config, RunConfig): + runConfig = config + mlflow_experiment_name = runConfig.name else: - raise ValueError(f"model {model} is not supported") + raise ValueError(f"config type {type(config)} is not supported") run = create_run(runConfig) run_name = run.name @@ -175,27 +187,6 @@ def on_button_clicked(b): display(HTML(summary.to_html(escape=False))) button.on_click(on_button_clicked) _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) - - def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True): - run_name = run.name - while not run.status.after(status, inclusive=inclusive) and not run.status.is_terminal(): - run = get_run(run_name) - # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet - # when the run just starts running - _display_run_summary(_get_run_summary(run, None), button) - time.sleep(5) - logger.debug(f"finish waiting run reached expected status {status}") - return run - - def _wait_for_run_finish(run: Run): - run_name = run.name - while not run.status.is_terminal(): - run = get_run(run_name) - _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) - time.sleep(5) - logger.debug(f"finish waiting run reached terminal") - return run - run = _wait_for_run_status(run, RunStatus.RUNNING) try_count = 0 @@ -216,7 +207,7 @@ def _wait_for_run_finish(run: Run): if sync: logger.debug(f"synchronously waiting for the run to finish.") - run = _wait_for_run_finish(run) + run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False) _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) return run \ No newline at end of file From a090962d7631506d54af79ec43fdd2f34a16f20d Mon Sep 17 00:00:00 2001 From: Yu Gong <129110170+ygong1@users.noreply.github.com> Date: Wed, 27 Mar 2024 15:53:05 -0700 Subject: [PATCH 21/27] aa --- ygong/mosaic/submit.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index dcf3cf5c40..76bf633408 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -19,16 +19,19 @@ import sys from mcli import RunConfig import hashlib +from mcli.config import MCLIConfig +from mcli.api.engine.engine import MAPIConnection logger = logging.getLogger('ygong.mosaic.submit') - def _set_up_environment(content: str): os.environ['CREDENTIALS'] = content def _init_connection(): def _is_local(): + if os.environ.get('CREDENTIALS') is not None: + return True try: wc = WorkspaceClient() wc.dbutils.entry_point.getDbutils().notebook().getContext() @@ -157,7 +160,8 @@ def submit(config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: logger.setLevel(logging.DEBUG) logger.info("set the logger to debug mode") - + + # MTC + AWS Dogfood _init_connection() mlflow_experiment_name = None if isinstance(config, MPT125MConfig): @@ -178,6 +182,7 @@ def submit(config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: else: button = widgets.Button(description="cancel the run") def on_button_clicked(b): + logger.debug(f"cancel button clicked") clear_output(wait=False) run = get_run(run_name) run.stop() From b245134c5ecaf5067e12278c79d691ad3b8ffafc Mon Sep 17 00:00:00 2001 From: Shitao Li Date: Wed, 27 Mar 2024 23:20:57 +0000 Subject: [PATCH 22/27] init --- ygong/mosaic/__init__.py | 4 +- ygong/mosaic/mpt125mConfig.py | 80 ++++++++++++++++++++++++++++++++--- ygong/mosaic/submit.py | 29 +------------ 3 files changed, 78 insertions(+), 35 deletions(-) diff --git a/ygong/mosaic/__init__.py b/ygong/mosaic/__init__.py index c5bd21dde7..1142627308 100644 --- a/ygong/mosaic/__init__.py +++ b/ygong/mosaic/__init__.py @@ -1,6 +1,6 @@ from .submit import submit from .submit import _set_up_environment from .scaling_config import ScalingConfig -from .mpt125mConfig import MPT125MConfig +from .mpt125mConfig import MPT125MConfig, WSFSIntegration -__all__ = ['submit', 'ScalingConfig', "MPT125MConfig", "_set_up_environment"] \ No newline at end of file +__all__ = ['submit', 'ScalingConfig', "MPT125MConfig", "WSFSIntegration", "_set_up_environment"] \ No newline at end of file diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py index a213fce2d5..8c4cf01929 100644 --- a/ygong/mosaic/mpt125mConfig.py +++ b/ygong/mosaic/mpt125mConfig.py @@ -1,9 +1,47 @@ from mcli import RunConfig from ygong.mosaic.scaling_config import ScalingConfig -from typing import Optional +from typing import Dict, List, Optional import os +import shlex # import databricks_genai.api.config as cfg +class WSFSIntegration: + def __init__( + self, + wsfs_path: str, + entry_point: Optional[str] = None, + args: Optional[List[str]] = None): + """ + Class to represent the integration with Databricks WSFS. + + :params: wsfs_path: str Absolute path + :params: entry_point: str Required if the wsfs_path is a directory + """ + self.wsfs_path = wsfs_path + self.entry_point = entry_point + self.args = args + + def get_entry_command(self): + entry_file_path = "" + if self.entry_point is not None: + if self.entry_point.startswith("/Workspace"): + entry_file_path = self.entry_point + else: + entry_file_path = os.path.join(self.wsfs_path, self.entry_point) + else: + entry_file_path = self.wsfs_path + if self.args is None: + return f"python3 {shlex.quote(entry_file_path)}" + return f"python3 {shlex.quote(entry_file_path)} {' '.join(self.args)}" + + def toDict(self): + return { + "integration_type": "wsfs", + "wsfs_path": self.wsfs_path, + "entrypoint": self.entry_point, + "args": self.args, + } + class MPT125MConfig: def __init__( @@ -12,7 +50,8 @@ def __init__( data: str, priority: str = 'high', preemptible: bool = False, - retry_on_system_failure: bool = False): + retry_on_system_failure: bool = False, + wsfs_integration: Optional[WSFSIntegration] = None): # TODO: validate the inputs and remove the yu.gong hardcode self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}" self.mlflow_trackingUri = "databricks" @@ -35,10 +74,39 @@ def __init__( self.global_seed = 17 self.data_remote = self.data self.data_local = "" - self.commands = [ - "cd llm-foundry/scripts", - "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train eval_loader.dataset.split=val" - ] + self.commands = [] + if wsfs_integration is not None: + # The first group of commands are to download the object(file or directory) from + # databricks WSFS using PAT token and url. + # The second command try to unzip if the object from WSFS is directory. + # TODO: Read the token and host name from env vars or /mnt/jwt-secret/.databrickscfg + self.commands = [ + f""" + DATABRICKS_HOST="https://oregon.staging.cloud.databricks.com" + DATABRICKS_TOKEN="dapid5af61ff89674be90c3e86ae9fc10c2e" + WSFS_PATH="{wsfs_integration.wsfs_path}" + DIR_NAME=$(dirname "$WSFS_PATH") + ENCODED_WSFS_PATH=$(python3 -c "import urllib.parse; print(urllib.parse.quote('$WSFS_PATH'))") + mkdir -p "$DIR_NAME" + curl -X GET -o "$WSFS_PATH" "${{DATABRICKS_HOST}}/api/2.0/workspace/export?path=${{ENCODED_WSFS_PATH}}&direct_download=true" \ + -H "Authorization: Bearer $DATABRICKS_TOKEN" + + if file "$WSFS_PATH" | grep -q "Zip archive data"; then + mv "$WSFS_PATH" "${{WSFS_PATH}}.zip" + apt update && apt install unzip + unzip -d "$DIR_NAME" "${{WSFS_PATH}}.zip" + rm -f "${{WSFS_PATH}}.zip" + else + echo "$WSFS_PATH is not a ZIP file." + fi + """ + ] + self.commands.append(wsfs_integration.get_entry_command()) + else: + self.commands = [ + "cd llm-foundry/scripts", + "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train eval_loader.dataset.split=val" + ] def toRunConfig(self, scalingConfig: ScalingConfig): diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 9767259c81..62c1f002bb 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -41,34 +41,9 @@ def _is_local(): data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8')) workspace_url = data.get("workspace_url", None) token = data.get("token", None) - mosaic_token = data.get("mosaic_token", None) # set up the mosaic token - conf = MCLIConfig.load_config() - conf.api_key = mosaic_token - conf.save_config() - MAPIConnection.reset_connection() - - - hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8] - databricks_secret_name = f"databricks-{hash}" - - # clean up the old secret. MosaicML doesn't support multiple databricks secrets - # would have to clean up the old secret if it exists - from mcli.api.secrets.api_get_secrets import get_secrets - from mcli.api.secrets.api_delete_secrets import delete_secrets - from mcli.models.mcli_secret import SecretType - s = get_secrets(secret_types=[SecretType.databricks]) - if len(s) == 1: - if s[0].name != databricks_secret_name: - delete_secrets(s) - else: - print("databricks secret already exists") - return - from mcli.objects.secrets.create.databricks import DatabricksSecretCreator - from mcli.api.secrets.api_create_secret import create_secret - s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token) - print(f"successfully created databricks secret: {databricks_secret_name}") - create_secret(s) + os.environ[config.MCLI_MODE_ENV] = config.MCLIMode.DBX_AWS_STAGING.value + os.environ[config.MOSAICML_ACCESS_TOKEN_FILE_ENV] = "/home/shitao.li/e2_token" else: logger.debug("init_connection in databricks environment") wc = WorkspaceClient() From aee64cc61b96655111d6306f686b9961d8b51c64 Mon Sep 17 00:00:00 2001 From: Shitao Li Date: Thu, 28 Mar 2024 06:36:53 +0000 Subject: [PATCH 23/27] tmp --- llmfoundry/composerpatch/MLFlowLogger.py | 4 +- ygong/mosaic/submit.py | 47 ++++++++++++++---------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/llmfoundry/composerpatch/MLFlowLogger.py b/llmfoundry/composerpatch/MLFlowLogger.py index e9f73d7767..32635c6f7d 100644 --- a/llmfoundry/composerpatch/MLFlowLogger.py +++ b/llmfoundry/composerpatch/MLFlowLogger.py @@ -1,11 +1,9 @@ from composer.loggers import MLFlowLogger as ComposerMLFlowLogger -from composer.utils import MissingConditionalImportError, dist +from composer.utils import dist import json import os from composer.core.state import State from composer.loggers.logger import Logger -from composer.loggers.logger_destination import LoggerDestination -from composer.utils import MissingConditionalImportError, dist diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py index 7d418ca795..c1a25245a6 100644 --- a/ygong/mosaic/submit.py +++ b/ygong/mosaic/submit.py @@ -72,24 +72,24 @@ def _is_local(): def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str): - if tracking_uri is None: - raise ValueError("tracking_uri must be provided") - mlflow.set_tracking_uri(tracking_uri) - tracking_uri = tracking_uri.rstrip("/") - experiment = mlflow.get_experiment_by_name(name=experiment_name) - if experiment is None: - raise ValueError(f"experiment {experiment_name} does not exist") - experiment_id = experiment.experiment_id - runs = mlflow.search_runs(experiment_ids=[experiment_id], - filter_string=f'tags.composer_run_name = "{run_name}"', - output_format='list') - if len(runs) == 0: - raise ValueError(f"run {run_name} does not exist in experiment {experiment_name}") - elif len(runs) > 1: - raise ValueError(f"multiple runs {run_name} exist in experiment {experiment_name}") - else: - run_id = runs[0].info.run_id - return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}" + if tracking_uri is None: + raise ValueError("tracking_uri must be provided") + mlflow.set_tracking_uri(tracking_uri) + tracking_uri = tracking_uri.rstrip("/") + experiment = mlflow.get_experiment_by_name(name=experiment_name) + if experiment is None: + raise ValueError(f"experiment {experiment_name} does not exist") + experiment_id = experiment.experiment_id + runs = mlflow.search_runs(experiment_ids=[experiment_id], + filter_string=f'tags.composer_run_name = "{run_name}"', + output_format='list') + if len(runs) == 0: + return None + elif len(runs) > 1: + raise ValueError(f"multiple runs {run_name} exist in experiment {experiment_name}") + else: + run_id = runs[0].info.run_id + return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}" def _get_run_summary(run: Run, experiment_name: Optional[str] = None): @@ -169,6 +169,15 @@ def on_button_clicked(b): _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) run = _wait_for_run_status(run, RunStatus.RUNNING) + def _wait_for_run_finish(run: Run): + run_name = run.name + while not run.status.is_terminal(): + run = get_run(run_name) + _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button) + time.sleep(5) + logger.debug(f"finish waiting run reached terminal") + return run + try_count = 0 while try_count < 10: try_count += 1 @@ -187,7 +196,7 @@ def on_button_clicked(b): if sync: logger.debug(f"synchronously waiting for the run to finish.") - run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False) + run = _wait_for_run_finish(run) _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None) return run \ No newline at end of file From 65a7f306326ce914597d7c13b4b7fbded1183c32 Mon Sep 17 00:00:00 2001 From: Shitao Li Date: Thu, 28 Mar 2024 06:58:02 +0000 Subject: [PATCH 24/27] fix --- ygong/mosaic/mpt125mConfig.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py index 8c4cf01929..8bdbc88182 100644 --- a/ygong/mosaic/mpt125mConfig.py +++ b/ygong/mosaic/mpt125mConfig.py @@ -123,8 +123,8 @@ def toRunConfig(self, scalingConfig: ScalingConfig): integrations=[ { 'integration_type': 'git_repo', - 'git_repo': 'ygong1/llm-foundry', - 'git_branch': 'prototype', + 'git_repo': 'shitaoli-db/llm-foundry', + 'git_branch': 'prototype-shitao', 'pip_install': '-e .[gpu]', 'ssh_clone': False }, From d0bc1214eadf5470260f580633d3f02802632837 Mon Sep 17 00:00:00 2001 From: Shitao Li Date: Thu, 28 Mar 2024 06:58:19 +0000 Subject: [PATCH 25/27] fix --- ygong/mosaic/mpt125mConfig.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py index 8bdbc88182..8a23a0053e 100644 --- a/ygong/mosaic/mpt125mConfig.py +++ b/ygong/mosaic/mpt125mConfig.py @@ -124,7 +124,7 @@ def toRunConfig(self, scalingConfig: ScalingConfig): { 'integration_type': 'git_repo', 'git_repo': 'shitaoli-db/llm-foundry', - 'git_branch': 'prototype-shitao', + 'git_branch': 'shitao.li@databricks.com/prototype-shitao', 'pip_install': '-e .[gpu]', 'ssh_clone': False }, From 6c399151a15b1a726b40066a85d39603f2b2595d Mon Sep 17 00:00:00 2001 From: Shitao Li Date: Thu, 28 Mar 2024 07:04:59 +0000 Subject: [PATCH 26/27] fix --- setup.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/setup.py b/setup.py index 571b921b8b..b49b80d6ef 100644 --- a/setup.py +++ b/setup.py @@ -66,22 +66,12 @@ 'mosaicml-cli>=0.6.10,<1', 'onnx==1.14.0', 'onnxruntime==1.15.1', -<<<<<<< HEAD - 'cmake>=3.25.0,<=3.26.3', # required for triton-pre-mlir below - # PyPI does not support direct dependencies, so we remove this line before uploading from PyPI - # 'triton-pre-mlir@git+https://github.com/vchiley/triton.git@triton_pre_mlir_sm90#subdirectory=python', -======= ->>>>>>> 28467bbad5e8e9e2fb070ddc011367310b5721e7 'boto3>=1.21.45,<2', 'huggingface-hub>=0.17.0,<1.0', 'beautifulsoup4>=4.12.2,<5', # required for model download utils 'tenacity>=8.2.3,<9', -<<<<<<< HEAD - 'ipywidgets', -======= 'catalogue>=2,<3', 'typer[all]<1', ->>>>>>> 28467bbad5e8e9e2fb070ddc011367310b5721e7 ] extra_deps = {} From eea721800e3cb17234a3280f8c1cb9f1227adc8a Mon Sep 17 00:00:00 2001 From: Shitao Li Date: Thu, 28 Mar 2024 07:39:26 +0000 Subject: [PATCH 27/27] fix triton --- scripts/train/train.py | 3 --- ygong/mosaic/mpt125mConfig.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/scripts/train/train.py b/scripts/train/train.py index 4462fd8c4f..dabf5e4a22 100644 --- a/scripts/train/train.py +++ b/scripts/train/train.py @@ -12,9 +12,6 @@ import torch from composer import Trainer from composer.core.callback import Callback -from composer.loggers import MosaicMLLogger, MLFlowLogger -from composer.loggers.mosaicml_logger import (MOSAICML_ACCESS_TOKEN_ENV_VAR, - MOSAICML_PLATFORM_ENV_VAR) from composer.metrics.nlp import InContextLearningMetric from composer.profiler import (JSONTraceHandler, Profiler, TraceHandler, cyclic_schedule) diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py index 8a23a0053e..b2d5c54efd 100644 --- a/ygong/mosaic/mpt125mConfig.py +++ b/ygong/mosaic/mpt125mConfig.py @@ -154,7 +154,7 @@ def parameters(self): "max_seq_len": self.max_seq_len, "vocab_size": 50368, "attn_config": { - "attn_impl": "triton" + "attn_impl": "flash" } }, "tokenizer": {