Skip to content

Commit

Permalink
[SageMaker] Move SageMaker partition job to use SageMaker Processing …
Browse files Browse the repository at this point in the history
…instead of Training (#627)

*Issue #, if available:*

*Description of changes:*

* Convert the SageMaker partition jobs from training to processing.
* Add Docker login to SM Docker build script
* Add and pin dependencies to SM Docker container


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
thvasilo authored Nov 9, 2023
1 parent 6f8f12f commit 6563c05
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 28 deletions.
10 changes: 7 additions & 3 deletions docker/build_docker_sagemaker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,16 @@ DOCKER_FULLNAME="${IMAGE_NAME}:${TAG}"

echo "Build a sagemaker docker image ${DOCKER_FULLNAME}"

# Log in to ECR to pull Docker image
aws ecr get-login-password --region us-east-1 \
| docker login --username AWS --password-stdin 763104351884.dkr.ecr.us-east-1.amazonaws.com

if [ $IMAGE_TYPE = "gpu" ] || [ $IMAGE_TYPE = "cpu" ]; then
# User Buildkit to avoid pulling both CPU and GPU images
# Use Buildkit to avoid pulling both CPU and GPU images
DOCKER_BUILDKIT=1 docker build --build-arg DEVICE=$IMAGE_TYPE \
-f $GSF_HOME"docker/sagemaker/Dockerfile.sm" . -t $DOCKER_FULLNAME
-f "${GSF_HOME}/docker/sagemaker/Dockerfile.sm" . -t $DOCKER_FULLNAME
else
echo "Image type can only be \"gpu\" or \"cpu\", but get \""$IMAGE_TYPE"\""
echo "Image type can only be \"gpu\" or \"cpu\", but got \""$IMAGE_TYPE"\""
# remove the temporary code folder
rm -rf code
exit 1
Expand Down
10 changes: 9 additions & 1 deletion docker/sagemaker/Dockerfile.sm
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ FROM branch-${DEVICE} AS final
LABEL maintainer="Amazon AI Graph ML team"

# Install related Python packages
RUN pip3 install ogb==1.3.6 scipy pyarrow boto3 scikit-learn transformers==4.28.1 \
RUN pip3 install \
boto3 \
numba==0.58.1 \
numpy==1.26.1 \
ogb==1.3.6 \
pyarrow \
scikit-learn \
scipy \
transformers==4.28.1 \
&& rm -rf /root/.cache

# Install MPI etc needed by DistDGL
Expand Down
5 changes: 3 additions & 2 deletions python/graphstorm/sagemaker/sagemaker_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ def run_partition(job_config: PartitionJobConfig):
metadata_filename = job_config.metadata_filename
skip_partitioning = job_config.skip_partitioning == 'true'

sm_env = json.loads(os.environ['SM_TRAINING_ENV'])
with open("/opt/ml/config/resourceconfig.json", "r", encoding="utf-8") as f:
sm_env = json.load(f)
hosts = sm_env['hosts']
current_host = sm_env['current_host']
world_size = len(hosts)
Expand All @@ -239,7 +240,7 @@ def run_partition(job_config: PartitionJobConfig):
for key, val in os.environ.items():
logging.debug("%s: %s", key, val)

leader_addr = os.environ['MASTER_ADDR']
leader_addr = socket.gethostbyname('algo-1')
# sync with all instances in the cluster
if host_rank == 0:
# sync with workers
Expand Down
46 changes: 24 additions & 22 deletions sagemaker/launch/launch_partition.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
""" Launch SageMaker training task
"""
import os
import logging
from time import strftime, gmtime

import boto3 # pylint: disable=import-error
from sagemaker.pytorch.estimator import PyTorch
from sagemaker.processing import ScriptProcessor
import sagemaker

from common_parser import ( # pylint: disable=wrong-import-order
Expand All @@ -30,7 +29,7 @@ def run_job(input_args, image):
sm_task_name = input_args.task_name if input_args.task_name else timestamp
role = input_args.role # SageMaker ARN role
instance_type = input_args.instance_type # SageMaker instance type
instance_count = input_args.instance_count # Number of infernece instances
instance_count = input_args.instance_count # Number of partition instances
region = input_args.region # AWS region
entry_point = input_args.entry_point # GraphStorm training entry_point
num_parts = input_args.num_parts # Number of partitions
Expand All @@ -45,40 +44,43 @@ def run_job(input_args, image):
sagemaker_session = sagemaker.Session(boto3.Session(region_name=region))

skip_partitioning_str = "true" if input_args.skip_partitioning else "false"
params = {"graph-data-s3": graph_data_s3,
"metadata-filename": metadata_filename,
"num-parts": num_parts,
"output-data-s3": output_data_s3,
"skip-partitioning": skip_partitioning_str,
"log-level": input_args.log_level,
"partition-algorithm": input_args.partition_algorithm,}

print(f"Parameters {params}")
arguments = [
"--graph-data-s3", graph_data_s3,
"--metadata-filename", metadata_filename,
"--num-parts", num_parts,
"--output-data-s3", output_data_s3,
"--skip-partitioning", skip_partitioning_str,
"--log-level", input_args.log_level,
"--partition-algorithm", input_args.partition_algorithm,
]
arguments = [str(x) for x in arguments]

print(f"Parameters {arguments}")
if input_args.sm_estimator_parameters:
print(f"SageMaker Estimator parameters: '{input_args.sm_estimator_parameters}'")

estimator_kwargs = parse_estimator_kwargs(input_args.sm_estimator_parameters)

est = PyTorch(
disable_profiler=True,
debugger_hook_config=False,
entry_point=os.path.basename(entry_point),
source_dir=os.path.dirname(entry_point),
script_processor = ScriptProcessor(
image_uri=image,
role=role,
instance_count=instance_count,
instance_type=instance_type,
py_version="py3",
hyperparameters=params,
sagemaker_session=sagemaker_session,
command=["python3"],
base_job_name=f"gs-partition-{sm_task_name}",
sagemaker_session=sagemaker_session,
tags=[{"Key":"GraphStorm", "Value":"beta"},
{"Key":"GraphStorm_Task", "Value":"Partition"}],
container_log_level=logging.getLevelName(input_args.log_level),
**estimator_kwargs
)

est.fit(wait=not input_args.async_execution)
script_processor.run(
code=entry_point,
arguments=arguments,
inputs=[],
outputs=[],
wait=not input_args.async_execution
)

def get_partition_parser():
"""
Expand Down

0 comments on commit 6563c05

Please sign in to comment.