Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clone aotc and get metrics from gcs and set pythonpath #500

Merged
merged 21 commits into from
Dec 11, 2024
Merged
87 changes: 53 additions & 34 deletions dags/map_reproducibility/nemo_gpt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,29 @@
"""DAGs to run Aotc reproducibility benchmarks."""

import datetime
import sys
import tempfile

from airflow import models
from airflow.decorators import task
from airflow.hooks.subprocess import SubprocessHook
from dags import composer_env
from dags.map_reproducibility.aotc_reproducibility import get_metrics_cmds
from dags.map_reproducibility.aotc_reproducibility import set_variables_cmds
from dags.map_reproducibility.aotc_reproducibility import configure_project_and_cluster
from dags.map_reproducibility.aotc_reproducibility import install_helm_cmds
from dags.map_reproducibility.aotc_reproducibility import namespace_cmds
from dags.map_reproducibility.aotc_reproducibility import wait_for_jobs_cmds
from dags.map_reproducibility.aotc_reproducibility import copy_bucket_cmds
from dags.map_reproducibility.aotc_reproducibility import cleanup_cmds
from dags.map_reproducibility.aotc_reproducibility import git_cookie_authdaemon
from dags.map_reproducibility.aotc_reproducibility import clone_gob
from dags.map_reproducibility.aotc_reproducibility import helm_install_cmds
from dags.map_reproducibility.utils import get_metrics_cmds
from dags.map_reproducibility.utils import set_variables_cmds
from dags.map_reproducibility.utils import configure_project_and_cluster
from dags.map_reproducibility.utils import install_helm_cmds
from dags.map_reproducibility.utils import namespace_cmds
from dags.map_reproducibility.utils import wait_for_jobs_cmds
from dags.map_reproducibility.utils import copy_bucket_cmds
from dags.map_reproducibility.utils import cleanup_cmds
from dags.map_reproducibility.utils import git_cookie_authdaemon
from dags.map_reproducibility.utils import clone_gob
from dags.map_reproducibility.utils import helm_install_cmds
from dags.map_reproducibility.utils import get_metrics_from_gcs
from dags.map_reproducibility.utils import get_aotc_repo
from dags.map_reproducibility.utils import extract_bucket_file_name
from dags.map_reproducibility.utils import extract_python_path


# Run once a day at 2 pm UTC (6 am PST)
SCHEDULED_TIME = "0 14 * * *" if composer_env.is_prod_env() else None
Expand All @@ -50,29 +58,40 @@ def run_aotc_workload():
"export JOB_NAME=gpt3-xlml-$NOW-175b-nemo",
)

