From 9a21b7da91de2e4e1cfb8745bbcc7780817ff478 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 16:36:25 -0800 Subject: [PATCH 01/20] clone aotc and get metrics from gcs --- .../aotc_reproducibility.py | 107 +++++++++++++++--- dags/map_reproducibility/nemo_gpt3.py | 17 +++ 2 files changed, 111 insertions(+), 13 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 64478503..d73c6f1a 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -15,10 +15,17 @@ "Bash helper commands for AOTC artifacts" import os - +import re +import sys +from google.cloud import storage +import logging +import uuid +from typing import Type +from airflow.hooks.subprocess import SubprocessHook def set_variables_cmds(): set_variables = ( + # "set -e", "export PROJECT=supercomputer-testing", "export CLUSTER=a3plus-benchmark", "export CLUSTER_REGION=australia-southeast1", @@ -37,14 +44,16 @@ def configure_project_and_cluster(): ) return set_project_command - -# This is required to get auth to access -# internal GoB repo def git_cookie_authdaemon(): auth_cmds = ( "git clone https://gerrit.googlesource.com/gcompute-tools", "echo 'trying to run git-cookie-authdaemon'", - "./gcompute-tools/git-cookie-authdaemon", + # Check if the daemon is already running + "if pgrep -f git-cookie-authdaemon; then " + " echo 'git-cookie-authdaemon is already running'; " + "else " + " ./gcompute-tools/git-cookie-authdaemon || echo 'Error running git-cookie-authdaemon'; " # Run if not running + "fi" ) return auth_cmds @@ -56,9 +65,17 @@ def clone_gob(): "reproducible-benchmark-recipes", "cd reproducible-benchmark-recipes/projects", "cd gpu-recipes", + "pwd", ) return gob_clone_cmds +def stop_git_daemon(): + cmd = ( + "git config --global --unset credential.helper", + "rm ~/.git-credentials", + ) + return cmd + def install_helm_cmds(): install_helm_cmd = ( @@ -101,9 +118,6 @@ def helm_install_cmds(): def wait_for_jobs_cmds(): wait_for_job = ( - "echo 'will wait for job to start running'", - "kubectl wait --for=condition=running job/$JOB_NAME" - " --namespace=default --timeout=10m", "echo 'will wait for jobs to finish'", "kubectl wait --for=condition=complete " "job/$JOB_NAME --namespace=default --timeout=100m", @@ -113,17 +127,15 @@ def wait_for_jobs_cmds(): def copy_bucket_cmds(): copy_bucket_contents = ( - "COMPLETE_JOB_NAME=$(gcloud storage ls " + "export COMPLETE_JOB_NAME=$(gcloud storage ls " "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", - "echo 'copying from' ", - "echo $COMPLETE_JOB_NAME", + "echo 'COMPLETE_JOB_NAME=$COMPLETE_JOB_NAME'", "cd $REPO_ROOT/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" "dllogger/rank-0/dllogger.json .", ) return copy_bucket_contents - def get_metrics_cmds(): # TODO(gunjanj007): get these parameters from the recipe get_metrics = ( @@ -132,16 +144,85 @@ def get_metrics_cmds(): "--num_accelerators 256 " "--precision fp8 " "--model_type gpt3-175b " - "--accelerator_type h100 ", + "--accelerator_type h100 | " + "gsutil cp - ${COMPLETE_JOB_NAME}" + "/metrics.txt", ) return get_metrics +def get_aotc_repo(): + gob_clone_cmds = ( + "echo 'trying to clone GoB aotc repo'", + "git clone https://cmcs-perf-tooling-internal.googlesource.com/" + "benchmark-automation", + "cd benchmark-automation/aotc/src", + "export PYTHONPATH=$PWD", + "echo 'PYTHONPATH=$PYTHONPATH'", + ) + return gob_clone_cmds def cleanup_cmds(): cleanup = ( + "cd $REPO_ROOT", + "cd ../../..", "kubectl get pods " "--no-headers=true | awk '{print $1}' " "| grep $JOB_NAME | xargs kubectl delete pods", "helm uninstall $JOB_NAME", ) return cleanup + +def get_metrics_from_gcs(bucket_name, file_name): + # bucket_name = 'gunjanjalori-testing-xlml' + # file_name = 'nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/metrics.txt' + + # Initialize GCS and BigQuery clients + storage_client = storage.Client() + + # Get the bucket and file + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(file_name) + + # Download the file content + metrics_output = blob.download_as_string().decode('utf-8') + + # Parse the metrics (adjust based on your file format) + lines = metrics_output.splitlines() + average_step_time = float(lines[0].split(': ')[1]) + tflops_per_accelerator = float(lines[1].split(': ')[1]) + mfu = float(lines[2].split(': ')[1]) + + + print(f"Average Step Time: {average_step_time}") + print(f"TFLOPS/Accelerator: {tflops_per_accelerator}") + print(f"MFU: {mfu}") + + +def extract_bucket_file_name(bash_result_output): + complete_job_name = None + for line in bash_result_output.splitlines(): + if line.startswith("COMPLETE_JOB_NAME="): + complete_job_name = line.split("=", 1)[1] + break + if complete_job_name: + # Extract bucket_name and file_name + bucket_name = re.search(r'gs://([^/]+)/', complete_job_name).group(1) + file_name = re.search(r'gs://[^/]+/(.+)', complete_job_name).group(1) + + print(f"Bucket name: {bucket_name}") + print(f"File name: {file_name}") + + return bucket_name, file_name + +def extract_python_path(bash_result_output): + python_path = None + for line in bash_result_output.splitlines(): + if line.startswith("PYTHONPATH="): + python_path = line.split("=", 1)[1] + break + + print(f"Pyhon path name: {python_path}") + + return python_path + + diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 74d3cc0f..12c76ad3 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -15,6 +15,9 @@ """DAGs to run Aotc reproducibility benchmarks.""" import datetime +import re +import sys + from airflow import models from airflow.decorators import task from airflow.hooks.subprocess import SubprocessHook @@ -30,6 +33,12 @@ from dags.map_reproducibility.aotc_reproducibility import git_cookie_authdaemon from dags.map_reproducibility.aotc_reproducibility import clone_gob from dags.map_reproducibility.aotc_reproducibility import helm_install_cmds +from dags.map_reproducibility.aotc_reproducibility import get_metrics_from_gcs +from dags.map_reproducibility.aotc_reproducibility import stop_git_daemon +from dags.map_reproducibility.aotc_reproducibility import get_aotc_repo +from dags.map_reproducibility.aotc_reproducibility import extract_bucket_file_name +from dags.map_reproducibility.aotc_reproducibility import extract_python_path + # Run once a day at 2 pm UTC (6 am PST) SCHEDULED_TIME = "0 14 * * *" if composer_env.is_prod_env() else None @@ -69,11 +78,19 @@ def run_aotc_workload(): + copy_bucket_cmds() + get_metrics_cmds() + cleanup_cmds() + + get_aotc_repo() + + stop_git_daemon() ), ], ) assert result.exit_code == 0, f"Command failed with code {result.exit_code}" + # Extract COMPLETE_JOB_NAME from the output + bucket_name, file_name = extract_bucket_file_name(result.output) + python_path = extract_python_path(result.output) + sys.path.append(python_path) + + get_metrics_from_gcs(bucket_name, file_name) with models.DAG( dag_id="reproducibility_nemo_gpt3_nighly_dag", From 970b9caaa1d8e789dfb75713e08b4096db9d5d9e Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 16:43:32 -0800 Subject: [PATCH 02/20] reformat --- .../aotc_reproducibility.py | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index d73c6f1a..75291952 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -14,14 +14,8 @@ "Bash helper commands for AOTC artifacts" -import os import re -import sys from google.cloud import storage -import logging -import uuid -from typing import Type -from airflow.hooks.subprocess import SubprocessHook def set_variables_cmds(): set_variables = ( @@ -53,7 +47,7 @@ def git_cookie_authdaemon(): " echo 'git-cookie-authdaemon is already running'; " "else " " ./gcompute-tools/git-cookie-authdaemon || echo 'Error running git-cookie-authdaemon'; " # Run if not running - "fi" + "fi", ) return auth_cmds @@ -184,14 +178,13 @@ def get_metrics_from_gcs(bucket_name, file_name): blob = bucket.blob(file_name) # Download the file content - metrics_output = blob.download_as_string().decode('utf-8') + metrics_output = blob.download_as_string().decode("utf-8") # Parse the metrics (adjust based on your file format) lines = metrics_output.splitlines() - average_step_time = float(lines[0].split(': ')[1]) - tflops_per_accelerator = float(lines[1].split(': ')[1]) - mfu = float(lines[2].split(': ')[1]) - + average_step_time = float(lines[0].split(": ")[1]) + tflops_per_accelerator = float(lines[1].split(": ")[1]) + mfu = float(lines[2].split(": ")[1]) print(f"Average Step Time: {average_step_time}") print(f"TFLOPS/Accelerator: {tflops_per_accelerator}") @@ -206,8 +199,8 @@ def extract_bucket_file_name(bash_result_output): break if complete_job_name: # Extract bucket_name and file_name - bucket_name = re.search(r'gs://([^/]+)/', complete_job_name).group(1) - file_name = re.search(r'gs://[^/]+/(.+)', complete_job_name).group(1) + bucket_name = re.search(r"gs://([^/]+)/", complete_job_name).group(1) + file_name = re.search(r"gs://[^/]+/(.+)", complete_job_name).group(1) print(f"Bucket name: {bucket_name}") print(f"File name: {file_name}") From 5bdcef5034afbdd12d4252875d479ac8cdfb34e4 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 16:57:40 -0800 Subject: [PATCH 03/20] reformat --- dags/map_reproducibility/nemo_gpt3.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 12c76ad3..9586b809 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -87,10 +87,12 @@ def run_aotc_workload(): # Extract COMPLETE_JOB_NAME from the output bucket_name, file_name = extract_bucket_file_name(result.output) + get_metrics_from_gcs(bucket_name, file_name) + + # Extract PYTHONPATH from the output python_path = extract_python_path(result.output) sys.path.append(python_path) - get_metrics_from_gcs(bucket_name, file_name) with models.DAG( dag_id="reproducibility_nemo_gpt3_nighly_dag", From 257c148f8002031fae3699f452113c2b1cdd662a Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 16:59:20 -0800 Subject: [PATCH 04/20] reformat --- dags/map_reproducibility/aotc_reproducibility.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 75291952..2eb1ef56 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -207,6 +207,7 @@ def extract_bucket_file_name(bash_result_output): return bucket_name, file_name + def extract_python_path(bash_result_output): python_path = None for line in bash_result_output.splitlines(): From fa6219a426a78681cdf2c938b77b8c2fc79d3980 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 17:18:17 -0800 Subject: [PATCH 05/20] reformat --- dags/map_reproducibility/aotc_reproducibility.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 2eb1ef56..6e58f81a 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -133,14 +133,15 @@ def copy_bucket_cmds(): def get_metrics_cmds(): # TODO(gunjanj007): get these parameters from the recipe get_metrics = ( + "METRICS_FILE=$COMPLETE_JOB_NAME/metrics.txt", "python3 process_training_results.py --file" " dllogger.json --batch_size 2048 " "--num_accelerators 256 " "--precision fp8 " "--model_type gpt3-175b " "--accelerator_type h100 | " - "gsutil cp - ${COMPLETE_JOB_NAME}" - "/metrics.txt", + "gsutil cp - $METRICS_FILE", + "echo 'METRICS_FILE=$METRICS_FILE'", ) return get_metrics From 443d36977536ddf7040006c0a292abcff28c7502 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 17:21:45 -0800 Subject: [PATCH 06/20] reformat --- .../aotc_reproducibility.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 6e58f81a..87b184d7 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -17,6 +17,7 @@ import re from google.cloud import storage + def set_variables_cmds(): set_variables = ( # "set -e", @@ -38,6 +39,7 @@ def configure_project_and_cluster(): ) return set_project_command + def git_cookie_authdaemon(): auth_cmds = ( "git clone https://gerrit.googlesource.com/gcompute-tools", @@ -63,6 +65,7 @@ def clone_gob(): ) return gob_clone_cmds + def stop_git_daemon(): cmd = ( "git config --global --unset credential.helper", @@ -130,6 +133,7 @@ def copy_bucket_cmds(): ) return copy_bucket_contents + def get_metrics_cmds(): # TODO(gunjanj007): get these parameters from the recipe get_metrics = ( @@ -145,6 +149,7 @@ def get_metrics_cmds(): ) return get_metrics + def get_aotc_repo(): gob_clone_cmds = ( "echo 'trying to clone GoB aotc repo'", @@ -156,6 +161,7 @@ def get_aotc_repo(): ) return gob_clone_cmds + def cleanup_cmds(): cleanup = ( "cd $REPO_ROOT", @@ -167,6 +173,7 @@ def cleanup_cmds(): ) return cleanup + def get_metrics_from_gcs(bucket_name, file_name): # bucket_name = 'gunjanjalori-testing-xlml' # file_name = 'nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/metrics.txt' @@ -193,15 +200,15 @@ def get_metrics_from_gcs(bucket_name, file_name): def extract_bucket_file_name(bash_result_output): - complete_job_name = None + metrics_file = None for line in bash_result_output.splitlines(): - if line.startswith("COMPLETE_JOB_NAME="): - complete_job_name = line.split("=", 1)[1] + if line.startswith("METRICS_FILE="): + metrics_file = line.split("=", 1)[1] break - if complete_job_name: + if metrics_file: # Extract bucket_name and file_name - bucket_name = re.search(r"gs://([^/]+)/", complete_job_name).group(1) - file_name = re.search(r"gs://[^/]+/(.+)", complete_job_name).group(1) + bucket_name = re.search(r"gs://([^/]+)/", metrics_file).group(1) + file_name = re.search(r"gs://[^/]+/(.+)", metrics_file).group(1) print(f"Bucket name: {bucket_name}") print(f"File name: {file_name}") @@ -219,5 +226,3 @@ def extract_python_path(bash_result_output): print(f"Pyhon path name: {python_path}") return python_path - - From b10cee05f0f0472a944928fe838ee54f5379f3f6 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 21:40:42 -0800 Subject: [PATCH 07/20] minor fix --- dags/map_reproducibility/aotc_reproducibility.py | 1 - dags/map_reproducibility/nemo_gpt3.py | 1 - 2 files changed, 2 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 87b184d7..5cdf9f76 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -69,7 +69,6 @@ def clone_gob(): def stop_git_daemon(): cmd = ( "git config --global --unset credential.helper", - "rm ~/.git-credentials", ) return cmd diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 9586b809..99554f9d 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -15,7 +15,6 @@ """DAGs to run Aotc reproducibility benchmarks.""" import datetime -import re import sys from airflow import models From e1d5573a626bad02f418923bdd759b852f5a799e Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 21:51:20 -0800 Subject: [PATCH 08/20] reformat --- dags/map_reproducibility/aotc_reproducibility.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 5cdf9f76..eb69e198 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -67,9 +67,7 @@ def clone_gob(): def stop_git_daemon(): - cmd = ( - "git config --global --unset credential.helper", - ) + cmd = ("git config --global --unset credential.helper",) return cmd From fae639b968f61ea51cc44f0897e6e2ac40b0f17b Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 15:19:03 -0800 Subject: [PATCH 09/20] merge the directory of hook and python task --- dags/map_reproducibility/nemo_gpt3.py | 96 ++++++++++--------- .../{aotc_reproducibility.py => utils.py} | 49 ++++++---- 2 files changed, 78 insertions(+), 67 deletions(-) rename dags/map_reproducibility/{aotc_reproducibility.py => utils.py} (82%) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 99554f9d..f09ed07f 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -16,27 +16,27 @@ import datetime import sys +import tempfile from airflow import models from airflow.decorators import task from airflow.hooks.subprocess import SubprocessHook from dags import composer_env -from dags.map_reproducibility.aotc_reproducibility import get_metrics_cmds -from dags.map_reproducibility.aotc_reproducibility import set_variables_cmds -from dags.map_reproducibility.aotc_reproducibility import configure_project_and_cluster -from dags.map_reproducibility.aotc_reproducibility import install_helm_cmds -from dags.map_reproducibility.aotc_reproducibility import namespace_cmds -from dags.map_reproducibility.aotc_reproducibility import wait_for_jobs_cmds -from dags.map_reproducibility.aotc_reproducibility import copy_bucket_cmds -from dags.map_reproducibility.aotc_reproducibility import cleanup_cmds -from dags.map_reproducibility.aotc_reproducibility import git_cookie_authdaemon -from dags.map_reproducibility.aotc_reproducibility import clone_gob -from dags.map_reproducibility.aotc_reproducibility import helm_install_cmds -from dags.map_reproducibility.aotc_reproducibility import get_metrics_from_gcs -from dags.map_reproducibility.aotc_reproducibility import stop_git_daemon -from dags.map_reproducibility.aotc_reproducibility import get_aotc_repo -from dags.map_reproducibility.aotc_reproducibility import extract_bucket_file_name -from dags.map_reproducibility.aotc_reproducibility import extract_python_path +from dags.map_reproducibility.utils import get_metrics_cmds +from dags.map_reproducibility.utils import set_variables_cmds +from dags.map_reproducibility.utils import configure_project_and_cluster +from dags.map_reproducibility.utils import install_helm_cmds +from dags.map_reproducibility.utils import namespace_cmds +from dags.map_reproducibility.utils import wait_for_jobs_cmds +from dags.map_reproducibility.utils import copy_bucket_cmds +from dags.map_reproducibility.utils import cleanup_cmds +from dags.map_reproducibility.utils import git_cookie_authdaemon +from dags.map_reproducibility.utils import clone_gob +from dags.map_reproducibility.utils import helm_install_cmds +from dags.map_reproducibility.utils import get_metrics_from_gcs +from dags.map_reproducibility.utils import get_aotc_repo +from dags.map_reproducibility.utils import extract_bucket_file_name +from dags.map_reproducibility.utils import extract_python_path # Run once a day at 2 pm UTC (6 am PST) @@ -58,39 +58,41 @@ def run_aotc_workload(): "export JOB_NAME=gpt3-xlml-$NOW-175b-nemo", ) - hook = SubprocessHook() - result = hook.run_command( - [ - "bash", - "-c", - ";".join( - set_variables_cmds() - + configure_project_and_cluster() - + git_cookie_authdaemon() - + clone_gob() - + gpu_recipe_cmd - + install_helm_cmds() - + namespace_cmds() - + workload_cmds - + helm_install_cmds() - + wait_for_jobs_cmds() - + copy_bucket_cmds() - + get_metrics_cmds() - + cleanup_cmds() - + get_aotc_repo() - + stop_git_daemon() - ), - ], - ) - assert result.exit_code == 0, f"Command failed with code {result.exit_code}" + with tempfile.TemporaryDirectory() as tmpdir: + + hook = SubprocessHook() + result = hook.run_command( + [ + "bash", + "-c", + ";".join( + set_variables_cmds() + + configure_project_and_cluster() + + git_cookie_authdaemon() + + clone_gob() + + gpu_recipe_cmd + + install_helm_cmds() + + namespace_cmds() + + workload_cmds + + helm_install_cmds() + + wait_for_jobs_cmds() + + copy_bucket_cmds() + + get_metrics_cmds() + + cleanup_cmds() + + get_aotc_repo() + ), + ], + cwd=tmpdir, + ) + assert result.exit_code == 0, f"Command failed with code {result.exit_code}" - # Extract COMPLETE_JOB_NAME from the output - bucket_name, file_name = extract_bucket_file_name(result.output) - get_metrics_from_gcs(bucket_name, file_name) + # Extract COMPLETE_JOB_NAME from the output + bucket_name, file_name, python_path = extract_bucket_file_name(result.output) + get_metrics_from_gcs(bucket_name, file_name) - # Extract PYTHONPATH from the output - python_path = extract_python_path(result.output) - sys.path.append(python_path) + # # Extract PYTHONPATH from the output + # python_path = extract_python_path(result.output) + sys.path.append(python_path) with models.DAG( diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/utils.py similarity index 82% rename from dags/map_reproducibility/aotc_reproducibility.py rename to dags/map_reproducibility/utils.py index eb69e198..b5cc32d8 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/utils.py @@ -65,12 +65,6 @@ def clone_gob(): ) return gob_clone_cmds - -def stop_git_daemon(): - cmd = ("git config --global --unset credential.helper",) - return cmd - - def install_helm_cmds(): install_helm_cmd = ( "curl -fsSL -o get_helm.sh " @@ -123,7 +117,8 @@ def copy_bucket_cmds(): copy_bucket_contents = ( "export COMPLETE_JOB_NAME=$(gcloud storage ls " "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", - "echo 'COMPLETE_JOB_NAME=$COMPLETE_JOB_NAME'", + # "COMPLETE_JOB_NAME=gs://gunjanjalori-testing-xlml/nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/", + 'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"', "cd $REPO_ROOT/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" "dllogger/rank-0/dllogger.json .", @@ -142,7 +137,7 @@ def get_metrics_cmds(): "--model_type gpt3-175b " "--accelerator_type h100 | " "gsutil cp - $METRICS_FILE", - "echo 'METRICS_FILE=$METRICS_FILE'", + 'echo "METRICS_FILE=${METRICS_FILE}"', ) return get_metrics @@ -154,7 +149,7 @@ def get_aotc_repo(): "benchmark-automation", "cd benchmark-automation/aotc/src", "export PYTHONPATH=$PWD", - "echo 'PYTHONPATH=$PYTHONPATH'", + 'echo "PYTHONPATH=$PYTHONPATH and METRICS_FILE=$METRICS_FILE"', ) return gob_clone_cmds @@ -163,10 +158,10 @@ def cleanup_cmds(): cleanup = ( "cd $REPO_ROOT", "cd ../../..", - "kubectl get pods " - "--no-headers=true | awk '{print $1}' " - "| grep $JOB_NAME | xargs kubectl delete pods", - "helm uninstall $JOB_NAME", + # "kubectl get pods " + # "--no-headers=true | awk '{print $1}' " + # "| grep $JOB_NAME | xargs kubectl delete pods", + # "helm uninstall $JOB_NAME", ) return cleanup @@ -196,12 +191,24 @@ def get_metrics_from_gcs(bucket_name, file_name): print(f"MFU: {mfu}") -def extract_bucket_file_name(bash_result_output): +def extract_bucket_file_name(last_line): metrics_file = None - for line in bash_result_output.splitlines(): - if line.startswith("METRICS_FILE="): - metrics_file = line.split("=", 1)[1] - break + # for line in bash_result_output.splitlines(): + # print(f"Line: {line}") + # if line.startswith("METRICS_FILE"): + # print(f"Line: {line} with metrics file") + # metrics_file = line.split("=", 1)[1] + # break + + match = re.search(r'PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)', last_line) + if match: + python_path = match.group(1) + metrics_file = match.group(2) + print(f"PYTHONPATH in python: {python_path}") + print(f"METRICS_FILE: {metrics_file}") + else: + print("Error: Could not extract PYTHONPATH and METRICS_FILE") + print(f"Metrics file name: {metrics_file}") if metrics_file: # Extract bucket_name and file_name bucket_name = re.search(r"gs://([^/]+)/", metrics_file).group(1) @@ -209,14 +216,16 @@ def extract_bucket_file_name(bash_result_output): print(f"Bucket name: {bucket_name}") print(f"File name: {file_name}") + else: + print("Metrics file not found in the output.") - return bucket_name, file_name + return bucket_name, file_name, python_path def extract_python_path(bash_result_output): python_path = None for line in bash_result_output.splitlines(): - if line.startswith("PYTHONPATH="): + if line.startswith("PYTHONPATH"): python_path = line.split("=", 1)[1] break From b305ecc11f1149191d0a7c33d3343d33e8f8d42b Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 15:35:17 -0800 Subject: [PATCH 10/20] reformat --- dags/map_reproducibility/utils.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index b5cc32d8..fa6586ba 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -193,21 +193,15 @@ def get_metrics_from_gcs(bucket_name, file_name): def extract_bucket_file_name(last_line): metrics_file = None - # for line in bash_result_output.splitlines(): - # print(f"Line: {line}") - # if line.startswith("METRICS_FILE"): - # print(f"Line: {line} with metrics file") - # metrics_file = line.split("=", 1)[1] - # break - - match = re.search(r'PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)', last_line) + + match = re.search(r"PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)", last_line) if match: - python_path = match.group(1) - metrics_file = match.group(2) - print(f"PYTHONPATH in python: {python_path}") - print(f"METRICS_FILE: {metrics_file}") + python_path = match.group(1) + metrics_file = match.group(2) + print(f"PYTHONPATH in python: {python_path}") + print(f"METRICS_FILE: {metrics_file}") else: - print("Error: Could not extract PYTHONPATH and METRICS_FILE") + print("Error: Could not extract PYTHONPATH and METRICS_FILE") print(f"Metrics file name: {metrics_file}") if metrics_file: # Extract bucket_name and file_name From 697e7598f90a048f37920759b8f9515f91b7c40a Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 15:38:46 -0800 Subject: [PATCH 11/20] reformat --- dags/map_reproducibility/nemo_gpt3.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index f09ed07f..fa9cf82c 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -87,7 +87,9 @@ def run_aotc_workload(): assert result.exit_code == 0, f"Command failed with code {result.exit_code}" # Extract COMPLETE_JOB_NAME from the output - bucket_name, file_name, python_path = extract_bucket_file_name(result.output) + bucket_name, file_name, python_path = extract_bucket_file_name( + result.output + ) get_metrics_from_gcs(bucket_name, file_name) # # Extract PYTHONPATH from the output From 1d811aeea138171b21dafdfc4962ee26a7cbd065 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 15:39:31 -0800 Subject: [PATCH 12/20] reformat --- dags/map_reproducibility/nemo_gpt3.py | 1 - dags/map_reproducibility/utils.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index fa9cf82c..4535aa48 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -59,7 +59,6 @@ def run_aotc_workload(): ) with tempfile.TemporaryDirectory() as tmpdir: - hook = SubprocessHook() result = hook.run_command( [ diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index fa6586ba..56a0f9ba 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -65,6 +65,7 @@ def clone_gob(): ) return gob_clone_cmds + def install_helm_cmds(): install_helm_cmd = ( "curl -fsSL -o get_helm.sh " From 0a3c5f7ab1d9a3350579da6489cee154b04f04c4 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 16:58:48 -0800 Subject: [PATCH 13/20] reformat --- dags/map_reproducibility/utils.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 56a0f9ba..977888e3 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -159,17 +159,15 @@ def cleanup_cmds(): cleanup = ( "cd $REPO_ROOT", "cd ../../..", - # "kubectl get pods " - # "--no-headers=true | awk '{print $1}' " - # "| grep $JOB_NAME | xargs kubectl delete pods", - # "helm uninstall $JOB_NAME", + "kubectl get pods " + "--no-headers=true | awk '{print $1}' " + "| grep $JOB_NAME | xargs kubectl delete pods", + "helm uninstall $JOB_NAME", ) return cleanup def get_metrics_from_gcs(bucket_name, file_name): - # bucket_name = 'gunjanjalori-testing-xlml' - # file_name = 'nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/metrics.txt' # Initialize GCS and BigQuery clients storage_client = storage.Client() From 4ef3684e3f107734e5e46b33ebf6be893216f8d0 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Sun, 8 Dec 2024 10:00:09 -0800 Subject: [PATCH 14/20] reformat --- dags/map_reproducibility/nemo_gpt3.py | 3 ++- dags/map_reproducibility/utils.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 4535aa48..9ba82db6 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -87,7 +87,7 @@ def run_aotc_workload(): # Extract COMPLETE_JOB_NAME from the output bucket_name, file_name, python_path = extract_bucket_file_name( - result.output + result.output ) get_metrics_from_gcs(bucket_name, file_name) @@ -96,6 +96,7 @@ def run_aotc_workload(): sys.path.append(python_path) + with models.DAG( dag_id="reproducibility_nemo_gpt3_nighly_dag", schedule=SCHEDULED_TIME, diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 977888e3..3bc7ca08 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -168,7 +168,6 @@ def cleanup_cmds(): def get_metrics_from_gcs(bucket_name, file_name): - # Initialize GCS and BigQuery clients storage_client = storage.Client() From bfdb497dd16bf260b94c9ca34982b933ca8bf3ea Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 07:24:22 -0800 Subject: [PATCH 15/20] reformat --- dags/map_reproducibility/nemo_gpt3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 9ba82db6..116e8a7a 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -96,7 +96,6 @@ def run_aotc_workload(): sys.path.append(python_path) - with models.DAG( dag_id="reproducibility_nemo_gpt3_nighly_dag", schedule=SCHEDULED_TIME, From 19c144fbc2fdcddc5f5f43b7f7810586690c81ea Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 07:27:54 -0800 Subject: [PATCH 16/20] resolve comments --- dags/map_reproducibility/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 3bc7ca08..beda3029 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -20,7 +20,6 @@ def set_variables_cmds(): set_variables = ( - # "set -e", "export PROJECT=supercomputer-testing", "export CLUSTER=a3plus-benchmark", "export CLUSTER_REGION=australia-southeast1", @@ -118,7 +117,6 @@ def copy_bucket_cmds(): copy_bucket_contents = ( "export COMPLETE_JOB_NAME=$(gcloud storage ls " "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", - # "COMPLETE_JOB_NAME=gs://gunjanjalori-testing-xlml/nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/", 'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"', "cd $REPO_ROOT/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" From db99876bbd96c33d8e6d0115ad55b58d4e24ea35 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 21:34:44 -0800 Subject: [PATCH 17/20] resolve comments --- dags/map_reproducibility/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index beda3029..e9957282 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -38,7 +38,7 @@ def configure_project_and_cluster(): ) return set_project_command - +# This is required to get auth to access def git_cookie_authdaemon(): auth_cmds = ( "git clone https://gerrit.googlesource.com/gcompute-tools", @@ -190,6 +190,7 @@ def get_metrics_from_gcs(bucket_name, file_name): def extract_bucket_file_name(last_line): metrics_file = None + # We match here because subprocesshook only outputs the last line. match = re.search(r"PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)", last_line) if match: python_path = match.group(1) From d1dbb73904fd2fb8f1ffb78a6718e37d18529307 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 21:37:19 -0800 Subject: [PATCH 18/20] resolve comments --- dags/map_reproducibility/nemo_gpt3.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 116e8a7a..cf20b601 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -91,8 +91,6 @@ def run_aotc_workload(): ) get_metrics_from_gcs(bucket_name, file_name) - # # Extract PYTHONPATH from the output - # python_path = extract_python_path(result.output) sys.path.append(python_path) From 24b0cc28bdde6ba32d6236f043fc508b499ed1f9 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 21:40:54 -0800 Subject: [PATCH 19/20] reformat --- dags/map_reproducibility/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index e9957282..89fcb8ab 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -38,6 +38,7 @@ def configure_project_and_cluster(): ) return set_project_command + # This is required to get auth to access def git_cookie_authdaemon(): auth_cmds = ( From 757e6ee319c9e448c5bf41b1b6b1e4b7a3cd76d0 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Wed, 11 Dec 2024 14:34:55 -0800 Subject: [PATCH 20/20] Add Dan and Di as owners for aotc --- .github/CODEOWNERS | 2 ++ .github/requirements.txt | 1 + 2 files changed, 3 insertions(+) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e68ed421..07ad9a96 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -16,3 +16,5 @@ dags/sparsity_diffusion_devx/project_bite* @RissyRan @parambole @jiangjy1982 @ai dags/sparsity_diffusion_devx/configs/project_bite* @RissyRan @parambole @jiangjy1982 @aireenmei @michelle-yooh @jiya-zhang dags/inference @yeandy @vipannalla @morgandu @mailvijayasingh @sixiang-google @joezijunzhou @singh-mitali + +dags/map_reproducibility @crankshaw-google @polydier1 diff --git a/.github/requirements.txt b/.github/requirements.txt index ebf875bf..1c283211 100644 --- a/.github/requirements.txt +++ b/.github/requirements.txt @@ -10,3 +10,4 @@ tensorflow-cpu kubernetes pyarrow apache-airflow-providers-google +dacite