diff --git a/dags/pytorch_xla/pytorchxla_multigpu.py b/dags/pytorch_xla/pytorchxla_multigpu.py new file mode 100644 index 000000000..bb7c28e2f --- /dev/null +++ b/dags/pytorch_xla/pytorchxla_multigpu.py @@ -0,0 +1,142 @@ +import datetime +from airflow import models + +from xlml.utils import gke + +job_yaml = """ +"apiVersion": "batch/v1" +"kind": "Job" +"metadata": + "generateName": "pt-nightly-resnet50-mp-fake-v100-x2-" + "labels": + "accelerator": "v100-x2" + "benchmarkId": "pt-nightly-resnet50-mp-fake-v100-x2" + "frameworkVersion": "pt-nightly" + "mode": "fake" + "model": "resnet50-mp" +"spec": + "activeDeadlineSeconds": 10800 + "backoffLimit": 0 + "completionMode": "Indexed" + "completions": 2 + "parallelism": 2 + "template": + metadata: + labels: + "headless-svc": 'true' + "spec": + "subdomain": headless-svc + "containers": + - command: + - bash + - -cxeu + - | + export PATH=/usr/local/nvidia/bin${PATH:+:${PATH}} + export LD_LIBRARY_PATH=/usr/local/nvidia/lib64:/usr/local/nvidia/lib${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}} + + nvidia-smi + pip3 uninstall -y torch torchvision + pip3 install --pre torch torchvision --index-url https://download.pytorch.org/whl/nightly/cpu + + # job_name=$(JOB_NAME) + # ip=$(getent hosts ${job_name}-0.headless-svc | awk {'print $1'}) + # echo ip: ${ip} + + # torchrun --nnodes=2 --node_rank=$(JOB_COMPLETION_INDEX) --nproc_per_node=2 --rdzv_endpoint=${ip}:12355 /src/xla-master/test/test_train_mp_imagenet.py --model=resnet50 --log_steps=200 --fake_data --nometrics_debug --pjrt_distributed + + echo "${@:0}" + + # bash + "args": + # - "bash" + - "torchrun" + - "--nnodes=2" + - "--node_rank=$(JOB_COMPLETION_INDEX)" + - "--nproc_per_node=2" + - "--rdzv_endpoint=$(JOB_NAME)-0.headless-svc:12355" + - "/src/xla-master/test/test_train_mp_imagenet.py" + - "--model=resnet50" + - "--log_steps=200" + - "--fake_data" + - "--nometrics_debug" + - "--num_epochs=1" + - "--pjrt_distributed" + # stdin: true + # tty: true + "env": + - "name": "POD_NAME" + "valueFrom": + "fieldRef": + "fieldPath": "metadata.name" + # - "name": "POD_UID" + # "valueFrom": + # "fieldRef": + # "fieldPath": "metadata.uid" + - "name": "POD_NAMESPACE" + "valueFrom": + "fieldRef": + "fieldPath": "metadata.namespace" + - "name": "JOB_NAME" + "valueFrom": + "fieldRef": + "fieldPath": "metadata.labels['job-name']" + # - "name": "MODEL_DIR" + # "value": "$(OUTPUT_BUCKET)/pt-nightly/resnet50-mp/fake/v100-x2/$(JOB_NAME)" + # - "name": "GPU_NUM_DEVICES" + # "value": "2" + - name: PJRT_DEVICE + value: CUDA + # - "name": "XLA_USE_BF16" + # "value": "0" + "image": us-central1-docker.pkg.dev/tpu-pytorch-releases/docker/xla:nightly_3.8_cuda_12.1 + # "image": us-central1-docker.pkg.dev/tpu-pytorch-releases/docker/xla:r2.2.0_3.10_cuda_12.1 + "imagePullPolicy": "Always" + "name": "train" + "resources": + "limits": + "nvidia.com/gpu": 2 + "requests": + "cpu": "7.0" + "memory": "40Gi" + "volumeMounts": + - "mountPath": "/dev/shm" + "name": "dshm" + "readOnly": false + - "mountPath": "/src" + "name": "dshm" + "readOnly": false + initContainers: + - name: clone + image: alpine + command: + - sh + - -c + - | + cd /src + wget https://github.com/pytorch/xla/archive/refs/heads/master.tar.gz -O - | tar xzf - + volumeMounts: + - "mountPath": "/src" + "name": "dshm" + "readOnly": false + "nodeSelector": + "cloud.google.com/gke-accelerator": "nvidia-tesla-v100" + "restartPolicy": "Never" + "volumes": + - "emptyDir": + "medium": "Memory" + "name": "dshm" + - "emptyDir": + "medium": "Memory" + "name": "src" + "ttlSecondsAfterFinished": 604800 +""" + + +with models.DAG( + dag_id="pytorchxla-multigpu", + schedule=None, + tags=["pytorchxla", "latest", "supported", "xlml"], + catchup=False, + start_date=datetime.datetime(2023, 7, 12), +): + resnet_v100_2x2 = gke.deploy_job(job_yaml) diff --git a/deployment/clusters.tf b/deployment/clusters.tf new file mode 100644 index 000000000..51c524ade --- /dev/null +++ b/deployment/clusters.tf @@ -0,0 +1,77 @@ +resource "google_container_cluster" "gpu-uc1" { + name = "wcromar-test-cluster" + project = "cloud-ml-auto-solutions" + location = "us-central1" + + release_channel { + channel = "RAPID" + } + + # We can't create a cluster with no node pool defined, but we want to only use + # separately managed node pools. So we create the smallest possible default + # node pool and immediately delete it. + remove_default_node_pool = true + initial_node_count = 1 +} + +resource "google_container_node_pool" "primary" { + name = "primary-pool" + project = google_container_cluster.gpu-uc1.project + location = google_container_cluster.gpu-uc1.location + cluster = google_container_cluster.gpu-uc1.name + node_count = 1 + + management { + auto_repair = true + auto_upgrade = true + } + + node_config { + preemptible = true + machine_type = "e2-medium" + + # Google recommends custom service accounts that have cloud-platform scope and permissions granted via IAM Roles. + # TODO: custom service account? + # service_account = google_service_account.default.email + oauth_scopes = [ + "https://www.googleapis.com/auth/cloud-platform" + ] + } +} + + +resource "google_container_node_pool" "nvidia-v100x2" { + name = "nvidia-v100x2-pool" + project = google_container_cluster.gpu-uc1.project + location = google_container_cluster.gpu-uc1.location + cluster = google_container_cluster.gpu-uc1.name + node_count = 3 + + node_locations = [ + "us-central1-b" + ] + + management { + auto_repair = true + auto_upgrade = true + } + + node_config { + preemptible = true + machine_type = "n1-highmem-16" + + # Google recommends custom service accounts that have cloud-platform scope and permissions granted via IAM Roles. + # TODO: custom service account? + # service_account = google_service_account.default.email + oauth_scopes = [ + "https://www.googleapis.com/auth/cloud-platform" + ] + guest_accelerator { + type = "nvidia-tesla-v100" + count = 2 + gpu_driver_installation_config { + gpu_driver_version = "LATEST" + } + } + } +} diff --git a/xlml/utils/gke.py b/xlml/utils/gke.py new file mode 100644 index 000000000..0e0cb5436 --- /dev/null +++ b/xlml/utils/gke.py @@ -0,0 +1,90 @@ +import base64 +import concurrent.futures +import logging +import tempfile + +from airflow.decorators import task +import google.auth +import google.auth.transport.requests +from google.cloud import container_v1 +import kubernetes +import yaml + + +def get_authenticated_client(project_id: str, region: str, cluster_name: str) -> kubernetes.client.ApiClient: + container_client = container_v1.ClusterManagerClient() + cluster_path = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}" + response = container_client.get_cluster(name=cluster_path) + creds, _ = google.auth.default() + auth_req = google.auth.transport.requests.Request() + creds.refresh(auth_req) + configuration = kubernetes.client.Configuration() + configuration.host = f"https://{response.endpoint}" + with tempfile.NamedTemporaryFile(delete=False) as ca_cert: + ca_cert.write(base64.b64decode(response.master_auth.cluster_ca_certificate)) + configuration.ssl_ca_cert = ca_cert.name + configuration.api_key_prefix["authorization"] = "Bearer" + configuration.api_key["authorization"] = creds.token + + return kubernetes.client.ApiClient(configuration) + +@task +def deploy_job(job_yaml: str): + client = get_authenticated_client('cloud-ml-auto-solutions', 'us-central1', 'wcromar-test-cluster') + + body = yaml.safe_load(job_yaml) + jobs_client = kubernetes.client.BatchV1Api(client) + resp = jobs_client.create_namespaced_job(namespace='default', body=body) + + print(resp) + print(type(resp)) + + core_v1 = kubernetes.client.CoreV1Api(client) + + pod_label_selector = "controller-uid=" + resp.metadata.uid + pods = core_v1.list_namespaced_pod(namespace='default', label_selector=pod_label_selector) + print(pods) + + + def _watch_pod(name, namespace): + logs_watcher = kubernetes.watch.Watch() + + while True: + logging.info('Waiting for pod %s to start...', name) + pod_watcher = kubernetes.watch.Watch() + for event in pod_watcher.stream(core_v1.list_namespaced_pod, namespace, + field_selector=f'metadata.name={name}'): + status = event['object'].status + logging.info('Pod %s status: %s', event['object'].metadata.name, status.phase) + if status.phase != 'Pending': + break + + if status.container_statuses: + container_status = status.container_statuses[0] + if status.container_statuses[0].state.terminated: + exit_code = container_status.state.terminated.exit_code + if exit_code: + logging.error('Pod %s had non-zero exit code %d', name, exit_code) + + return exit_code + + logging.info('Streaming pod logs for %s...', name) + for line in logs_watcher.stream(core_v1.read_namespaced_pod_log, + name, namespace, _request_timeout=3600): + logging.info('%s] %s', name, line) + + logging.warning('Lost logs stream for %s.', name) + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [] + for pod in pods.items: + f = executor.submit(_watch_pod, pod.metadata.name, pod.metadata.namespace) + futures.append(f) + + # Wait for pods to complete, and exit with the first non-zero exit code. + for f in concurrent.futures.as_completed(futures): + exit_code = f.result() + if exit_code: + print('bad', exit_code) + +