hook = SubprocessHook()
result = hook.run_command(
[
"bash",
"-c",
";".join(
set_variables_cmds()
+ configure_project_and_cluster()
+ git_cookie_authdaemon()
+ clone_gob()
+ gpu_recipe_cmd
+ install_helm_cmds()
+ namespace_cmds()
+ workload_cmds
+ helm_install_cmds()
+ wait_for_jobs_cmds()
+ copy_bucket_cmds()
+ get_metrics_cmds()
+ cleanup_cmds()
),
],
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()
result = hook.run_command(
[
"bash",
"-c",
";".join(
set_variables_cmds()
+ configure_project_and_cluster()
+ git_cookie_authdaemon()
+ clone_gob()
+ gpu_recipe_cmd
+ install_helm_cmds()
+ namespace_cmds()
+ workload_cmds
gunjanj007 marked this conversation as resolved.
Show resolved Hide resolved
+ helm_install_cmds()
+ wait_for_jobs_cmds()
+ copy_bucket_cmds()
+ get_metrics_cmds()
+ cleanup_cmds()
+ get_aotc_repo()
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"

# Extract COMPLETE_JOB_NAME from the output
bucket_name, file_name, python_path = extract_bucket_file_name(
result.output
)
get_metrics_from_gcs(bucket_name, file_name)

sys.path.append(python_path)


with models.DAG(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

"Bash helper commands for AOTC artifacts"

import os
import re
from google.cloud import storage


def set_variables_cmds():
Expand All @@ -39,12 +40,16 @@ def configure_project_and_cluster():


# This is required to get auth to access
gunjanj007 marked this conversation as resolved.
Show resolved Hide resolved
# internal GoB repo
def git_cookie_authdaemon():
auth_cmds = (
"git clone https://gerrit.googlesource.com/gcompute-tools",
"echo 'trying to run git-cookie-authdaemon'",
"./gcompute-tools/git-cookie-authdaemon",
# Check if the daemon is already running
"if pgrep -f git-cookie-authdaemon; then "
" echo 'git-cookie-authdaemon is already running'; "
"else "
" ./gcompute-tools/git-cookie-authdaemon || echo 'Error running git-cookie-authdaemon'; " # Run if not running
"fi",
)
return auth_cmds

Expand All @@ -56,6 +61,7 @@ def clone_gob():
"reproducible-benchmark-recipes",
"cd reproducible-benchmark-recipes/projects",
"cd gpu-recipes",
"pwd",
gunjanj007 marked this conversation as resolved.
Show resolved Hide resolved
)
return gob_clone_cmds

Expand Down Expand Up @@ -101,9 +107,6 @@ def helm_install_cmds():

def wait_for_jobs_cmds():
wait_for_job = (
"echo 'will wait for job to start running'",
"kubectl wait --for=condition=running job/$JOB_NAME"
" --namespace=default --timeout=10m",
"echo 'will wait for jobs to finish'",
"kubectl wait --for=condition=complete "
"job/$JOB_NAME --namespace=default --timeout=100m",
Expand All @@ -113,10 +116,9 @@ def wait_for_jobs_cmds():

def copy_bucket_cmds():
copy_bucket_contents = (
"COMPLETE_JOB_NAME=$(gcloud storage ls "
"export COMPLETE_JOB_NAME=$(gcloud storage ls "
"gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)",
gunjanj007 marked this conversation as resolved.
Show resolved Hide resolved
"echo 'copying from' ",
"echo $COMPLETE_JOB_NAME",
'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"',
"cd $REPO_ROOT/src/utils/training_metrics",
"gcloud storage cp ${COMPLETE_JOB_NAME}"
"dllogger/rank-0/dllogger.json .",
Expand All @@ -127,21 +129,98 @@ def copy_bucket_cmds():
def get_metrics_cmds():
# TODO(gunjanj007): get these parameters from the recipe
get_metrics = (
"METRICS_FILE=$COMPLETE_JOB_NAME/metrics.txt",
"python3 process_training_results.py --file"
" dllogger.json --batch_size 2048 "
"--num_accelerators 256 "
"--precision fp8 "
"--model_type gpt3-175b "
gunjanj007 marked this conversation as resolved.
Show resolved Hide resolved
"--accelerator_type h100 ",
"--accelerator_type h100 | "
"gsutil cp - $METRICS_FILE",
'echo "METRICS_FILE=${METRICS_FILE}"',
)
return get_metrics


def get_aotc_repo():
gob_clone_cmds = (
"echo 'trying to clone GoB aotc repo'",
"git clone https://cmcs-perf-tooling-internal.googlesource.com/"
"benchmark-automation",
"cd benchmark-automation/aotc/src",
"export PYTHONPATH=$PWD",
'echo "PYTHONPATH=$PYTHONPATH and METRICS_FILE=$METRICS_FILE"',
)
return gob_clone_cmds


def cleanup_cmds():
cleanup = (
"cd $REPO_ROOT",
"cd ../../..",
"kubectl get pods "
"--no-headers=true | awk '{print $1}' "
"| grep $JOB_NAME | xargs kubectl delete pods",
"helm uninstall $JOB_NAME",
)
return cleanup


def get_metrics_from_gcs(bucket_name, file_name):
# Initialize GCS and BigQuery clients
storage_client = storage.Client()

# Get the bucket and file
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)

# Download the file content
metrics_output = blob.download_as_string().decode("utf-8")

# Parse the metrics (adjust based on your file format)
lines = metrics_output.splitlines()
average_step_time = float(lines[0].split(": ")[1])
tflops_per_accelerator = float(lines[1].split(": ")[1])
mfu = float(lines[2].split(": ")[1])

print(f"Average Step Time: {average_step_time}")
print(f"TFLOPS/Accelerator: {tflops_per_accelerator}")
print(f"MFU: {mfu}")


def extract_bucket_file_name(last_line):
metrics_file = None

# We match here because subprocesshook only outputs the last line.
match = re.search(r"PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)", last_line)
gunjanj007 marked this conversation as resolved.
Show resolved Hide resolved
if match:
python_path = match.group(1)
metrics_file = match.group(2)
print(f"PYTHONPATH in python: {python_path}")
print(f"METRICS_FILE: {metrics_file}")
else:
print("Error: Could not extract PYTHONPATH and METRICS_FILE")
print(f"Metrics file name: {metrics_file}")
if metrics_file:
# Extract bucket_name and file_name
bucket_name = re.search(r"gs://([^/]+)/", metrics_file).group(1)
file_name = re.search(r"gs://[^/]+/(.+)", metrics_file).group(1)

print(f"Bucket name: {bucket_name}")
print(f"File name: {file_name}")
else:
print("Metrics file not found in the output.")

return bucket_name, file_name, python_path


def extract_python_path(bash_result_output):
python_path = None
for line in bash_result_output.splitlines():
if line.startswith("PYTHONPATH"):
python_path = line.split("=", 1)[1]
break

print(f"Pyhon path name: {python_path}")

return python_path
Loading