From 72c4afc2c98e18dbc2708615dcda23b248c2a293 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Mon, 12 Feb 2024 20:10:07 +0000 Subject: [PATCH 01/29] Multi-GPU tests on GKE Change-Id: I8b6ef0f096d965d6538e1de3b9699bb271e15232 --- dags/pytorch_xla/pytorchxla_multigpu.py | 142 ++++++++++++++++++++++++ deployment/clusters.tf | 77 +++++++++++++ xlml/utils/gke.py | 90 +++++++++++++++ 3 files changed, 309 insertions(+) create mode 100644 dags/pytorch_xla/pytorchxla_multigpu.py create mode 100644 deployment/clusters.tf create mode 100644 xlml/utils/gke.py diff --git a/dags/pytorch_xla/pytorchxla_multigpu.py b/dags/pytorch_xla/pytorchxla_multigpu.py new file mode 100644 index 00000000..bb7c28e2 --- /dev/null +++ b/dags/pytorch_xla/pytorchxla_multigpu.py @@ -0,0 +1,142 @@ +import datetime +from airflow import models + +from xlml.utils import gke + +job_yaml = """ +"apiVersion": "batch/v1" +"kind": "Job" +"metadata": + "generateName": "pt-nightly-resnet50-mp-fake-v100-x2-" + "labels": + "accelerator": "v100-x2" + "benchmarkId": "pt-nightly-resnet50-mp-fake-v100-x2" + "frameworkVersion": "pt-nightly" + "mode": "fake" + "model": "resnet50-mp" +"spec": + "activeDeadlineSeconds": 10800 + "backoffLimit": 0 + "completionMode": "Indexed" + "completions": 2 + "parallelism": 2 + "template": + metadata: + labels: + "headless-svc": 'true' + "spec": + "subdomain": headless-svc + "containers": + - command: + - bash + - -cxeu + - | + export PATH=/usr/local/nvidia/bin${PATH:+:${PATH}} + export LD_LIBRARY_PATH=/usr/local/nvidia/lib64:/usr/local/nvidia/lib${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}} + + nvidia-smi + pip3 uninstall -y torch torchvision + pip3 install --pre torch torchvision --index-url https://download.pytorch.org/whl/nightly/cpu + + # job_name=$(JOB_NAME) + # ip=$(getent hosts ${job_name}-0.headless-svc | awk {'print $1'}) + # echo ip: ${ip} + + # torchrun --nnodes=2 --node_rank=$(JOB_COMPLETION_INDEX) --nproc_per_node=2 --rdzv_endpoint=${ip}:12355 /src/xla-master/test/test_train_mp_imagenet.py --model=resnet50 --log_steps=200 --fake_data --nometrics_debug --pjrt_distributed + + echo "${@:0}" + + # bash + "args": + # - "bash" + - "torchrun" + - "--nnodes=2" + - "--node_rank=$(JOB_COMPLETION_INDEX)" + - "--nproc_per_node=2" + - "--rdzv_endpoint=$(JOB_NAME)-0.headless-svc:12355" + - "/src/xla-master/test/test_train_mp_imagenet.py" + - "--model=resnet50" + - "--log_steps=200" + - "--fake_data" + - "--nometrics_debug" + - "--num_epochs=1" + - "--pjrt_distributed" + # stdin: true + # tty: true + "env": + - "name": "POD_NAME" + "valueFrom": + "fieldRef": + "fieldPath": "metadata.name" + # - "name": "POD_UID" + # "valueFrom": + # "fieldRef": + # "fieldPath": "metadata.uid" + - "name": "POD_NAMESPACE" + "valueFrom": + "fieldRef": + "fieldPath": "metadata.namespace" + - "name": "JOB_NAME" + "valueFrom": + "fieldRef": + "fieldPath": "metadata.labels['job-name']" + # - "name": "MODEL_DIR" + # "value": "$(OUTPUT_BUCKET)/pt-nightly/resnet50-mp/fake/v100-x2/$(JOB_NAME)" + # - "name": "GPU_NUM_DEVICES" + # "value": "2" + - name: PJRT_DEVICE + value: CUDA + # - "name": "XLA_USE_BF16" + # "value": "0" + "image": us-central1-docker.pkg.dev/tpu-pytorch-releases/docker/xla:nightly_3.8_cuda_12.1 + # "image": us-central1-docker.pkg.dev/tpu-pytorch-releases/docker/xla:r2.2.0_3.10_cuda_12.1 + "imagePullPolicy": "Always" + "name": "train" + "resources": + "limits": + "nvidia.com/gpu": 2 + "requests": + "cpu": "7.0" + "memory": "40Gi" + "volumeMounts": + - "mountPath": "/dev/shm" + "name": "dshm" + "readOnly": false + - "mountPath": "/src" + "name": "dshm" + "readOnly": false + initContainers: + - name: clone + image: alpine + command: + - sh + - -c + - | + cd /src + wget https://github.com/pytorch/xla/archive/refs/heads/master.tar.gz -O - | tar xzf - + volumeMounts: + - "mountPath": "/src" + "name": "dshm" + "readOnly": false + "nodeSelector": + "cloud.google.com/gke-accelerator": "nvidia-tesla-v100" + "restartPolicy": "Never" + "volumes": + - "emptyDir": + "medium": "Memory" + "name": "dshm" + - "emptyDir": + "medium": "Memory" + "name": "src" + "ttlSecondsAfterFinished": 604800 +""" + + +with models.DAG( + dag_id="pytorchxla-multigpu", + schedule=None, + tags=["pytorchxla", "latest", "supported", "xlml"], + catchup=False, + start_date=datetime.datetime(2023, 7, 12), +): + resnet_v100_2x2 = gke.deploy_job(job_yaml) diff --git a/deployment/clusters.tf b/deployment/clusters.tf new file mode 100644 index 00000000..51c524ad --- /dev/null +++ b/deployment/clusters.tf @@ -0,0 +1,77 @@ +resource "google_container_cluster" "gpu-uc1" { + name = "wcromar-test-cluster" + project = "cloud-ml-auto-solutions" + location = "us-central1" + + release_channel { + channel = "RAPID" + } + + # We can't create a cluster with no node pool defined, but we want to only use + # separately managed node pools. So we create the smallest possible default + # node pool and immediately delete it. + remove_default_node_pool = true + initial_node_count = 1 +} + +resource "google_container_node_pool" "primary" { + name = "primary-pool" + project = google_container_cluster.gpu-uc1.project + location = google_container_cluster.gpu-uc1.location + cluster = google_container_cluster.gpu-uc1.name + node_count = 1 + + management { + auto_repair = true + auto_upgrade = true + } + + node_config { + preemptible = true + machine_type = "e2-medium" + + # Google recommends custom service accounts that have cloud-platform scope and permissions granted via IAM Roles. + # TODO: custom service account? + # service_account = google_service_account.default.email + oauth_scopes = [ + "https://www.googleapis.com/auth/cloud-platform" + ] + } +} + + +resource "google_container_node_pool" "nvidia-v100x2" { + name = "nvidia-v100x2-pool" + project = google_container_cluster.gpu-uc1.project + location = google_container_cluster.gpu-uc1.location + cluster = google_container_cluster.gpu-uc1.name + node_count = 3 + + node_locations = [ + "us-central1-b" + ] + + management { + auto_repair = true + auto_upgrade = true + } + + node_config { + preemptible = true + machine_type = "n1-highmem-16" + + # Google recommends custom service accounts that have cloud-platform scope and permissions granted via IAM Roles. + # TODO: custom service account? + # service_account = google_service_account.default.email + oauth_scopes = [ + "https://www.googleapis.com/auth/cloud-platform" + ] + guest_accelerator { + type = "nvidia-tesla-v100" + count = 2 + gpu_driver_installation_config { + gpu_driver_version = "LATEST" + } + } + } +} diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py new file mode 100644 index 00000000..0e0cb543 --- /dev/null +++ b/xlml/utils/gke.py @@ -0,0 +1,90 @@ +import base64 +import concurrent.futures +import logging +import tempfile + +from airflow.decorators import task +import google.auth +import google.auth.transport.requests +from google.cloud import container_v1 +import kubernetes +import yaml + + +def get_authenticated_client(project_id: str, region: str, cluster_name: str) -> kubernetes.client.ApiClient: + container_client = container_v1.ClusterManagerClient() + cluster_path = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}" + response = container_client.get_cluster(name=cluster_path) + creds, _ = google.auth.default() + auth_req = google.auth.transport.requests.Request() + creds.refresh(auth_req) + configuration = kubernetes.client.Configuration() + configuration.host = f"https://{response.endpoint}" + with tempfile.NamedTemporaryFile(delete=False) as ca_cert: + ca_cert.write(base64.b64decode(response.master_auth.cluster_ca_certificate)) + configuration.ssl_ca_cert = ca_cert.name + configuration.api_key_prefix["authorization"] = "Bearer" + configuration.api_key["authorization"] = creds.token + + return kubernetes.client.ApiClient(configuration) + +@task +def deploy_job(job_yaml: str): + client = get_authenticated_client('cloud-ml-auto-solutions', 'us-central1', 'wcromar-test-cluster') + + body = yaml.safe_load(job_yaml) + jobs_client = kubernetes.client.BatchV1Api(client) + resp = jobs_client.create_namespaced_job(namespace='default', body=body) + + print(resp) + print(type(resp)) + + core_v1 = kubernetes.client.CoreV1Api(client) + + pod_label_selector = "controller-uid=" + resp.metadata.uid + pods = core_v1.list_namespaced_pod(namespace='default', label_selector=pod_label_selector) + print(pods) + + + def _watch_pod(name, namespace): + logs_watcher = kubernetes.watch.Watch() + + while True: + logging.info('Waiting for pod %s to start...', name) + pod_watcher = kubernetes.watch.Watch() + for event in pod_watcher.stream(core_v1.list_namespaced_pod, namespace, + field_selector=f'metadata.name={name}'): + status = event['object'].status + logging.info('Pod %s status: %s', event['object'].metadata.name, status.phase) + if status.phase != 'Pending': + break + + if status.container_statuses: + container_status = status.container_statuses[0] + if status.container_statuses[0].state.terminated: + exit_code = container_status.state.terminated.exit_code + if exit_code: + logging.error('Pod %s had non-zero exit code %d', name, exit_code) + + return exit_code + + logging.info('Streaming pod logs for %s...', name) + for line in logs_watcher.stream(core_v1.read_namespaced_pod_log, + name, namespace, _request_timeout=3600): + logging.info('%s] %s', name, line) + + logging.warning('Lost logs stream for %s.', name) + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [] + for pod in pods.items: + f = executor.submit(_watch_pod, pod.metadata.name, pod.metadata.namespace) + futures.append(f) + + # Wait for pods to complete, and exit with the first non-zero exit code. + for f in concurrent.futures.as_completed(futures): + exit_code = f.result() + if exit_code: + print('bad', exit_code) + + From 6c4dcfdcd178c26e6e45f534987e52279ecebbb7 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 13 Feb 2024 22:15:33 +0000 Subject: [PATCH 02/29] Fit new test type into task API Change-Id: I0507c9d9275cf3b920adfa75a065492d66dc0658 --- .github/requirements.txt | 3 +- dags/legacy_test/templates/gpus.libsonnet | 11 +- .../tests/pytorch/nightly/common.libsonnet | 33 +++- .../pytorch/nightly/resnet50-mp.libsonnet | 19 ++- dags/pytorch_xla/pytorchxla_multigpu.py | 137 +-------------- xlml/apis/task.py | 159 +++++++++++++++++- xlml/apis/test_config.py | 83 +++++++-- xlml/utils/gke.py | 16 +- 8 files changed, 298 insertions(+), 163 deletions(-) diff --git a/.github/requirements.txt b/.github/requirements.txt index 2380d4f7..fb0380b7 100644 --- a/.github/requirements.txt +++ b/.github/requirements.txt @@ -7,4 +7,5 @@ google-cloud-container google-cloud-tpu>=1.16.0 jsonlines tensorflow-cpu -apache-airflow-providers-cncf-kubernetes \ No newline at end of file +apache-airflow-providers-cncf-kubernetes +kubernetes diff --git a/dags/legacy_test/templates/gpus.libsonnet b/dags/legacy_test/templates/gpus.libsonnet index 5de7b39f..112db455 100644 --- a/dags/legacy_test/templates/gpus.libsonnet +++ b/dags/legacy_test/templates/gpus.libsonnet @@ -18,11 +18,14 @@ local base = import 'base.libsonnet'; GPUSpec:: base.BaseAccelerator { local gpu = self, - name: '%(version)s-x%(count)d' % gpu, + name: '%(version)s-x%(count)dx%(num_hosts)d' % gpu, type: 'gpu', version: error 'Must specify GPUSpec `version`', count: 1, replicas: gpu.count, + num_hosts: 1, + // Label used in GCE API + accelerator_type: error 'Must specify GPUSpec `accelerator_type', // Ignore TPU settings. PodTemplate(_):: { @@ -43,8 +46,6 @@ local base = import 'base.libsonnet'; }, }, - teslaK80: self.GPUSpec { version: 'k80' }, - teslaV100: self.GPUSpec { version: 'v100' }, - teslaA100: self.GPUSpec { version: 'a100' }, - teslaT4: self.GPUSpec { version: 't4' }, + teslaV100: self.GPUSpec { version: 'v100', accelerator_type: 'nvidia-tesla-v100' }, + teslaA100: self.GPUSpec { version: 'a100', accelerator_type: 'nvidia-tesla-a100' }, } diff --git a/dags/legacy_test/tests/pytorch/nightly/common.libsonnet b/dags/legacy_test/tests/pytorch/nightly/common.libsonnet index 8dd45721..2a1a6ee5 100644 --- a/dags/legacy_test/tests/pytorch/nightly/common.libsonnet +++ b/dags/legacy_test/tests/pytorch/nightly/common.libsonnet @@ -24,7 +24,7 @@ local volumes = import 'templates/volumes.libsonnet'; tpuSettings+: { softwareVersion: 'pytorch-nightly', }, - imageTag: 'nightly_3.7', + imageTag: 'nightly_3.8', }, PyTorchTest:: common.PyTorchTest + Nightly { local config = self, @@ -120,7 +120,36 @@ local volumes = import 'templates/volumes.libsonnet'; }, GpuMixin:: { local config = self, - imageTag+: '_cuda_11.8', + imageTag+: '_cuda_12.1', + + entrypoint: [ + 'bash', + '-cxue', + ||| + export PATH=/usr/local/nvidia/bin${PATH:+:${PATH}} + export LD_LIBRARY_PATH=/usr/local/nvidia/lib64:/usr/local/nvidia/lib${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}} + + nvidia-smi + pip3 uninstall -y torch torchvision + pip3 install --pre torch torchvision --index-url https://download.pytorch.org/whl/nightly/cpu + + mkdir pytorch + wget https://github.com/pytorch/xla/archive/refs/heads/master.tar.gz -O - | tar xzf - + mv xla-master pytorch/xla + + export PJRT_DEVICE=CUDA + + # Run whatever is in `command` here + "${@:0}" + |||, + ], + command: [ + 'torchrun', + '--nnodes=%d' % config.accelerator.num_hosts, + '--node_rank=$(JOB_COMPLETION_INDEX)', + '--nproc_per_node=%d' % config.accelerator.count, + '--rdzv_endpoint=$(JOB_NAME)-0.headless-svc:12355', + ] + super.command[1:], podTemplate+:: { spec+: { diff --git a/dags/legacy_test/tests/pytorch/nightly/resnet50-mp.libsonnet b/dags/legacy_test/tests/pytorch/nightly/resnet50-mp.libsonnet index b24e5bc5..cd496fc2 100644 --- a/dags/legacy_test/tests/pytorch/nightly/resnet50-mp.libsonnet +++ b/dags/legacy_test/tests/pytorch/nightly/resnet50-mp.libsonnet @@ -141,20 +141,30 @@ local tpus = import 'templates/tpus.libsonnet'; local gpu = self.gpu, gpu:: common.GpuMixin { + local config = self, + cpu: '7.0', memory: '40Gi', - // Disable XLA metrics report on GPU command+: [ + '--pjrt_distributed', '--nometrics_debug', ], flags+: { modelDir: null, }, + + jobTemplate+:: { + spec+: { + completionMode: 'Indexed', + completions: config.accelerator.num_hosts, + parallelism: config.accelerator.num_hosts, + } + }, }, - local v100x4 = self.v100x4, - v100x4:: gpu { - accelerator: gpus.teslaV100 { count: 4 }, + local v100x2x2 = self.v100x2x2, + v100x2x2:: gpu { + accelerator: gpus.teslaV100 { count: 2, num_hosts: 2 }, }, local pjrt_ddp = self.pjrt_ddp, @@ -194,6 +204,7 @@ local tpus = import 'templates/tpus.libsonnet'; }, configs: [ + resnet50 + fake_data + v100x2x2 + timeouts.Hours(3), // PJRT resnet50 + fake_data + v2_8 + timeouts.Hours(3) + pjrt, resnet50 + fake_data + v3_8 + timeouts.Hours(2) + pjrt, diff --git a/dags/pytorch_xla/pytorchxla_multigpu.py b/dags/pytorch_xla/pytorchxla_multigpu.py index bb7c28e2..f63b885b 100644 --- a/dags/pytorch_xla/pytorchxla_multigpu.py +++ b/dags/pytorch_xla/pytorchxla_multigpu.py @@ -1,135 +1,16 @@ import datetime from airflow import models -from xlml.utils import gke +from dags.vm_resource import Project +from xlml.apis import gcp_config, test_config, task -job_yaml = """ -"apiVersion": "batch/v1" -"kind": "Job" -"metadata": - "generateName": "pt-nightly-resnet50-mp-fake-v100-x2-" - "labels": - "accelerator": "v100-x2" - "benchmarkId": "pt-nightly-resnet50-mp-fake-v100-x2" - "frameworkVersion": "pt-nightly" - "mode": "fake" - "model": "resnet50-mp" -"spec": - "activeDeadlineSeconds": 10800 - "backoffLimit": 0 - "completionMode": "Indexed" - "completions": 2 - "parallelism": 2 - "template": - metadata: - labels: - "headless-svc": 'true' - "spec": - "subdomain": headless-svc - "containers": - - command: - - bash - - -cxeu - - | - export PATH=/usr/local/nvidia/bin${PATH:+:${PATH}} - export LD_LIBRARY_PATH=/usr/local/nvidia/lib64:/usr/local/nvidia/lib${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}} - nvidia-smi - pip3 uninstall -y torch torchvision - pip3 install --pre torch torchvision --index-url https://download.pytorch.org/whl/nightly/cpu - - # job_name=$(JOB_NAME) - # ip=$(getent hosts ${job_name}-0.headless-svc | awk {'print $1'}) - # echo ip: ${ip} - - # torchrun --nnodes=2 --node_rank=$(JOB_COMPLETION_INDEX) --nproc_per_node=2 --rdzv_endpoint=${ip}:12355 /src/xla-master/test/test_train_mp_imagenet.py --model=resnet50 --log_steps=200 --fake_data --nometrics_debug --pjrt_distributed - - echo "${@:0}" - - # bash - "args": - # - "bash" - - "torchrun" - - "--nnodes=2" - - "--node_rank=$(JOB_COMPLETION_INDEX)" - - "--nproc_per_node=2" - - "--rdzv_endpoint=$(JOB_NAME)-0.headless-svc:12355" - - "/src/xla-master/test/test_train_mp_imagenet.py" - - "--model=resnet50" - - "--log_steps=200" - - "--fake_data" - - "--nometrics_debug" - - "--num_epochs=1" - - "--pjrt_distributed" - # stdin: true - # tty: true - "env": - - "name": "POD_NAME" - "valueFrom": - "fieldRef": - "fieldPath": "metadata.name" - # - "name": "POD_UID" - # "valueFrom": - # "fieldRef": - # "fieldPath": "metadata.uid" - - "name": "POD_NAMESPACE" - "valueFrom": - "fieldRef": - "fieldPath": "metadata.namespace" - - "name": "JOB_NAME" - "valueFrom": - "fieldRef": - "fieldPath": "metadata.labels['job-name']" - # - "name": "MODEL_DIR" - # "value": "$(OUTPUT_BUCKET)/pt-nightly/resnet50-mp/fake/v100-x2/$(JOB_NAME)" - # - "name": "GPU_NUM_DEVICES" - # "value": "2" - - name: PJRT_DEVICE - value: CUDA - # - "name": "XLA_USE_BF16" - # "value": "0" - "image": us-central1-docker.pkg.dev/tpu-pytorch-releases/docker/xla:nightly_3.8_cuda_12.1 - # "image": us-central1-docker.pkg.dev/tpu-pytorch-releases/docker/xla:r2.2.0_3.10_cuda_12.1 - "imagePullPolicy": "Always" - "name": "train" - "resources": - "limits": - "nvidia.com/gpu": 2 - "requests": - "cpu": "7.0" - "memory": "40Gi" - "volumeMounts": - - "mountPath": "/dev/shm" - "name": "dshm" - "readOnly": false - - "mountPath": "/src" - "name": "dshm" - "readOnly": false - initContainers: - - name: clone - image: alpine - command: - - sh - - -c - - | - cd /src - wget https://github.com/pytorch/xla/archive/refs/heads/master.tar.gz -O - | tar xzf - - volumeMounts: - - "mountPath": "/src" - "name": "dshm" - "readOnly": false - "nodeSelector": - "cloud.google.com/gke-accelerator": "nvidia-tesla-v100" - "restartPolicy": "Never" - "volumes": - - "emptyDir": - "medium": "Memory" - "name": "dshm" - - "emptyDir": - "medium": "Memory" - "name": "src" - "ttlSecondsAfterFinished": 604800 -""" +US_CENTRAL1 = gcp_config.GCPConfig( + Project.CLOUD_ML_AUTO_SOLUTIONS.value, + # HACK: use region in place of zone, since clusters are regional + zone='us-central1', + dataset_name=..., +) with models.DAG( @@ -139,4 +20,4 @@ catchup=False, start_date=datetime.datetime(2023, 7, 12), ): - resnet_v100_2x2 = gke.deploy_job(job_yaml) + resnet_v100_2x2 = task.GpuGkeTask(test_config.JSonnetGpuTest.from_pytorch('pt-nightly-resnet50-mp-fake-v100-x2x2'), US_CENTRAL1).run() diff --git a/xlml/apis/task.py b/xlml/apis/task.py index dd9a86d5..c94310a9 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -17,12 +17,13 @@ import abc import dataclasses import datetime -from typing import Optional, Tuple +import shlex +from typing import Any, Dict, Optional, Tuple import airflow from airflow.models.taskmixin import DAGNode from airflow.utils.task_group import TaskGroup from xlml.apis import gcp_config, metric_config, test_config -from xlml.utils import gpu, metric, name_format, ssh, tpu, xpk, startup_script +from xlml.utils import gpu, metric, name_format, ssh, tpu, xpk, gke, startup_script class BaseTask(abc.ABC): @@ -503,3 +504,157 @@ def clean_up(self, resource: airflow.XComArg, project_id: str, zone: str) -> DAG AirflowTaskTimeout: An error occurs when execution_timeout is breached. """ return gpu.delete_resource.override(group_id="clean_up")(resource, project_id, zone) + + +@dataclasses.dataclass +class GpuGkeTask(BaseTask): + """This is a class to set up tasks for GPU. + + Attributes: + image_project: the project that an image belongs to. + image_family: the family group that an image belongs to. + """ + task_test_config: test_config.JSonnetGpuTest + task_gcp_config: gcp_config.GCPConfig + # task_metric_config: Optional[metric_config.MetricConfig] = None + + def run(self) -> DAGNode: + """Run a test job. + + Returns: + A task group with the following tasks chained: provision, run_model, + post_process, clean_up. + """ + # piz: We skip the queued resource for GPU for now since there is no queued + # resource command for GPU. + with TaskGroup( + group_id=self.task_test_config.benchmark_id, prefix_group_id=True + ) as group: + self.run_job() + + return group + + def _get_job_manifest(self): + return { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "generateName": f"{self.task_test_config.benchmark_id}-", + "labels": { + "accelerator": self.task_test_config.accelerator.name, + "benchmarkId": self.task_test_config.benchmark_id, + } + }, + "spec": { + # "activeDeadlineSeconds": 10800, + "backoffLimit": 0, + "completionMode": "Indexed", + "completions": self.task_test_config.num_hosts, + "parallelism": self.task_test_config.num_hosts, + "template": { + "metadata": { + "labels": { + "headless-svc": "true" + }, + }, + "spec": { + "subdomain": "headless-svc", + "nodeSelector": { + "cloud.google.com/gke-accelerator": self.task_test_config.accelerator.accelerator_type, + }, + "restartPolicy": "Never", + # TODO: repo source code should be in test image + # "initContainers": [ + # { + # "name": "clone", + # "image": "alpine", + # "command": [ + # "sh", + # "-c", + # "cd /src\nwget https://github.com/pytorch/xla/archive/refs/heads/master.tar.gz -O - | tar xzf -\n" + # ], + # "volumeMounts": [ + # { + # "mountPath": "/src", + # "name": "dshm", + # "readOnly": False + # } + # ] + # } + # ], + "containers": [ + { + "name": "main", + "image": self.task_test_config.docker_image, + "imagePullPolicy": "Always", + "command": shlex.split(self.task_test_config.setup_script), + "args": shlex.split(self.task_test_config.test_script), + "resources": { + "limits": { + "nvidia.com/gpu": self.task_test_config.accelerator.count, + } + }, + "env": [ + { + "name": "POD_NAME", + "valueFrom": { + "fieldRef": { + "fieldPath": "metadata.name" + } + } + }, + { + "name": "POD_NAMESPACE", + "valueFrom": { + "fieldRef": { + "fieldPath": "metadata.namespace" + } + } + }, + { + "name": "JOB_NAME", + "valueFrom": { + "fieldRef": { + "fieldPath": "metadata.labels['job-name']" + } + } + }, + ], + "volumeMounts": [ + { + "mountPath": "/dev/shm", + "name": "dshm", + "readOnly": False + }, + # { + # "mountPath": "/src", + # "name": "dshm", + # "readOnly": False + # } + ], + }, + ], + "volumes": [ + { + "emptyDir": { + "medium": "Memory" + }, + "name": "dshm" + }, + # { + # "emptyDir": { + # "medium": "Memory" + # }, + # "name": "src" + # } + ] + } + }, + }, + } + + + def run_job(self) -> DAGNode: + job_body = self._get_job_manifest() + return gke.deploy_job(job_body, self.task_gcp_config) + diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index 8c289165..b8b8d5b0 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -250,9 +250,21 @@ def test_script(self) -> str: return ';'.join(('set -xue', *self.run_model_cmds)) +def _load_compiled_jsonnet(test_name: str) -> Any: + # TODO(wcromar): Parse GPU tests too + config_dir = os.environ.get( + 'XLMLTEST_CONFIGS', '/home/airflow/gcs/dags/dags/jsonnet' + ) + test_path = os.path.join(config_dir, test_name) + with open(test_path, 'r') as f: + test = json.load(f) + + return test + + @attrs.define class JSonnetTpuVmTest(TestConfig[Tpu]): - """Convert legacy JSonnet test configs into a Task. + """Convert legacy JSonnet test configs into a TestConfig. Do not construct directly. Instead, use the `from_*` factory functions which parse pre-compiled JSonnet test configs. @@ -271,17 +283,6 @@ class JSonnetTpuVmTest(TestConfig[Tpu]): test_command: List[str] num_slices: int = 1 - @staticmethod - def _load_compiled_jsonnet(test_name: str) -> Any: - # TODO(wcromar): Parse GPU tests too - config_dir = os.environ.get( - 'XLMLTEST_CONFIGS', '/home/airflow/gcs/dags/dags/jsonnet' - ) - test_path = os.path.join(config_dir, test_name) - with open(test_path, 'r') as f: - test = json.load(f) - - return test @staticmethod def _from_json_helper( @@ -309,7 +310,7 @@ def _from_json_helper( @staticmethod def from_jax(test_name: str, reserved_tpu: bool = True): """Parses a compiled legacy JSonnet config test from `tests/jax`.""" - test = JSonnetTpuVmTest._load_compiled_jsonnet(test_name) + test = _load_compiled_jsonnet(test_name) return JSonnetTpuVmTest._from_json_helper( test, # TODO(wcromar): make this less hacky @@ -322,7 +323,7 @@ def from_jax(test_name: str, reserved_tpu: bool = True): @staticmethod def from_pytorch(test_name: str, reserved_tpu: bool = True): """Parses a compiled legacy JSonnet test config from `tests/pytorch`.""" - test = JSonnetTpuVmTest._load_compiled_jsonnet(test_name) + test = _load_compiled_jsonnet(test_name) return JSonnetTpuVmTest._from_json_helper( test, setup=test['tpuSettings']['tpuVmPytorchSetup'] @@ -349,3 +350,57 @@ def test_script(self) -> str: self.exports, ' '.join(shlex.quote(s) for s in self.test_command), ]) + + +@attrs.define +class JSonnetGpuTest(TestConfig[Gpu]): + """Convert legacy JSonnet test configs into a TestConfig. + + Do not construct directly. Instead, use the `from_*` factory functions which + parse pre-compiled JSonnet test configs. + + Attributes: + test_name: Unique name of this test/model. + setup: Multi-line script that configures the TPU instance. + exports: Extra setup commands to run in same shell as test_command. + test_command: Command and arguments to execute on the TPU VM. + num_slices: Number of TPU slices. + """ + + test_name: str + entrypoint_script: List[str] + test_command: List[str] + docker_image: str + num_hosts: int = 1 + + @staticmethod + def from_pytorch(test_name: str): + """Parses a compiled legacy JSonnet test config from `tests/pytorch`.""" + test = _load_compiled_jsonnet(test_name) + + return JSonnetGpuTest( + test_name=test_name, + docker_image=f'{test["image"]}:{test["imageTag"]}', + accelerator=Gpu( + machine_type='n/a', + image_family='n/a', + runtime_version='n/a', + count=test['accelerator']['count'], + accelerator_type=test['accelerator']['accelerator_type'], + ), + entrypoint_script=test['entrypoint'], + test_command=test['command'], + num_hosts=test['accelerator']['num_hosts'], + ) + + @property + def benchmark_id(self) -> str: + return self.test_name + + @property + def setup_script(self) -> str: + return shlex.join(self.entrypoint_script) + + @property + def test_script(self) -> str: + return shlex.join(self.test_command) diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index 0e0cb543..f66b1b3d 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -2,18 +2,20 @@ import concurrent.futures import logging import tempfile +from typing import Dict, TypeAlias from airflow.decorators import task import google.auth import google.auth.transport.requests from google.cloud import container_v1 import kubernetes -import yaml +from xlml.apis import gcp_config -def get_authenticated_client(project_id: str, region: str, cluster_name: str) -> kubernetes.client.ApiClient: + +def get_authenticated_client(gcp: gcp_config.GCPConfig, cluster_name: str) -> kubernetes.client.ApiClient: container_client = container_v1.ClusterManagerClient() - cluster_path = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}" + cluster_path = f"projects/{gcp.project_name}/locations/{gcp.zone}/clusters/{cluster_name}" response = container_client.get_cluster(name=cluster_path) creds, _ = google.auth.default() auth_req = google.auth.transport.requests.Request() @@ -28,11 +30,11 @@ def get_authenticated_client(project_id: str, region: str, cluster_name: str) -> return kubernetes.client.ApiClient(configuration) + @task -def deploy_job(job_yaml: str): - client = get_authenticated_client('cloud-ml-auto-solutions', 'us-central1', 'wcromar-test-cluster') +def deploy_job(body: Dict[str, object], gcp: gcp_config.GCPConfig): + client = get_authenticated_client(gcp, 'wcromar-test-cluster') - body = yaml.safe_load(job_yaml) jobs_client = kubernetes.client.BatchV1Api(client) resp = jobs_client.create_namespaced_job(namespace='default', body=body) @@ -85,6 +87,6 @@ def _watch_pod(name, namespace): for f in concurrent.futures.as_completed(futures): exit_code = f.result() if exit_code: - print('bad', exit_code) + raise RuntimeError(f'Non-zero exit code: {exit_code}') From dbb523f67fa630b354cc1f8fc1d20f501136255d Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Thu, 15 Feb 2024 22:53:59 +0000 Subject: [PATCH 03/29] split up `deploy_job` Change-Id: I7683d82b3f1ebce32b047f967c66b18b042373ca --- xlml/apis/task.py | 39 +-------------- xlml/apis/test_config.py | 2 + xlml/utils/gke.py | 101 ++++++++++++++++++++++++--------------- 3 files changed, 67 insertions(+), 75 deletions(-) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index c94310a9..f09c8417 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -530,7 +530,8 @@ def run(self) -> DAGNode: with TaskGroup( group_id=self.task_test_config.benchmark_id, prefix_group_id=True ) as group: - self.run_job() + job_body = self._get_job_manifest() + gke.run_job(job_body, self.task_gcp_config) return group @@ -563,25 +564,6 @@ def _get_job_manifest(self): "cloud.google.com/gke-accelerator": self.task_test_config.accelerator.accelerator_type, }, "restartPolicy": "Never", - # TODO: repo source code should be in test image - # "initContainers": [ - # { - # "name": "clone", - # "image": "alpine", - # "command": [ - # "sh", - # "-c", - # "cd /src\nwget https://github.com/pytorch/xla/archive/refs/heads/master.tar.gz -O - | tar xzf -\n" - # ], - # "volumeMounts": [ - # { - # "mountPath": "/src", - # "name": "dshm", - # "readOnly": False - # } - # ] - # } - # ], "containers": [ { "name": "main", @@ -626,11 +608,6 @@ def _get_job_manifest(self): "name": "dshm", "readOnly": False }, - # { - # "mountPath": "/src", - # "name": "dshm", - # "readOnly": False - # } ], }, ], @@ -641,20 +618,8 @@ def _get_job_manifest(self): }, "name": "dshm" }, - # { - # "emptyDir": { - # "medium": "Memory" - # }, - # "name": "src" - # } ] } }, }, } - - - def run_job(self) -> DAGNode: - job_body = self._get_job_manifest() - return gke.deploy_job(job_body, self.task_gcp_config) - diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index b8b8d5b0..1085bacd 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -397,6 +397,8 @@ def from_pytorch(test_name: str): def benchmark_id(self) -> str: return self.test_name + # HACK: setup script is used as the entrypoint in the test. Make sure it + # invokes the content of `test_script` at the end (e.g. "${@:0}"). @property def setup_script(self) -> str: return shlex.join(self.entrypoint_script) diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index f66b1b3d..0e6ccf8e 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -2,9 +2,9 @@ import concurrent.futures import logging import tempfile -from typing import Dict, TypeAlias +from typing import Any, Dict, Optional -from airflow.decorators import task +from airflow.decorators import task, task_group import google.auth import google.auth.transport.requests from google.cloud import container_v1 @@ -31,62 +31,87 @@ def get_authenticated_client(gcp: gcp_config.GCPConfig, cluster_name: str) -> ku return kubernetes.client.ApiClient(configuration) -@task -def deploy_job(body: Dict[str, object], gcp: gcp_config.GCPConfig): - client = get_authenticated_client(gcp, 'wcromar-test-cluster') +@task_group +def run_job(body: Dict[str, Any], gcp: gcp_config.GCPConfig): + """Run a batch job directly on a GKE cluster""" - jobs_client = kubernetes.client.BatchV1Api(client) - resp = jobs_client.create_namespaced_job(namespace='default', body=body) + @task + def deploy_job(): + client = get_authenticated_client(gcp, 'wcromar-test-cluster') - print(resp) - print(type(resp)) + jobs_client = kubernetes.client.BatchV1Api(client) + resp = jobs_client.create_namespaced_job(namespace='default', body=body) - core_v1 = kubernetes.client.CoreV1Api(client) + print(type(resp)) + print(resp) - pod_label_selector = "controller-uid=" + resp.metadata.uid - pods = core_v1.list_namespaced_pod(namespace='default', label_selector=pod_label_selector) - print(pods) + return resp.metadata.name + @task.sensor(poke_interval=60, timeout=3600, mode='reschedule') + def stream_logs(name: str): + client = get_authenticated_client(gcp, 'wcromar-test-cluster') - def _watch_pod(name, namespace): - logs_watcher = kubernetes.watch.Watch() + batch_v1 = kubernetes.client.BatchV1Api(client) + job = batch_v1.read_namespaced_job(namespace="default", name=name) - while True: - logging.info('Waiting for pod %s to start...', name) + # TODO: Handle other conditions (e.g. unschedulablility) + logging.info(f'Job status: {job.status}') + if job.status.failed: + raise RuntimeError(f'Job has {job.status.failed} failed pods.') + + core_v1 = kubernetes.client.CoreV1Api(client) + pod_label_selector = f'batch.kubernetes.io/job-name={name}' + pods = core_v1.list_namespaced_pod(namespace='default', label_selector=pod_label_selector) + + if len(pods.items) != body['spec']['parallelism']: + logging.info('Waiting for all pods to be created...') + return False + + def _watch_pod(name, namespace) -> Optional[int]: + logs_watcher = kubernetes.watch.Watch() + + logging.info(f'Waiting for pod {name} to start...') pod_watcher = kubernetes.watch.Watch() for event in pod_watcher.stream(core_v1.list_namespaced_pod, namespace, field_selector=f'metadata.name={name}'): status = event['object'].status - logging.info('Pod %s status: %s', event['object'].metadata.name, status.phase) + logging.info(f'Pod {event["object"].metadata.name} status: {status.phase}') if status.phase != 'Pending': break - if status.container_statuses: - container_status = status.container_statuses[0] - if status.container_statuses[0].state.terminated: + logging.info(f'Streaming pod logs for {name}...') + for line in logs_watcher.stream(core_v1.read_namespaced_pod_log, + name, namespace, _request_timeout=3600): + logging.info(f'{name}] {line}') + + logging.warning(f'Lost logs stream for {name}.') + + pod = core_v1.read_namespaced_pod(namespace="default", name=name) + if pod.status.container_statuses: + container_status = pod.status.container_statuses[0] + if pod.status.container_statuses[0].state.terminated: exit_code = container_status.state.terminated.exit_code if exit_code: - logging.error('Pod %s had non-zero exit code %d', name, exit_code) + logging.error(f'Pod {name} had non-zero exit code {exit_code}') return exit_code - logging.info('Streaming pod logs for %s...', name) - for line in logs_watcher.stream(core_v1.read_namespaced_pod_log, - name, namespace, _request_timeout=3600): - logging.info('%s] %s', name, line) - - logging.warning('Lost logs stream for %s.', name) + logging.warning(f'Unknown status for pod {name}') + return None - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [] - for pod in pods.items: - f = executor.submit(_watch_pod, pod.metadata.name, pod.metadata.namespace) - futures.append(f) + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [] + for pod in pods.items: + f = executor.submit(_watch_pod, pod.metadata.name, pod.metadata.namespace) + futures.append(f) - # Wait for pods to complete, and exit with the first non-zero exit code. - for f in concurrent.futures.as_completed(futures): - exit_code = f.result() - if exit_code: - raise RuntimeError(f'Non-zero exit code: {exit_code}') + # Wait for pods to complete, and exit with the first non-zero exit code. + for f in concurrent.futures.as_completed(futures): + exit_code = f.result() + if exit_code: + return RuntimeError('Non-zero exit code') + return True + name = deploy_job() + stream_logs(name) From 5eff68573ab4ee20c6f79829924c96b762fc966c Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Thu, 15 Feb 2024 22:54:06 +0000 Subject: [PATCH 04/29] limit epochs Change-Id: Ibfabea0833dfaf3e9f3922438c0d5dca97463592 --- dags/legacy_test/tests/pytorch/nightly/resnet50-mp.libsonnet | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/legacy_test/tests/pytorch/nightly/resnet50-mp.libsonnet b/dags/legacy_test/tests/pytorch/nightly/resnet50-mp.libsonnet index cd496fc2..0a54ea4d 100644 --- a/dags/legacy_test/tests/pytorch/nightly/resnet50-mp.libsonnet +++ b/dags/legacy_test/tests/pytorch/nightly/resnet50-mp.libsonnet @@ -149,6 +149,7 @@ local tpus = import 'templates/tpus.libsonnet'; command+: [ '--pjrt_distributed', '--nometrics_debug', + '--num_epochs=2', ], flags+: { modelDir: null, From 4842cdffdbbff487f3d2137fda1d535c4e6e725d Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 18:42:27 +0000 Subject: [PATCH 05/29] Make cluster name a parameter Change-Id: I5efbd5b7b7e61a471f2756f81bb0a8d23c8df355 --- dags/pytorch_xla/pytorchxla_multigpu.py | 2 +- xlml/apis/task.py | 4 +++- xlml/apis/test_config.py | 1 + xlml/utils/gke.py | 4 ++-- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/dags/pytorch_xla/pytorchxla_multigpu.py b/dags/pytorch_xla/pytorchxla_multigpu.py index f63b885b..455e3848 100644 --- a/dags/pytorch_xla/pytorchxla_multigpu.py +++ b/dags/pytorch_xla/pytorchxla_multigpu.py @@ -20,4 +20,4 @@ catchup=False, start_date=datetime.datetime(2023, 7, 12), ): - resnet_v100_2x2 = task.GpuGkeTask(test_config.JSonnetGpuTest.from_pytorch('pt-nightly-resnet50-mp-fake-v100-x2x2'), US_CENTRAL1).run() + resnet_v100_2x2 = task.GpuGkeTask(test_config.JSonnetGpuTest.from_pytorch('pt-nightly-resnet50-mp-fake-v100-x2x2'), US_CENTRAL1, 'wcromar-test-cluster').run() diff --git a/xlml/apis/task.py b/xlml/apis/task.py index f09c8417..9f559746 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -516,6 +516,8 @@ class GpuGkeTask(BaseTask): """ task_test_config: test_config.JSonnetGpuTest task_gcp_config: gcp_config.GCPConfig + cluster_name: str + # TODO: metrics # task_metric_config: Optional[metric_config.MetricConfig] = None def run(self) -> DAGNode: @@ -531,7 +533,7 @@ def run(self) -> DAGNode: group_id=self.task_test_config.benchmark_id, prefix_group_id=True ) as group: job_body = self._get_job_manifest() - gke.run_job(job_body, self.task_gcp_config) + gke.run_job(job_body, self.task_gcp_config, self.cluster_name) return group diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index 1085bacd..1cfa9d36 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -371,6 +371,7 @@ class JSonnetGpuTest(TestConfig[Gpu]): entrypoint_script: List[str] test_command: List[str] docker_image: str + startup_time_out_in_sec: int = attrs.field(default=300, kw_only=True) num_hosts: int = 1 @staticmethod diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index 0e6ccf8e..d0549935 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -32,12 +32,12 @@ def get_authenticated_client(gcp: gcp_config.GCPConfig, cluster_name: str) -> ku @task_group -def run_job(body: Dict[str, Any], gcp: gcp_config.GCPConfig): +def run_job(body: Dict[str, Any], gcp: gcp_config.GCPConfig, cluster_name: str): """Run a batch job directly on a GKE cluster""" @task def deploy_job(): - client = get_authenticated_client(gcp, 'wcromar-test-cluster') + client = get_authenticated_client(gcp, cluster_name) jobs_client = kubernetes.client.BatchV1Api(client) resp = jobs_client.create_namespaced_job(namespace='default', body=body) From d3cd9c63a3613010cf8d3b0583461c5e7b3d5b82 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 19:41:16 +0000 Subject: [PATCH 06/29] remove terrform from this PR Change-Id: Ia14f9601cf5f254af19b339416e525962e6e1e33 --- deployment/clusters.tf | 77 ------------------------------------------ 1 file changed, 77 deletions(-) delete mode 100644 deployment/clusters.tf diff --git a/deployment/clusters.tf b/deployment/clusters.tf deleted file mode 100644 index 51c524ad..00000000 --- a/deployment/clusters.tf +++ /dev/null @@ -1,77 +0,0 @@ -resource "google_container_cluster" "gpu-uc1" { - name = "wcromar-test-cluster" - project = "cloud-ml-auto-solutions" - location = "us-central1" - - release_channel { - channel = "RAPID" - } - - # We can't create a cluster with no node pool defined, but we want to only use - # separately managed node pools. So we create the smallest possible default - # node pool and immediately delete it. - remove_default_node_pool = true - initial_node_count = 1 -} - -resource "google_container_node_pool" "primary" { - name = "primary-pool" - project = google_container_cluster.gpu-uc1.project - location = google_container_cluster.gpu-uc1.location - cluster = google_container_cluster.gpu-uc1.name - node_count = 1 - - management { - auto_repair = true - auto_upgrade = true - } - - node_config { - preemptible = true - machine_type = "e2-medium" - - # Google recommends custom service accounts that have cloud-platform scope and permissions granted via IAM Roles. - # TODO: custom service account? - # service_account = google_service_account.default.email - oauth_scopes = [ - "https://www.googleapis.com/auth/cloud-platform" - ] - } -} - - -resource "google_container_node_pool" "nvidia-v100x2" { - name = "nvidia-v100x2-pool" - project = google_container_cluster.gpu-uc1.project - location = google_container_cluster.gpu-uc1.location - cluster = google_container_cluster.gpu-uc1.name - node_count = 3 - - node_locations = [ - "us-central1-b" - ] - - management { - auto_repair = true - auto_upgrade = true - } - - node_config { - preemptible = true - machine_type = "n1-highmem-16" - - # Google recommends custom service accounts that have cloud-platform scope and permissions granted via IAM Roles. - # TODO: custom service account? - # service_account = google_service_account.default.email - oauth_scopes = [ - "https://www.googleapis.com/auth/cloud-platform" - ] - guest_accelerator { - type = "nvidia-tesla-v100" - count = 2 - gpu_driver_installation_config { - gpu_driver_version = "LATEST" - } - } - } -} From 57dc55d1ad2b9a883d3604a86e0ce4f0b6db4816 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 19:41:21 +0000 Subject: [PATCH 07/29] use new cluster Change-Id: Ieca0e21a6e093f1635b60d1e29bbb663183d76e9 --- dags/pytorch_xla/pytorchxla_multigpu.py | 2 +- xlml/utils/gke.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/pytorch_xla/pytorchxla_multigpu.py b/dags/pytorch_xla/pytorchxla_multigpu.py index 455e3848..15908df1 100644 --- a/dags/pytorch_xla/pytorchxla_multigpu.py +++ b/dags/pytorch_xla/pytorchxla_multigpu.py @@ -20,4 +20,4 @@ catchup=False, start_date=datetime.datetime(2023, 7, 12), ): - resnet_v100_2x2 = task.GpuGkeTask(test_config.JSonnetGpuTest.from_pytorch('pt-nightly-resnet50-mp-fake-v100-x2x2'), US_CENTRAL1, 'wcromar-test-cluster').run() + resnet_v100_2x2 = task.GpuGkeTask(test_config.JSonnetGpuTest.from_pytorch('pt-nightly-resnet50-mp-fake-v100-x2x2'), US_CENTRAL1, 'gpu-uc1').run() diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index d0549935..82a42094 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -49,7 +49,7 @@ def deploy_job(): @task.sensor(poke_interval=60, timeout=3600, mode='reschedule') def stream_logs(name: str): - client = get_authenticated_client(gcp, 'wcromar-test-cluster') + client = get_authenticated_client(gcp, cluster_name) batch_v1 = kubernetes.client.BatchV1Api(client) job = batch_v1.read_namespaced_job(namespace="default", name=name) From 074c4e2499645d2798ef7b0b59c4538a6b528390 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 21:00:32 +0000 Subject: [PATCH 08/29] comment Change-Id: Ib1ae393fec7b9e9fe8b88e87812cbd0a7a9e9ecb --- xlml/apis/task.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 9f559746..5a4c9035 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -556,6 +556,7 @@ def _get_job_manifest(self): "parallelism": self.task_test_config.num_hosts, "template": { "metadata": { + # Matches `headless-svc` in GKE cluster. See deployments directory. "labels": { "headless-svc": "true" }, From 9d74f72423c8b9ee0a9ffb1fd998062faf3a04d5 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 21:11:45 +0000 Subject: [PATCH 09/29] set timeout Change-Id: Ib54bd01bd54c5c785beb2dab1cddcaede1720ca9 --- xlml/apis/task.py | 3 ++- xlml/apis/test_config.py | 1 - xlml/utils/gke.py | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 5a4c9035..e7baf330 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -517,6 +517,7 @@ class GpuGkeTask(BaseTask): task_test_config: test_config.JSonnetGpuTest task_gcp_config: gcp_config.GCPConfig cluster_name: str + job_create_timeout: datetime.timedelta = datetime.timedelta(minutes=10) # TODO: metrics # task_metric_config: Optional[metric_config.MetricConfig] = None @@ -533,7 +534,7 @@ def run(self) -> DAGNode: group_id=self.task_test_config.benchmark_id, prefix_group_id=True ) as group: job_body = self._get_job_manifest() - gke.run_job(job_body, self.task_gcp_config, self.cluster_name) + gke.run_job(job_body, self.task_gcp_config, self.cluster_name, self.job_create_timeout) return group diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index 1cfa9d36..1085bacd 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -371,7 +371,6 @@ class JSonnetGpuTest(TestConfig[Gpu]): entrypoint_script: List[str] test_command: List[str] docker_image: str - startup_time_out_in_sec: int = attrs.field(default=300, kw_only=True) num_hosts: int = 1 @staticmethod diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index 82a42094..3d59cec9 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -1,5 +1,6 @@ import base64 import concurrent.futures +import datetime import logging import tempfile from typing import Any, Dict, Optional @@ -32,7 +33,7 @@ def get_authenticated_client(gcp: gcp_config.GCPConfig, cluster_name: str) -> ku @task_group -def run_job(body: Dict[str, Any], gcp: gcp_config.GCPConfig, cluster_name: str): +def run_job(body: Dict[str, Any], gcp: gcp_config.GCPConfig, cluster_name: str, job_create_timeout: datetime.timedelta): """Run a batch job directly on a GKE cluster""" @task @@ -42,12 +43,11 @@ def deploy_job(): jobs_client = kubernetes.client.BatchV1Api(client) resp = jobs_client.create_namespaced_job(namespace='default', body=body) - print(type(resp)) - print(resp) + logging.info(f'response: {resp}') return resp.metadata.name - @task.sensor(poke_interval=60, timeout=3600, mode='reschedule') + @task.sensor(poke_interval=60, timeout=job_create_timeout.total_seconds(), mode='reschedule') def stream_logs(name: str): client = get_authenticated_client(gcp, cluster_name) From 24d030c42c408e09e99d0c741545aa18f504284e Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 21:15:23 +0000 Subject: [PATCH 10/29] move to main dag Change-Id: I0f28c7e5a49ac6c8e2655f59d5b9b9db4a5f09e9 --- dags/pytorch_xla/pytorchxla_multigpu.py | 23 ---------------------- dags/pytorch_xla/pytorchxla_torchvision.py | 9 +++++++++ 2 files changed, 9 insertions(+), 23 deletions(-) delete mode 100644 dags/pytorch_xla/pytorchxla_multigpu.py diff --git a/dags/pytorch_xla/pytorchxla_multigpu.py b/dags/pytorch_xla/pytorchxla_multigpu.py deleted file mode 100644 index 15908df1..00000000 --- a/dags/pytorch_xla/pytorchxla_multigpu.py +++ /dev/null @@ -1,23 +0,0 @@ -import datetime -from airflow import models - -from dags.vm_resource import Project -from xlml.apis import gcp_config, test_config, task - - -US_CENTRAL1 = gcp_config.GCPConfig( - Project.CLOUD_ML_AUTO_SOLUTIONS.value, - # HACK: use region in place of zone, since clusters are regional - zone='us-central1', - dataset_name=..., -) - - -with models.DAG( - dag_id="pytorchxla-multigpu", - schedule=None, - tags=["pytorchxla", "latest", "supported", "xlml"], - catchup=False, - start_date=datetime.datetime(2023, 7, 12), -): - resnet_v100_2x2 = task.GpuGkeTask(test_config.JSonnetGpuTest.from_pytorch('pt-nightly-resnet50-mp-fake-v100-x2x2'), US_CENTRAL1, 'gpu-uc1').run() diff --git a/dags/pytorch_xla/pytorchxla_torchvision.py b/dags/pytorch_xla/pytorchxla_torchvision.py index b00a30c3..924db6c7 100644 --- a/dags/pytorch_xla/pytorchxla_torchvision.py +++ b/dags/pytorch_xla/pytorchxla_torchvision.py @@ -32,6 +32,13 @@ metric_config.DatasetOption.XLML_DATASET, ) +US_CENTRAL1 = gcp_config.GCPConfig( + Project.CLOUD_ML_AUTO_SOLUTIONS.value, + # HACK: use region in place of zone, since clusters are regional + zone='us-central1', + dataset_name=..., +) + with models.DAG( dag_id="pytorchxla-torchvision", @@ -59,3 +66,5 @@ mnist_v2_8 >> resnet_v2_8 mnist_v2_8 >> resnet_v4_8 + + resnet_v100_2x2 = task.GpuGkeTask(test_config.JSonnetGpuTest.from_pytorch('pt-nightly-resnet50-mp-fake-v100-x2x2'), US_CENTRAL1, 'gpu-uc1').run() From c2e0d3de19012a00b8bc3f383b1c5f1e17dd0141 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 21:17:23 +0000 Subject: [PATCH 11/29] format Change-Id: Ie32ce56bf74fb31b420d693251c52539d8d3f8c2 --- dags/pytorch_xla/pytorchxla_torchvision.py | 8 +- xlml/apis/task.py | 156 ++++++++++----------- xlml/apis/test_config.py | 41 +++--- xlml/utils/gke.py | 45 ++++-- xlml/utils/gpu.py | 14 +- 5 files changed, 138 insertions(+), 126 deletions(-) diff --git a/dags/pytorch_xla/pytorchxla_torchvision.py b/dags/pytorch_xla/pytorchxla_torchvision.py index 924db6c7..20cabafd 100644 --- a/dags/pytorch_xla/pytorchxla_torchvision.py +++ b/dags/pytorch_xla/pytorchxla_torchvision.py @@ -35,7 +35,7 @@ US_CENTRAL1 = gcp_config.GCPConfig( Project.CLOUD_ML_AUTO_SOLUTIONS.value, # HACK: use region in place of zone, since clusters are regional - zone='us-central1', + zone="us-central1", dataset_name=..., ) @@ -67,4 +67,8 @@ mnist_v2_8 >> resnet_v2_8 mnist_v2_8 >> resnet_v4_8 - resnet_v100_2x2 = task.GpuGkeTask(test_config.JSonnetGpuTest.from_pytorch('pt-nightly-resnet50-mp-fake-v100-x2x2'), US_CENTRAL1, 'gpu-uc1').run() + resnet_v100_2x2 = task.GpuGkeTask( + test_config.JSonnetGpuTest.from_pytorch("pt-nightly-resnet50-mp-fake-v100-x2x2"), + US_CENTRAL1, + "gpu-uc1", + ).run() diff --git a/xlml/apis/task.py b/xlml/apis/task.py index e7baf330..c9320cf2 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -514,6 +514,7 @@ class GpuGkeTask(BaseTask): image_project: the project that an image belongs to. image_family: the family group that an image belongs to. """ + task_test_config: test_config.JSonnetGpuTest task_gcp_config: gcp_config.GCPConfig cluster_name: str @@ -534,96 +535,87 @@ def run(self) -> DAGNode: group_id=self.task_test_config.benchmark_id, prefix_group_id=True ) as group: job_body = self._get_job_manifest() - gke.run_job(job_body, self.task_gcp_config, self.cluster_name, self.job_create_timeout) + gke.run_job( + job_body, self.task_gcp_config, self.cluster_name, self.job_create_timeout + ) return group def _get_job_manifest(self): return { - "apiVersion": "batch/v1", - "kind": "Job", - "metadata": { - "generateName": f"{self.task_test_config.benchmark_id}-", - "labels": { - "accelerator": self.task_test_config.accelerator.name, - "benchmarkId": self.task_test_config.benchmark_id, - } - }, - "spec": { - # "activeDeadlineSeconds": 10800, - "backoffLimit": 0, - "completionMode": "Indexed", - "completions": self.task_test_config.num_hosts, - "parallelism": self.task_test_config.num_hosts, - "template": { - "metadata": { - # Matches `headless-svc` in GKE cluster. See deployments directory. + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "generateName": f"{self.task_test_config.benchmark_id}-", "labels": { - "headless-svc": "true" - }, - }, - "spec": { - "subdomain": "headless-svc", - "nodeSelector": { - "cloud.google.com/gke-accelerator": self.task_test_config.accelerator.accelerator_type, + "accelerator": self.task_test_config.accelerator.name, + "benchmarkId": self.task_test_config.benchmark_id, }, - "restartPolicy": "Never", - "containers": [ - { - "name": "main", - "image": self.task_test_config.docker_image, - "imagePullPolicy": "Always", - "command": shlex.split(self.task_test_config.setup_script), - "args": shlex.split(self.task_test_config.test_script), - "resources": { - "limits": { - "nvidia.com/gpu": self.task_test_config.accelerator.count, - } + }, + "spec": { + # "activeDeadlineSeconds": 10800, + "backoffLimit": 0, + "completionMode": "Indexed", + "completions": self.task_test_config.num_hosts, + "parallelism": self.task_test_config.num_hosts, + "template": { + "metadata": { + # Matches `headless-svc` in GKE cluster. See deployments directory. + "labels": {"headless-svc": "true"}, }, - "env": [ - { - "name": "POD_NAME", - "valueFrom": { - "fieldRef": { - "fieldPath": "metadata.name" - } - } - }, - { - "name": "POD_NAMESPACE", - "valueFrom": { - "fieldRef": { - "fieldPath": "metadata.namespace" - } - } - }, - { - "name": "JOB_NAME", - "valueFrom": { - "fieldRef": { - "fieldPath": "metadata.labels['job-name']" - } - } - }, - ], - "volumeMounts": [ - { - "mountPath": "/dev/shm", - "name": "dshm", - "readOnly": False - }, - ], - }, - ], - "volumes": [ - { - "emptyDir": { - "medium": "Memory" + "spec": { + "subdomain": "headless-svc", + "nodeSelector": { + "cloud.google.com/gke-accelerator": self.task_test_config.accelerator.accelerator_type, + }, + "restartPolicy": "Never", + "containers": [ + { + "name": "main", + "image": self.task_test_config.docker_image, + "imagePullPolicy": "Always", + "command": shlex.split(self.task_test_config.setup_script), + "args": shlex.split(self.task_test_config.test_script), + "resources": { + "limits": { + "nvidia.com/gpu": self.task_test_config.accelerator.count, + } + }, + "env": [ + { + "name": "POD_NAME", + "valueFrom": { + "fieldRef": {"fieldPath": "metadata.name"} + }, + }, + { + "name": "POD_NAMESPACE", + "valueFrom": { + "fieldRef": {"fieldPath": "metadata.namespace"} + }, + }, + { + "name": "JOB_NAME", + "valueFrom": { + "fieldRef": { + "fieldPath": "metadata.labels['job-name']" + } + }, + }, + ], + "volumeMounts": [ + { + "mountPath": "/dev/shm", + "name": "dshm", + "readOnly": False, + }, + ], + }, + ], + "volumes": [ + {"emptyDir": {"medium": "Memory"}, "name": "dshm"}, + ], }, - "name": "dshm" - }, - ] - } + }, }, - }, } diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index 1085bacd..6aab900f 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -252,9 +252,7 @@ def test_script(self) -> str: def _load_compiled_jsonnet(test_name: str) -> Any: # TODO(wcromar): Parse GPU tests too - config_dir = os.environ.get( - 'XLMLTEST_CONFIGS', '/home/airflow/gcs/dags/dags/jsonnet' - ) + config_dir = os.environ.get('XLMLTEST_CONFIGS', '/home/airflow/gcs/dags/dags/jsonnet') test_path = os.path.join(config_dir, test_name) with open(test_path, 'r') as f: test = json.load(f) @@ -283,7 +281,6 @@ class JSonnetTpuVmTest(TestConfig[Tpu]): test_command: List[str] num_slices: int = 1 - @staticmethod def _from_json_helper( test: Any, @@ -345,11 +342,13 @@ def setup_script(self) -> Optional[str]: # TODO(wcromar): replace configmaps @property def test_script(self) -> str: - return '\n'.join([ - 'set -xue', - self.exports, - ' '.join(shlex.quote(s) for s in self.test_command), - ]) + return '\n'.join( + [ + 'set -xue', + self.exports, + ' '.join(shlex.quote(s) for s in self.test_command), + ] + ) @attrs.define @@ -379,18 +378,18 @@ def from_pytorch(test_name: str): test = _load_compiled_jsonnet(test_name) return JSonnetGpuTest( - test_name=test_name, - docker_image=f'{test["image"]}:{test["imageTag"]}', - accelerator=Gpu( - machine_type='n/a', - image_family='n/a', - runtime_version='n/a', - count=test['accelerator']['count'], - accelerator_type=test['accelerator']['accelerator_type'], - ), - entrypoint_script=test['entrypoint'], - test_command=test['command'], - num_hosts=test['accelerator']['num_hosts'], + test_name=test_name, + docker_image=f'{test["image"]}:{test["imageTag"]}', + accelerator=Gpu( + machine_type='n/a', + image_family='n/a', + runtime_version='n/a', + count=test['accelerator']['count'], + accelerator_type=test['accelerator']['accelerator_type'], + ), + entrypoint_script=test['entrypoint'], + test_command=test['command'], + num_hosts=test['accelerator']['num_hosts'], ) @property diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index 3d59cec9..81f13ab8 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -14,26 +14,35 @@ from xlml.apis import gcp_config -def get_authenticated_client(gcp: gcp_config.GCPConfig, cluster_name: str) -> kubernetes.client.ApiClient: +def get_authenticated_client( + gcp: gcp_config.GCPConfig, cluster_name: str +) -> kubernetes.client.ApiClient: container_client = container_v1.ClusterManagerClient() - cluster_path = f"projects/{gcp.project_name}/locations/{gcp.zone}/clusters/{cluster_name}" + cluster_path = ( + f'projects/{gcp.project_name}/locations/{gcp.zone}/clusters/{cluster_name}' + ) response = container_client.get_cluster(name=cluster_path) creds, _ = google.auth.default() auth_req = google.auth.transport.requests.Request() creds.refresh(auth_req) configuration = kubernetes.client.Configuration() - configuration.host = f"https://{response.endpoint}" + configuration.host = f'https://{response.endpoint}' with tempfile.NamedTemporaryFile(delete=False) as ca_cert: ca_cert.write(base64.b64decode(response.master_auth.cluster_ca_certificate)) configuration.ssl_ca_cert = ca_cert.name - configuration.api_key_prefix["authorization"] = "Bearer" - configuration.api_key["authorization"] = creds.token + configuration.api_key_prefix['authorization'] = 'Bearer' + configuration.api_key['authorization'] = creds.token - return kubernetes.client.ApiClient(configuration) + return kubernetes.client.ApiClient(configuration) @task_group -def run_job(body: Dict[str, Any], gcp: gcp_config.GCPConfig, cluster_name: str, job_create_timeout: datetime.timedelta): +def run_job( + body: Dict[str, Any], + gcp: gcp_config.GCPConfig, + cluster_name: str, + job_create_timeout: datetime.timedelta, +): """Run a batch job directly on a GKE cluster""" @task @@ -47,12 +56,14 @@ def deploy_job(): return resp.metadata.name - @task.sensor(poke_interval=60, timeout=job_create_timeout.total_seconds(), mode='reschedule') + @task.sensor( + poke_interval=60, timeout=job_create_timeout.total_seconds(), mode='reschedule' + ) def stream_logs(name: str): client = get_authenticated_client(gcp, cluster_name) batch_v1 = kubernetes.client.BatchV1Api(client) - job = batch_v1.read_namespaced_job(namespace="default", name=name) + job = batch_v1.read_namespaced_job(namespace='default', name=name) # TODO: Handle other conditions (e.g. unschedulablility) logging.info(f'Job status: {job.status}') @@ -61,7 +72,9 @@ def stream_logs(name: str): core_v1 = kubernetes.client.CoreV1Api(client) pod_label_selector = f'batch.kubernetes.io/job-name={name}' - pods = core_v1.list_namespaced_pod(namespace='default', label_selector=pod_label_selector) + pods = core_v1.list_namespaced_pod( + namespace='default', label_selector=pod_label_selector + ) if len(pods.items) != body['spec']['parallelism']: logging.info('Waiting for all pods to be created...') @@ -72,21 +85,23 @@ def _watch_pod(name, namespace) -> Optional[int]: logging.info(f'Waiting for pod {name} to start...') pod_watcher = kubernetes.watch.Watch() - for event in pod_watcher.stream(core_v1.list_namespaced_pod, namespace, - field_selector=f'metadata.name={name}'): + for event in pod_watcher.stream( + core_v1.list_namespaced_pod, namespace, field_selector=f'metadata.name={name}' + ): status = event['object'].status logging.info(f'Pod {event["object"].metadata.name} status: {status.phase}') if status.phase != 'Pending': break logging.info(f'Streaming pod logs for {name}...') - for line in logs_watcher.stream(core_v1.read_namespaced_pod_log, - name, namespace, _request_timeout=3600): + for line in logs_watcher.stream( + core_v1.read_namespaced_pod_log, name, namespace, _request_timeout=3600 + ): logging.info(f'{name}] {line}') logging.warning(f'Lost logs stream for {name}.') - pod = core_v1.read_namespaced_pod(namespace="default", name=name) + pod = core_v1.read_namespaced_pod(namespace='default', name=name) if pod.status.container_statuses: container_status = pod.status.container_statuses[0] if pod.status.container_statuses[0].state.terminated: diff --git a/xlml/utils/gpu.py b/xlml/utils/gpu.py index 70bf5136..f3dcaed4 100644 --- a/xlml/utils/gpu.py +++ b/xlml/utils/gpu.py @@ -307,12 +307,14 @@ def create_resource( image = get_image_from_family(project=image_project, family=image_family) disk_type = f"zones/{gcp.zone}/diskTypes/pd-ssd" disks = [disk_from_image(disk_type, 100, True, image.self_link)] - metadata = create_metadata({ - # "install-nvidia-driver": "True", - "install-nvidia-driver": "False", - "proxy-mode": "project_editors", - "ssh-keys": f"cloud-ml-auto-solutions:{ssh_keys.public}", - }) + metadata = create_metadata( + { + # "install-nvidia-driver": "True", + "install-nvidia-driver": "False", + "proxy-mode": "project_editors", + "ssh-keys": f"cloud-ml-auto-solutions:{ssh_keys.public}", + } + ) acceleratorConfig = compute_v1.AcceleratorConfig( accelerator_count=accelerator.count, accelerator_type=f"projects/{gcp.project_name}/zones/{gcp.zone}/acceleratorTypes/{accelerator.accelerator_type}", From 3a75b0a690700d75939c8fe5fb5ad780078b450f Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 21:23:09 +0000 Subject: [PATCH 12/29] factor out get_authenticated_client Change-Id: I2a7213cc67d11f4089409cbc52e29b27a0d5fffb --- xlml/utils/gke.py | 8 ++++---- xlml/utils/xpk.py | 24 +++--------------------- 2 files changed, 7 insertions(+), 25 deletions(-) diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index 81f13ab8..003e668b 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -15,11 +15,11 @@ def get_authenticated_client( - gcp: gcp_config.GCPConfig, cluster_name: str + project_name: str, region: str, cluster_name: str ) -> kubernetes.client.ApiClient: container_client = container_v1.ClusterManagerClient() cluster_path = ( - f'projects/{gcp.project_name}/locations/{gcp.zone}/clusters/{cluster_name}' + f'projects/{project_name}/locations/{region}/clusters/{cluster_name}' ) response = container_client.get_cluster(name=cluster_path) creds, _ = google.auth.default() @@ -47,7 +47,7 @@ def run_job( @task def deploy_job(): - client = get_authenticated_client(gcp, cluster_name) + client = get_authenticated_client(gcp.project_name, gcp.zone, cluster_name) jobs_client = kubernetes.client.BatchV1Api(client) resp = jobs_client.create_namespaced_job(namespace='default', body=body) @@ -60,7 +60,7 @@ def deploy_job(): poke_interval=60, timeout=job_create_timeout.total_seconds(), mode='reschedule' ) def stream_logs(name: str): - client = get_authenticated_client(gcp, cluster_name) + client = get_authenticated_client(gcp.project_name, gcp.zone, cluster_name) batch_v1 = kubernetes.client.BatchV1Api(client) job = batch_v1.read_namespaced_job(namespace='default', name=name) diff --git a/xlml/utils/xpk.py b/xlml/utils/xpk.py index de6bad3e..555349bd 100644 --- a/xlml/utils/xpk.py +++ b/xlml/utils/xpk.py @@ -14,17 +14,13 @@ """Utilities to run workloads with xpk (https://github.com/google/xpk).""" -import base64 -from tempfile import NamedTemporaryFile import uuid from absl import logging from airflow.decorators import task from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator -import google.auth -import google.auth.transport.requests -from google.cloud import container_v1 from kubernetes import client as k8s_client from kubernetes.client import models as k8s_models +from xlml.utils import gke @task @@ -97,24 +93,10 @@ def wait_for_workload_completion( workload_id: str, project_id: str, region: str, cluster_name: str ) -> bool: """Check the workload status.""" - - # Get cluster configuration - container_client = container_v1.ClusterManagerClient() - cluster_path = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}" - response = container_client.get_cluster(name=cluster_path) - creds, _ = google.auth.default() - auth_req = google.auth.transport.requests.Request() - creds.refresh(auth_req) - configuration = k8s_client.Configuration() - configuration.host = f"https://{response.endpoint}" - with NamedTemporaryFile(delete=False) as ca_cert: - ca_cert.write(base64.b64decode(response.master_auth.cluster_ca_certificate)) - configuration.ssl_ca_cert = ca_cert.name - configuration.api_key_prefix["authorization"] = "Bearer" - configuration.api_key["authorization"] = creds.token + client = gke.get_authenticated_client(project_id, region, cluster_name) # Initilize the client - core_api = k8s_client.CoreV1Api(k8s_client.ApiClient(configuration)) + core_api = k8s_client.CoreV1Api(client) logging.info("Successful initilize k8s client from cluster response.") # Get pods for the workload From 61a28ddc8df855a5fdf8861d43c6442e2bc39d4a Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 21:24:40 +0000 Subject: [PATCH 13/29] naming Change-Id: Id0c6b916fb97ba4567caff6e7560cde21178e40c --- xlml/utils/gke.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index 003e668b..dd5d4367 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -62,17 +62,17 @@ def deploy_job(): def stream_logs(name: str): client = get_authenticated_client(gcp.project_name, gcp.zone, cluster_name) - batch_v1 = kubernetes.client.BatchV1Api(client) - job = batch_v1.read_namespaced_job(namespace='default', name=name) + batch_api = kubernetes.client.BatchV1Api(client) + job = batch_api.read_namespaced_job(namespace='default', name=name) # TODO: Handle other conditions (e.g. unschedulablility) logging.info(f'Job status: {job.status}') if job.status.failed: raise RuntimeError(f'Job has {job.status.failed} failed pods.') - core_v1 = kubernetes.client.CoreV1Api(client) + core_api = kubernetes.client.CoreV1Api(client) pod_label_selector = f'batch.kubernetes.io/job-name={name}' - pods = core_v1.list_namespaced_pod( + pods = core_api.list_namespaced_pod( namespace='default', label_selector=pod_label_selector ) @@ -86,7 +86,7 @@ def _watch_pod(name, namespace) -> Optional[int]: logging.info(f'Waiting for pod {name} to start...') pod_watcher = kubernetes.watch.Watch() for event in pod_watcher.stream( - core_v1.list_namespaced_pod, namespace, field_selector=f'metadata.name={name}' + core_api.list_namespaced_pod, namespace, field_selector=f'metadata.name={name}' ): status = event['object'].status logging.info(f'Pod {event["object"].metadata.name} status: {status.phase}') @@ -95,13 +95,13 @@ def _watch_pod(name, namespace) -> Optional[int]: logging.info(f'Streaming pod logs for {name}...') for line in logs_watcher.stream( - core_v1.read_namespaced_pod_log, name, namespace, _request_timeout=3600 + core_api.read_namespaced_pod_log, name, namespace, _request_timeout=3600 ): logging.info(f'{name}] {line}') logging.warning(f'Lost logs stream for {name}.') - pod = core_v1.read_namespaced_pod(namespace='default', name=name) + pod = core_api.read_namespaced_pod(namespace='default', name=name) if pod.status.container_statuses: container_status = pod.status.container_statuses[0] if pod.status.container_statuses[0].state.terminated: From bad8b382b741c8a4a179ae0a619e99c9bf8f296e Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 21:24:57 +0000 Subject: [PATCH 14/29] formatting Change-Id: Ie97fc0545a399a8d088907afbe3e570818e5dab0 --- xlml/utils/gke.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index dd5d4367..b77c659e 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -18,9 +18,7 @@ def get_authenticated_client( project_name: str, region: str, cluster_name: str ) -> kubernetes.client.ApiClient: container_client = container_v1.ClusterManagerClient() - cluster_path = ( - f'projects/{project_name}/locations/{region}/clusters/{cluster_name}' - ) + cluster_path = f'projects/{project_name}/locations/{region}/clusters/{cluster_name}' response = container_client.get_cluster(name=cluster_path) creds, _ = google.auth.default() auth_req = google.auth.transport.requests.Request() @@ -86,7 +84,9 @@ def _watch_pod(name, namespace) -> Optional[int]: logging.info(f'Waiting for pod {name} to start...') pod_watcher = kubernetes.watch.Watch() for event in pod_watcher.stream( - core_api.list_namespaced_pod, namespace, field_selector=f'metadata.name={name}' + core_api.list_namespaced_pod, + namespace, + field_selector=f'metadata.name={name}', ): status = event['object'].status logging.info(f'Pod {event["object"].metadata.name} status: {status.phase}') From b08623ee91908991a275d1c70f850aea6032f7f7 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Fri, 16 Feb 2024 21:49:32 +0000 Subject: [PATCH 15/29] format with correct version Change-Id: I76ed5ca9ca0cce013ede30a451039c39690f4f17 --- xlml/apis/test_config.py | 12 +++++------- xlml/utils/gpu.py | 14 ++++++-------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index 6aab900f..ffd5fe68 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -342,13 +342,11 @@ def setup_script(self) -> Optional[str]: # TODO(wcromar): replace configmaps @property def test_script(self) -> str: - return '\n'.join( - [ - 'set -xue', - self.exports, - ' '.join(shlex.quote(s) for s in self.test_command), - ] - ) + return '\n'.join([ + 'set -xue', + self.exports, + ' '.join(shlex.quote(s) for s in self.test_command), + ]) @attrs.define diff --git a/xlml/utils/gpu.py b/xlml/utils/gpu.py index f3dcaed4..70bf5136 100644 --- a/xlml/utils/gpu.py +++ b/xlml/utils/gpu.py @@ -307,14 +307,12 @@ def create_resource( image = get_image_from_family(project=image_project, family=image_family) disk_type = f"zones/{gcp.zone}/diskTypes/pd-ssd" disks = [disk_from_image(disk_type, 100, True, image.self_link)] - metadata = create_metadata( - { - # "install-nvidia-driver": "True", - "install-nvidia-driver": "False", - "proxy-mode": "project_editors", - "ssh-keys": f"cloud-ml-auto-solutions:{ssh_keys.public}", - } - ) + metadata = create_metadata({ + # "install-nvidia-driver": "True", + "install-nvidia-driver": "False", + "proxy-mode": "project_editors", + "ssh-keys": f"cloud-ml-auto-solutions:{ssh_keys.public}", + }) acceleratorConfig = compute_v1.AcceleratorConfig( accelerator_count=accelerator.count, accelerator_type=f"projects/{gcp.project_name}/zones/{gcp.zone}/acceleratorTypes/{accelerator.accelerator_type}", From c59a53f2e627936ebf120b3adebdb6506226aa85 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 17:51:45 +0000 Subject: [PATCH 16/29] add commented dep Change-Id: I79cb359c070b5bcd3b4d16188217c5ef9417b0c1 --- deployment/cloud_composer_template.tf | 1 + 1 file changed, 1 insertion(+) diff --git a/deployment/cloud_composer_template.tf b/deployment/cloud_composer_template.tf index 032c4372..a807f80c 100644 --- a/deployment/cloud_composer_template.tf +++ b/deployment/cloud_composer_template.tf @@ -141,6 +141,7 @@ resource "google_composer_environment" "example_environment" { # google-cloud-container = "" # tensorflow-cpu = "" # apache-airflow-providers-cncf-kubernetes = "" + # kubernetes = "" } } From eebc64825947a1b33fdbd009f51c0924255546b1 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 18:18:09 +0000 Subject: [PATCH 17/29] docstrings Change-Id: I433000f39c12dfb2d402ab445f9ba938048ebdb2 --- dags/pytorch_xla/pytorchxla_multigpu.py | 27 +++++++++++++++++++++++++ xlml/apis/task.py | 7 ++++--- xlml/apis/test_config.py | 9 +++++---- xlml/utils/gke.py | 9 ++++++++- 4 files changed, 44 insertions(+), 8 deletions(-) create mode 100644 dags/pytorch_xla/pytorchxla_multigpu.py diff --git a/dags/pytorch_xla/pytorchxla_multigpu.py b/dags/pytorch_xla/pytorchxla_multigpu.py new file mode 100644 index 00000000..6a9a6d23 --- /dev/null +++ b/dags/pytorch_xla/pytorchxla_multigpu.py @@ -0,0 +1,27 @@ +import datetime +from airflow import models + +from dags.vm_resource import Project +from xlml.apis import gcp_config, test_config, task + + +US_CENTRAL1 = gcp_config.GCPConfig( + Project.CLOUD_ML_AUTO_SOLUTIONS.value, + # HACK: use region in place of zone, since clusters are regional + zone="us-central1", + dataset_name=..., +) + + +with models.DAG( + dag_id="pytorchxla-multigpu", + schedule=None, + tags=["pytorchxla", "latest", "supported", "xlml"], + catchup=False, + start_date=datetime.datetime(2023, 7, 12), +): + resnet_v100_2x2 = task.GpuGkeTask( + test_config.JSonnetGpuTest.from_pytorch("pt-nightly-resnet50-mp-fake-v100-x2x2"), + US_CENTRAL1, + "gpu-uc1", + ).run() diff --git a/xlml/apis/task.py b/xlml/apis/task.py index c9320cf2..253298e2 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -508,11 +508,13 @@ def clean_up(self, resource: airflow.XComArg, project_id: str, zone: str) -> DAG @dataclasses.dataclass class GpuGkeTask(BaseTask): - """This is a class to set up tasks for GPU. + """This is a class to set up tasks for GPU on a GKE cluster. Attributes: image_project: the project that an image belongs to. image_family: the family group that an image belongs to. + cluster_name: Name of the GCP cluster. + job_create_timeout: Amount of time to wait for all pods to become active. """ task_test_config: test_config.JSonnetGpuTest @@ -526,8 +528,7 @@ def run(self) -> DAGNode: """Run a test job. Returns: - A task group with the following tasks chained: provision, run_model, - post_process, clean_up. + A task group that runs the given test config on a GKE cluster. """ # piz: We skip the queued resource for GPU for now since there is no queued # resource command for GPU. diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index ffd5fe68..0dce7fc0 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -358,15 +358,16 @@ class JSonnetGpuTest(TestConfig[Gpu]): Attributes: test_name: Unique name of this test/model. - setup: Multi-line script that configures the TPU instance. - exports: Extra setup commands to run in same shell as test_command. test_command: Command and arguments to execute on the TPU VM. - num_slices: Number of TPU slices. + entrypoint: Multi-line script that configures the GPU instance and invokes + `test_command`. + docker_image: Image for main test container. + num_hosts: Number of GPU hosts. """ test_name: str - entrypoint_script: List[str] test_command: List[str] + entrypoint_script: List[str] docker_image: str num_hosts: int = 1 diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index b77c659e..010a0513 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -41,7 +41,14 @@ def run_job( cluster_name: str, job_create_timeout: datetime.timedelta, ): - """Run a batch job directly on a GKE cluster""" + """Run a batch job directly on a GKE cluster. + + Args: + body: Dict that defines a Kubernetes `Job`. + gcp: GCP config with the project name and zone of the GKE cluster. + cluster_name: Name of the GCP cluster. + job_create_timeout: Amount of time to wait for all pods to become active. + """ @task def deploy_job(): From 33a7de45769783c2d0c426aa942392038d8fde66 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 18:19:32 +0000 Subject: [PATCH 18/29] update some comments Change-Id: I5f83e564b15c8a97cb9805702e8162e8e29ef326 --- xlml/apis/task.py | 2 -- xlml/utils/gke.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 253298e2..2c4665d8 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -530,8 +530,6 @@ def run(self) -> DAGNode: Returns: A task group that runs the given test config on a GKE cluster. """ - # piz: We skip the queued resource for GPU for now since there is no queued - # resource command for GPU. with TaskGroup( group_id=self.task_test_config.benchmark_id, prefix_group_id=True ) as group: diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index 010a0513..0c97e816 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -70,7 +70,7 @@ def stream_logs(name: str): batch_api = kubernetes.client.BatchV1Api(client) job = batch_api.read_namespaced_job(namespace='default', name=name) - # TODO: Handle other conditions (e.g. unschedulablility) + # TODO(wcromar): Handle other conditions (e.g. unschedulablility) logging.info(f'Job status: {job.status}') if job.status.failed: raise RuntimeError(f'Job has {job.status.failed} failed pods.') From 191f582f5cc07859cca238736fc4f000b28d123a Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 18:45:17 +0000 Subject: [PATCH 19/29] implement timeout Change-Id: Iffc7b7d551e317e130f12c384213df8a5a56c3c9 --- xlml/apis/task.py | 2 +- xlml/apis/test_config.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 2c4665d8..f0f4ef55 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -552,7 +552,7 @@ def _get_job_manifest(self): }, }, "spec": { - # "activeDeadlineSeconds": 10800, + "activeDeadlineSeconds": datetime.timedelta(minutes=self.task_test_config.time_out_in_min or 60).total_seconds(), "backoffLimit": 0, "completionMode": "Indexed", "completions": self.task_test_config.num_hosts, diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index 0dce7fc0..87c52e41 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -127,6 +127,7 @@ class TestConfig(abc.ABC, Generic[A]): """ accelerator: A + # TODO(wcromar): make this a datetime time_out_in_min: Optional[int] = attrs.field(default=None, kw_only=True) task_owner: str = attrs.field(default='unowned', kw_only=True) @@ -389,6 +390,8 @@ def from_pytorch(test_name: str): entrypoint_script=test['entrypoint'], test_command=test['command'], num_hosts=test['accelerator']['num_hosts'], + # `timeout` is in seconds + time_out_in_min=test['timeout'] // 60, ) @property From 9907376452da07384bd3832ed9d1ebf317eefcfa Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 18:46:43 +0000 Subject: [PATCH 20/29] typo Change-Id: I0b0716b19bfbade576be13df7cec92ef286e0543 --- xlml/apis/test_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index 87c52e41..7b6bba15 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -127,7 +127,7 @@ class TestConfig(abc.ABC, Generic[A]): """ accelerator: A - # TODO(wcromar): make this a datetime + # TODO(wcromar): make this a timedelta time_out_in_min: Optional[int] = attrs.field(default=None, kw_only=True) task_owner: str = attrs.field(default='unowned', kw_only=True) From bb2714870b6d558036609bdfece74fd78bee45df Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 18:46:58 +0000 Subject: [PATCH 21/29] formatting Change-Id: I4fd2f13f8faa6705c7217f321562a42a9bce5345 --- xlml/apis/task.py | 4 +++- xlml/apis/test_config.py | 12 +++++++----- xlml/utils/gpu.py | 14 ++++++++------ 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index f0f4ef55..1a70a3ba 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -552,7 +552,9 @@ def _get_job_manifest(self): }, }, "spec": { - "activeDeadlineSeconds": datetime.timedelta(minutes=self.task_test_config.time_out_in_min or 60).total_seconds(), + "activeDeadlineSeconds": datetime.timedelta( + minutes=self.task_test_config.time_out_in_min or 60 + ).total_seconds(), "backoffLimit": 0, "completionMode": "Indexed", "completions": self.task_test_config.num_hosts, diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index 7b6bba15..b545abba 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -343,11 +343,13 @@ def setup_script(self) -> Optional[str]: # TODO(wcromar): replace configmaps @property def test_script(self) -> str: - return '\n'.join([ - 'set -xue', - self.exports, - ' '.join(shlex.quote(s) for s in self.test_command), - ]) + return '\n'.join( + [ + 'set -xue', + self.exports, + ' '.join(shlex.quote(s) for s in self.test_command), + ] + ) @attrs.define diff --git a/xlml/utils/gpu.py b/xlml/utils/gpu.py index 70bf5136..f3dcaed4 100644 --- a/xlml/utils/gpu.py +++ b/xlml/utils/gpu.py @@ -307,12 +307,14 @@ def create_resource( image = get_image_from_family(project=image_project, family=image_family) disk_type = f"zones/{gcp.zone}/diskTypes/pd-ssd" disks = [disk_from_image(disk_type, 100, True, image.self_link)] - metadata = create_metadata({ - # "install-nvidia-driver": "True", - "install-nvidia-driver": "False", - "proxy-mode": "project_editors", - "ssh-keys": f"cloud-ml-auto-solutions:{ssh_keys.public}", - }) + metadata = create_metadata( + { + # "install-nvidia-driver": "True", + "install-nvidia-driver": "False", + "proxy-mode": "project_editors", + "ssh-keys": f"cloud-ml-auto-solutions:{ssh_keys.public}", + } + ) acceleratorConfig = compute_v1.AcceleratorConfig( accelerator_count=accelerator.count, accelerator_type=f"projects/{gcp.project_name}/zones/{gcp.zone}/acceleratorTypes/{accelerator.accelerator_type}", From 4fa2653128cb96ca7350f59c277a13a4103f80da Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 18:51:03 +0000 Subject: [PATCH 22/29] format with correct pyink version Change-Id: Ief7e35acc1dc5a54b2a8f1b2f6a442230d973731 --- xlml/apis/test_config.py | 12 +++++------- xlml/utils/gpu.py | 14 ++++++-------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index b545abba..7b6bba15 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -343,13 +343,11 @@ def setup_script(self) -> Optional[str]: # TODO(wcromar): replace configmaps @property def test_script(self) -> str: - return '\n'.join( - [ - 'set -xue', - self.exports, - ' '.join(shlex.quote(s) for s in self.test_command), - ] - ) + return '\n'.join([ + 'set -xue', + self.exports, + ' '.join(shlex.quote(s) for s in self.test_command), + ]) @attrs.define diff --git a/xlml/utils/gpu.py b/xlml/utils/gpu.py index f3dcaed4..70bf5136 100644 --- a/xlml/utils/gpu.py +++ b/xlml/utils/gpu.py @@ -307,14 +307,12 @@ def create_resource( image = get_image_from_family(project=image_project, family=image_family) disk_type = f"zones/{gcp.zone}/diskTypes/pd-ssd" disks = [disk_from_image(disk_type, 100, True, image.self_link)] - metadata = create_metadata( - { - # "install-nvidia-driver": "True", - "install-nvidia-driver": "False", - "proxy-mode": "project_editors", - "ssh-keys": f"cloud-ml-auto-solutions:{ssh_keys.public}", - } - ) + metadata = create_metadata({ + # "install-nvidia-driver": "True", + "install-nvidia-driver": "False", + "proxy-mode": "project_editors", + "ssh-keys": f"cloud-ml-auto-solutions:{ssh_keys.public}", + }) acceleratorConfig = compute_v1.AcceleratorConfig( accelerator_count=accelerator.count, accelerator_type=f"projects/{gcp.project_name}/zones/{gcp.zone}/acceleratorTypes/{accelerator.accelerator_type}", From 3c55699ad9a936b41e47266453ff11e12db1c910 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 19:07:31 +0000 Subject: [PATCH 23/29] remove extra dag Change-Id: I29eadfea4bb89fb2a77a6c8752b5f7da6635fdef --- dags/pytorch_xla/pytorchxla_multigpu.py | 27 ------------------------- 1 file changed, 27 deletions(-) delete mode 100644 dags/pytorch_xla/pytorchxla_multigpu.py diff --git a/dags/pytorch_xla/pytorchxla_multigpu.py b/dags/pytorch_xla/pytorchxla_multigpu.py deleted file mode 100644 index 6a9a6d23..00000000 --- a/dags/pytorch_xla/pytorchxla_multigpu.py +++ /dev/null @@ -1,27 +0,0 @@ -import datetime -from airflow import models - -from dags.vm_resource import Project -from xlml.apis import gcp_config, test_config, task - - -US_CENTRAL1 = gcp_config.GCPConfig( - Project.CLOUD_ML_AUTO_SOLUTIONS.value, - # HACK: use region in place of zone, since clusters are regional - zone="us-central1", - dataset_name=..., -) - - -with models.DAG( - dag_id="pytorchxla-multigpu", - schedule=None, - tags=["pytorchxla", "latest", "supported", "xlml"], - catchup=False, - start_date=datetime.datetime(2023, 7, 12), -): - resnet_v100_2x2 = task.GpuGkeTask( - test_config.JSonnetGpuTest.from_pytorch("pt-nightly-resnet50-mp-fake-v100-x2x2"), - US_CENTRAL1, - "gpu-uc1", - ).run() From bd4099b0e2fb0b67a355acf6fd5a1556e27dcd76 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 19:19:45 +0000 Subject: [PATCH 24/29] fix data type Change-Id: I4dfcedfbd71bb17b7144254c9e6594468539aecc --- xlml/apis/task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 1a70a3ba..0de61430 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -552,9 +552,9 @@ def _get_job_manifest(self): }, }, "spec": { - "activeDeadlineSeconds": datetime.timedelta( + "activeDeadlineSeconds": int(datetime.timedelta( minutes=self.task_test_config.time_out_in_min or 60 - ).total_seconds(), + ).total_seconds()), "backoffLimit": 0, "completionMode": "Indexed", "completions": self.task_test_config.num_hosts, From e3de8895cb895acf3ba178eede7ee1540d2b7d8e Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 19:20:51 +0000 Subject: [PATCH 25/29] formatting Change-Id: Icfd3ec316e068d2d71d120685ac7857d37f43b17 --- xlml/apis/task.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 0de61430..ebe9cd76 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -552,9 +552,11 @@ def _get_job_manifest(self): }, }, "spec": { - "activeDeadlineSeconds": int(datetime.timedelta( - minutes=self.task_test_config.time_out_in_min or 60 - ).total_seconds()), + "activeDeadlineSeconds": int( + datetime.timedelta( + minutes=self.task_test_config.time_out_in_min or 60 + ).total_seconds() + ), "backoffLimit": 0, "completionMode": "Indexed", "completions": self.task_test_config.num_hosts, From 163a809a651f71c765266fef5b864380a502c060 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Tue, 20 Feb 2024 19:38:58 +0000 Subject: [PATCH 26/29] return -> raise Change-Id: Ic607c47aafb439729268a645e8210a47a7ca7ad2 --- xlml/utils/gke.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py index 0c97e816..0ab72964 100644 --- a/xlml/utils/gke.py +++ b/xlml/utils/gke.py @@ -131,7 +131,7 @@ def _watch_pod(name, namespace) -> Optional[int]: for f in concurrent.futures.as_completed(futures): exit_code = f.result() if exit_code: - return RuntimeError('Non-zero exit code') + raise RuntimeError('Non-zero exit code') return True From c11f12029e274cb5a55355fe67eb1f8abc706157 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Thu, 22 Feb 2024 18:15:54 +0000 Subject: [PATCH 27/29] add myself to a TODO Change-Id: If3bc4d60890fb0545d269433c516db9dfd538306 --- xlml/apis/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index ebe9cd76..5d355e11 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -521,7 +521,7 @@ class GpuGkeTask(BaseTask): task_gcp_config: gcp_config.GCPConfig cluster_name: str job_create_timeout: datetime.timedelta = datetime.timedelta(minutes=10) - # TODO: metrics + # TODO(wcromar): job history metrics # task_metric_config: Optional[metric_config.MetricConfig] = None def run(self) -> DAGNode: From ce82e8264f4bb8bfa4cc443cd1be67c441648be5 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Thu, 22 Feb 2024 18:16:59 +0000 Subject: [PATCH 28/29] TODO for big class Change-Id: Iad49f733c866f5309b968097cc724571de2d9e8d --- xlml/apis/task.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 5d355e11..3d80c589 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -506,6 +506,7 @@ def clean_up(self, resource: airflow.XComArg, project_id: str, zone: str) -> DAG return gpu.delete_resource.override(group_id="clean_up")(resource, project_id, zone) +# TODO(ranran): This class is big. Let's move it to a new file. @dataclasses.dataclass class GpuGkeTask(BaseTask): """This is a class to set up tasks for GPU on a GKE cluster. From 397d7cbfd0054c5bf856496746f47741598be5b6 Mon Sep 17 00:00:00 2001 From: Will Cromar Date: Thu, 22 Feb 2024 18:29:38 +0000 Subject: [PATCH 29/29] `run_model` Change-Id: I04bb3b098d789328bbf7749f70b08e5a056aad01 --- xlml/apis/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 3d80c589..08244ec4 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -535,7 +535,7 @@ def run(self) -> DAGNode: group_id=self.task_test_config.benchmark_id, prefix_group_id=True ) as group: job_body = self._get_job_manifest() - gke.run_job( + gke.run_job.override(group_id="run_model")( job_body, self.task_gcp_config, self.cluster_name, self.job_create_timeout )