Skip to content

Commit

Permalink
Multi-GPU tests on GKE
Browse files Browse the repository at this point in the history
Change-Id: I8b6ef0f096d965d6538e1de3b9699bb271e15232
  • Loading branch information
will-cromar committed Feb 12, 2024
1 parent 66f2fc4 commit 37893a5
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 0 deletions.
142 changes: 142 additions & 0 deletions dags/pytorch_xla/pytorchxla_multigpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import datetime
from airflow import models

from xlml.utils import gke

job_yaml = """
"apiVersion": "batch/v1"
"kind": "Job"
"metadata":
"generateName": "pt-nightly-resnet50-mp-fake-v100-x2-"
"labels":
"accelerator": "v100-x2"
"benchmarkId": "pt-nightly-resnet50-mp-fake-v100-x2"
"frameworkVersion": "pt-nightly"
"mode": "fake"
"model": "resnet50-mp"
"spec":
"activeDeadlineSeconds": 10800
"backoffLimit": 0
"completionMode": "Indexed"
"completions": 2
"parallelism": 2
"template":
metadata:
labels:
"headless-svc": 'true'
"spec":
"subdomain": headless-svc
"containers":
- command:
- bash
- -cxeu
- |
export PATH=/usr/local/nvidia/bin${PATH:+:${PATH}}
export LD_LIBRARY_PATH=/usr/local/nvidia/lib64:/usr/local/nvidia/lib${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}}
nvidia-smi
pip3 uninstall -y torch torchvision
pip3 install --pre torch torchvision --index-url https://download.pytorch.org/whl/nightly/cpu
# job_name=$(JOB_NAME)
# ip=$(getent hosts ${job_name}-0.headless-svc | awk {'print $1'})
# echo ip: ${ip}
# torchrun --nnodes=2 --node_rank=$(JOB_COMPLETION_INDEX) --nproc_per_node=2 --rdzv_endpoint=${ip}:12355 /src/xla-master/test/test_train_mp_imagenet.py --model=resnet50 --log_steps=200 --fake_data --nometrics_debug --pjrt_distributed
echo "${@:0}"
# bash
"args":
# - "bash"
- "torchrun"
- "--nnodes=2"
- "--node_rank=$(JOB_COMPLETION_INDEX)"
- "--nproc_per_node=2"
- "--rdzv_endpoint=$(JOB_NAME)-0.headless-svc:12355"
- "/src/xla-master/test/test_train_mp_imagenet.py"
- "--model=resnet50"
- "--log_steps=200"
- "--fake_data"
- "--nometrics_debug"
- "--num_epochs=1"
- "--pjrt_distributed"
# stdin: true
# tty: true
"env":
- "name": "POD_NAME"
"valueFrom":
"fieldRef":
"fieldPath": "metadata.name"
# - "name": "POD_UID"
# "valueFrom":
# "fieldRef":
# "fieldPath": "metadata.uid"
- "name": "POD_NAMESPACE"
"valueFrom":
"fieldRef":
"fieldPath": "metadata.namespace"
- "name": "JOB_NAME"
"valueFrom":
"fieldRef":
"fieldPath": "metadata.labels['job-name']"
# - "name": "MODEL_DIR"
# "value": "$(OUTPUT_BUCKET)/pt-nightly/resnet50-mp/fake/v100-x2/$(JOB_NAME)"
# - "name": "GPU_NUM_DEVICES"
# "value": "2"
- name: PJRT_DEVICE
value: CUDA
# - "name": "XLA_USE_BF16"
# "value": "0"
"image": us-central1-docker.pkg.dev/tpu-pytorch-releases/docker/xla:nightly_3.8_cuda_12.1
# "image": us-central1-docker.pkg.dev/tpu-pytorch-releases/docker/xla:r2.2.0_3.10_cuda_12.1
"imagePullPolicy": "Always"
"name": "train"
"resources":
"limits":
"nvidia.com/gpu": 2
"requests":
"cpu": "7.0"
"memory": "40Gi"
"volumeMounts":
- "mountPath": "/dev/shm"
"name": "dshm"
"readOnly": false
- "mountPath": "/src"
"name": "dshm"
"readOnly": false
initContainers:
- name: clone
image: alpine
command:
- sh
- -c
- |
cd /src
wget https://github.com/pytorch/xla/archive/refs/heads/master.tar.gz -O - | tar xzf -
volumeMounts:
- "mountPath": "/src"
"name": "dshm"
"readOnly": false
"nodeSelector":
"cloud.google.com/gke-accelerator": "nvidia-tesla-v100"
"restartPolicy": "Never"
"volumes":
- "emptyDir":
"medium": "Memory"
"name": "dshm"
- "emptyDir":
"medium": "Memory"
"name": "src"
"ttlSecondsAfterFinished": 604800
"""


