From d23e5d706c91bff122a83eaa078fcfb802b52a86 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Thu, 14 Mar 2024 12:45:18 -0700
Subject: [PATCH 01/27] copy launcher.py from composer repo
---
scripts/train/launcher.py | 584 ++++++++++++++++++++++++++++++++++++++
1 file changed, 584 insertions(+)
create mode 100755 scripts/train/launcher.py
diff --git a/scripts/train/launcher.py b/scripts/train/launcher.py
new file mode 100755
index 0000000000..e20626265f
--- /dev/null
+++ b/scripts/train/launcher.py
@@ -0,0 +1,584 @@
+#!/usr/bin/env python3
+# Copyright 2022 MosaicML Composer authors
+# SPDX-License-Identifier: Apache-2.0
+
+"""The Composer CLI launcher for distributed training."""
+
+import contextlib
+import datetime
+import logging
+import os
+import signal
+import subprocess
+import sys
+import tempfile
+import time
+import traceback
+from argparse import ArgumentParser
+from typing import Any, Dict, List, Union
+
+import psutil
+import torch
+
+import composer
+from composer.loggers.mosaicml_logger import (
+ MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR,
+ MOSAICML_LOG_DIR_ENV_VAR,
+ MOSAICML_PLATFORM_ENV_VAR,
+)
+from composer.utils import get_free_tcp_port
+
+CLEANUP_TIMEOUT = datetime.timedelta(seconds=30)
+
+log = logging.getLogger(__name__)
+
+
+def _get_parser():
+ parser = ArgumentParser(description='Utility for launching distributed machine learning jobs.')
+
+ parser.add_argument('--version', action='version', version=f'MosaicML Composer {composer.__version__}')
+
+ required_args = parser.add_argument_group('required arguments')
+
+ parser.add_argument(
+ '-n',
+ '--nproc',
+ type=int,
+ help=(
+ 'The number of processes to launch on this node. Overrides env var `LOCAL_WORLD_SIZE` if specified; '
+ 'otherwise, defaults to `max(1, torch.cuda.device_count())`.'
+ ),
+ )
+
+ parser.add_argument(
+ '--stdout',
+ type=str,
+ default=None,
+ help=(
+ 'Format string for a filename to dump the STDOUT from the non-local-rank-zero processes. '
+ 'The local rank zero process will be piped through to STDOUT. The available format variables are: '
+ "'{rank}', '{local_rank}', '{world_size}', '{node_rank}', and '{local_world_size}'. If specified, "
+ "it is recommended to include '{rank}' or '{local_rank}' in the filename so each rank will write to its "
+ 'own file. By default, the STDOUT of the non-local-rank-zero processes is discarded; instead, use the '
+ 'FileLogger within Composer. This logger captures and saves the STDOUT of each process.'
+ ),
+ )
+ parser.add_argument(
+ '--stderr',
+ type=str,
+ default=None,
+ help=(
+ 'Format string for a filename to dump the STDERR from the non-local-rank-zero processes. '
+ 'The local rank zero process will be piped through to STDERR. The available format variables are: '
+ "'{rank}', '{local_rank}', '{world_size}', '{node_rank}', and '{local_world_size}'. If specified, "
+ "it is recommended to include '{rank}' or '{local_rank}' in the filename so each rank will write to its "
+ 'own file. By default, the STDERR of the non-local-rank-zero processes is discarded; instead, use the '
+ 'FileLogger within Composer. This logger captures and saves the STDERR of each process.'
+ ),
+ )
+ parser.add_argument('-v', '--verbose', action='store_true', help='If set, print verbose messages')
+ parser.add_argument(
+ '-m',
+ '--module_mode',
+ action='store_true',
+ help=(
+ 'If set, run the training script as a module instead of as a script. '
+ 'Cannot be used in conjunction with `command_mode`'
+ ),
+ )
+ parser.add_argument(
+ '-c',
+ '--command_mode',
+ action='store_true',
+ help=(
+ 'If set, run the training script as a command (i.e. without `python`). '
+ 'Cannot be used in conjunction with `module_mode`.'
+ ),
+ )
+
+ multinode_args = parser.add_argument_group(
+ 'multi-node arguments',
+ description=(
+ 'These arguments generally only need to be set when training in a multi-node '
+ 'environment, i.e. when the world_size is bigger than nproc.'
+ ),
+ )
+ multinode_args.add_argument(
+ '--world_size',
+ type=int,
+ help=(
+ 'The total number of processes to launch across all nodes. '
+ 'Setting this to a value greater than nproc indicates a multi-node '
+ 'environment. Overrides env var WORLD_SIZE. Defaults to nproc.'
+ ),
+ )
+ multinode_args.add_argument(
+ '--base_rank',
+ type=int,
+ help=(
+ 'The rank of the lowest ranked process to launch on this node. '
+ 'Specifying a base_rank B and an nproc N will spawn processes with '
+ 'global ranks [B, B+1, ... B+N-1]. In a multi-node environment, '
+ 'at least one of base_rank and node_rank must be specified. '
+ 'If only one of base_rank and node_rank are provided, it is assumed '
+ 'that all nodes have the same amount of processes, and that the two '
+ 'values are related as node_rank * nproc = base_rank. If this is '
+ 'not the case, both base_rank and node_rank must be provided. '
+ 'Overrides env var BASE_RANK. Defaults to 0 in a single-node '
+ 'environment.'
+ ),
+ )
+ multinode_args.add_argument(
+ '--node_rank',
+ type=int,
+ help=(
+ 'The rank of this node. See base_rank for information on when '
+ 'this must be provided. Overrides env var NODE_RANK. Defaults to 0 '
+ 'in a single-node environment.'
+ ),
+ )
+ multinode_args.add_argument(
+ '--master_addr',
+ type=str,
+ help=(
+ 'The FQDN of the node hosting the C10d TCP store. For single-node '
+ 'operation, this can generally be left as 127.0.0.1. Overrides env var '
+ 'MASTER_ADDR. Defaults to 127.0.0.1 in a single-node environment.'
+ ),
+ )
+ multinode_args.add_argument(
+ '--master_port',
+ type=int,
+ help=(
+ 'The port on the master hosting the C10d TCP store. If you are '
+ 'running multiple trainers on a single node, this generally needs '
+ 'to be unique for each one. Overrides env var MASTER_PORT. Defaults '
+ 'to a random free port in a single-node environment.'
+ ),
+ )
+
+ required_args.add_argument(
+ 'training_script',
+ type=str,
+ help=(
+ 'The path to the training script used to initialize a single training '
+ 'process. Should be followed by any command-line arguments the script '
+ 'should be launched with.'
+ ),
+ )
+ required_args.add_argument(
+ 'training_script_args',
+ nargs='...',
+ help='Any arguments for the training script, given in the expected order.',
+ )
+
+ return parser
+
+
+def _parse_args():
+ parser = _get_parser()
+
+ args = parser.parse_args()
+
+ # Default values to env vars if they are not provided
+ if args.nproc is None:
+ if 'LOCAL_WORLD_SIZE' in os.environ:
+ args.nproc = int(os.environ['LOCAL_WORLD_SIZE'])
+ else:
+ args.nproc = torch.cuda.device_count()
+
+ if args.nproc == 0:
+ # This could happen if doing cpu-only training,
+ # which could cause torch.cuda.device_count() to return 0,
+ # and LOCAL_WORLD_SIZE (as set by MCLI) to be zero
+ args.nproc = 1
+
+ if args.nproc < 1:
+ raise ValueError('The nproc must be 1 or greater')
+
+ if args.world_size is None and 'WORLD_SIZE' in os.environ:
+ args.world_size = int(os.environ['WORLD_SIZE'])
+
+ if args.base_rank is None and 'BASE_RANK' in os.environ:
+ args.base_rank = int(os.environ['BASE_RANK'])
+
+ if args.node_rank is None and 'NODE_RANK' in os.environ:
+ args.node_rank = int(os.environ['NODE_RANK'])
+
+ if args.master_addr is None and 'MASTER_ADDR' in os.environ:
+ args.master_addr = os.environ['MASTER_ADDR']
+
+ if args.master_port is None and 'MASTER_PORT' in os.environ:
+ args.master_port = int(os.environ['MASTER_PORT'])
+
+ if args.world_size is None:
+ args.world_size = args.nproc
+
+ if args.world_size < args.nproc:
+ raise ValueError(f'world_size({args.world_size}) cannot be less than nproc({args.nproc})')
+
+ if args.world_size < 1:
+ raise ValueError('The world_size must be 1 or greater')
+
+ is_multinode = args.world_size > args.nproc
+
+ if is_multinode:
+ if args.base_rank is None and args.node_rank is None:
+ raise ValueError(f'In a multi-node environment, at least one of node_rank and base_rank must be provided.')
+
+ if args.node_rank is None:
+ if args.world_size % args.nproc != 0 or args.base_rank % args.nproc != 0:
+ raise ValueError(
+ 'node_rank not provided, but unable to infer from base_rank since nodes appear to '
+ 'have different amounts of processes. Please also specify node_rank.',
+ )
+ args.node_rank = args.base_rank // args.nproc
+
+ if args.base_rank is None:
+ if args.world_size % args.nproc != 0:
+ raise ValueError(
+ 'base_rank not provided, but unable to infer from node_rank since nodes appear to '
+ 'have different amounts of processes. Please also provide base_rank.',
+ )
+ args.base_rank = args.node_rank * args.nproc
+
+ if args.base_rank + args.nproc > args.world_size:
+ raise ValueError(
+ f'Cannot initialize processes for node with base_rank({args.base_rank}) and '
+ f'nproc({args.nproc}) because this would mean creating a process with '
+ f'rank({args.base_rank + args.nproc - 1}), and all processes must have smaller rank than '
+ f'the world_size({args.world_size}).',
+ )
+
+ if args.master_addr is None:
+ raise ValueError('In a multi-node environment, master_addr is required.')
+
+ if args.master_port is None:
+ raise ValueError('In a multi-node environment, master_port is required.')
+
+ else:
+ if args.base_rank is not None and args.base_rank != 0:
+ raise ValueError(f'base_rank({args.base_rank}) != 0 is not valid in a single-node environment.')
+ args.base_rank = 0
+
+ if args.node_rank is not None and args.node_rank != 0:
+ raise ValueError(f'node_rank({args.node_rank}) != 0 is not valid in a single-node environment.')
+ args.node_rank = 0
+
+ if args.master_addr is None:
+ args.master_addr = '127.0.0.1'
+
+ if args.master_port is None:
+ args.master_port = get_free_tcp_port()
+
+ return args
+
+
+@contextlib.contextmanager
+def _patch_env(**environs: str):
+ """Returns a context manager that patches ``os.environ`` with ``environs``.
+
+ The original ``os.environ`` values are restored at the end.
+ """
+ # Adapted loosely from https://stackoverflow.com/a/34333710
+ # Capture the original environ values
+ original_environs = {k: os.environ.get(k) for k in environs}
+
+ # Patch the environment
+ for k, v in environs.items():
+ os.environ[k] = v
+ try:
+ # Run the context manager
+ yield
+ finally:
+ # Restore the original environ values
+ for k, v in original_environs.items():
+ if v is None:
+ del os.environ[k]
+ else:
+ os.environ[k] = v
+
+
+def _launch_processes(
+ nproc: int,
+ world_size: int,
+ base_rank: int,
+ node_rank: int,
+ master_addr: str,
+ master_port: int,
+ module_mode: bool,
+ command_mode: bool,
+ training_script: str,
+ stdout_file_format: str,
+ stderr_file_format: Union[str, None],
+ training_script_args: List[Any],
+ processes: Dict[int, subprocess.Popen],
+):
+ log.info('Starting distributed environment on local node for global_rank(%s-%s)', base_rank, base_rank + nproc - 1)
+ log.info('Distributed KV store: tcp://%s:%s', master_addr, master_port)
+
+ for local_rank in range(nproc):
+ global_rank = base_rank + local_rank
+ if command_mode and module_mode:
+ raise ValueError('Either `command_mode` or `module_mode` should be set, but not both.')
+ cmd = []
+ if not command_mode:
+ cmd.append(sys.executable)
+ if module_mode:
+ cmd.append('-m')
+
+ cmd.append(training_script)
+
+ # Update the env with the distributed variables
+ with _patch_env(
+ RANK=str(global_rank),
+ WORLD_SIZE=str(world_size),
+ LOCAL_RANK=str(local_rank),
+ LOCAL_WORLD_SIZE=str(nproc),
+ NODE_RANK=str(node_rank),
+ MASTER_ADDR=master_addr,
+ MASTER_PORT=str(master_port),
+ PYTHONUNBUFFERED='1',
+ NCCL_ASYNC_ERROR_HANDLING='1',
+ ):
+ # Populate the distributed variables in all launcher args
+ for arg in training_script_args:
+ cmd.append(os.path.expandvars(os.path.expanduser(arg)))
+
+ log.info(
+ 'Launching process for local_rank(%s), global_rank(%s) with command(%s)',
+ local_rank,
+ global_rank,
+ cmd,
+ )
+
+ if local_rank == 0:
+ process = subprocess.Popen(
+ cmd,
+ text=True,
+ )
+ else:
+
+ def _get_file(format: str):
+ filename = format.format(
+ rank=global_rank,
+ world_size=world_size,
+ local_rank=local_rank,
+ local_world_size=nproc,
+ node_rank=node_rank,
+ )
+ return open(filename, 'x+')
+
+ stdout_file = _get_file(stdout_file_format)
+ stderr_file = _get_file(stderr_file_format) if stderr_file_format is not None else None
+
+ process = subprocess.Popen(
+ cmd,
+ stdout=stdout_file,
+ stderr=stderr_file if stderr_file is not None else subprocess.STDOUT,
+ text=True,
+ )
+ process.stdout = stdout_file
+ if stderr_file is not None:
+ process.stderr = stderr_file
+ processes[global_rank] = process
+
+
+def _monitor_processes(processes: Dict[int, subprocess.Popen]):
+ try:
+ while True:
+ process_has_crashed = False
+ all_processes_finished = True
+ for global_rank, process in processes.items():
+ if process.poll() is None:
+ # the process is still running
+ all_processes_finished = False
+ continue
+ else:
+ # return code of 0 implies clean exit
+ if process.returncode != 0:
+ log.error(f'Rank {global_rank} crashed with exit code {process.returncode}.')
+ process_has_crashed = True
+ break
+ else:
+ # exited cleanly
+ log.info(f'Rank {global_rank} finished successfully.')
+ if process_has_crashed or all_processes_finished:
+ break
+ time.sleep(0.1)
+ except KeyboardInterrupt:
+ print('Ctrl-C received; terminating training processes.')
+ pass
+
+
+def _print_process_exit_status(global_rank: int, process: subprocess.Popen):
+ stdOutLabel = 'STDOUT'
+ if process.stdout is None:
+ output = None
+ else:
+ process.stdout.seek(0)
+ output = process.stdout.read()
+
+ if process.stderr is None:
+ stderr = None
+ stdOutLabel = 'logs'
+ else:
+ process.stderr.seek(0)
+ stderr = process.stderr.read()
+ exc = subprocess.CalledProcessError(
+ process.returncode,
+ cmd=process.args,
+ output=output,
+ stderr=stderr,
+ )
+
+ error_msg = [f'Global rank {global_rank} (PID {process.pid}) exited with code {process.returncode}']
+ if output is not None:
+ error_msg.extend([
+ f'----------Begin global rank {global_rank} {stdOutLabel}----------',
+ output,
+ f'----------End global rank {global_rank} {stdOutLabel}----------',
+ ])
+
+ if stderr is not None:
+ error_msg.extend([
+ f'----------Begin global rank {global_rank} STDERR----------',
+ exc.stderr,
+ f'----------End global rank {global_rank} STDERR----------',
+ ])
+ print('\n'.join(error_msg))
+
+
+def _cleanup_processes(processes: Dict[int, subprocess.Popen]):
+ for global_rank, process in processes.items():
+ process.poll()
+ if process.returncode is None:
+ log.info('Killing global rank %s (PID %s) with SIGTERM', global_rank, process.pid)
+ # Assuming that child processes correctly handle SIGTERM to cleanup any children
+ try:
+ os.kill(process.pid, signal.SIGTERM)
+ except ProcessLookupError:
+ pass
+
+ current_time = datetime.datetime.now()
+
+ try:
+ print((
+ f'Waiting up to {CLEANUP_TIMEOUT.seconds} seconds for all training processes to terminate. '
+ 'Press Ctrl-C to exit immediately.'
+ ))
+ while datetime.datetime.now() - current_time < CLEANUP_TIMEOUT:
+ for process in processes.values():
+ process.poll()
+ if all(process.returncode is not None for process in processes.values()):
+ break
+ time.sleep(0.1)
+ except KeyboardInterrupt:
+ pass
+
+ for global_rank, process in processes.items():
+ process.poll()
+ if process.returncode is None:
+ log.warning(
+ 'Failed to kill global rank %s (PID %s) with SIGTERM; terminating with SIGKILL instead',
+ global_rank,
+ process.pid,
+ )
+ try:
+ proc = psutil.Process(process.pid)
+ except psutil.NoSuchProcess:
+ pass
+ else:
+ # If using SIGKILL, manually kill all child processes, since the main training process
+ # likely won't be able to intercept the signal and clean up its children.
+ for psutil_proc in [proc, *proc.children(recursive=True)]:
+ try:
+ os.kill(psutil_proc.pid, signal.SIGKILL)
+ except ProcessLookupError:
+ pass
+ for global_rank, process in processes.items():
+ process.poll()
+ if process.returncode is not None and process.returncode != 0:
+ if -process.returncode in (signal.SIGKILL, signal.SIGTERM):
+ # Negative return codes indicate the process was killed via a signal
+ # If the launcher script killed the training process (which would happen via SIGKILL or SIGTERM),
+ # then do not print the stack trace.
+ continue
+ # only print the processes that have actually crashed,
+ # not the ones that were killed
+ _print_process_exit_status(global_rank, process)
+
+
+def _aggregate_process_returncode(processes: Dict[int, subprocess.Popen]) -> int:
+ for global_rank, process in processes.items():
+ process.poll()
+ if process.returncode is None:
+ log.error('Global rank %s (PID %s) has still not exited; return exit code 1.', global_rank, process.pid)
+ return 1
+ if process.returncode != 0:
+ log.error('Global rank %s (PID %s) exited with code %s', global_rank, process.pid, process.returncode)
+ return process.returncode
+
+ return 0
+
+
+def main():
+ """Entrypoint into the Composer CLI."""
+ args = _parse_args()
+
+ logging.basicConfig()
+ log.setLevel(logging.INFO if args.verbose else logging.WARNING)
+
+ processes = {}
+
+ log_tmpdir = tempfile.TemporaryDirectory()
+ if args.stdout is None:
+ args.stdout = f'{log_tmpdir.name}/rank{{rank}}.stdout.txt'
+ if args.stderr is None:
+ args.stderr = f'{log_tmpdir.name}/rank{{rank}}.stderr.txt'
+
+ # If running on the Mosaic platform, log all gpu ranks' stderr and stdout to Mosaic platform
+ if os.environ.get(MOSAICML_PLATFORM_ENV_VAR, 'false').lower() == 'true' and str(
+ os.environ.get(MOSAICML_LOG_DIR_ENV_VAR, 'false'),
+ ).lower() != 'false' and os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR, 'false').lower() != 'false':
+ log.info('Logging all GPU ranks to Mosaic Platform.')
+ log_file_format = f'{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{local_rank}}.txt'
+ if args.stderr is not None or args.stdout is not None:
+ log.info(
+ 'Logging to Mosaic Platform. Ignoring provided stdout and stderr args. To use provided stdout and stderr, set MOSAICML_LOG_DIR=false.',
+ )
+ args.stdout = log_file_format
+ args.stderr = None
+
+ try:
+ _launch_processes(
+ nproc=args.nproc,
+ world_size=args.world_size,
+ base_rank=args.base_rank,
+ node_rank=args.node_rank,
+ master_addr=args.master_addr,
+ master_port=args.master_port,
+ module_mode=args.module_mode,
+ command_mode=args.command_mode,
+ stdout_file_format=args.stdout,
+ stderr_file_format=args.stderr,
+ training_script=args.training_script,
+ training_script_args=args.training_script_args,
+ processes=processes,
+ )
+ _monitor_processes(processes)
+ except:
+ # Print the exception first, then kill the training processes, since killing
+ # may take up to CLEANUP_TIMEOUT seconds, and the user should know immediately
+ # what failed. No need to re-raise the exception, as `aggregate_process_returncode`
+ # will return an appropriate error code, which will cause the script to exit.
+ traceback.print_exc()
+ print('Killing training processes')
+ finally:
+ _cleanup_processes(processes)
+ log_tmpdir.cleanup()
+ return _aggregate_process_returncode(processes)
+
+
+if __name__ == '__main__':
+ sys.exit(main())
From 48a7ccc20e7cb0e4e96a012da16b3cdf1a164d73 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Tue, 19 Mar 2024 17:49:44 -0700
Subject: [PATCH 02/27] case 1 can run successfully
---
.gitignore | 2 +
llmfoundry/composerpatch/MLFlowLogger.py | 33 +++++
llmfoundry/composerpatch/__init__.py | 0
llmfoundry/utils/builders.py | 7 +-
scripts/train/launcher.py | 63 +++++++--
scripts/train/train.py | 3 +-
setup.py | 7 +-
ygong/__init__.py | 0
ygong/mosaic/__init__.py | 6 +
ygong/mosaic/mpt125mConfig.py | 167 +++++++++++++++++++++++
ygong/mosaic/scaling_config.py | 14 ++
ygong/mosaic/submit.py | 145 ++++++++++++++++++++
12 files changed, 431 insertions(+), 16 deletions(-)
create mode 100644 llmfoundry/composerpatch/MLFlowLogger.py
create mode 100644 llmfoundry/composerpatch/__init__.py
create mode 100644 ygong/__init__.py
create mode 100644 ygong/mosaic/__init__.py
create mode 100644 ygong/mosaic/mpt125mConfig.py
create mode 100644 ygong/mosaic/scaling_config.py
create mode 100644 ygong/mosaic/submit.py
diff --git a/.gitignore b/.gitignore
index d041a25c22..72f965ce63 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,8 @@ my-copy-c4*/
my-copy-arxiv*/
*.jsonl*
+ygong/notebook/*
+
# WandB
wandb/
diff --git a/llmfoundry/composerpatch/MLFlowLogger.py b/llmfoundry/composerpatch/MLFlowLogger.py
new file mode 100644
index 0000000000..e9f73d7767
--- /dev/null
+++ b/llmfoundry/composerpatch/MLFlowLogger.py
@@ -0,0 +1,33 @@
+from composer.loggers import MLFlowLogger as ComposerMLFlowLogger
+from composer.utils import MissingConditionalImportError, dist
+import json
+import os
+from composer.core.state import State
+from composer.loggers.logger import Logger
+from composer.loggers.logger_destination import LoggerDestination
+from composer.utils import MissingConditionalImportError, dist
+
+
+
+CONFIG_FILE = "/tmp/mlflow_config.yaml"
+EXPERIMENT_ID_FIELD = "experiment_id"
+RUN_ID_FIELD = "run_id"
+TRACKING_URI_FIELD = "tracking_uri"
+
+
+class MLFlowLogger(ComposerMLFlowLogger):
+
+ def init(self, state: State, logger: Logger) -> None:
+ super().init(state, logger)
+
+ if self._enabled and dist.get_local_rank() == 0:
+ if os.path.exists(CONFIG_FILE):
+ os.remove(CONFIG_FILE)
+
+ with open(CONFIG_FILE, "w") as f:
+ data = {
+ EXPERIMENT_ID_FIELD: self._experiment_id,
+ RUN_ID_FIELD: self._run_id,
+ TRACKING_URI_FIELD : self.tracking_uri,
+ }
+ json.dump(data, f)
\ No newline at end of file
diff --git a/llmfoundry/composerpatch/__init__.py b/llmfoundry/composerpatch/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/llmfoundry/utils/builders.py b/llmfoundry/utils/builders.py
index 328705d2a5..6195152199 100644
--- a/llmfoundry/utils/builders.py
+++ b/llmfoundry/utils/builders.py
@@ -18,7 +18,7 @@
from composer.core import Algorithm, Callback, Evaluator
from composer.datasets.in_context_learning_evaluation import \
get_icl_task_dataloader
-from composer.loggers import (InMemoryLogger, LoggerDestination, MLFlowLogger,
+from composer.loggers import (InMemoryLogger, LoggerDestination,
TensorboardLogger, WandBLogger)
from composer.optim import DecoupledAdamW
from composer.optim.scheduler import (ComposerScheduler,
@@ -31,6 +31,9 @@
from torch.optim.optimizer import Optimizer
from transformers import AutoTokenizer, PreTrainedTokenizerBase
+from llmfoundry.composerpatch import MLFlowLogger
+
+
from llmfoundry.callbacks import (AsyncEval, CurriculumLearning, EvalGauntlet,
FDiffMetrics, GlobalLRScaling,
HuggingFaceCheckpointer, LayerFreezing,
@@ -240,7 +243,7 @@ def build_logger(name: str, kwargs: Dict[str, Any]) -> LoggerDestination:
elif name == 'in_memory_logger':
return InMemoryLogger(**kwargs)
elif name == 'mlflow':
- return MLFlowLogger(**kwargs)
+ return MLFlowLogger.MLFlowLogger(**kwargs)
elif name == 'inmemory':
return InMemoryLogger(**kwargs)
else:
diff --git a/scripts/train/launcher.py b/scripts/train/launcher.py
index e20626265f..3d0ee3b47b 100755
--- a/scripts/train/launcher.py
+++ b/scripts/train/launcher.py
@@ -27,6 +27,8 @@
MOSAICML_PLATFORM_ENV_VAR,
)
from composer.utils import get_free_tcp_port
+from llmfoundry.composerpatch import MLFlowLogger
+
CLEANUP_TIMEOUT = datetime.timedelta(seconds=30)
@@ -313,9 +315,11 @@ def _launch_processes(
stderr_file_format: Union[str, None],
training_script_args: List[Any],
processes: Dict[int, subprocess.Popen],
+ log_dirs: set[str],
):
log.info('Starting distributed environment on local node for global_rank(%s-%s)', base_rank, base_rank + nproc - 1)
log.info('Distributed KV store: tcp://%s:%s', master_addr, master_port)
+ log.warning(f"ygong: stdout_file_format={stdout_file_format}, stderr_file_format={stderr_file_format}")
for local_rank in range(nproc):
global_rank = base_rank + local_rank
@@ -359,7 +363,7 @@ def _launch_processes(
)
else:
- def _get_file(format: str):
+ def _get_file__(format: str):
filename = format.format(
rank=global_rank,
world_size=world_size,
@@ -367,10 +371,13 @@ def _get_file(format: str):
local_world_size=nproc,
node_rank=node_rank,
)
+ dir = os.path.normpath(os.path.dirname(filename))
+ os.makedirs(dir, exist_ok=True)
+ log_dirs.add(dir)
return open(filename, 'x+')
- stdout_file = _get_file(stdout_file_format)
- stderr_file = _get_file(stderr_file_format) if stderr_file_format is not None else None
+ stdout_file = _get_file__(stdout_file_format)
+ stderr_file = _get_file__(stderr_file_format) if stderr_file_format is not None else None
process = subprocess.Popen(
cmd,
@@ -384,7 +391,15 @@ def _get_file(format: str):
processes[global_rank] = process
-def _monitor_processes(processes: Dict[int, subprocess.Popen]):
+def _monitor_processes(
+ processes: Dict[int, subprocess.Popen],
+ log_dirs: set[str],
+ launcher_log: str,):
+ import mlflow
+ log_frequency = 200
+ cycle = 0
+
+ mlflow_runid = None
try:
while True:
process_has_crashed = False
@@ -405,7 +420,29 @@ def _monitor_processes(processes: Dict[int, subprocess.Popen]):
log.info(f'Rank {global_rank} finished successfully.')
if process_has_crashed or all_processes_finished:
break
+
+ if cycle == 0:
+ if mlflow_runid is None:
+ if os.path.exists(MLFlowLogger.CONFIG_FILE):
+ import json
+ with open(MLFlowLogger.CONFIG_FILE, "r") as f:
+ data = json.load(f)
+ log.error(f"ygong:Started mlflow run {data}")
+ mlflow_runid = data[MLFlowLogger.RUN_ID_FIELD]
+ mlflow.set_tracking_uri(data[MLFlowLogger.TRACKING_URI_FIELD])
+ mlflow.start_run(run_id=data[MLFlowLogger.RUN_ID_FIELD], experiment_id=data[MLFlowLogger.EXPERIMENT_ID_FIELD])
+ else:
+ try:
+ for log_dir in log_dirs:
+ log.warning(f"ygong: Logging directory: {log_dir}")
+ mlflow.log_artifacts(log_dir, log_dir.lstrip('/'))
+ mlflow.log_artifact(launcher_log)
+ except Exception as e:
+ log.error(f"ygong:Failed to log artifacts to mlflow: {e}")
+ cycle = (cycle + 1) % log_frequency
+
time.sleep(0.1)
+
except KeyboardInterrupt:
print('Ctrl-C received; terminating training processes.')
pass
@@ -526,12 +563,13 @@ def main():
"""Entrypoint into the Composer CLI."""
args = _parse_args()
- logging.basicConfig()
- log.setLevel(logging.INFO if args.verbose else logging.WARNING)
-
+ log_tmpdir = tempfile.TemporaryDirectory()
+ launcher_log = f"{log_tmpdir.name}/launcher{args.node_rank}.log"
+ logging.basicConfig(filename=launcher_log, level=logging.INFO if args.verbose else logging.WARNING)
+
processes = {}
+ log_dirs = set()
- log_tmpdir = tempfile.TemporaryDirectory()
if args.stdout is None:
args.stdout = f'{log_tmpdir.name}/rank{{rank}}.stdout.txt'
if args.stderr is None:
@@ -542,13 +580,13 @@ def main():
os.environ.get(MOSAICML_LOG_DIR_ENV_VAR, 'false'),
).lower() != 'false' and os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR, 'false').lower() != 'false':
log.info('Logging all GPU ranks to Mosaic Platform.')
- log_file_format = f'{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{local_rank}}.txt'
if args.stderr is not None or args.stdout is not None:
log.info(
'Logging to Mosaic Platform. Ignoring provided stdout and stderr args. To use provided stdout and stderr, set MOSAICML_LOG_DIR=false.',
)
- args.stdout = log_file_format
- args.stderr = None
+
+ args.stdout = f'{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/stdout/{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{rank}}.txt'
+ args.stderr = f'{os.environ.get(MOSAICML_LOG_DIR_ENV_VAR)}/stderr/{os.environ.get(MOSAICML_GPU_LOG_FILE_PREFIX_ENV_VAR)}{{rank}}.txt'
try:
_launch_processes(
@@ -565,8 +603,9 @@ def main():
training_script=args.training_script,
training_script_args=args.training_script_args,
processes=processes,
+ log_dirs=log_dirs,
)
- _monitor_processes(processes)
+ _monitor_processes(processes, log_dirs, launcher_log)
except:
# Print the exception first, then kill the training processes, since killing
# may take up to CLEANUP_TIMEOUT seconds, and the user should know immediately
diff --git a/scripts/train/train.py b/scripts/train/train.py
index 92c6afa128..8784f4e2a0 100644
--- a/scripts/train/train.py
+++ b/scripts/train/train.py
@@ -12,7 +12,7 @@
import torch
from composer import Trainer
from composer.core.callback import Callback
-from composer.loggers import MosaicMLLogger
+from composer.loggers import MosaicMLLogger, MLFlowLogger
from composer.loggers.mosaicml_logger import (MOSAICML_ACCESS_TOKEN_ENV_VAR,
MOSAICML_PLATFORM_ENV_VAR)
from composer.metrics.nlp import InContextLearningMetric
@@ -459,6 +459,7 @@ def main(cfg: DictConfig) -> Trainer:
mosaicml_logger = MosaicMLLogger()
loggers.append(mosaicml_logger)
+
if metadata is not None:
# Flatten the metadata for logging
logged_cfg.pop('metadata', None)
diff --git a/setup.py b/setup.py
index 4ecd34861a..faa27e7556 100644
--- a/setup.py
+++ b/setup.py
@@ -68,11 +68,12 @@
'onnxruntime==1.15.1',
'cmake>=3.25.0,<=3.26.3', # required for triton-pre-mlir below
# PyPI does not support direct dependencies, so we remove this line before uploading from PyPI
- 'triton-pre-mlir@git+https://github.com/vchiley/triton.git@triton_pre_mlir_sm90#subdirectory=python',
+ # 'triton-pre-mlir@git+https://github.com/vchiley/triton.git@triton_pre_mlir_sm90#subdirectory=python',
'boto3>=1.21.45,<2',
'huggingface-hub>=0.17.0,<1.0',
'beautifulsoup4>=4.12.2,<5', # required for model download utils
'tenacity>=8.2.3,<9',
+ 'ipywidgets',
]
extra_deps = {}
@@ -88,6 +89,10 @@
'hf_transfer==0.1.3',
]
+extra_deps['ygong'] = [
+ 'databricks-genai',
+]
+
extra_deps['databricks'] = [
'mosaicml[databricks]>=0.20.1,<0.21',
'databricks-sql-connector>=3,<4',
diff --git a/ygong/__init__.py b/ygong/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/ygong/mosaic/__init__.py b/ygong/mosaic/__init__.py
new file mode 100644
index 0000000000..c5bd21dde7
--- /dev/null
+++ b/ygong/mosaic/__init__.py
@@ -0,0 +1,6 @@
+from .submit import submit
+from .submit import _set_up_environment
+from .scaling_config import ScalingConfig
+from .mpt125mConfig import MPT125MConfig
+
+__all__ = ['submit', 'ScalingConfig', "MPT125MConfig", "_set_up_environment"]
\ No newline at end of file
diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py
new file mode 100644
index 0000000000..78c9c057bc
--- /dev/null
+++ b/ygong/mosaic/mpt125mConfig.py
@@ -0,0 +1,167 @@
+from mcli import RunConfig
+from ygong.mosaic.scaling_config import ScalingConfig
+from typing import Optional
+import os
+# import databricks_genai.api.config as cfg
+
+
+class MPT125MConfig:
+ def __init__(self, experimentName: str, data: str):
+ # TODO: validate the inputs and remove the yu.gong hardcode
+ self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}"
+ workspace_url = os.environ.get('WORKSPACE_URL')
+ self.mlflow_trackingUri = "databricks"
+ # self.mlflow_trackingUri = "databricks" if workspace_url is None else workspace_url
+
+ self.data = data
+
+ # the run name is pre-configured for all config-driven pretrain runs
+ self.name = "mpt125m-config-driven-pretrain"
+
+ ########################################
+ # model parameters
+ ########################################
+ self.max_seq_len = 2048
+ self.global_seed = 17
+ # TODO: hardcode, need to respect self.data
+ self.data_remote = "s3://aaron-mlflow-demo/ygong-c4-process/"
+ self.data_local = "./my-copy-c4"
+ self.commands = [
+ "cd llm-foundry/scripts",
+ "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train_small eval_loader.dataset.split=val"
+ ]
+ # api_token, endpoint = cfg.get_config_from_env()
+ # self.workspace_url = "https://dbc-04ac0685-8857.staging.cloud.databricks.com/"
+
+
+ def toRunConfig(self, scalingConfig: ScalingConfig):
+ return RunConfig(
+ name=self.name,
+ image='mosaicml/llm-foundry:2.2.1_cu121_flash2-latest',
+ command="\n".join(self.commands),
+ compute=scalingConfig.toCompute,
+ scheduling={},
+ integrations=[
+ {
+ 'integration_type': 'git_repo',
+ 'git_repo': 'ygong1/llm-foundry',
+ 'git_branch': 'prototype',
+ 'pip_install': '-e .[gpu]',
+ 'ssh_clone': False
+ },
+ {
+ 'integration_type': 'pip_packages',
+ 'packages': ['pynvml'],
+ },
+ ],
+ parameters=self.parameters(),
+ env_variables={},
+ )
+
+ def parameters(self):
+ return {
+ "data_local": "./my-copy-c4",
+ "data_remote": "s3://aaron-mlflow-demo/ygong-c4-process/",
+ "max_seq_len": self.max_seq_len,
+ "global_seed": self.global_seed,
+ "run_name": None,
+ "model": {
+ "name": "mpt_causal_lm",
+ "init_device": "meta",
+ "d_model": 768,
+ "n_heads": 12,
+ "n_layers": 12,
+ "expansion_ratio": 4,
+ "max_seq_len": self.max_seq_len,
+ "vocab_size": 50368,
+ "attn_config": {
+ "attn_impl": "triton"
+ }
+ },
+ "tokenizer": {
+ "name": "EleutherAI/gpt-neox-20b",
+ "kwargs": {
+ "model_max_length": self.max_seq_len
+ }
+ },
+ "train_loader": {
+ "name": "text",
+ "dataset": {
+ "local": f"{self.data_local}",
+ "remote": f"{self.data_remote}",
+ "split": "train",
+ "shuffle": True,
+ "max_seq_len": self.max_seq_len,
+ "shuffle_seed": self.global_seed
+ },
+ "drop_last": True,
+ "num_workers": 8
+ },
+ "eval_loader": {
+ "name": "text",
+ "dataset": {
+ "local": f"{self.data_local}",
+ "remote": f"{self.data_remote}",
+ "split": "val",
+ "shuffle": False,
+ "max_seq_len": self.max_seq_len,
+ "shuffle_seed": self.global_seed
+ },
+ "drop_last": False,
+ "num_workers": 8
+ },
+ "scheduler": {
+ "name": "cosine_with_warmup",
+ "t_warmup": "100ba",
+ "alpha_f": 0.1
+ },
+ "optimizer": {
+ "name": "decoupled_adamw",
+ "lr": 6.0e-4,
+ "betas": [0.9, 0.95],
+ "eps": 1.0e-08,
+ "weight_decay": 0.0
+ },
+ "algorithms": {
+ "gradient_clipping": {
+ "clipping_type": "norm",
+ "clipping_threshold": 1.0
+ }
+ },
+ "max_duration": "480ba", # ~ 2.5B tokens, original
+ "eval_interval": "50ba", # original 500
+ "eval_first": False,
+ "eval_subset_num_batches": -1,
+ "global_train_batch_size": 256,
+ "seed": self.global_seed,
+ "device_eval_batch_size": 16,
+ "device_train_microbatch_size": 16,
+ "precision": "amp_bf16",
+ "fsdp_config": {
+ "sharding_strategy": "FULL_SHARD",
+ "mixed_precision": "PURE",
+ "activation_checkpointing": False,
+ "activation_checkpointing_reentrant": False,
+ "activation_cpu_offload": False,
+ "limit_all_gathers": True
+ },
+ "progress_bar": False,
+ "log_to_console": True,
+ "console_log_interval": "10ba",
+ "callbacks": {
+ "speed_monitor": {
+ "window_size": 10
+ },
+ "lr_monitor": {},
+ "memory_monitor": {},
+ "runtime_estimator": {}
+ },
+ "loggers": {
+ "mlflow": {
+ "experiment_name": self.mlflow_experimentName,
+ "tracking_uri": "databricks",
+ "synchronous": False,
+ "log_system_metrics": True
+ }
+ }
+ }
\ No newline at end of file
diff --git a/ygong/mosaic/scaling_config.py b/ygong/mosaic/scaling_config.py
new file mode 100644
index 0000000000..f22e8e7e4c
--- /dev/null
+++ b/ygong/mosaic/scaling_config.py
@@ -0,0 +1,14 @@
+class ScalingConfig:
+ def __init__(self, gpusNum: int, gpuType: str, poolName: str):
+ # TODO: validate the inputs
+ self.gpusNum = gpusNum
+ self.gpuType = gpuType
+ self.poolName = poolName
+
+ @property
+ def toCompute(self):
+ return {
+ 'gpus': self.gpusNum,
+ 'gpu_type': self.gpuType,
+ 'cluster': self.poolName
+ }
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
new file mode 100644
index 0000000000..91ad41feb1
--- /dev/null
+++ b/ygong/mosaic/submit.py
@@ -0,0 +1,145 @@
+from ygong.mosaic.scaling_config import ScalingConfig
+from ygong.mosaic.mpt125mConfig import MPT125MConfig
+from mcli import wait_for_run_status, Run, RunConfig, RunStatus, create_run
+import pandas as pd
+import ipywidgets as widgets
+from IPython.display import display, clear_output, HTML
+import mlflow
+import os
+from typing import Optional
+import time
+import base64
+import json
+import hashlib
+from mcli.api.engine.engine import MAPIConnection
+from mcli.config import MCLIConfig
+
+def _set_up_environment(content):
+ data = json.loads(base64.b64decode(content).decode('utf-8'))
+ workspace_url = data.get("workspace_url", None)
+ token = data.get("token", None)
+ mosaic_token = data.get("mosaic_token", None)
+
+ if token is None:
+ from databricks.sdk import WorkspaceClient
+ wc = WorkspaceClient()
+ ctx = wc.dbutils.entry_point.getDbutils().notebook().getContext()
+ token = ctx.apiToken().get()
+
+ if workspace_url is None or mosaic_token is None:
+ raise ValueError("workspace_url and token must be provided")
+ os.environ['WORKSPACE_URL'] = workspace_url
+ os.environ['MLFLOW_TRACKING_TOKEN'] = token
+
+ # set up the mosaic token
+ conf = MCLIConfig.load_config()
+ conf.api_key = mosaic_token
+ conf.save_config()
+ MAPIConnection.reset_connection()
+
+
+ hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8]
+ databricks_secret_name = f"databricks-{hash}"
+
+ # clean up the old secret. MosaicML doesn't support multiple databricks secrets
+ # would have to clean up the old secret if it exists
+ from mcli.api.secrets.api_get_secrets import get_secrets
+ from mcli.api.secrets.api_delete_secrets import delete_secrets
+ from mcli.models.mcli_secret import SecretType
+ s = get_secrets(secret_types=[SecretType.databricks])
+ if len(s) == 1:
+ if s[0].name != databricks_secret_name:
+ delete_secrets(s)
+ else:
+ print("databricks secret already exists")
+ return
+ from mcli.objects.secrets.create.databricks import DatabricksSecretCreator
+ from mcli.api.secrets.api_create_secret import create_secret
+ s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token)
+ print(f"successfully created databricks secret: {databricks_secret_name}")
+ create_secret(s)
+
+
+
+def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str):
+ if tracking_uri is None:
+ raise ValueError("tracking_uri must be provided")
+ mlflow.set_tracking_uri(tracking_uri)
+ tracking_uri = tracking_uri.rstrip("/")
+ experiment = mlflow.get_experiment_by_name(name=experiment_name)
+ if experiment is None:
+ raise ValueError(f"experiment {experiment_name} does not exist")
+ experiment_id = experiment.experiment_id
+ runs = mlflow.search_runs(experiment_ids=[experiment_id],
+ filter_string=f'tags.composer_run_name = "{run_name}"',
+ output_format='list')
+ if len(runs) == 0:
+ raise ValueError(f"run {run_name} does not exist in experiment {experiment_name}")
+ elif len(runs) > 1:
+ raise ValueError(f"multiple runs {run_name} exist in experiment {experiment_name}")
+ else:
+ run_id = runs[0].info.run_id
+ return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}"
+
+def _get_run_summary(run: Run, experiment_name: Optional[str] = None):
+ url = None
+ if run.status == RunStatus.RUNNING and experiment_name is not None:
+ url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name)
+
+ df = pd.DataFrame({
+ 'Run Name': [run.name],
+ 'Run ID': [run.run_uid],
+ "Status": [str(run.status)],
+ 'Experiment Run': [f'Link' if url is not None else ""],
+ })
+ return df
+
+def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.Button]):
+ clear_output(wait=True)
+ if cancel_button is not None:
+ display(cancel_button)
+ display(HTML(summary.to_html(escape=False)))
+
+
+
+def submit(model, config: any, scalingConfig: ScalingConfig):
+ mlflow_experiment_name = None
+ if model == "mpt125m":
+ if not isinstance(config, MPT125MConfig):
+ raise ValueError("config must be an instance of MPT125MConfig")
+ mlflow_experiment_name = config.mlflow_experimentName
+ runConfig = config.toRunConfig(scalingConfig)
+ else:
+ raise ValueError(f"model {model} is not supported")
+
+
+ run = create_run(runConfig)
+ # Create a button
+ button = widgets.Button(description="cancel the run")
+ def on_button_clicked(b):
+ clear_output(wait=False)
+ run.stop()
+ wait_for_run_status(run, RunStatus.TERMINATING)
+ summary = _get_run_summary(run, mlflow_experiment_name)
+ display(HTML(summary.to_html(escape=False)))
+ button.on_click(on_button_clicked)
+
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
+
+ wait_for_run_status(run, RunStatus.RUNNING)
+ # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
+ # when the run just starts running
+ _display_run_summary(_get_run_summary(run, None), button)
+
+
+ try_count = 0
+ while try_count < 10:
+ try_count += 1
+ time.sleep(20)
+ try:
+ summary = _get_run_summary(run, mlflow_experiment_name)
+ _display_run_summary(summary, button)
+ break
+ except ValueError:
+ print("waiting for the run to be ready...")
+ pass
\ No newline at end of file
From fbf6783de0aa15790169d83d2577c61c28e72259 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Thu, 21 Mar 2024 22:21:18 -0700
Subject: [PATCH 03/27] set up the credential to work with datbricks
environment
---
ygong/mosaic/mpt125mConfig.py | 23 +++----
ygong/mosaic/submit.py | 119 ++++++++++++++++++++++------------
2 files changed, 88 insertions(+), 54 deletions(-)
diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py
index 78c9c057bc..322fc181e3 100644
--- a/ygong/mosaic/mpt125mConfig.py
+++ b/ygong/mosaic/mpt125mConfig.py
@@ -6,14 +6,14 @@
class MPT125MConfig:
- def __init__(self, experimentName: str, data: str):
+ def __init__(self, experimentName: str, data: str, priority: str = 'high'):
# TODO: validate the inputs and remove the yu.gong hardcode
self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}"
- workspace_url = os.environ.get('WORKSPACE_URL')
self.mlflow_trackingUri = "databricks"
# self.mlflow_trackingUri = "databricks" if workspace_url is None else workspace_url
self.data = data
+ self.priority = priority
# the run name is pre-configured for all config-driven pretrain runs
self.name = "mpt125m-config-driven-pretrain"
@@ -23,16 +23,13 @@ def __init__(self, experimentName: str, data: str):
########################################
self.max_seq_len = 2048
self.global_seed = 17
- # TODO: hardcode, need to respect self.data
- self.data_remote = "s3://aaron-mlflow-demo/ygong-c4-process/"
- self.data_local = "./my-copy-c4"
+ self.data_remote = self.data
+ self.data_local = ""
self.commands = [
"cd llm-foundry/scripts",
- "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train_small eval_loader.dataset.split=val"
+ "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train eval_loader.dataset.split=val"
]
- # api_token, endpoint = cfg.get_config_from_env()
- # self.workspace_url = "https://dbc-04ac0685-8857.staging.cloud.databricks.com/"
-
+
def toRunConfig(self, scalingConfig: ScalingConfig):
return RunConfig(
@@ -40,7 +37,7 @@ def toRunConfig(self, scalingConfig: ScalingConfig):
image='mosaicml/llm-foundry:2.2.1_cu121_flash2-latest',
command="\n".join(self.commands),
compute=scalingConfig.toCompute,
- scheduling={},
+ scheduling={'priority': self.priority},
integrations=[
{
'integration_type': 'git_repo',
@@ -51,7 +48,7 @@ def toRunConfig(self, scalingConfig: ScalingConfig):
},
{
'integration_type': 'pip_packages',
- 'packages': ['pynvml'],
+ 'packages': ['pynvml', 'mosaicml-streaming[databricks]'],
},
],
parameters=self.parameters(),
@@ -60,8 +57,8 @@ def toRunConfig(self, scalingConfig: ScalingConfig):
def parameters(self):
return {
- "data_local": "./my-copy-c4",
- "data_remote": "s3://aaron-mlflow-demo/ygong-c4-process/",
+ "data_local": self.data_local,
+ "data_remote": self.data,
"max_seq_len": self.max_seq_len,
"global_seed": self.global_seed,
"run_name": None,
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 91ad41feb1..0d92125c87 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -13,52 +13,76 @@
import hashlib
from mcli.api.engine.engine import MAPIConnection
from mcli.config import MCLIConfig
+from databricks.sdk import WorkspaceClient
+# from databricks_genai.api.config import configure_request
+from mcli import config
+from mcli.api.runs.api_get_runs import get_run
+
+
+def _set_up_environment(content: str):
+ os.environ['CREDENTIALS'] = content
-def _set_up_environment(content):
- data = json.loads(base64.b64decode(content).decode('utf-8'))
- workspace_url = data.get("workspace_url", None)
- token = data.get("token", None)
- mosaic_token = data.get("mosaic_token", None)
- if token is None:
- from databricks.sdk import WorkspaceClient
+def _init_connection():
+ def _is_local():
+ try:
+ wc = WorkspaceClient()
+ wc.dbutils.entry_point.getDbutils().notebook().getContext()
+ return False
+ except:
+ return True
+
+ if _is_local():
+ if os.environ.get('CREDENTIALS') is None:
+ raise ValueError("_set_up_environment must be manually called to configure credentials for local runs")
+ data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8'))
+ workspace_url = data.get("workspace_url", None)
+ token = data.get("token", None)
+ mosaic_token = data.get("mosaic_token", None)
+ # set up the mosaic token
+ conf = MCLIConfig.load_config()
+ conf.api_key = mosaic_token
+ conf.save_config()
+ MAPIConnection.reset_connection()
+
+
+ hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8]
+ databricks_secret_name = f"databricks-{hash}"
+
+ # clean up the old secret. MosaicML doesn't support multiple databricks secrets
+ # would have to clean up the old secret if it exists
+ from mcli.api.secrets.api_get_secrets import get_secrets
+ from mcli.api.secrets.api_delete_secrets import delete_secrets
+ from mcli.models.mcli_secret import SecretType
+ s = get_secrets(secret_types=[SecretType.databricks])
+ if len(s) == 1:
+ if s[0].name != databricks_secret_name:
+ delete_secrets(s)
+ else:
+ print("databricks secret already exists")
+ return
+ from mcli.objects.secrets.create.databricks import DatabricksSecretCreator
+ from mcli.api.secrets.api_create_secret import create_secret
+ s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token)
+ print(f"successfully created databricks secret: {databricks_secret_name}")
+ create_secret(s)
+
+ else:
wc = WorkspaceClient()
+ import mlflow.utils.databricks_utils as databricks_utils
+ workspace_url = databricks_utils.get_workspace_info_from_dbutils()[0]
ctx = wc.dbutils.entry_point.getDbutils().notebook().getContext()
token = ctx.apiToken().get()
+ api_url = ctx.apiUrl().get()
+ endpoint = f'{api_url}/api/2.0/genai-mapi/graphql'
+ os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}'
+ os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint
- if workspace_url is None or mosaic_token is None:
- raise ValueError("workspace_url and token must be provided")
+ # needed to set up the MLFlow query for experiment runs
os.environ['WORKSPACE_URL'] = workspace_url
os.environ['MLFLOW_TRACKING_TOKEN'] = token
- # set up the mosaic token
- conf = MCLIConfig.load_config()
- conf.api_key = mosaic_token
- conf.save_config()
- MAPIConnection.reset_connection()
-
- hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8]
- databricks_secret_name = f"databricks-{hash}"
-
- # clean up the old secret. MosaicML doesn't support multiple databricks secrets
- # would have to clean up the old secret if it exists
- from mcli.api.secrets.api_get_secrets import get_secrets
- from mcli.api.secrets.api_delete_secrets import delete_secrets
- from mcli.models.mcli_secret import SecretType
- s = get_secrets(secret_types=[SecretType.databricks])
- if len(s) == 1:
- if s[0].name != databricks_secret_name:
- delete_secrets(s)
- else:
- print("databricks secret already exists")
- return
- from mcli.objects.secrets.create.databricks import DatabricksSecretCreator
- from mcli.api.secrets.api_create_secret import create_secret
- s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token)
- print(f"successfully created databricks secret: {databricks_secret_name}")
- create_secret(s)
-
def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str):
@@ -85,6 +109,7 @@ def _get_run_summary(run: Run, experiment_name: Optional[str] = None):
url = None
if run.status == RunStatus.RUNNING and experiment_name is not None:
url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name)
+ print(f"ygong: url {url}")
df = pd.DataFrame({
'Run Name': [run.name],
@@ -100,9 +125,17 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.
display(cancel_button)
display(HTML(summary.to_html(escape=False)))
+def _wait_for_run_status(run: Run, status: RunStatus):
+ run_name = run.name
+ while not run.status.after(status, inclusive=True):
+ time.sleep(5)
+ run = get_run(run_name)
+ return run
+
def submit(model, config: any, scalingConfig: ScalingConfig):
+ _init_connection()
mlflow_experiment_name = None
if model == "mpt125m":
if not isinstance(config, MPT125MConfig):
@@ -114,21 +147,22 @@ def submit(model, config: any, scalingConfig: ScalingConfig):
run = create_run(runConfig)
+ run_name = run.name
# Create a button
button = widgets.Button(description="cancel the run")
def on_button_clicked(b):
clear_output(wait=False)
+ run = get_run(run_name)
run.stop()
- wait_for_run_status(run, RunStatus.TERMINATING)
+ run = _wait_for_run_status(run, RunStatus.TERMINATING)
summary = _get_run_summary(run, mlflow_experiment_name)
display(HTML(summary.to_html(escape=False)))
button.on_click(on_button_clicked)
-
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
-
- wait_for_run_status(run, RunStatus.RUNNING)
+
# setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
# when the run just starts running
+ run = _wait_for_run_status(run, RunStatus.RUNNING)
_display_run_summary(_get_run_summary(run, None), button)
@@ -137,9 +171,12 @@ def on_button_clicked(b):
try_count += 1
time.sleep(20)
try:
+ run = get_run(run)
+ if run.status.after(RunStatus.TERMINATING, inclusive=True):
+ break
summary = _get_run_summary(run, mlflow_experiment_name)
_display_run_summary(summary, button)
break
except ValueError:
- print("waiting for the run to be ready...")
+ print(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}")
pass
\ No newline at end of file
From 717568cc7693201306857deb245c3816ac6d2cee Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Thu, 21 Mar 2024 22:24:15 -0700
Subject: [PATCH 04/27] add sync logic
---
ygong/mosaic/submit.py | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 0d92125c87..e757f34330 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -134,7 +134,7 @@ def _wait_for_run_status(run: Run, status: RunStatus):
-def submit(model, config: any, scalingConfig: ScalingConfig):
+def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False):
_init_connection()
mlflow_experiment_name = None
if model == "mpt125m":
@@ -179,4 +179,9 @@ def on_button_clicked(b):
break
except ValueError:
print(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}")
- pass
\ No newline at end of file
+ pass
+
+ if sync:
+ print(f"DEBUG: synchronously waiting for the run to finish.")
+ run = _wait_for_run_status(run, RunStatus.TERMINATING)
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
\ No newline at end of file
From ca72575d705b6580d6a002a7dc3907956bdc773d Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 10:02:07 -0700
Subject: [PATCH 05/27] fix the log artifact upload bug
---
scripts/train/launcher.py | 43 +++++++++++++++++++++++----------------
1 file changed, 26 insertions(+), 17 deletions(-)
diff --git a/scripts/train/launcher.py b/scripts/train/launcher.py
index 3d0ee3b47b..e86e083e9c 100755
--- a/scripts/train/launcher.py
+++ b/scripts/train/launcher.py
@@ -390,6 +390,29 @@ def _get_file__(format: str):
process.stderr = stderr_file
processes[global_rank] = process
+def _logs_upload_to_mlflow(log_dirs: set[str], launcher_log: str):
+ import mlflow
+ # intialize mlflow experiment. Need to wait for processors to start the experiment
+ if os.environ.get("mlflow_runid") is None and os.path.exists(MLFlowLogger.CONFIG_FILE):
+ import json
+ with open(MLFlowLogger.CONFIG_FILE, "r") as f:
+ data = json.load(f)
+ log.error(f"ygong:Started mlflow run {data}")
+ os.environ['mlflow_runid'] = data[MLFlowLogger.RUN_ID_FIELD]
+
+ mlflow.set_tracking_uri(data[MLFlowLogger.TRACKING_URI_FIELD])
+ mlflow.start_run(run_id=data[MLFlowLogger.RUN_ID_FIELD], experiment_id=data[MLFlowLogger.EXPERIMENT_ID_FIELD])
+
+ # once the mlflow experiment is started, upload the logs
+ if os.environ.get("mlflow_runid") is not None:
+ try:
+ for log_dir in log_dirs:
+ log.warning(f"ygong: Logging directory: {log_dir}")
+ mlflow.log_artifacts(log_dir, log_dir.lstrip('/'))
+ mlflow.log_artifact(launcher_log)
+ except Exception as e:
+ log.error(f"ygong:Failed to log artifacts to mlflow: {e}")
+
def _monitor_processes(
processes: Dict[int, subprocess.Popen],
@@ -422,23 +445,7 @@ def _monitor_processes(
break
if cycle == 0:
- if mlflow_runid is None:
- if os.path.exists(MLFlowLogger.CONFIG_FILE):
- import json
- with open(MLFlowLogger.CONFIG_FILE, "r") as f:
- data = json.load(f)
- log.error(f"ygong:Started mlflow run {data}")
- mlflow_runid = data[MLFlowLogger.RUN_ID_FIELD]
- mlflow.set_tracking_uri(data[MLFlowLogger.TRACKING_URI_FIELD])
- mlflow.start_run(run_id=data[MLFlowLogger.RUN_ID_FIELD], experiment_id=data[MLFlowLogger.EXPERIMENT_ID_FIELD])
- else:
- try:
- for log_dir in log_dirs:
- log.warning(f"ygong: Logging directory: {log_dir}")
- mlflow.log_artifacts(log_dir, log_dir.lstrip('/'))
- mlflow.log_artifact(launcher_log)
- except Exception as e:
- log.error(f"ygong:Failed to log artifacts to mlflow: {e}")
+ _logs_upload_to_mlflow(log_dirs, launcher_log)
cycle = (cycle + 1) % log_frequency
time.sleep(0.1)
@@ -614,6 +621,8 @@ def main():
traceback.print_exc()
print('Killing training processes')
finally:
+ # upload the logs before exit
+ _logs_upload_to_mlflow(log_dirs=log_dirs, launcher_log=launcher_log)
_cleanup_processes(processes)
log_tmpdir.cleanup()
return _aggregate_process_returncode(processes)
From 9acc479f2759dd27669a3588b391e90f2bb7de01 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 10:32:37 -0700
Subject: [PATCH 06/27] aa
---
ygong/mosaic/submit.py | 27 +++++++++++++++++++--------
1 file changed, 19 insertions(+), 8 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index e757f34330..7e3cb5953b 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -17,6 +17,7 @@
# from databricks_genai.api.config import configure_request
from mcli import config
from mcli.api.runs.api_get_runs import get_run
+import logging
def _set_up_environment(content: str):
@@ -125,16 +126,21 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.
display(cancel_button)
display(HTML(summary.to_html(escape=False)))
-def _wait_for_run_status(run: Run, status: RunStatus):
+def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
run_name = run.name
- while not run.status.after(status, inclusive=True):
+ while not run.status.after(status, inclusive=inclusive):
time.sleep(5)
- run = get_run(run_name)
+ run = get_run(run_name)
+ logging.debug(f"DEBUG: run status {run.status}, expected status {status}")
+ logging.debug(f"DEBUG: finish waiting run reached expected status {status}")
return run
-def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False):
+def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False):
+ if debug:
+ logging.basicConfig(level=logging.DEBUG)
+
_init_connection()
mlflow_experiment_name = None
if model == "mpt125m":
@@ -154,6 +160,7 @@ def on_button_clicked(b):
clear_output(wait=False)
run = get_run(run_name)
run.stop()
+ logging.debug(f"DEBUG: run {run_name} is cancelled")
run = _wait_for_run_status(run, RunStatus.TERMINATING)
summary = _get_run_summary(run, mlflow_experiment_name)
display(HTML(summary.to_html(escape=False)))
@@ -173,15 +180,19 @@ def on_button_clicked(b):
try:
run = get_run(run)
if run.status.after(RunStatus.TERMINATING, inclusive=True):
+ logging.debug(f"DEBUG: run {run_name} is terminated. Status {run.status}")
break
summary = _get_run_summary(run, mlflow_experiment_name)
_display_run_summary(summary, button)
break
except ValueError:
- print(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}")
+
+ logging.debug(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}")
pass
if sync:
- print(f"DEBUG: synchronously waiting for the run to finish.")
- run = _wait_for_run_status(run, RunStatus.TERMINATING)
- _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
\ No newline at end of file
+ logging.debug(f"DEBUG: synchronously waiting for the run to finish.")
+ run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False)
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
+
+ return run
\ No newline at end of file
From 18bd472e83e3f9da78e1d35ea76fba28027bb05a Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 10:39:00 -0700
Subject: [PATCH 07/27] bb
---
ygong/mosaic/submit.py | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 7e3cb5953b..bd3cc8337b 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -18,7 +18,8 @@
from mcli import config
from mcli.api.runs.api_get_runs import get_run
import logging
-
+
+logger = logging.getLogger('ygong.mosaic.submit')
def _set_up_environment(content: str):
os.environ['CREDENTIALS'] = content
@@ -139,7 +140,7 @@ def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False):
if debug:
- logging.basicConfig(level=logging.DEBUG)
+ logger.setLevel(logging.DEBUG)
_init_connection()
mlflow_experiment_name = None
From 0368ccaa76adc55344d4b275bd389911500112c4 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 10:41:48 -0700
Subject: [PATCH 08/27] cc
---
ygong/mosaic/submit.py | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index bd3cc8337b..379bf80eb8 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -132,8 +132,8 @@ def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
while not run.status.after(status, inclusive=inclusive):
time.sleep(5)
run = get_run(run_name)
- logging.debug(f"DEBUG: run status {run.status}, expected status {status}")
- logging.debug(f"DEBUG: finish waiting run reached expected status {status}")
+ logger.debug(f"DEBUG: run status {run.status}, expected status {status}")
+ logger.debug(f"DEBUG: finish waiting run reached expected status {status}")
return run
@@ -161,7 +161,7 @@ def on_button_clicked(b):
clear_output(wait=False)
run = get_run(run_name)
run.stop()
- logging.debug(f"DEBUG: run {run_name} is cancelled")
+ logger.debug(f"DEBUG: run {run_name} is cancelled")
run = _wait_for_run_status(run, RunStatus.TERMINATING)
summary = _get_run_summary(run, mlflow_experiment_name)
display(HTML(summary.to_html(escape=False)))
@@ -181,18 +181,18 @@ def on_button_clicked(b):
try:
run = get_run(run)
if run.status.after(RunStatus.TERMINATING, inclusive=True):
- logging.debug(f"DEBUG: run {run_name} is terminated. Status {run.status}")
+ logger.debug(f"DEBUG: run {run_name} is terminated. Status {run.status}")
break
summary = _get_run_summary(run, mlflow_experiment_name)
_display_run_summary(summary, button)
break
except ValueError:
- logging.debug(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}")
+ logger.debug(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}")
pass
if sync:
- logging.debug(f"DEBUG: synchronously waiting for the run to finish.")
+ logger.debug(f"DEBUG: synchronously waiting for the run to finish.")
run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False)
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
From 24db5fa90702764189efe6fb39e14b620c2b3c62 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 10:50:51 -0700
Subject: [PATCH 09/27] dd
---
ygong/mosaic/submit.py | 23 +++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 379bf80eb8..8a542475e2 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -18,8 +18,10 @@
from mcli import config
from mcli.api.runs.api_get_runs import get_run
import logging
+import sys
logger = logging.getLogger('ygong.mosaic.submit')
+
def _set_up_environment(content: str):
os.environ['CREDENTIALS'] = content
@@ -132,15 +134,24 @@ def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
while not run.status.after(status, inclusive=inclusive):
time.sleep(5)
run = get_run(run_name)
- logger.debug(f"DEBUG: run status {run.status}, expected status {status}")
- logger.debug(f"DEBUG: finish waiting run reached expected status {status}")
+ logger.debug(f"run status {run.status}, expected status {status}")
+ logger.debug(f"finish waiting run reached expected status {status}")
return run
def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False):
if debug:
+ stdout_handler = logging.StreamHandler(sys.stdout)
+ stdout_handler.setLevel(logging.DEBUG) # Set minimum log level for the handler
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ stdout_handler.setFormatter(formatter)
+
+ # Add the handler to the logger
+ logger.addHandler(stdout_handler)
logger.setLevel(logging.DEBUG)
+
+ logger.info("set the logger to debug mode")
_init_connection()
mlflow_experiment_name = None
@@ -161,7 +172,7 @@ def on_button_clicked(b):
clear_output(wait=False)
run = get_run(run_name)
run.stop()
- logger.debug(f"DEBUG: run {run_name} is cancelled")
+ logger.debug(f"run {run_name} is cancelled")
run = _wait_for_run_status(run, RunStatus.TERMINATING)
summary = _get_run_summary(run, mlflow_experiment_name)
display(HTML(summary.to_html(escape=False)))
@@ -181,18 +192,18 @@ def on_button_clicked(b):
try:
run = get_run(run)
if run.status.after(RunStatus.TERMINATING, inclusive=True):
- logger.debug(f"DEBUG: run {run_name} is terminated. Status {run.status}")
+ logger.debug(f"run {run_name} is terminated. Status {run.status}")
break
summary = _get_run_summary(run, mlflow_experiment_name)
_display_run_summary(summary, button)
break
except ValueError:
- logger.debug(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}")
+ logger.debug(f"waiting for the MLFLow experiment run to be ready, run status{run.status}")
pass
if sync:
- logger.debug(f"DEBUG: synchronously waiting for the run to finish.")
+ logger.debug(f"synchronously waiting for the run to finish.")
run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False)
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
From 4c08f3b9d7b87b149dc3d6bdf21f9393c0a388cd Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 12:13:21 -0700
Subject: [PATCH 10/27] remove packaging
---
setup.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/setup.py b/setup.py
index faa27e7556..2c32f79894 100644
--- a/setup.py
+++ b/setup.py
@@ -85,7 +85,7 @@
'pytest-cov>=4,<5',
'pyright==1.1.256',
'toml>=0.10.2,<0.11',
- 'packaging>=21,<23',
+ # 'packaging>=21,<23',
'hf_transfer==0.1.3',
]
From d8ba721f2d8c280f5e373d2b106bf572bfa940b0 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 15:06:43 -0700
Subject: [PATCH 11/27] temprarily avoid displaying the button so that it can
be runnable by jobs
---
setup.py | 2 +-
ygong/mosaic/submit.py | 31 +++++++++++++++++--------------
2 files changed, 18 insertions(+), 15 deletions(-)
diff --git a/setup.py b/setup.py
index 2c32f79894..018f067f72 100644
--- a/setup.py
+++ b/setup.py
@@ -85,7 +85,7 @@
'pytest-cov>=4,<5',
'pyright==1.1.256',
'toml>=0.10.2,<0.11',
- # 'packaging>=21,<23',
+ 'packaging>=21',
'hf_transfer==0.1.3',
]
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 8a542475e2..d7ae51b66a 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -167,23 +167,25 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False,
run = create_run(runConfig)
run_name = run.name
# Create a button
- button = widgets.Button(description="cancel the run")
- def on_button_clicked(b):
- clear_output(wait=False)
- run = get_run(run_name)
- run.stop()
- logger.debug(f"run {run_name} is cancelled")
- run = _wait_for_run_status(run, RunStatus.TERMINATING)
- summary = _get_run_summary(run, mlflow_experiment_name)
- display(HTML(summary.to_html(escape=False)))
- button.on_click(on_button_clicked)
- _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
+ # button = widgets.Button(description="cancel the run")
+ # def on_button_clicked(b):
+ # clear_output(wait=False)
+ # run = get_run(run_name)
+ # run.stop()
+ # logger.debug(f"run {run_name} is cancelled")
+ # run = _wait_for_run_status(run, RunStatus.TERMINATING)
+ # summary = _get_run_summary(run, mlflow_experiment_name)
+ # display(HTML(summary.to_html(escape=False)))
+ # button.on_click(on_button_clicked)
+ # _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
# setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
# when the run just starts running
run = _wait_for_run_status(run, RunStatus.RUNNING)
- _display_run_summary(_get_run_summary(run, None), button)
-
+ # _display_run_summary(_get_run_summary(run, None), button)
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
+
try_count = 0
while try_count < 10:
@@ -195,7 +197,8 @@ def on_button_clicked(b):
logger.debug(f"run {run_name} is terminated. Status {run.status}")
break
summary = _get_run_summary(run, mlflow_experiment_name)
- _display_run_summary(summary, button)
+ # _display_run_summary(summary, button)
+ _display_run_summary(summary, None)
break
except ValueError:
From 766a638e015652e57ad1355caf10b97683d3a53e Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 15:19:43 -0700
Subject: [PATCH 12/27] add more debuggin infor
---
ygong/mosaic/submit.py | 3 +++
1 file changed, 3 insertions(+)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index d7ae51b66a..4f47f6a36c 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -37,6 +37,7 @@ def _is_local():
return True
if _is_local():
+ logger.debug("init_connection in local mode")
if os.environ.get('CREDENTIALS') is None:
raise ValueError("_set_up_environment must be manually called to configure credentials for local runs")
data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8'))
@@ -72,6 +73,7 @@ def _is_local():
create_secret(s)
else:
+ logger.debug("init_connection in databricks environment")
wc = WorkspaceClient()
import mlflow.utils.databricks_utils as databricks_utils
workspace_url = databricks_utils.get_workspace_info_from_dbutils()[0]
@@ -79,6 +81,7 @@ def _is_local():
token = ctx.apiToken().get()
api_url = ctx.apiUrl().get()
endpoint = f'{api_url}/api/2.0/genai-mapi/graphql'
+ logger.debug(f"init_connection token: {token}, api_url: {api_url}, endpoint: {endpoint}, workspace_url: {workspace_url}")
os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}'
os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint
From 46734a15066ba517f55e432fa33f9f118f184414 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 16:25:57 -0700
Subject: [PATCH 13/27] cc
---
ygong/mosaic/submit.py | 46 +++++++++++++++++++++---------------------
1 file changed, 23 insertions(+), 23 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 4f47f6a36c..ed59909069 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -71,27 +71,27 @@ def _is_local():
s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token)
print(f"successfully created databricks secret: {databricks_secret_name}")
create_secret(s)
+ os.environ['IS_JOBS'] = False
else:
logger.debug("init_connection in databricks environment")
wc = WorkspaceClient()
- import mlflow.utils.databricks_utils as databricks_utils
- workspace_url = databricks_utils.get_workspace_info_from_dbutils()[0]
ctx = wc.dbutils.entry_point.getDbutils().notebook().getContext()
token = ctx.apiToken().get()
api_url = ctx.apiUrl().get()
endpoint = f'{api_url}/api/2.0/genai-mapi/graphql'
- logger.debug(f"init_connection token: {token}, api_url: {api_url}, endpoint: {endpoint}, workspace_url: {workspace_url}")
+ workspace_url = api_url
os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}'
os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint
+ os.environ['IS_JOBS'] = ctx.jobId() is not None
# needed to set up the MLFlow query for experiment runs
os.environ['WORKSPACE_URL'] = workspace_url
os.environ['MLFLOW_TRACKING_TOKEN'] = token
+ logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs:{ os.environ['IS_JOBS']}")
+
-
-
def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str):
if tracking_uri is None:
raise ValueError("tracking_uri must be provided")
@@ -170,25 +170,26 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False,
run = create_run(runConfig)
run_name = run.name
# Create a button
- # button = widgets.Button(description="cancel the run")
- # def on_button_clicked(b):
- # clear_output(wait=False)
- # run = get_run(run_name)
- # run.stop()
- # logger.debug(f"run {run_name} is cancelled")
- # run = _wait_for_run_status(run, RunStatus.TERMINATING)
- # summary = _get_run_summary(run, mlflow_experiment_name)
- # display(HTML(summary.to_html(escape=False)))
- # button.on_click(on_button_clicked)
- # _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
- _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
-
+ if os.environ['IS_JOBS']:
+ button = None
+ else:
+ button = widgets.Button(description="cancel the run")
+ def on_button_clicked(b):
+ clear_output(wait=False)
+ run = get_run(run_name)
+ run.stop()
+ logger.debug(f"run {run_name} is cancelled")
+ run = _wait_for_run_status(run, RunStatus.TERMINATING)
+ summary = _get_run_summary(run, mlflow_experiment_name)
+ display(HTML(summary.to_html(escape=False)))
+ button.on_click(on_button_clicked)
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
+
# setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
# when the run just starts running
run = _wait_for_run_status(run, RunStatus.RUNNING)
- # _display_run_summary(_get_run_summary(run, None), button)
- _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
-
+ _display_run_summary(_get_run_summary(run, None), button)
+
try_count = 0
while try_count < 10:
@@ -200,8 +201,7 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False,
logger.debug(f"run {run_name} is terminated. Status {run.status}")
break
summary = _get_run_summary(run, mlflow_experiment_name)
- # _display_run_summary(summary, button)
- _display_run_summary(summary, None)
+ _display_run_summary(summary, button)
break
except ValueError:
From f28cfbc8b5ba9e6f22122d032e429be299d874f6 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Fri, 22 Mar 2024 16:46:39 -0700
Subject: [PATCH 14/27] dd
---
ygong/mosaic/submit.py | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index ed59909069..9ae7b2536c 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -71,7 +71,6 @@ def _is_local():
s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token)
print(f"successfully created databricks secret: {databricks_secret_name}")
create_secret(s)
- os.environ['IS_JOBS'] = False
else:
logger.debug("init_connection in databricks environment")
@@ -83,12 +82,16 @@ def _is_local():
workspace_url = api_url
os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}'
os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint
- os.environ['IS_JOBS'] = ctx.jobId() is not None
+ try:
+ jobs_id = ctx.jobId().get()
+ os.environ['JOB_ID'] = jobs_id
+ except:
+ pass
# needed to set up the MLFlow query for experiment runs
os.environ['WORKSPACE_URL'] = workspace_url
os.environ['MLFLOW_TRACKING_TOKEN'] = token
- logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs:{ os.environ['IS_JOBS']}")
+ logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs: {os.environ.get('JOB_ID')}")
@@ -116,7 +119,6 @@ def _get_run_summary(run: Run, experiment_name: Optional[str] = None):
url = None
if run.status == RunStatus.RUNNING and experiment_name is not None:
url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name)
- print(f"ygong: url {url}")
df = pd.DataFrame({
'Run Name': [run.name],
@@ -170,7 +172,8 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False,
run = create_run(runConfig)
run_name = run.name
# Create a button
- if os.environ['IS_JOBS']:
+ if os.environ.get('JOB_ID') is not None:
+ # running in jobs workflow, no need to cancel the run and doesn't support widgets
button = None
else:
button = widgets.Button(description="cancel the run")
From 0737de7aa8dad006d2b546059aa7c6aa0c895527 Mon Sep 17 00:00:00 2001
From: Shitao Li
Date: Mon, 25 Mar 2024 17:09:38 +0000
Subject: [PATCH 15/27] add support for retry and preemption
---
ygong/mosaic/mpt125mConfig.py | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py
index 322fc181e3..a213fce2d5 100644
--- a/ygong/mosaic/mpt125mConfig.py
+++ b/ygong/mosaic/mpt125mConfig.py
@@ -6,14 +6,24 @@
class MPT125MConfig:
- def __init__(self, experimentName: str, data: str, priority: str = 'high'):
+ def __init__(
+ self,
+ experimentName: str,
+ data: str,
+ priority: str = 'high',
+ preemptible: bool = False,
+ retry_on_system_failure: bool = False):
# TODO: validate the inputs and remove the yu.gong hardcode
self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}"
self.mlflow_trackingUri = "databricks"
# self.mlflow_trackingUri = "databricks" if workspace_url is None else workspace_url
self.data = data
+
+ # Scheudling parameters
self.priority = priority
+ self.preemptible = preemptible
+ self.retry_on_system_failure = retry_on_system_failure
# the run name is pre-configured for all config-driven pretrain runs
self.name = "mpt125m-config-driven-pretrain"
@@ -37,7 +47,11 @@ def toRunConfig(self, scalingConfig: ScalingConfig):
image='mosaicml/llm-foundry:2.2.1_cu121_flash2-latest',
command="\n".join(self.commands),
compute=scalingConfig.toCompute,
- scheduling={'priority': self.priority},
+ scheduling={
+ 'priority': self.priority,
+ 'preemptible': self.preemptible,
+ 'retry_on_system_failure': self.retry_on_system_failure
+ },
integrations=[
{
'integration_type': 'git_repo',
From 69375aba57bb3e00d5061172b131271ccebc2a6a Mon Sep 17 00:00:00 2001
From: Shitao Li
Date: Mon, 25 Mar 2024 21:23:16 +0000
Subject: [PATCH 16/27] use mcli style status
---
ygong/mosaic/submit.py | 120 +++++++++++++++++------------------------
1 file changed, 48 insertions(+), 72 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 8a542475e2..97135f9974 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -1,23 +1,21 @@
from ygong.mosaic.scaling_config import ScalingConfig
from ygong.mosaic.mpt125mConfig import MPT125MConfig
-from mcli import wait_for_run_status, Run, RunConfig, RunStatus, create_run
-import pandas as pd
-import ipywidgets as widgets
+
+from databricks.sdk import WorkspaceClient
+from mcli import config, Run, RunStatus, create_run
+from mcli.api.runs.api_get_runs import get_run
+from mcli.cli.m_get.runs import RunDisplayItem
from IPython.display import display, clear_output, HTML
+import ipywidgets as widgets
import mlflow
-import os
+import pandas as pd
+
from typing import Optional
-import time
import base64
+import time
import json
-import hashlib
-from mcli.api.engine.engine import MAPIConnection
-from mcli.config import MCLIConfig
-from databricks.sdk import WorkspaceClient
-# from databricks_genai.api.config import configure_request
-from mcli import config
-from mcli.api.runs.api_get_runs import get_run
import logging
+import os
import sys
logger = logging.getLogger('ygong.mosaic.submit')
@@ -42,35 +40,9 @@ def _is_local():
data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8'))
workspace_url = data.get("workspace_url", None)
token = data.get("token", None)
- mosaic_token = data.get("mosaic_token", None)
# set up the mosaic token
- conf = MCLIConfig.load_config()
- conf.api_key = mosaic_token
- conf.save_config()
- MAPIConnection.reset_connection()
-
-
- hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8]
- databricks_secret_name = f"databricks-{hash}"
-
- # clean up the old secret. MosaicML doesn't support multiple databricks secrets
- # would have to clean up the old secret if it exists
- from mcli.api.secrets.api_get_secrets import get_secrets
- from mcli.api.secrets.api_delete_secrets import delete_secrets
- from mcli.models.mcli_secret import SecretType
- s = get_secrets(secret_types=[SecretType.databricks])
- if len(s) == 1:
- if s[0].name != databricks_secret_name:
- delete_secrets(s)
- else:
- print("databricks secret already exists")
- return
- from mcli.objects.secrets.create.databricks import DatabricksSecretCreator
- from mcli.api.secrets.api_create_secret import create_secret
- s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token)
- print(f"successfully created databricks secret: {databricks_secret_name}")
- create_secret(s)
-
+ os.environ[config.MCLI_MODE_ENV] = config.MCLIMode.DBX_AWS_STAGING.value
+ os.environ[config.MOSAICML_ACCESS_TOKEN_FILE_ENV] = "/home/shitao.li/e2_token"
else:
wc = WorkspaceClient()
import mlflow.utils.databricks_utils as databricks_utils
@@ -86,8 +58,6 @@ def _is_local():
os.environ['WORKSPACE_URL'] = workspace_url
os.environ['MLFLOW_TRACKING_TOKEN'] = token
-
-
def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str):
if tracking_uri is None:
@@ -108,19 +78,22 @@ def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, ru
else:
run_id = runs[0].info.run_id
return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}"
-
+
+
def _get_run_summary(run: Run, experiment_name: Optional[str] = None):
url = None
- if run.status == RunStatus.RUNNING and experiment_name is not None:
- url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name)
- print(f"ygong: url {url}")
+
+ run_rows = []
+
+ # Copy pasted from mcli to display the the resumption status of the run.
+ for row_raw in RunDisplayItem.from_run(run, [], True):
+ row = row_raw.to_dict()
+ if row['Status'].startswith('Running') and experiment_name is not None:
+ url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name)
+ row['Experiment Run'] =f'Link' if url is not None else ""
+ run_rows.append(row)
- df = pd.DataFrame({
- 'Run Name': [run.name],
- 'Run ID': [run.run_uid],
- "Status": [str(run.status)],
- 'Experiment Run': [f'Link' if url is not None else ""],
- })
+ df = pd.DataFrame(run_rows)
return df
def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.Button]):
@@ -129,17 +102,6 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.
display(cancel_button)
display(HTML(summary.to_html(escape=False)))
-def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
- run_name = run.name
- while not run.status.after(status, inclusive=inclusive):
- time.sleep(5)
- run = get_run(run_name)
- logger.debug(f"run status {run.status}, expected status {status}")
- logger.debug(f"finish waiting run reached expected status {status}")
- return run
-
-
-
def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False):
if debug:
stdout_handler = logging.StreamHandler(sys.stdout)
@@ -177,13 +139,28 @@ def on_button_clicked(b):
summary = _get_run_summary(run, mlflow_experiment_name)
display(HTML(summary.to_html(escape=False)))
button.on_click(on_button_clicked)
- _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
- # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
- # when the run just starts running
+ def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
+ run_name = run.name
+ while not run.status.after(status, inclusive=inclusive) and not run.status.is_terminal():
+ run = get_run(run_name)
+ # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
+ # when the run just starts running
+ _display_run_summary(_get_run_summary(run, None), button)
+ time.sleep(5)
+ logger.debug(f"finish waiting run reached expected status {status}")
+ return run
+
+ def _wait_for_run_finish(run: Run):
+ run_name = run.name
+ while not run.status.is_terminal():
+ run = get_run(run_name)
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
+ time.sleep(5)
+ logger.debug(f"finish waiting run reached terminal")
+ return run
+
run = _wait_for_run_status(run, RunStatus.RUNNING)
- _display_run_summary(_get_run_summary(run, None), button)
-
try_count = 0
while try_count < 10:
@@ -191,20 +168,19 @@ def on_button_clicked(b):
time.sleep(20)
try:
run = get_run(run)
- if run.status.after(RunStatus.TERMINATING, inclusive=True):
- logger.debug(f"run {run_name} is terminated. Status {run.status}")
+ if run.status.is_terminal():
+ logger.debug(f"run {run_name} is in terminal state. Status {run.status}")
break
summary = _get_run_summary(run, mlflow_experiment_name)
_display_run_summary(summary, button)
break
except ValueError:
-
logger.debug(f"waiting for the MLFLow experiment run to be ready, run status{run.status}")
pass
if sync:
logger.debug(f"synchronously waiting for the run to finish.")
- run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False)
+ run = _wait_for_run_finish(run)
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
return run
\ No newline at end of file
From 3abd4fa57165c503bad1190948a587c309778a80 Mon Sep 17 00:00:00 2001
From: Shitao Li
Date: Tue, 26 Mar 2024 23:31:19 +0000
Subject: [PATCH 17/27] revert some change
---
ygong/mosaic/submit.py | 29 +++++++++++++++++++++++++++--
1 file changed, 27 insertions(+), 2 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 62c1f002bb..9767259c81 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -41,9 +41,34 @@ def _is_local():
data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8'))
workspace_url = data.get("workspace_url", None)
token = data.get("token", None)
+ mosaic_token = data.get("mosaic_token", None)
# set up the mosaic token
- os.environ[config.MCLI_MODE_ENV] = config.MCLIMode.DBX_AWS_STAGING.value
- os.environ[config.MOSAICML_ACCESS_TOKEN_FILE_ENV] = "/home/shitao.li/e2_token"
+ conf = MCLIConfig.load_config()
+ conf.api_key = mosaic_token
+ conf.save_config()
+ MAPIConnection.reset_connection()
+
+
+ hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8]
+ databricks_secret_name = f"databricks-{hash}"
+
+ # clean up the old secret. MosaicML doesn't support multiple databricks secrets
+ # would have to clean up the old secret if it exists
+ from mcli.api.secrets.api_get_secrets import get_secrets
+ from mcli.api.secrets.api_delete_secrets import delete_secrets
+ from mcli.models.mcli_secret import SecretType
+ s = get_secrets(secret_types=[SecretType.databricks])
+ if len(s) == 1:
+ if s[0].name != databricks_secret_name:
+ delete_secrets(s)
+ else:
+ print("databricks secret already exists")
+ return
+ from mcli.objects.secrets.create.databricks import DatabricksSecretCreator
+ from mcli.api.secrets.api_create_secret import create_secret
+ s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token)
+ print(f"successfully created databricks secret: {databricks_secret_name}")
+ create_secret(s)
else:
logger.debug("init_connection in databricks environment")
wc = WorkspaceClient()
From 1359e58f319587a725a7f49217c56bebd246c2b4 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Wed, 27 Mar 2024 10:39:21 -0700
Subject: [PATCH 18/27] make it works in job workflow
---
setup.py | 2 +-
ygong/mosaic/submit.py | 80 +++++++++++++++++++++++++++++-------------
2 files changed, 57 insertions(+), 25 deletions(-)
diff --git a/setup.py b/setup.py
index faa27e7556..018f067f72 100644
--- a/setup.py
+++ b/setup.py
@@ -85,7 +85,7 @@
'pytest-cov>=4,<5',
'pyright==1.1.256',
'toml>=0.10.2,<0.11',
- 'packaging>=21,<23',
+ 'packaging>=21',
'hf_transfer==0.1.3',
]
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index e757f34330..9ae7b2536c 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -17,7 +17,11 @@
# from databricks_genai.api.config import configure_request
from mcli import config
from mcli.api.runs.api_get_runs import get_run
-
+import logging
+import sys
+
+logger = logging.getLogger('ygong.mosaic.submit')
+
def _set_up_environment(content: str):
os.environ['CREDENTIALS'] = content
@@ -33,6 +37,7 @@ def _is_local():
return True
if _is_local():
+ logger.debug("init_connection in local mode")
if os.environ.get('CREDENTIALS') is None:
raise ValueError("_set_up_environment must be manually called to configure credentials for local runs")
data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8'))
@@ -68,23 +73,28 @@ def _is_local():
create_secret(s)
else:
+ logger.debug("init_connection in databricks environment")
wc = WorkspaceClient()
- import mlflow.utils.databricks_utils as databricks_utils
- workspace_url = databricks_utils.get_workspace_info_from_dbutils()[0]
ctx = wc.dbutils.entry_point.getDbutils().notebook().getContext()
token = ctx.apiToken().get()
api_url = ctx.apiUrl().get()
endpoint = f'{api_url}/api/2.0/genai-mapi/graphql'
+ workspace_url = api_url
os.environ[config.MOSAICML_API_KEY_ENV] = f'Bearer {token}'
os.environ[config.MOSAICML_API_ENDPOINT_ENV] = endpoint
+ try:
+ jobs_id = ctx.jobId().get()
+ os.environ['JOB_ID'] = jobs_id
+ except:
+ pass
# needed to set up the MLFlow query for experiment runs
os.environ['WORKSPACE_URL'] = workspace_url
os.environ['MLFLOW_TRACKING_TOKEN'] = token
+ logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs: {os.environ.get('JOB_ID')}")
+
-
-
def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str):
if tracking_uri is None:
raise ValueError("tracking_uri must be provided")
@@ -109,7 +119,6 @@ def _get_run_summary(run: Run, experiment_name: Optional[str] = None):
url = None
if run.status == RunStatus.RUNNING and experiment_name is not None:
url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name)
- print(f"ygong: url {url}")
df = pd.DataFrame({
'Run Name': [run.name],
@@ -125,16 +134,30 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.
display(cancel_button)
display(HTML(summary.to_html(escape=False)))
-def _wait_for_run_status(run: Run, status: RunStatus):
+def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
run_name = run.name
- while not run.status.after(status, inclusive=True):
+ while not run.status.after(status, inclusive=inclusive):
time.sleep(5)
- run = get_run(run_name)
+ run = get_run(run_name)
+ logger.debug(f"run status {run.status}, expected status {status}")
+ logger.debug(f"finish waiting run reached expected status {status}")
return run
-def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False):
+def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False):
+ if debug:
+ stdout_handler = logging.StreamHandler(sys.stdout)
+ stdout_handler.setLevel(logging.DEBUG) # Set minimum log level for the handler
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ stdout_handler.setFormatter(formatter)
+
+ # Add the handler to the logger
+ logger.addHandler(stdout_handler)
+ logger.setLevel(logging.DEBUG)
+
+ logger.info("set the logger to debug mode")
+
_init_connection()
mlflow_experiment_name = None
if model == "mpt125m":
@@ -149,17 +172,22 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False)
run = create_run(runConfig)
run_name = run.name
# Create a button
- button = widgets.Button(description="cancel the run")
- def on_button_clicked(b):
- clear_output(wait=False)
- run = get_run(run_name)
- run.stop()
- run = _wait_for_run_status(run, RunStatus.TERMINATING)
- summary = _get_run_summary(run, mlflow_experiment_name)
- display(HTML(summary.to_html(escape=False)))
- button.on_click(on_button_clicked)
+ if os.environ.get('JOB_ID') is not None:
+ # running in jobs workflow, no need to cancel the run and doesn't support widgets
+ button = None
+ else:
+ button = widgets.Button(description="cancel the run")
+ def on_button_clicked(b):
+ clear_output(wait=False)
+ run = get_run(run_name)
+ run.stop()
+ logger.debug(f"run {run_name} is cancelled")
+ run = _wait_for_run_status(run, RunStatus.TERMINATING)
+ summary = _get_run_summary(run, mlflow_experiment_name)
+ display(HTML(summary.to_html(escape=False)))
+ button.on_click(on_button_clicked)
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
-
+
# setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
# when the run just starts running
run = _wait_for_run_status(run, RunStatus.RUNNING)
@@ -173,15 +201,19 @@ def on_button_clicked(b):
try:
run = get_run(run)
if run.status.after(RunStatus.TERMINATING, inclusive=True):
+ logger.debug(f"run {run_name} is terminated. Status {run.status}")
break
summary = _get_run_summary(run, mlflow_experiment_name)
_display_run_summary(summary, button)
break
except ValueError:
- print(f"DEBUG: waiting for the MLFLow experiment run to be ready, run status{run.status}")
+
+ logger.debug(f"waiting for the MLFLow experiment run to be ready, run status{run.status}")
pass
if sync:
- print(f"DEBUG: synchronously waiting for the run to finish.")
- run = _wait_for_run_status(run, RunStatus.TERMINATING)
- _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
\ No newline at end of file
+ logger.debug(f"synchronously waiting for the run to finish.")
+ run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False)
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
+
+ return run
\ No newline at end of file
From 4cd30cdf1632be7c07e9373cc0ec34536b2da732 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Wed, 27 Mar 2024 10:52:39 -0700
Subject: [PATCH 19/27] port shitao's change
https://github.com/ygong1/llm-foundry/pull/1/files
---
ygong/mosaic/mpt125mConfig.py | 18 ++++++-
ygong/mosaic/submit.py | 97 ++++++++++++++++++-----------------
2 files changed, 66 insertions(+), 49 deletions(-)
diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py
index 322fc181e3..a213fce2d5 100644
--- a/ygong/mosaic/mpt125mConfig.py
+++ b/ygong/mosaic/mpt125mConfig.py
@@ -6,14 +6,24 @@
class MPT125MConfig:
- def __init__(self, experimentName: str, data: str, priority: str = 'high'):
+ def __init__(
+ self,
+ experimentName: str,
+ data: str,
+ priority: str = 'high',
+ preemptible: bool = False,
+ retry_on_system_failure: bool = False):
# TODO: validate the inputs and remove the yu.gong hardcode
self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}"
self.mlflow_trackingUri = "databricks"
# self.mlflow_trackingUri = "databricks" if workspace_url is None else workspace_url
self.data = data
+
+ # Scheudling parameters
self.priority = priority
+ self.preemptible = preemptible
+ self.retry_on_system_failure = retry_on_system_failure
# the run name is pre-configured for all config-driven pretrain runs
self.name = "mpt125m-config-driven-pretrain"
@@ -37,7 +47,11 @@ def toRunConfig(self, scalingConfig: ScalingConfig):
image='mosaicml/llm-foundry:2.2.1_cu121_flash2-latest',
command="\n".join(self.commands),
compute=scalingConfig.toCompute,
- scheduling={'priority': self.priority},
+ scheduling={
+ 'priority': self.priority,
+ 'preemptible': self.preemptible,
+ 'retry_on_system_failure': self.retry_on_system_failure
+ },
integrations=[
{
'integration_type': 'git_repo',
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 9ae7b2536c..9767259c81 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -1,23 +1,21 @@
from ygong.mosaic.scaling_config import ScalingConfig
from ygong.mosaic.mpt125mConfig import MPT125MConfig
-from mcli import wait_for_run_status, Run, RunConfig, RunStatus, create_run
-import pandas as pd
-import ipywidgets as widgets
+
+from databricks.sdk import WorkspaceClient
+from mcli import config, Run, RunStatus, create_run
+from mcli.api.runs.api_get_runs import get_run
+from mcli.cli.m_get.runs import RunDisplayItem
from IPython.display import display, clear_output, HTML
+import ipywidgets as widgets
import mlflow
-import os
+import pandas as pd
+
from typing import Optional
-import time
import base64
+import time
import json
-import hashlib
-from mcli.api.engine.engine import MAPIConnection
-from mcli.config import MCLIConfig
-from databricks.sdk import WorkspaceClient
-# from databricks_genai.api.config import configure_request
-from mcli import config
-from mcli.api.runs.api_get_runs import get_run
import logging
+import os
import sys
logger = logging.getLogger('ygong.mosaic.submit')
@@ -50,10 +48,10 @@ def _is_local():
conf.save_config()
MAPIConnection.reset_connection()
-
+
hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8]
databricks_secret_name = f"databricks-{hash}"
-
+
# clean up the old secret. MosaicML doesn't support multiple databricks secrets
# would have to clean up the old secret if it exists
from mcli.api.secrets.api_get_secrets import get_secrets
@@ -71,7 +69,6 @@ def _is_local():
s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token)
print(f"successfully created databricks secret: {databricks_secret_name}")
create_secret(s)
-
else:
logger.debug("init_connection in databricks environment")
wc = WorkspaceClient()
@@ -93,8 +90,7 @@ def _is_local():
os.environ['MLFLOW_TRACKING_TOKEN'] = token
logger.debug(f"init_connection token: {os.environ['MLFLOW_TRACKING_TOKEN']}, workspace: {os.environ['WORKSPACE_URL']}, is_jobs: {os.environ.get('JOB_ID')}")
-
-
+
def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str):
if tracking_uri is None:
raise ValueError("tracking_uri must be provided")
@@ -114,18 +110,22 @@ def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, ru
else:
run_id = runs[0].info.run_id
return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}"
-
+
+
def _get_run_summary(run: Run, experiment_name: Optional[str] = None):
url = None
- if run.status == RunStatus.RUNNING and experiment_name is not None:
- url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name)
+
+ run_rows = []
+
+ # Copy pasted from mcli to display the the resumption status of the run.
+ for row_raw in RunDisplayItem.from_run(run, [], True):
+ row = row_raw.to_dict()
+ if row['Status'].startswith('Running') and experiment_name is not None:
+ url = get_experiment_run_url(os.environ.get('WORKSPACE_URL'), experiment_name, run.name)
+ row['Experiment Run'] =f'Link' if url is not None else ""
+ run_rows.append(row)
- df = pd.DataFrame({
- 'Run Name': [run.name],
- 'Run ID': [run.run_uid],
- "Status": [str(run.status)],
- 'Experiment Run': [f'Link' if url is not None else ""],
- })
+ df = pd.DataFrame(run_rows)
return df
def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.Button]):
@@ -134,17 +134,6 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.
display(cancel_button)
display(HTML(summary.to_html(escape=False)))
-def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
- run_name = run.name
- while not run.status.after(status, inclusive=inclusive):
- time.sleep(5)
- run = get_run(run_name)
- logger.debug(f"run status {run.status}, expected status {status}")
- logger.debug(f"finish waiting run reached expected status {status}")
- return run
-
-
-
def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False):
if debug:
stdout_handler = logging.StreamHandler(sys.stdout)
@@ -168,7 +157,6 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False,
else:
raise ValueError(f"model {model} is not supported")
-
run = create_run(runConfig)
run_name = run.name
# Create a button
@@ -187,12 +175,28 @@ def on_button_clicked(b):
display(HTML(summary.to_html(escape=False)))
button.on_click(on_button_clicked)
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
-
- # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
- # when the run just starts running
+
+ def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
+ run_name = run.name
+ while not run.status.after(status, inclusive=inclusive) and not run.status.is_terminal():
+ run = get_run(run_name)
+ # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
+ # when the run just starts running
+ _display_run_summary(_get_run_summary(run, None), button)
+ time.sleep(5)
+ logger.debug(f"finish waiting run reached expected status {status}")
+ return run
+
+ def _wait_for_run_finish(run: Run):
+ run_name = run.name
+ while not run.status.is_terminal():
+ run = get_run(run_name)
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
+ time.sleep(5)
+ logger.debug(f"finish waiting run reached terminal")
+ return run
+
run = _wait_for_run_status(run, RunStatus.RUNNING)
- _display_run_summary(_get_run_summary(run, None), button)
-
try_count = 0
while try_count < 10:
@@ -200,20 +204,19 @@ def on_button_clicked(b):
time.sleep(20)
try:
run = get_run(run)
- if run.status.after(RunStatus.TERMINATING, inclusive=True):
- logger.debug(f"run {run_name} is terminated. Status {run.status}")
+ if run.status.is_terminal():
+ logger.debug(f"run {run_name} is in terminal state. Status {run.status}")
break
summary = _get_run_summary(run, mlflow_experiment_name)
_display_run_summary(summary, button)
break
except ValueError:
-
logger.debug(f"waiting for the MLFLow experiment run to be ready, run status{run.status}")
pass
if sync:
logger.debug(f"synchronously waiting for the run to finish.")
- run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False)
+ run = _wait_for_run_finish(run)
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
return run
\ No newline at end of file
From 461cb232da8323d6b6c1da256dbdb81b68a613a5 Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Wed, 27 Mar 2024 10:49:17 -0700
Subject: [PATCH 20/27] chagne
---
.gitignore | 2 ++
ygong/mosaic/submit.py | 45 +++++++++++++++++-------------------------
2 files changed, 20 insertions(+), 27 deletions(-)
diff --git a/.gitignore b/.gitignore
index 72f965ce63..5da48d2cc0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -158,3 +158,5 @@ notebooks/
**/mlruns/*
**/tokenizer-save-dir-*/**
**/.downloaded_finetuning/
+
+.databricks
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 9767259c81..dcf3cf5c40 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -17,6 +17,8 @@
import logging
import os
import sys
+from mcli import RunConfig
+import hashlib
logger = logging.getLogger('ygong.mosaic.submit')
@@ -134,7 +136,16 @@ def _display_run_summary(summary: pd.DataFrame, cancel_button: Optional[widgets.
display(cancel_button)
display(HTML(summary.to_html(escape=False)))
-def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False):
+def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
+ run_name = run.name
+ while not run.status.after(status, inclusive=inclusive):
+ time.sleep(5)
+ run = get_run(run_name)
+ logger.debug(f"run status {run.status}, expected status {status}")
+ logger.debug(f"finish waiting run reached expected status {status}")
+ return run
+
+def submit(config: any, scalingConfig: ScalingConfig, sync: bool = False, debug: bool = False):
if debug:
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG) # Set minimum log level for the handler
@@ -149,13 +160,14 @@ def submit(model, config: any, scalingConfig: ScalingConfig, sync: bool = False,
_init_connection()
mlflow_experiment_name = None
- if model == "mpt125m":
- if not isinstance(config, MPT125MConfig):
- raise ValueError("config must be an instance of MPT125MConfig")
+ if isinstance(config, MPT125MConfig):
mlflow_experiment_name = config.mlflow_experimentName
runConfig = config.toRunConfig(scalingConfig)
+ elif isinstance(config, RunConfig):
+ runConfig = config
+ mlflow_experiment_name = runConfig.name
else:
- raise ValueError(f"model {model} is not supported")
+ raise ValueError(f"config type {type(config)} is not supported")
run = create_run(runConfig)
run_name = run.name
@@ -175,27 +187,6 @@ def on_button_clicked(b):
display(HTML(summary.to_html(escape=False)))
button.on_click(on_button_clicked)
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
-
- def _wait_for_run_status(run: Run, status: RunStatus, inclusive: bool = True):
- run_name = run.name
- while not run.status.after(status, inclusive=inclusive) and not run.status.is_terminal():
- run = get_run(run_name)
- # setting mlflow_experiment_name to be None, since its very likely mlflow run isn't ready yet
- # when the run just starts running
- _display_run_summary(_get_run_summary(run, None), button)
- time.sleep(5)
- logger.debug(f"finish waiting run reached expected status {status}")
- return run
-
- def _wait_for_run_finish(run: Run):
- run_name = run.name
- while not run.status.is_terminal():
- run = get_run(run_name)
- _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
- time.sleep(5)
- logger.debug(f"finish waiting run reached terminal")
- return run
-
run = _wait_for_run_status(run, RunStatus.RUNNING)
try_count = 0
@@ -216,7 +207,7 @@ def _wait_for_run_finish(run: Run):
if sync:
logger.debug(f"synchronously waiting for the run to finish.")
- run = _wait_for_run_finish(run)
+ run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False)
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
return run
\ No newline at end of file
From a090962d7631506d54af79ec43fdd2f34a16f20d Mon Sep 17 00:00:00 2001
From: Yu Gong <129110170+ygong1@users.noreply.github.com>
Date: Wed, 27 Mar 2024 15:53:05 -0700
Subject: [PATCH 21/27] aa
---
ygong/mosaic/submit.py | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index dcf3cf5c40..76bf633408 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -19,16 +19,19 @@
import sys
from mcli import RunConfig
import hashlib
+from mcli.config import MCLIConfig
+from mcli.api.engine.engine import MAPIConnection
logger = logging.getLogger('ygong.mosaic.submit')
-
def _set_up_environment(content: str):
os.environ['CREDENTIALS'] = content
def _init_connection():
def _is_local():
+ if os.environ.get('CREDENTIALS') is not None:
+ return True
try:
wc = WorkspaceClient()
wc.dbutils.entry_point.getDbutils().notebook().getContext()
@@ -157,7 +160,8 @@ def submit(config: any, scalingConfig: ScalingConfig, sync: bool = False, debug:
logger.setLevel(logging.DEBUG)
logger.info("set the logger to debug mode")
-
+
+ # MTC + AWS Dogfood
_init_connection()
mlflow_experiment_name = None
if isinstance(config, MPT125MConfig):
@@ -178,6 +182,7 @@ def submit(config: any, scalingConfig: ScalingConfig, sync: bool = False, debug:
else:
button = widgets.Button(description="cancel the run")
def on_button_clicked(b):
+ logger.debug(f"cancel button clicked")
clear_output(wait=False)
run = get_run(run_name)
run.stop()
From b245134c5ecaf5067e12278c79d691ad3b8ffafc Mon Sep 17 00:00:00 2001
From: Shitao Li
Date: Wed, 27 Mar 2024 23:20:57 +0000
Subject: [PATCH 22/27] init
---
ygong/mosaic/__init__.py | 4 +-
ygong/mosaic/mpt125mConfig.py | 80 ++++++++++++++++++++++++++++++++---
ygong/mosaic/submit.py | 29 +------------
3 files changed, 78 insertions(+), 35 deletions(-)
diff --git a/ygong/mosaic/__init__.py b/ygong/mosaic/__init__.py
index c5bd21dde7..1142627308 100644
--- a/ygong/mosaic/__init__.py
+++ b/ygong/mosaic/__init__.py
@@ -1,6 +1,6 @@
from .submit import submit
from .submit import _set_up_environment
from .scaling_config import ScalingConfig
-from .mpt125mConfig import MPT125MConfig
+from .mpt125mConfig import MPT125MConfig, WSFSIntegration
-__all__ = ['submit', 'ScalingConfig', "MPT125MConfig", "_set_up_environment"]
\ No newline at end of file
+__all__ = ['submit', 'ScalingConfig', "MPT125MConfig", "WSFSIntegration", "_set_up_environment"]
\ No newline at end of file
diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py
index a213fce2d5..8c4cf01929 100644
--- a/ygong/mosaic/mpt125mConfig.py
+++ b/ygong/mosaic/mpt125mConfig.py
@@ -1,9 +1,47 @@
from mcli import RunConfig
from ygong.mosaic.scaling_config import ScalingConfig
-from typing import Optional
+from typing import Dict, List, Optional
import os
+import shlex
# import databricks_genai.api.config as cfg
+class WSFSIntegration:
+ def __init__(
+ self,
+ wsfs_path: str,
+ entry_point: Optional[str] = None,
+ args: Optional[List[str]] = None):
+ """
+ Class to represent the integration with Databricks WSFS.
+
+ :params: wsfs_path: str Absolute path
+ :params: entry_point: str Required if the wsfs_path is a directory
+ """
+ self.wsfs_path = wsfs_path
+ self.entry_point = entry_point
+ self.args = args
+
+ def get_entry_command(self):
+ entry_file_path = ""
+ if self.entry_point is not None:
+ if self.entry_point.startswith("/Workspace"):
+ entry_file_path = self.entry_point
+ else:
+ entry_file_path = os.path.join(self.wsfs_path, self.entry_point)
+ else:
+ entry_file_path = self.wsfs_path
+ if self.args is None:
+ return f"python3 {shlex.quote(entry_file_path)}"
+ return f"python3 {shlex.quote(entry_file_path)} {' '.join(self.args)}"
+
+ def toDict(self):
+ return {
+ "integration_type": "wsfs",
+ "wsfs_path": self.wsfs_path,
+ "entrypoint": self.entry_point,
+ "args": self.args,
+ }
+
class MPT125MConfig:
def __init__(
@@ -12,7 +50,8 @@ def __init__(
data: str,
priority: str = 'high',
preemptible: bool = False,
- retry_on_system_failure: bool = False):
+ retry_on_system_failure: bool = False,
+ wsfs_integration: Optional[WSFSIntegration] = None):
# TODO: validate the inputs and remove the yu.gong hardcode
self.mlflow_experimentName = f"/Users/yu.gong@databricks.com/{experimentName}"
self.mlflow_trackingUri = "databricks"
@@ -35,10 +74,39 @@ def __init__(
self.global_seed = 17
self.data_remote = self.data
self.data_local = ""
- self.commands = [
- "cd llm-foundry/scripts",
- "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train eval_loader.dataset.split=val"
- ]
+ self.commands = []
+ if wsfs_integration is not None:
+ # The first group of commands are to download the object(file or directory) from
+ # databricks WSFS using PAT token and url.
+ # The second command try to unzip if the object from WSFS is directory.
+ # TODO: Read the token and host name from env vars or /mnt/jwt-secret/.databrickscfg
+ self.commands = [
+ f"""
+ DATABRICKS_HOST="https://oregon.staging.cloud.databricks.com"
+ DATABRICKS_TOKEN="dapid5af61ff89674be90c3e86ae9fc10c2e"
+ WSFS_PATH="{wsfs_integration.wsfs_path}"
+ DIR_NAME=$(dirname "$WSFS_PATH")
+ ENCODED_WSFS_PATH=$(python3 -c "import urllib.parse; print(urllib.parse.quote('$WSFS_PATH'))")
+ mkdir -p "$DIR_NAME"
+ curl -X GET -o "$WSFS_PATH" "${{DATABRICKS_HOST}}/api/2.0/workspace/export?path=${{ENCODED_WSFS_PATH}}&direct_download=true" \
+ -H "Authorization: Bearer $DATABRICKS_TOKEN"
+
+ if file "$WSFS_PATH" | grep -q "Zip archive data"; then
+ mv "$WSFS_PATH" "${{WSFS_PATH}}.zip"
+ apt update && apt install unzip
+ unzip -d "$DIR_NAME" "${{WSFS_PATH}}.zip"
+ rm -f "${{WSFS_PATH}}.zip"
+ else
+ echo "$WSFS_PATH is not a ZIP file."
+ fi
+ """
+ ]
+ self.commands.append(wsfs_integration.get_entry_command())
+ else:
+ self.commands = [
+ "cd llm-foundry/scripts",
+ "train/launcher.py train/train.py /mnt/config/parameters.yaml train_loader.dataset.split=train eval_loader.dataset.split=val"
+ ]
def toRunConfig(self, scalingConfig: ScalingConfig):
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 9767259c81..62c1f002bb 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -41,34 +41,9 @@ def _is_local():
data = json.loads(base64.b64decode(os.environ.get('CREDENTIALS')).decode('utf-8'))
workspace_url = data.get("workspace_url", None)
token = data.get("token", None)
- mosaic_token = data.get("mosaic_token", None)
# set up the mosaic token
- conf = MCLIConfig.load_config()
- conf.api_key = mosaic_token
- conf.save_config()
- MAPIConnection.reset_connection()
-
-
- hash = hashlib.sha256(f"{workspace_url}-{token}-{mosaic_token}".encode()).hexdigest()[:8]
- databricks_secret_name = f"databricks-{hash}"
-
- # clean up the old secret. MosaicML doesn't support multiple databricks secrets
- # would have to clean up the old secret if it exists
- from mcli.api.secrets.api_get_secrets import get_secrets
- from mcli.api.secrets.api_delete_secrets import delete_secrets
- from mcli.models.mcli_secret import SecretType
- s = get_secrets(secret_types=[SecretType.databricks])
- if len(s) == 1:
- if s[0].name != databricks_secret_name:
- delete_secrets(s)
- else:
- print("databricks secret already exists")
- return
- from mcli.objects.secrets.create.databricks import DatabricksSecretCreator
- from mcli.api.secrets.api_create_secret import create_secret
- s = DatabricksSecretCreator().create(name=databricks_secret_name, host=workspace_url, token=token)
- print(f"successfully created databricks secret: {databricks_secret_name}")
- create_secret(s)
+ os.environ[config.MCLI_MODE_ENV] = config.MCLIMode.DBX_AWS_STAGING.value
+ os.environ[config.MOSAICML_ACCESS_TOKEN_FILE_ENV] = "/home/shitao.li/e2_token"
else:
logger.debug("init_connection in databricks environment")
wc = WorkspaceClient()
From aee64cc61b96655111d6306f686b9961d8b51c64 Mon Sep 17 00:00:00 2001
From: Shitao Li
Date: Thu, 28 Mar 2024 06:36:53 +0000
Subject: [PATCH 23/27] tmp
---
llmfoundry/composerpatch/MLFlowLogger.py | 4 +-
ygong/mosaic/submit.py | 47 ++++++++++++++----------
2 files changed, 29 insertions(+), 22 deletions(-)
diff --git a/llmfoundry/composerpatch/MLFlowLogger.py b/llmfoundry/composerpatch/MLFlowLogger.py
index e9f73d7767..32635c6f7d 100644
--- a/llmfoundry/composerpatch/MLFlowLogger.py
+++ b/llmfoundry/composerpatch/MLFlowLogger.py
@@ -1,11 +1,9 @@
from composer.loggers import MLFlowLogger as ComposerMLFlowLogger
-from composer.utils import MissingConditionalImportError, dist
+from composer.utils import dist
import json
import os
from composer.core.state import State
from composer.loggers.logger import Logger
-from composer.loggers.logger_destination import LoggerDestination
-from composer.utils import MissingConditionalImportError, dist
diff --git a/ygong/mosaic/submit.py b/ygong/mosaic/submit.py
index 7d418ca795..c1a25245a6 100644
--- a/ygong/mosaic/submit.py
+++ b/ygong/mosaic/submit.py
@@ -72,24 +72,24 @@ def _is_local():
def get_experiment_run_url(tracking_uri: Optional[str], experiment_name: str, run_name: str):
- if tracking_uri is None:
- raise ValueError("tracking_uri must be provided")
- mlflow.set_tracking_uri(tracking_uri)
- tracking_uri = tracking_uri.rstrip("/")
- experiment = mlflow.get_experiment_by_name(name=experiment_name)
- if experiment is None:
- raise ValueError(f"experiment {experiment_name} does not exist")
- experiment_id = experiment.experiment_id
- runs = mlflow.search_runs(experiment_ids=[experiment_id],
- filter_string=f'tags.composer_run_name = "{run_name}"',
- output_format='list')
- if len(runs) == 0:
- raise ValueError(f"run {run_name} does not exist in experiment {experiment_name}")
- elif len(runs) > 1:
- raise ValueError(f"multiple runs {run_name} exist in experiment {experiment_name}")
- else:
- run_id = runs[0].info.run_id
- return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}"
+ if tracking_uri is None:
+ raise ValueError("tracking_uri must be provided")
+ mlflow.set_tracking_uri(tracking_uri)
+ tracking_uri = tracking_uri.rstrip("/")
+ experiment = mlflow.get_experiment_by_name(name=experiment_name)
+ if experiment is None:
+ raise ValueError(f"experiment {experiment_name} does not exist")
+ experiment_id = experiment.experiment_id
+ runs = mlflow.search_runs(experiment_ids=[experiment_id],
+ filter_string=f'tags.composer_run_name = "{run_name}"',
+ output_format='list')
+ if len(runs) == 0:
+ return None
+ elif len(runs) > 1:
+ raise ValueError(f"multiple runs {run_name} exist in experiment {experiment_name}")
+ else:
+ run_id = runs[0].info.run_id
+ return f"{tracking_uri}/ml/experiments/{experiment_id}/runs/{run_id}"
def _get_run_summary(run: Run, experiment_name: Optional[str] = None):
@@ -169,6 +169,15 @@ def on_button_clicked(b):
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
run = _wait_for_run_status(run, RunStatus.RUNNING)
+ def _wait_for_run_finish(run: Run):
+ run_name = run.name
+ while not run.status.is_terminal():
+ run = get_run(run_name)
+ _display_run_summary(_get_run_summary(run, mlflow_experiment_name), button)
+ time.sleep(5)
+ logger.debug(f"finish waiting run reached terminal")
+ return run
+
try_count = 0
while try_count < 10:
try_count += 1
@@ -187,7 +196,7 @@ def on_button_clicked(b):
if sync:
logger.debug(f"synchronously waiting for the run to finish.")
- run = _wait_for_run_status(run, RunStatus.TERMINATING, inclusive=False)
+ run = _wait_for_run_finish(run)
_display_run_summary(_get_run_summary(run, mlflow_experiment_name), None)
return run
\ No newline at end of file
From 65a7f306326ce914597d7c13b4b7fbded1183c32 Mon Sep 17 00:00:00 2001
From: Shitao Li
Date: Thu, 28 Mar 2024 06:58:02 +0000
Subject: [PATCH 24/27] fix
---
ygong/mosaic/mpt125mConfig.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py
index 8c4cf01929..8bdbc88182 100644
--- a/ygong/mosaic/mpt125mConfig.py
+++ b/ygong/mosaic/mpt125mConfig.py
@@ -123,8 +123,8 @@ def toRunConfig(self, scalingConfig: ScalingConfig):
integrations=[
{
'integration_type': 'git_repo',
- 'git_repo': 'ygong1/llm-foundry',
- 'git_branch': 'prototype',
+ 'git_repo': 'shitaoli-db/llm-foundry',
+ 'git_branch': 'prototype-shitao',
'pip_install': '-e .[gpu]',
'ssh_clone': False
},
From d0bc1214eadf5470260f580633d3f02802632837 Mon Sep 17 00:00:00 2001
From: Shitao Li
Date: Thu, 28 Mar 2024 06:58:19 +0000
Subject: [PATCH 25/27] fix
---
ygong/mosaic/mpt125mConfig.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py
index 8bdbc88182..8a23a0053e 100644
--- a/ygong/mosaic/mpt125mConfig.py
+++ b/ygong/mosaic/mpt125mConfig.py
@@ -124,7 +124,7 @@ def toRunConfig(self, scalingConfig: ScalingConfig):
{
'integration_type': 'git_repo',
'git_repo': 'shitaoli-db/llm-foundry',
- 'git_branch': 'prototype-shitao',
+ 'git_branch': 'shitao.li@databricks.com/prototype-shitao',
'pip_install': '-e .[gpu]',
'ssh_clone': False
},
From 6c399151a15b1a726b40066a85d39603f2b2595d Mon Sep 17 00:00:00 2001
From: Shitao Li
Date: Thu, 28 Mar 2024 07:04:59 +0000
Subject: [PATCH 26/27] fix
---
setup.py | 10 ----------
1 file changed, 10 deletions(-)
diff --git a/setup.py b/setup.py
index 571b921b8b..b49b80d6ef 100644
--- a/setup.py
+++ b/setup.py
@@ -66,22 +66,12 @@
'mosaicml-cli>=0.6.10,<1',
'onnx==1.14.0',
'onnxruntime==1.15.1',
-<<<<<<< HEAD
- 'cmake>=3.25.0,<=3.26.3', # required for triton-pre-mlir below
- # PyPI does not support direct dependencies, so we remove this line before uploading from PyPI
- # 'triton-pre-mlir@git+https://github.com/vchiley/triton.git@triton_pre_mlir_sm90#subdirectory=python',
-=======
->>>>>>> 28467bbad5e8e9e2fb070ddc011367310b5721e7
'boto3>=1.21.45,<2',
'huggingface-hub>=0.17.0,<1.0',
'beautifulsoup4>=4.12.2,<5', # required for model download utils
'tenacity>=8.2.3,<9',
-<<<<<<< HEAD
- 'ipywidgets',
-=======
'catalogue>=2,<3',
'typer[all]<1',
->>>>>>> 28467bbad5e8e9e2fb070ddc011367310b5721e7
]
extra_deps = {}
From eea721800e3cb17234a3280f8c1cb9f1227adc8a Mon Sep 17 00:00:00 2001
From: Shitao Li
Date: Thu, 28 Mar 2024 07:39:26 +0000
Subject: [PATCH 27/27] fix triton
---
scripts/train/train.py | 3 ---
ygong/mosaic/mpt125mConfig.py | 2 +-
2 files changed, 1 insertion(+), 4 deletions(-)
diff --git a/scripts/train/train.py b/scripts/train/train.py
index 4462fd8c4f..dabf5e4a22 100644
--- a/scripts/train/train.py
+++ b/scripts/train/train.py
@@ -12,9 +12,6 @@
import torch
from composer import Trainer
from composer.core.callback import Callback
-from composer.loggers import MosaicMLLogger, MLFlowLogger
-from composer.loggers.mosaicml_logger import (MOSAICML_ACCESS_TOKEN_ENV_VAR,
- MOSAICML_PLATFORM_ENV_VAR)
from composer.metrics.nlp import InContextLearningMetric
from composer.profiler import (JSONTraceHandler, Profiler, TraceHandler,
cyclic_schedule)
diff --git a/ygong/mosaic/mpt125mConfig.py b/ygong/mosaic/mpt125mConfig.py
index 8a23a0053e..b2d5c54efd 100644
--- a/ygong/mosaic/mpt125mConfig.py
+++ b/ygong/mosaic/mpt125mConfig.py
@@ -154,7 +154,7 @@ def parameters(self):
"max_seq_len": self.max_seq_len,
"vocab_size": 50368,
"attn_config": {
- "attn_impl": "triton"
+ "attn_impl": "flash"
}
},
"tokenizer": {