diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e68ed421..07ad9a96 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -16,3 +16,5 @@ dags/sparsity_diffusion_devx/project_bite* @RissyRan @parambole @jiangjy1982 @ai dags/sparsity_diffusion_devx/configs/project_bite* @RissyRan @parambole @jiangjy1982 @aireenmei @michelle-yooh @jiya-zhang dags/inference @yeandy @vipannalla @morgandu @mailvijayasingh @sixiang-google @joezijunzhou @singh-mitali + +dags/map_reproducibility @crankshaw-google @polydier1 diff --git a/.github/requirements.txt b/.github/requirements.txt index ebf875bf..1c283211 100644 --- a/.github/requirements.txt +++ b/.github/requirements.txt @@ -10,3 +10,4 @@ tensorflow-cpu kubernetes pyarrow apache-airflow-providers-google +dacite diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 74d3cc0f..cf20b601 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -15,21 +15,29 @@ """DAGs to run Aotc reproducibility benchmarks.""" import datetime +import sys +import tempfile + from airflow import models from airflow.decorators import task from airflow.hooks.subprocess import SubprocessHook from dags import composer_env -from dags.map_reproducibility.aotc_reproducibility import get_metrics_cmds -from dags.map_reproducibility.aotc_reproducibility import set_variables_cmds -from dags.map_reproducibility.aotc_reproducibility import configure_project_and_cluster -from dags.map_reproducibility.aotc_reproducibility import install_helm_cmds -from dags.map_reproducibility.aotc_reproducibility import namespace_cmds -from dags.map_reproducibility.aotc_reproducibility import wait_for_jobs_cmds -from dags.map_reproducibility.aotc_reproducibility import copy_bucket_cmds -from dags.map_reproducibility.aotc_reproducibility import cleanup_cmds -from dags.map_reproducibility.aotc_reproducibility import git_cookie_authdaemon -from dags.map_reproducibility.aotc_reproducibility import clone_gob -from dags.map_reproducibility.aotc_reproducibility import helm_install_cmds +from dags.map_reproducibility.utils import get_metrics_cmds +from dags.map_reproducibility.utils import set_variables_cmds +from dags.map_reproducibility.utils import configure_project_and_cluster +from dags.map_reproducibility.utils import install_helm_cmds +from dags.map_reproducibility.utils import namespace_cmds +from dags.map_reproducibility.utils import wait_for_jobs_cmds +from dags.map_reproducibility.utils import copy_bucket_cmds +from dags.map_reproducibility.utils import cleanup_cmds +from dags.map_reproducibility.utils import git_cookie_authdaemon +from dags.map_reproducibility.utils import clone_gob +from dags.map_reproducibility.utils import helm_install_cmds +from dags.map_reproducibility.utils import get_metrics_from_gcs +from dags.map_reproducibility.utils import get_aotc_repo +from dags.map_reproducibility.utils import extract_bucket_file_name +from dags.map_reproducibility.utils import extract_python_path + # Run once a day at 2 pm UTC (6 am PST) SCHEDULED_TIME = "0 14 * * *" if composer_env.is_prod_env() else None @@ -50,29 +58,40 @@ def run_aotc_workload(): "export JOB_NAME=gpt3-xlml-$NOW-175b-nemo", ) - hook = SubprocessHook() - result = hook.run_command( - [ - "bash", - "-c", - ";".join( - set_variables_cmds() - + configure_project_and_cluster() - + git_cookie_authdaemon() - + clone_gob() - + gpu_recipe_cmd - + install_helm_cmds() - + namespace_cmds() - + workload_cmds - + helm_install_cmds() - + wait_for_jobs_cmds() - + copy_bucket_cmds() - + get_metrics_cmds() - + cleanup_cmds() - ), - ], - ) - assert result.exit_code == 0, f"Command failed with code {result.exit_code}" + with tempfile.TemporaryDirectory() as tmpdir: + hook = SubprocessHook() + result = hook.run_command( + [ + "bash", + "-c", + ";".join( + set_variables_cmds() + + configure_project_and_cluster() + + git_cookie_authdaemon() + + clone_gob() + + gpu_recipe_cmd + + install_helm_cmds() + + namespace_cmds() + + workload_cmds + + helm_install_cmds() + + wait_for_jobs_cmds() + + copy_bucket_cmds() + + get_metrics_cmds() + + cleanup_cmds() + + get_aotc_repo() + ), + ], + cwd=tmpdir, + ) + assert result.exit_code == 0, f"Command failed with code {result.exit_code}" + + # Extract COMPLETE_JOB_NAME from the output + bucket_name, file_name, python_path = extract_bucket_file_name( + result.output + ) + get_metrics_from_gcs(bucket_name, file_name) + + sys.path.append(python_path) with models.DAG( diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/utils.py similarity index 57% rename from dags/map_reproducibility/aotc_reproducibility.py rename to dags/map_reproducibility/utils.py index 64478503..89fcb8ab 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/utils.py @@ -14,7 +14,8 @@ "Bash helper commands for AOTC artifacts" -import os +import re +from google.cloud import storage def set_variables_cmds(): @@ -39,12 +40,16 @@ def configure_project_and_cluster(): # This is required to get auth to access -# internal GoB repo def git_cookie_authdaemon(): auth_cmds = ( "git clone https://gerrit.googlesource.com/gcompute-tools", "echo 'trying to run git-cookie-authdaemon'", - "./gcompute-tools/git-cookie-authdaemon", + # Check if the daemon is already running + "if pgrep -f git-cookie-authdaemon; then " + " echo 'git-cookie-authdaemon is already running'; " + "else " + " ./gcompute-tools/git-cookie-authdaemon || echo 'Error running git-cookie-authdaemon'; " # Run if not running + "fi", ) return auth_cmds @@ -56,6 +61,7 @@ def clone_gob(): "reproducible-benchmark-recipes", "cd reproducible-benchmark-recipes/projects", "cd gpu-recipes", + "pwd", ) return gob_clone_cmds @@ -101,9 +107,6 @@ def helm_install_cmds(): def wait_for_jobs_cmds(): wait_for_job = ( - "echo 'will wait for job to start running'", - "kubectl wait --for=condition=running job/$JOB_NAME" - " --namespace=default --timeout=10m", "echo 'will wait for jobs to finish'", "kubectl wait --for=condition=complete " "job/$JOB_NAME --namespace=default --timeout=100m", @@ -113,10 +116,9 @@ def wait_for_jobs_cmds(): def copy_bucket_cmds(): copy_bucket_contents = ( - "COMPLETE_JOB_NAME=$(gcloud storage ls " + "export COMPLETE_JOB_NAME=$(gcloud storage ls " "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", - "echo 'copying from' ", - "echo $COMPLETE_JOB_NAME", + 'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"', "cd $REPO_ROOT/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" "dllogger/rank-0/dllogger.json .", @@ -127,21 +129,98 @@ def copy_bucket_cmds(): def get_metrics_cmds(): # TODO(gunjanj007): get these parameters from the recipe get_metrics = ( + "METRICS_FILE=$COMPLETE_JOB_NAME/metrics.txt", "python3 process_training_results.py --file" " dllogger.json --batch_size 2048 " "--num_accelerators 256 " "--precision fp8 " "--model_type gpt3-175b " - "--accelerator_type h100 ", + "--accelerator_type h100 | " + "gsutil cp - $METRICS_FILE", + 'echo "METRICS_FILE=${METRICS_FILE}"', ) return get_metrics +def get_aotc_repo(): + gob_clone_cmds = ( + "echo 'trying to clone GoB aotc repo'", + "git clone https://cmcs-perf-tooling-internal.googlesource.com/" + "benchmark-automation", + "cd benchmark-automation/aotc/src", + "export PYTHONPATH=$PWD", + 'echo "PYTHONPATH=$PYTHONPATH and METRICS_FILE=$METRICS_FILE"', + ) + return gob_clone_cmds + + def cleanup_cmds(): cleanup = ( + "cd $REPO_ROOT", + "cd ../../..", "kubectl get pods " "--no-headers=true | awk '{print $1}' " "| grep $JOB_NAME | xargs kubectl delete pods", "helm uninstall $JOB_NAME", ) return cleanup + + +def get_metrics_from_gcs(bucket_name, file_name): + # Initialize GCS and BigQuery clients + storage_client = storage.Client() + + # Get the bucket and file + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(file_name) + + # Download the file content + metrics_output = blob.download_as_string().decode("utf-8") + + # Parse the metrics (adjust based on your file format) + lines = metrics_output.splitlines() + average_step_time = float(lines[0].split(": ")[1]) + tflops_per_accelerator = float(lines[1].split(": ")[1]) + mfu = float(lines[2].split(": ")[1]) + + print(f"Average Step Time: {average_step_time}") + print(f"TFLOPS/Accelerator: {tflops_per_accelerator}") + print(f"MFU: {mfu}") + + +def extract_bucket_file_name(last_line): + metrics_file = None + + # We match here because subprocesshook only outputs the last line. + match = re.search(r"PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)", last_line) + if match: + python_path = match.group(1) + metrics_file = match.group(2) + print(f"PYTHONPATH in python: {python_path}") + print(f"METRICS_FILE: {metrics_file}") + else: + print("Error: Could not extract PYTHONPATH and METRICS_FILE") + print(f"Metrics file name: {metrics_file}") + if metrics_file: + # Extract bucket_name and file_name + bucket_name = re.search(r"gs://([^/]+)/", metrics_file).group(1) + file_name = re.search(r"gs://[^/]+/(.+)", metrics_file).group(1) + + print(f"Bucket name: {bucket_name}") + print(f"File name: {file_name}") + else: + print("Metrics file not found in the output.") + + return bucket_name, file_name, python_path + + +def extract_python_path(bash_result_output): + python_path = None + for line in bash_result_output.splitlines(): + if line.startswith("PYTHONPATH"): + python_path = line.split("=", 1)[1] + break + + print(f"Pyhon path name: {python_path}") + + return python_path