with models.DAG(
dag_id="pytorchxla-multigpu",
schedule=None,
tags=["pytorchxla", "latest", "supported", "xlml"],
catchup=False,
start_date=datetime.datetime(2023, 7, 12),
):
resnet_v100_2x2 = gke.deploy_job(job_yaml)
77 changes: 77 additions & 0 deletions deployment/clusters.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
resource "google_container_cluster" "gpu-uc1" {
name = "wcromar-test-cluster"
project = "cloud-ml-auto-solutions"
location = "us-central1"

release_channel {
channel = "RAPID"
}

# We can't create a cluster with no node pool defined, but we want to only use
# separately managed node pools. So we create the smallest possible default
# node pool and immediately delete it.
remove_default_node_pool = true
initial_node_count = 1
}

resource "google_container_node_pool" "primary" {
name = "primary-pool"
project = google_container_cluster.gpu-uc1.project
location = google_container_cluster.gpu-uc1.location
cluster = google_container_cluster.gpu-uc1.name
node_count = 1

management {
auto_repair = true
auto_upgrade = true
}

node_config {
preemptible = true
machine_type = "e2-medium"

# Google recommends custom service accounts that have cloud-platform scope and permissions granted via IAM Roles.
# TODO: custom service account?
# service_account = google_service_account.default.email
oauth_scopes = [
"https://www.googleapis.com/auth/cloud-platform"
]
}
}


resource "google_container_node_pool" "nvidia-v100x2" {
name = "nvidia-v100x2-pool"
project = google_container_cluster.gpu-uc1.project
location = google_container_cluster.gpu-uc1.location
cluster = google_container_cluster.gpu-uc1.name
node_count = 3

node_locations = [
"us-central1-b"
]

management {
auto_repair = true
auto_upgrade = true
}

node_config {
preemptible = true
machine_type = "n1-highmem-16"

# Google recommends custom service accounts that have cloud-platform scope and permissions granted via IAM Roles.
# TODO: custom service account?
# service_account = google_service_account.default.email
oauth_scopes = [
"https://www.googleapis.com/auth/cloud-platform"
]
guest_accelerator {
type = "nvidia-tesla-v100"
count = 2
gpu_driver_installation_config {
gpu_driver_version = "LATEST"
}
}
}
}
90 changes: 90 additions & 0 deletions xlml/utils/gke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import base64
import concurrent.futures
import logging
import tempfile

from airflow.decorators import task
import google.auth
import google.auth.transport.requests
from google.cloud import container_v1
import kubernetes
import yaml


def get_authenticated_client(project_id: str, region: str, cluster_name: str) -> kubernetes.client.ApiClient:
container_client = container_v1.ClusterManagerClient()
cluster_path = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}"
response = container_client.get_cluster(name=cluster_path)
creds, _ = google.auth.default()
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)
configuration = kubernetes.client.Configuration()
configuration.host = f"https://{response.endpoint}"
with tempfile.NamedTemporaryFile(delete=False) as ca_cert:
ca_cert.write(base64.b64decode(response.master_auth.cluster_ca_certificate))
configuration.ssl_ca_cert = ca_cert.name
configuration.api_key_prefix["authorization"] = "Bearer"
configuration.api_key["authorization"] = creds.token

return kubernetes.client.ApiClient(configuration)

@task
def deploy_job(job_yaml: str):
client = get_authenticated_client('cloud-ml-auto-solutions', 'us-central1', 'wcromar-test-cluster')

body = yaml.safe_load(job_yaml)
jobs_client = kubernetes.client.BatchV1Api(client)
resp = jobs_client.create_namespaced_job(namespace='default', body=body)

print(resp)
print(type(resp))

core_v1 = kubernetes.client.CoreV1Api(client)

pod_label_selector = "controller-uid=" + resp.metadata.uid
pods = core_v1.list_namespaced_pod(namespace='default', label_selector=pod_label_selector)
print(pods)


def _watch_pod(name, namespace):
logs_watcher = kubernetes.watch.Watch()

while True:
logging.info('Waiting for pod %s to start...', name)
pod_watcher = kubernetes.watch.Watch()
for event in pod_watcher.stream(core_v1.list_namespaced_pod, namespace,
field_selector=f'metadata.name={name}'):
status = event['object'].status
logging.info('Pod %s status: %s', event['object'].metadata.name, status.phase)
if status.phase != 'Pending':
break

if status.container_statuses:
container_status = status.container_statuses[0]
if status.container_statuses[0].state.terminated:
exit_code = container_status.state.terminated.exit_code
if exit_code:
logging.error('Pod %s had non-zero exit code %d', name, exit_code)

return exit_code

logging.info('Streaming pod logs for %s...', name)
for line in logs_watcher.stream(core_v1.read_namespaced_pod_log,
name, namespace, _request_timeout=3600):
logging.info('%s] %s', name, line)

logging.warning('Lost logs stream for %s.', name)

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for pod in pods.items:
f = executor.submit(_watch_pod, pod.metadata.name, pod.metadata.namespace)
futures.append(f)

# Wait for pods to complete, and exit with the first non-zero exit code.
for f in concurrent.futures.as_completed(futures):
exit_code = f.result()
if exit_code:
print('bad', exit_code)


0 comments on commit 37893a5

Please sign in to comment.