diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index 4158a69a95..7cbb9039d9 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -381,8 +381,9 @@ arguments. Valid values are: ``none`` (Default), ``mean``, ``median``, and ``most_frequent``. Missing values will be replaced with the respective value computed from the data. - - ``normalizer`` (String, optional): Applies a normalization to the data, after - imputation. Can take the following values: + - ``normalizer`` (String, optional): Applies a normalization to the data, after imputation. + Can take the following values: + - ``none``: (Default) Don't normalize the numerical values during encoding. - ``min-max``: Normalize each value by subtracting the minimum value from it, and then dividing it by the difference between the maximum value and the minimum. @@ -399,7 +400,7 @@ arguments. - ``kwargs``: - ``imputer`` (String, optional): Same as for ``numerical`` transformation, will - apply the ``mean`` transformation by default. + apply no imputation by default. - ``normalizer`` (String, optional): Same as for ``numerical`` transformation, no normalization is applied by default. - ``separator`` (String, optional): Same as for ``no-op`` transformation, used to separate numerical diff --git a/docs/source/gs-processing/gs-processing-getting-started.rst b/docs/source/gs-processing/gs-processing-getting-started.rst index 70d5fd3e9e..97f63086b2 100644 --- a/docs/source/gs-processing/gs-processing-getting-started.rst +++ b/docs/source/gs-processing/gs-processing-getting-started.rst @@ -138,34 +138,41 @@ make sure the produced data matches the assumptions of DGL [#f1]_. .. code-block:: bash - gs-repartition \ - --input-prefix /path/to/output/data + gs-repartition --input-prefix /path/to/output/data Once this script completes, the data are ready to be fed into DGL's distributed partitioning pipeline. -See `this guide `_ -for more details on how to use GraphStorm distributed partitioning on SageMaker. +See `this guide `_ +for more details on how to use GraphStorm distributed partitioning and training on SageMaker. See :doc:`usage/example` for a detailed walkthrough of using GSProcessing to -wrangle data into a format that's ready to be consumed by the GraphStorm/DGL -partitioning pipeline. +wrangle data into a format that's ready to be consumed by the GraphStorm +distributed training pipeline. -Using with Amazon SageMaker ---------------------------- +Running on AWS resources +------------------------ -To run distributed jobs on Amazon SageMaker we will have to build a Docker image +GSProcessing supports Amazon SageMaker and EMR Serverless as execution environments. +To run distributed jobs on AWS resources we will have to build a Docker image and push it to the Amazon Elastic Container Registry, which we cover in :doc:`usage/distributed-processing-setup` and run a SageMaker Processing -job which we describe in :doc:`usage/amazon-sagemaker`. +job which we describe in :doc:`usage/amazon-sagemaker`, or EMR Serverless +job that is covered in :doc:`usage/emr-serverless`. + + +Input configuration +------------------- + +GSProcessing supports both the GConstruct JSON configuration format, +as well as its own GSProcessing config. You can learn about the +GSProcessing JSON configuration in :doc:`developer/input-configuration`. Developer guide --------------- To get started with developing the package refer to :doc:`developer/developer-guide`. -To see the input configuration format that GSProcessing uses internally see -:doc:`developer/input-configuration`. .. rubric:: Footnotes diff --git a/docs/source/gs-processing/usage/amazon-sagemaker.rst b/docs/source/gs-processing/usage/amazon-sagemaker.rst index 8ab8f65bec..96522d9cf1 100644 --- a/docs/source/gs-processing/usage/amazon-sagemaker.rst +++ b/docs/source/gs-processing/usage/amazon-sagemaker.rst @@ -1,7 +1,7 @@ Running distributed jobs on Amazon SageMaker ============================================ -Once the :doc:`distributed processing setup ` is complete, we can +Once the :doc:`Amazon SageMaker setup ` is complete, we can use the Amazon SageMaker launch scripts to launch distributed processing jobs that use AWS resources. @@ -9,35 +9,7 @@ To demonstrate the usage of GSProcessing on Amazon SageMaker, we will execute th execution example, but this time use Amazon SageMaker to provide the compute resources instead of our local machine. -Upload data to S3 ------------------ - -Amazon SageMaker uses S3 as its storage target, so before starting -we'll need to upload our test data to S3. To do so you will need -to have read/write access to an S3 bucket, and the requisite AWS credentials -and permissions. - -We will use the AWS CLI to upload data so make sure it is -`installed `_ -and `configured `_ -in you local environment. - -Assuming ``graphstorm/graphstorm-processing`` is our current working -directory we can upload the test data to S3 using: - -.. code-block:: bash - - MY_BUCKET="enter-your-bucket-name-here" - REGION="bucket-region" # e.g. us-west-2 - aws --region ${REGION} s3 sync ./tests/resources/small_heterogeneous_graph/ \ - "${MY_BUCKET}/gsprocessing-input" - -.. note:: - - Make sure you are uploading your data to a bucket - that was created in the same region as the ECR image - you pushed in :doc:`distributed-processing-setup`. - +Before starting make sure you have uploaded the input data as described in :ref:`gsp-upload-data-ref`. Launch the GSProcessing job on Amazon SageMaker ----------------------------------------------- @@ -49,7 +21,7 @@ to run a GSProcessing job on Amazon SageMaker. For this example we'll use a SageMaker Spark cluster with 2 ``ml.t3.xlarge`` instances since this is a tiny dataset. Using SageMaker you'll be able to create clusters of up to 20 instances, allowing you to scale your processing to massive graphs, -using larger instances like `ml.r5.24xlarge`. +using larger instances like ``ml.r5.24xlarge``. Since we're now executing on AWS, we'll need access to an execution role for SageMaker and the ECR image URI we created in :doc:`distributed-processing-setup`. @@ -65,31 +37,22 @@ job, followed by the re-partitioning job, both on SageMaker: MY_BUCKET="enter-your-bucket-name-here" SAGEMAKER_ROLE_NAME="enter-your-sagemaker-execution-role-name-here" REGION="bucket-region" # e.g. us-west-2 - DATASET_S3_PATH="s3://${MY_BUCKET}/gsprocessing-input" + INPUT_PREFIX="s3://${MY_BUCKET}/gsprocessing-input" OUTPUT_BUCKET=${MY_BUCKET} - DATASET_NAME="small-graph" + GRAPH_NAME="small-graph" CONFIG_FILE="gconstruct-config.json" INSTANCE_COUNT="2" INSTANCE_TYPE="ml.t3.xlarge" NUM_FILES="4" - IMAGE_URI="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/graphstorm-processing:0.1.0" + IMAGE_URI="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/graphstorm-processing-sagemaker:0.2.1" ROLE="arn:aws:iam::${ACCOUNT}:role/service-role/${SAGEMAKER_ROLE_NAME}" - OUTPUT_PREFIX="s3://${OUTPUT_BUCKET}/gsprocessing/${DATASET_NAME}/${INSTANCE_COUNT}x-${INSTANCE_TYPE}-${NUM_FILES}files/" - - # Conditionally delete data at output - echo "Delete all data under output path? ${OUTPUT_PREFIX}" - select yn in "Yes" "No"; do - case $yn in - Yes ) aws s3 rm --recursive ${OUTPUT_PREFIX} --quiet; break;; - No ) break;; - esac - done + OUTPUT_PREFIX="s3://${OUTPUT_BUCKET}/gsprocessing/sagemaker/${GRAPH_NAME}/${INSTANCE_COUNT}x-${INSTANCE_TYPE}-${NUM_FILES}files/" # This will run and block until the GSProcessing job is done python scripts/run_distributed_processing.py \ - --s3-input-prefix ${DATASET_S3_PATH} \ + --s3-input-prefix ${INPUT_PREFIX} \ --s3-output-prefix ${OUTPUT_PREFIX} \ --role ${ROLE} \ --image ${IMAGE_URI} \ @@ -97,7 +60,7 @@ job, followed by the re-partitioning job, both on SageMaker: --config-filename ${CONFIG_FILE} \ --instance-count ${INSTANCE_COUNT} \ --instance-type ${INSTANCE_TYPE} \ - --job-name "${DATASET_NAME}-${INSTANCE_COUNT}x-${INSTANCE_TYPE//./-}-${NUM_FILES}files" \ + --job-name "${GRAPH_NAME}-${INSTANCE_COUNT}x-${INSTANCE_TYPE//./-}-${NUM_FILES}files" \ --num-output-files ${NUM_FILES} \ --wait-for-job @@ -150,5 +113,6 @@ Run distributed partitioning and training on Amazon SageMaker ------------------------------------------------------------- With the data now processed you can follow the -`GraphStorm Amazon SageMaker guide `_ +`GraphStorm Amazon SageMaker guide +`_ to partition your data and run training on AWS. diff --git a/docs/source/gs-processing/usage/distributed-processing-setup.rst b/docs/source/gs-processing/usage/distributed-processing-setup.rst index e6ca745bba..d003b93579 100644 --- a/docs/source/gs-processing/usage/distributed-processing-setup.rst +++ b/docs/source/gs-processing/usage/distributed-processing-setup.rst @@ -1,5 +1,5 @@ -GraphStorm Processing setup for Amazon SageMaker -================================================ +GraphStorm Processing Distributed Setup +======================================= In this guide we'll demonstrate how to prepare your environment to run GraphStorm Processing (GSProcessing) jobs on Amazon SageMaker. @@ -15,7 +15,7 @@ The steps required are: - Set up AWS access. - Build the GraphStorm Processing image using Docker. - Push the image to the Amazon Elastic Container Registry (ECR). -- Launch a SageMaker Processing job using the example scripts. +- Launch a SageMaker Processing or EMR Serverless job using the example scripts. Clone the GraphStorm repository ------------------------------- @@ -87,17 +87,25 @@ Once Docker and Poetry are installed, and your AWS credentials are set up, we can use the provided scripts in the ``graphstorm-processing/docker`` directory to build the image. +GSProcessing supports Amazon SageMaker and EMR Serverless as +execution environments, so we need to choose which image we want +to build first. + The ``build_gsprocessing_image.sh`` script can build the image -locally and tag it. For example, assuming our current directory is where -we cloned ``graphstorm/graphstorm-processing``: +locally and tag it, provided the intended execution environment, +using the ``-e/--environment`` argument. The supported environments +are ``sagemaker`` and ``emr-serverless``. +For example, assuming our current directory is where +we cloned ``graphstorm/graphstorm-processing``, we can use +the following to build the SageMaker image: .. code-block:: bash - bash docker/build_gsprocessing_image.sh + bash docker/build_gsprocessing_image.sh --environment sagemaker -The above will use the Dockerfile of the latest available GSProcessing version, -build an image and tag it as ``graphstorm-processing:${VERSION}`` where -``${VERSION}`` will take be the latest available GSProcessing version (e.g. ``0.1.0``). +The above will use the SageMaker-specific Dockerfile of the latest available GSProcessing version, +build an image and tag it as ``graphstorm-processing-sagemaker:${VERSION}`` where +``${VERSION}`` will take be the latest available GSProcessing version (e.g. ``0.2.1``). The script also supports other arguments to customize the image name, tag and other aspects of the build. See ``bash docker/build_gsprocessing_image.sh --help`` @@ -109,28 +117,71 @@ Push the image to the Amazon Elastic Container Registry (ECR) Once the image is built we can use the ``push_gsprocessing_image.sh`` script that will create an ECR repository if needed and push the image we just built. -The script does not require any arguments and by default will -create a repository named ``graphstorm-processing`` in the ``us-west-2`` region, +The script again requires us to provide the intended execution environment using +the ``-e/--environment`` argument, +and by default will create a repository named ``graphstorm-processing-`` in the ``us-west-2`` region, on the default AWS account ``aws-cli`` is configured for, and push the image tagged with the latest version of GSProcessing. The script supports 4 optional arguments: -1. Image name/repository. (``-i/--image``) Default: ``graphstorm-processing`` -2. Image tag. Default: (``-v/--version``) ```` e.g. ``0.1.0``. -3. ECR region. Default: (``-r/--region``) ``us-west-2``. +1. Image name/repository. (``-i/--image``) Default: ``graphstorm-processing-`` +2. Image tag. (``-v/--version``) Default: ```` e.g. ``0.2.1``. +3. ECR region. (``-r/--region``) Default: ``us-west-2``. 4. AWS Account ID. (``-a/--account``) Default: Uses the account ID detected by the ``aws-cli``. Example: .. code-block:: bash - bash push_gsprocessing_image.sh -i "graphstorm-processing" -v "0.1.0" -r "us-west-2" -a "1234567890" + bash docker/push_gsprocessing_image.sh -e sagemaker -i "graphstorm-processing" -v "0.2.1" -r "us-west-2" -a "1234567890" + +.. _gsp-upload-data-ref: + +Upload data to S3 +----------------- + +For distributed jobs we use S3 as our storage source and target, so before +running any example +we'll need to upload our data to S3. To do so you will need +to have read/write access to an S3 bucket, and the requisite AWS credentials +and permissions. + +We will use the AWS CLI to upload data so make sure it is +`installed `_ +and `configured `_ +in you local environment. + +Assuming ``graphstorm/graphstorm-processing`` is our current working +directory we can upload the data to S3 using: + +.. code-block:: bash + + MY_BUCKET="enter-your-bucket-name-here" + REGION="bucket-region" # e.g. us-west-2 + aws --region ${REGION} s3 sync ./tests/resources/small_heterogeneous_graph/ \ + "s3://${MY_BUCKET}/gsprocessing-input" + +.. note:: + Make sure you are uploading your data to a bucket + that was created in the same region as the ECR image + you pushed. Launch a SageMaker Processing job using the example scripts. ------------------------------------------------------------ Once the setup is complete, you can follow the :doc:`SageMaker Processing job guide ` -to launch your distributed processing job using AWS resources. +to launch your distributed processing job using Amazon SageMaker resources. + +Launch an EMR Serverless job using the example scripts. +------------------------------------------------------------ + +In addition to Amazon SageMaker you can also use EMR Serverless +as an execution environment to allow you to scale to even larger datasets +(recommended when your graph has 30B+ edges). +Its setup is more involved than Amazon SageMaker, so we only recommend +it for experienced AWS users. +Follow the :doc:`EMR Serverless job guide ` +to launch your distributed processing job using EMR Serverless resources. diff --git a/docs/source/gs-processing/usage/emr-serverless.rst b/docs/source/gs-processing/usage/emr-serverless.rst new file mode 100644 index 0000000000..af987a2519 --- /dev/null +++ b/docs/source/gs-processing/usage/emr-serverless.rst @@ -0,0 +1,277 @@ +Running distributed jobs on EMR Serverless +========================================== + +Once the :doc:`distributed processing setup ` is complete, +and we have built and pushed an EMR Serverless image tagged as ``graphstorm-processing-emr-serverless``, we can +set up our execution environment for EMR Serverless (EMR-S). If you're not familiar with EMR-S +we suggest going through its `introductory documentation `_ +to familiarize yourself with its concepts. + +In summary, we will set up an EMR-S `Application`, which we will configure to use our EMR-S +image, and then we'll demonstrate how we can launch jobs using the EMR-S application we created. + +.. note:: + + Because the set-up of EMR-S involves role creation and modifying the permissions of our ECR repository, + we will need access to a role with IAM access, usually an administrative role. + +Follow EMR Serverless set-up +---------------------------- + +To get started with EMR-S we will need to have an administrative user, +and use it to create the required roles and policies for EMR-S. +To do so follow the EMR-S `Setting up guide +`_. + +Create an job runtime role for EMR Serverless +--------------------------------------------- + +To be able to run EMR-S jobs we will need access to a role that +is configured with access to the S3 bucket we will use. + +Follow the `Create a job runtime role +`_ +guide to create such a role. You can replace ``DOC-EXAMPLE-BUCKET`` with the bucket you used +to upload your test data in :ref:`gsp-upload-data-ref`. + +Ensure EMR-S service role can access the ECR repository +------------------------------------------------------- + +To ensure we can create EMR-S applications and run jobs +using our custom image, we need to give the EMR-S service +role the ability to pull the image from our ECR repository. + +To do so we need to add ECR actions to the entity that +creates the EMR-S applications, and configure our ECR +repository to provide access to our +EMR-S application. + +To ensure the entity that creates the EMR-S application +can perform ECR actions, follow the +`Prerequisites `_ +part of the `Customizing an image` EMR-S guide. If you're using +an administrative user to work through this process you might +already have full ECR access. + +If not using an administrative user, the relevant policy to attach to the role/user +you are using would be: + +.. code-block:: json + + { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "ECRRepositoryListGetPolicy", + "Effect": "Allow", + "Action": [ + "ecr:GetDownloadUrlForLayer", + "ecr:BatchGetImage", + "ecr:DescribeImages" + ], + "Resource": "" + } + ] + } + +Create an EMR-S application that uses our custom image +------------------------------------------------------ + +Next we will need to create an EMR-S application that +uses our custom image. +For a general guide see the +`official docs `_. + +Here we will just show the custom image application creation using the AWS CLI: + +.. code-block:: bash + + aws emr-serverless create-application \ + --name gsprocessing-0.2.1 \ + --release-label emr-6.11.0 \ + --type SPARK \ + --image-configuration '{ + "imageUri": ".dkr.ecr..amazonaws.com/graphstorm-processing-emr-serverless:0.2.1" + }' + +Here you will need to replace ```` and ```` with the correct values +from the image you just created. GSProcessing version ``0.2.1`` uses ``emr-6.11.0`` as its +base image, so we need to ensure our application uses the same release. + + +Allow EMR Serverless to access the custom image repository +---------------------------------------------------------- + +Finally we need to provide the EMR-S service Principal access +to the `graphstorm-processing-emr-serverless` ECR image repository, +for which we will need to modify the repository's policy statement. + +As shown in the +`EMR docs `_, +once we have the EMR-S Application ID (from creating the application in the previous step) +we can use it to limit access to the repository to that particular application. + +The policy we need to set would be the following: + +.. code-block:: json + + { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "Emr Serverless Custom Image Support", + "Effect": "Allow", + "Principal": { + "Service": "emr-serverless.amazonaws.com" + }, + "Action": [ + "ecr:BatchGetImage", + "ecr:DescribeImages", + "ecr:GetDownloadUrlForLayer" + ], + "Condition":{ + "StringEquals":{ + "aws:SourceArn": "arn:aws:emr-serverless:::/applications/" + } + } + } + ] + } + +Where you would need to replace values for ````, ````, and ````. + +See `Setting a private repository policy statement `_ +for how to set a repository policy. + + +Running GSProcessing jobs on EMR Serverless +------------------------------------------- + +With all the setup complete we should now have the following: + +* An ECR repository where we have pushed the GSProcessing EMR-S image, + and to which we have provided access to the EMR-S application we just created. +* An EMR-S application that uses our custom image. +* An execution role that our EMR-S jobs will use when we launch them. + +To launch the same example job as we demonstrate in the :doc:`SageMaker Processing job guide ` +you can use the following ``bash`` snippet. Note that we use ``jq`` to wrangle JSON data, +which you can download from its `official website `_, +install using your package manager, or by running ``pip install jq``. + +Before starting the job, make sure you have uploaded the input data +as described in :ref:`gsp-upload-data-ref`. + +.. code-block:: bash + + APPLICATION_ID="enter-your-application-id-here" + ACCOUNT=$(aws sts get-caller-identity --query Account --output text) + MY_BUCKET="enter-your-bucket-name-here" + EMR_S_ROLE_NAME="enter-your-emr-serverless-execution-role-name-here" + REGION="bucket-region" # e.g. us-west-2 + INPUT_PREFIX="s3://${MY_BUCKET}/gsprocessing-input" + OUTPUT_BUCKET=${MY_BUCKET} + GRAPH_NAME="small-graph" + CONFIG_FILE="gconstruct-config.json" + NUM_FILES="4" + GSP_HOME="enter/path/to/graphstorm/graphstorm-processing/" + + LOCAL_ENTRY_POINT=$GSP_HOME/graphstorm_processing/distributed_executor.py + S3_ENTRY_POINT="s3://${OUTPUT_BUCKET}/emr-serverless-scripts/distributed_executor.py" + + ROLE="arn:aws:iam::${ACCOUNT}:role/${EMR_S_ROLE_NAME}" + + export OUTPUT_PREFIX="s3://${OUTPUT_BUCKET}/gsprocessing/emr-s/${GRAPH_NAME}/${NUM_FILES}files/" + + # Copy entry point script to S3 to ensure latest version is used + aws s3 cp $LOCAL_ENTRY_POINT $S3_ENTRY_POINT + + # Construct arguments JSON string using jq + ARGS_JSON=$( jq -n \ + --arg entry "$S3_ENTRY_POINT" \ + --arg in "$INPUT_PREFIX" \ + --arg out "$OUTPUT_PREFIX" \ + --arg cfg "$CONFIG_FILE" \ + --arg nfiles "$NUM_FILES" \ + --arg gname "$GRAPH_NAME" \ + '{ + sparkSubmit: { + entryPoint: $entry, + entryPointArguments: + ["--input-prefix", $in, + "--output-prefix", $out, + "--config-file", $cfg, + "--num-output-files", $nfiles, + "--graph-name", $gname] + } + }' ) + + echo "Arguments JSON:" + echo $ARGS_JSON | jq -r + + echo "Starting EMR-S job..." + aws --region $REGION emr-serverless start-job-run \ + --name "gsprocessing-emr-s-example" \ + --application-id $APPLICATION_ID \ + --execution-role-arn $ROLE \ + --job-driver "${ARGS_JSON}" # Need to surround ARGS_JSON with quotes here to maintain JSON formatting + +Similar to the SageMaker example, we need to run a follow-up job to align the output with the +expectations of the DistDGL partitioning pipeline. The easiest is to run the job locally +on an instance with S3 access (where we installed GSProcessing): + +.. code-block:: bash + + gs-repartition --input-prefix ${OUTPUT_PREFIX} + +Or if your data are too large for the re-partitioning job to run locally, you can +launch a SageMaker job as below after following the :doc:`distributed processing setup ` +and building the GSProcessing SageMaker ECR image: + +.. code-block:: bash + + bash docker/build_gsprocessing_image.sh --environment sagemaker --region ${REGION} + bash docker/push_gsprocessing_image.sh --environment sagemaker --region ${REGION} + + SAGEMAKER_ROLE_NAME="enter-your-sagemaker-execution-role-name-here" + IMAGE_URI="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/graphstorm-processing-sagemaker:0.2.1" + ROLE="arn:aws:iam::${ACCOUNT}:role/service-role/${SAGEMAKER_ROLE_NAME}" + INSTANCE_TYPE="ml.t3.xlarge" + + python scripts/run_repartitioning.py --s3-input-prefix ${OUTPUT_PREFIX} \ + --role ${ROLE} --image ${IMAGE_URI} --config-filename "metadata.json" \ + --instance-type ${INSTANCE_TYPE} --wait-for-job + + + +Note that ``${OUTPUT_PREFIX}`` here will need to match the value assigned when launching +the EMR-S job, i.e. ``"s3://${OUTPUT_BUCKET}/gsprocessing/emr-s/small-graph/4files/"`` + +Examine the output +------------------ + +Once both jobs are finished we can examine the output created, which +should match the output we saw when running the same jobs locally +in :ref:`gsp-examining-output`. + + +.. code-block:: bash + + $ aws s3 ls ${OUTPUT_PREFIX} + + PRE edges/ + PRE node_data/ + PRE node_id_mappings/ + 2023-08-05 00:47:36 804 launch_arguments.json + 2023-08-05 00:47:36 11914 metadata.json + 2023-08-05 00:47:37 545 perf_counters.json + 2023-08-05 00:47:37 12082 updated_row_counts_metadata.json + + +Run distributed partitioning and training on Amazon SageMaker +------------------------------------------------------------- + +With the data now processed you can follow the +`GraphStorm Amazon SageMaker guide +`_ +to partition your data and run training on AWS. diff --git a/docs/source/gs-processing/usage/example.rst b/docs/source/gs-processing/usage/example.rst index 98c2327cbb..0034e6aa8c 100644 --- a/docs/source/gs-processing/usage/example.rst +++ b/docs/source/gs-processing/usage/example.rst @@ -258,7 +258,7 @@ to manage your own infrastructure, we recommend using GraphStorm's `SageMaker wrappers `_ that do all the hard work for you and allow you to focus on model development. In particular you can follow the GraphStorm documentation to run -`distributed partititioning on SageMaker `_. +`distributed partitioning on SageMaker `_. To run GSProcessing jobs on Amazon SageMaker we'll need to follow diff --git a/docs/source/index.rst b/docs/source/index.rst index 01da81661d..f69d323915 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -23,6 +23,7 @@ Welcome to the GraphStorm Documentation and Tutorials gs-processing/usage/example gs-processing/usage/distributed-processing-setup gs-processing/usage/amazon-sagemaker + gs-processing/usage/emr-serverless gs-processing/developer/input-configuration .. toctree:: diff --git a/graphstorm-processing/docker/0.1.0/Dockerfile.cpu b/graphstorm-processing/docker/0.1.0/sagemaker/Dockerfile.cpu similarity index 100% rename from graphstorm-processing/docker/0.1.0/Dockerfile.cpu rename to graphstorm-processing/docker/0.1.0/sagemaker/Dockerfile.cpu diff --git a/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu b/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu new file mode 100644 index 0000000000..267f986358 --- /dev/null +++ b/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu @@ -0,0 +1,55 @@ +FROM public.ecr.aws/emr-serverless/spark/emr-6.11.0:20230629-x86_64 as runtime +USER root +ENV PYTHON_VERSION=3.9.18 + +# Python won’t try to write .pyc or .pyo files on the import of source modules +# Force stdin, stdout and stderr to be totally unbuffered. Good for logging +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +ENV PYTHONIOENCODING=UTF-8 + +# Set up pyenv +ENV PYENV_ROOT="${HOME}/.pyenv" +ENV PATH="${PYENV_ROOT}/shims:${PYENV_ROOT}/bin:${PATH}" +ENV PYSPARK_DRIVER_PYTHON=${PYENV_ROOT}/shims/python +ENV PYSPARK_PYTHON=${PYENV_ROOT}/shims/python + +# TODO: These can probably all go to another builder stage? +RUN yum erase -y openssl-devel && \ + yum install -y \ + bzip2-devel\ + gcc \ + git \ + libffi-devel \ + ncurses-devel \ + openssl11-devel \ + readline-devel \ + sqlite-devel \ + sudo \ + xz-devel && \ + rm -rf /var/cache/yum +RUN git clone https://github.com/pyenv/pyenv.git ${PYENV_ROOT} && \ + pyenv install ${PYTHON_VERSION} && \ + pyenv global ${PYTHON_VERSION} + +WORKDIR /usr/lib/spark/code/ + +# Install GSProcessing requirements to pyenv Python +COPY requirements.txt requirements.txt +# Use --mount=type=cache,target=/root/.cache when Buildkit CI issue is fixed: +# https://github.com/moby/buildkit/issues/1512 +RUN pip install -r /usr/lib/spark/code/requirements.txt \ + && rm -rf /root/.cache + +# GSProcessing codebase +COPY code/ /usr/lib/spark/code/ + +FROM runtime AS prod +RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \ + rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache + +FROM runtime AS test +RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm-processing/ && rm -rf /root/.cache + +USER hadoop:hadoop +WORKDIR /home/hadoop diff --git a/graphstorm-processing/docker/build_gsprocessing_image.sh b/graphstorm-processing/docker/build_gsprocessing_image.sh index d9ffe30316..7ecf1e3094 100644 --- a/graphstorm-processing/docker/build_gsprocessing_image.sh +++ b/graphstorm-processing/docker/build_gsprocessing_image.sh @@ -13,13 +13,14 @@ Script description here. Available options: --h, --help Print this help and exit --x, --verbose Print script debug info (set -x) --t, --target Docker image target, must be one of 'prod' or 'test'. Default is 'test'. --p, --path Path to graphstorm-processing directory, default is one level above this script. --i, --image Docker image name, default is 'graphstorm-processing'. --v, --version Docker version tag, default is the library's current version (`poetry version --short`) --b, --build Docker build directory, default is '/tmp/` +-h, --help Print this help and exit +-x, --verbose Print script debug info (set -x) +-e, --environment Image execution environment. Must be one of 'emr-serverless' or 'sagemaker'. Required. +-t, --target Docker image target, must be one of 'prod' or 'test'. Default is 'test'. +-p, --path Path to graphstorm-processing directory, default is the current directory. +-i, --image Docker image name, default is 'graphstorm-processing'. +-v, --version Docker version tag, default is the library's current version (`poetry version --short`) +-b, --build Docker build directory, default is '/tmp/` EOF exit } @@ -52,6 +53,10 @@ parse_params() { TARGET="${2-}" shift ;; + -e | --environment) + EXEC_ENV="${2-}" + shift + ;; -p | --path) GSP_HOME="${2-}" shift @@ -76,6 +81,9 @@ parse_params() { args=("$@") + # check required params and arguments + [[ -z "${EXEC_ENV-}" ]] && die "Missing required parameter: -e/--environment [emr-serverless|sagemaker]" + return 0 } @@ -92,11 +100,18 @@ parse_params "$@" if [[ ${TARGET} == "prod" || ${TARGET} == "test" ]]; then : # Do nothing else - die "target parameter needs to be one of 'prod' or 'test', got ${TARGET}" + die "--target parameter needs to be one of 'prod' or 'test', got ${TARGET}" +fi + +if [[ ${EXEC_ENV} == "sagemaker" || ${EXEC_ENV} == "emr-serverless" ]]; then + : # Do nothing +else + die "--environment parameter needs to be one of 'emr-serverless' or 'sagemaker', got ${EXEC_ENV}" fi # script logic here msg "Execution parameters:" +msg "- ENVIRONMENT: ${EXEC_ENV}" msg "- TARGET: ${TARGET}" msg "- GSP_HOME: ${GSP_HOME}" msg "- IMAGE_NAME: ${IMAGE_NAME}" @@ -120,12 +135,22 @@ fi # Copy Docker entry point to build folder cp ${GSP_HOME}/docker-entry.sh "${BUILD_DIR}/docker/code/" -DOCKER_FULLNAME="${IMAGE_NAME}:${VERSION}" +# Export Poetry requirements to requirements.txt file +poetry export -f requirements.txt --output "${BUILD_DIR}/docker/requirements.txt" + +# Set image name +DOCKER_FULLNAME="${IMAGE_NAME}-${EXEC_ENV}:${VERSION}" # Login to ECR to be able to pull source SageMaker image -aws ecr get-login-password --region us-west-2 \ - | docker login --username AWS --password-stdin 153931337802.dkr.ecr.us-west-2.amazonaws.com +if [[ ${EXEC_ENV} == "sagemaker" ]]; then + aws ecr get-login-password --region us-west-2 \ + | docker login --username AWS --password-stdin 153931337802.dkr.ecr.us-west-2.amazonaws.com +else + aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws + # aws ecr get-login-password --region us-west-2 \ + # | docker login --username AWS --password-stdin 895885662937.dkr.ecr.us-west-2.amazonaws.com +fi echo "Build a Docker image ${DOCKER_FULLNAME}" -DOCKER_BUILDKIT=1 docker build -f "${GSP_HOME}/docker/${VERSION}/Dockerfile.cpu" \ +DOCKER_BUILDKIT=1 docker build -f "${GSP_HOME}/docker/${VERSION}/${EXEC_ENV}/Dockerfile.cpu" \ "${BUILD_DIR}/docker/" -t $DOCKER_FULLNAME --target ${TARGET} diff --git a/graphstorm-processing/docker/push_gsprocessing_image.sh b/graphstorm-processing/docker/push_gsprocessing_image.sh index 64450ee448..5d6753d083 100644 --- a/graphstorm-processing/docker/push_gsprocessing_image.sh +++ b/graphstorm-processing/docker/push_gsprocessing_image.sh @@ -13,12 +13,13 @@ Script description here. Available options: --h, --help Print this help and exit --x, --verbose Print script debug info --i, --image Docker image name, default is 'graphstorm-processing'. --v, --version Docker version tag, default is the library's current version (`poetry version --short`) --r, --region AWS Region to which we'll push the image. By default will get from aws-cli configuration. --a, --account AWS Account ID. By default will get from aws-cli configuration. +-h, --help Print this help and exit +-x, --verbose Print script debug info +-e, --environment Image execution environment. Must be one of 'emr-serverless' or 'sagemaker'. Required. +-i, --image Docker image name, default is 'graphstorm-processing'. +-v, --version Docker version tag, default is the library's current version (`poetry version --short`) +-r, --region AWS Region to which we'll push the image. By default will get from aws-cli configuration. +-a, --account AWS Account ID. By default will get from aws-cli configuration. EOF exit } @@ -49,6 +50,10 @@ parse_params() { -h | --help) usage ;; -x | --verbose) set -x ;; --no-color) NO_COLOR=1 ;; + -e | --environment) + EXEC_ENV="${2-}" + shift + ;; -i | --image) IMAGE="${2-}" shift @@ -71,6 +76,8 @@ parse_params() { shift done + [[ -z "${EXEC_ENV-}" ]] && die "Missing required parameter: -e/--environment [emr|emr-serverless|sagemaker]" + return 0 } @@ -81,9 +88,16 @@ cleanup() { parse_params "$@" +if [[ ${EXEC_ENV} == "sagemaker" || ${EXEC_ENV} == "emr-serverless" ]]; then + : # Do nothing +else + die "--environment parameter needs to be one of 'emr', 'emr-serverless' or 'sagemaker', got ${EXEC_ENV}" +fi + # script logic here msg "Execution parameters: " +msg "- ENVIRONMENT: ${EXEC_ENV}" msg "- IMAGE: ${IMAGE}" msg "- VERSION: ${VERSION}" msg "- REGION: ${REGION}" @@ -91,16 +105,17 @@ msg "- ACCOUNT: ${ACCOUNT}" SUFFIX="${VERSION}" LATEST_SUFFIX="latest" +IMAGE_WITH_ENV="${IMAGE}-${EXEC_ENV}" -FULLNAME="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE}:${SUFFIX}" -LATEST_TAG="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE}:${LATEST_SUFFIX}" +FULLNAME="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${SUFFIX}" +LATEST_TAG="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${LATEST_SUFFIX}" # If the repository doesn't exist in ECR, create it. -echo "Getting or creating container repository: ${IMAGE}" -if ! $(aws ecr describe-repositories --repository-names "${IMAGE}" --region ${REGION} > /dev/null 2>&1); then - echo "Container repository ${IMAGE} does not exist. Creating" - aws ecr create-repository --repository-name "${IMAGE}" --region ${REGION} > /dev/null +echo "Getting or creating container repository: ${IMAGE_WITH_ENV}" +if ! $(aws ecr describe-repositories --repository-names "${IMAGE_WITH_ENV}" --region ${REGION} > /dev/null 2>&1); then + echo >&2 "WARNING: ECR repository ${IMAGE_WITH_ENV} does not exist in region ${REGION}. Creating..." + aws ecr create-repository --repository-name "${IMAGE_WITH_ENV}" --region ${REGION} > /dev/null fi echo "Logging into ECR with local credentials" @@ -109,11 +124,11 @@ aws ecr get-login-password --region ${REGION} | \ echo "Pushing image to ${FULLNAME}" -docker tag ${IMAGE}:${SUFFIX} ${FULLNAME} +docker tag ${IMAGE_WITH_ENV}:${SUFFIX} ${FULLNAME} docker push ${FULLNAME} if [ ${VERSION} = ${LATEST_VERSION} ]; then - docker tag ${IMAGE}:${SUFFIX} ${LATEST_TAG} + docker tag ${IMAGE_WITH_ENV}:${SUFFIX} ${LATEST_TAG} docker push ${LATEST_TAG} fi \ No newline at end of file diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_label_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_label_transformation.py index ac9c9673a2..9e32f06953 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_label_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_label_transformation.py @@ -59,7 +59,7 @@ def apply(self, input_df: DataFrame) -> DataFrame: self.label_column, ) - # Labels that were and were assigned the value numLabels by the StringIndexer + # Labels that were missing and were assigned the value numLabels by the StringIndexer # are converted to None long_class_label = indexed_df.select(F.col(self.label_column).cast("long")).select( F.when( diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/s3_utils.py b/graphstorm-processing/graphstorm_processing/data_transformations/s3_utils.py index e999f4744c..4bdfd2be93 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/s3_utils.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/s3_utils.py @@ -43,7 +43,7 @@ def list_s3_objects(bucket: str, prefix: str, s3_boto_client: boto3.client = Non """ assert not prefix.startswith( "s3://" - ), f"Prefix should not start with 's3://' but be relative to bucket, got {prefix}" + ), f"Prefix should not start with 's3://' but be relative to the bucket, {bucket}, got {prefix}" s3_boto_client = get_high_retry_s3_client() if s3_boto_client is None else s3_boto_client paginator = s3_boto_client.get_paginator("list_objects_v2") pages = paginator.paginate(Bucket=bucket, Prefix=prefix) @@ -82,14 +82,15 @@ def extract_bucket_and_key( path_with_bucket: str, relative_path: Optional[str] = None ) -> tuple[str, str]: """Given an S3 path that includes a bucket, and a relative path, - extracts the bucket name and full key path. + extracts the bucket name and full key path. If only `path_with_bucket` + is provided, will split that path into bucket name and prefix path. Parameters ---------- path_with_bucket : str An S3 path that can include a bucket name and a key prefix, e.g. 's3://my-bucket/my/prefix/'. - relative_path : Optional[str], optional + relative_path : Optional[str] An S3 key path that's relative to `path_with_bucket`, e.g. 'rest/of/path/to/key'. If not provided only `path_with_bucket` will be split. @@ -99,9 +100,21 @@ def extract_bucket_and_key( str A tuple whose first element is the bucket name and the second the full path to the key. + + Example + ------- + .. code:: + + >>> extract_bucket_and_key("s3://my-bucket/prefix", "rest/of/path/to/key") + ("my_bucket", "prefix/rest/of/path/to/key") + >>> extract_bucket_and_key("s3://my-bucket/prefix/key") + ("my_bucket", "prefix/key") + >>> extract_bucket_and_key("s3://my-bucket/") + ("my_bucket", "") """ if not path_with_bucket.startswith("s3://"): path_with_bucket = f"s3://{path_with_bucket}" + path_with_bucket = s3_path_remove_trailing(path_with_bucket) if relative_path: if relative_path.startswith("/"): relative_path = relative_path[1:] @@ -111,7 +124,11 @@ def extract_bucket_and_key( # We split on '/' to get the bucket, as it's always the third split element in an S3 URI file_bucket = file_s3_uri.split("/")[2] # Similarly, by having maxsplit=3 we get the S3 key value as the fourth element - file_key = file_s3_uri.split("/", 3)[3] + file_parts = file_s3_uri.split("/", maxsplit=3) + if len(file_parts) == 4: + file_key = file_parts[3] + else: + file_key = "" # We remove any trailing '/' from the key file_key = s3_path_remove_trailing(file_key) diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index 93a5d08109..f02d04ede4 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -28,6 +28,7 @@ StructField, StringType, IntegerType, + LongType, ArrayType, ByteType, ) @@ -122,6 +123,7 @@ def __init__( if num_output_files and num_output_files > 0 else int(spark.sparkContext.defaultParallelism) ) + assert self.num_output_files > 0 # Mapping from node type to filepath, each file is a node-str to node-int-id mapping self.node_mapping_paths = {} # type: Dict[str, Sequence[str]] # Mapping from label name to value counts @@ -703,7 +705,7 @@ def _extend_mapping_from_edges( map_schema = StructType( [ StructField(join_col, StringType(), True), - StructField(index_col, IntegerType(), True), + StructField(index_col, LongType(), True), ] ) @@ -742,7 +744,7 @@ def create_node_id_map_from_nodes_df(self, node_df: DataFrame, node_col: str) -> node_rdd_with_ids = node_df.rdd.zipWithIndex() node_id_col = f"{node_col}-int_id" - new_schema = original_schema.add(StructField(node_id_col, IntegerType(), False)) + new_schema = original_schema.add(StructField(node_id_col, LongType(), False)) node_rdd_with_ids = node_rdd_with_ids.map( lambda rdd_row: (list(rdd_row[0]) + [rdd_row[1]]) # type: ignore @@ -1064,6 +1066,11 @@ def write_edge_structure( The first list contains the original edge files, the second is the reversed edge files, will be empty if `self.add_reverse_edges` is False. """ + # TODO: An option for dealing with skewed data: + # Find the heavy hitter, collect, do broadcast join with just them, + # (for both sides of the edge and filter the rows?) then do the join with + # the rest, then concat the two (or write to storage separately, concat at read-time) + src_col = edge_config.src_col src_ntype = edge_config.src_ntype dst_col = edge_config.dst_col @@ -1084,8 +1091,10 @@ def write_edge_structure( ) .withColumnRenamed(NODE_MAPPING_INT, "src_int_id") .withColumnRenamed(NODE_MAPPING_STR, "src_str_id") + .repartition(self.num_output_files, F.col("src_str_id")) ) # Join incoming edge df with mapping df to transform source str-ids to int ids + edge_df = edge_df.repartition(self.num_output_files, F.col(src_col)) edge_df_with_int_src = src_node_id_mapping.join( edge_df, src_node_id_mapping["src_str_id"] == edge_df[src_col], @@ -1115,9 +1124,13 @@ def write_edge_structure( ) .withColumnRenamed(NODE_MAPPING_INT, "dst_int_id") .withColumnRenamed(NODE_MAPPING_STR, "dst_str_id") + .repartition(self.num_output_files, F.col("dst_str_id")) ) # Join the newly created src-int-id edge df with mapping # df to transform destination str-ids to int ids + edge_df_with_int_src = edge_df_with_int_src.repartition( + self.num_output_files, F.col(dst_col) + ) edge_df_with_int_ids = dst_node_id_mapping.join( edge_df_with_int_src, dst_node_id_mapping["dst_str_id"] == edge_df_with_int_src[dst_col], diff --git a/graphstorm-processing/tests/test_s3_utils.py b/graphstorm-processing/tests/test_s3_utils.py new file mode 100644 index 0000000000..5b5eb1340b --- /dev/null +++ b/graphstorm-processing/tests/test_s3_utils.py @@ -0,0 +1,36 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from graphstorm_processing.data_transformations import s3_utils + + +def test_extract_bucket_and_key(): + """Test various conditions for extracting bucket and key""" + bucket, key = s3_utils.extract_bucket_and_key("s3://my-bucket/my-key") + assert bucket == "my-bucket" + assert key == "my-key" + + bucket, key = s3_utils.extract_bucket_and_key("s3://my-bucket") + assert bucket == "my-bucket" + assert key == "" + + bucket, key = s3_utils.extract_bucket_and_key("s3://my-bucket", "/my-key/subkey") + assert bucket == "my-bucket" + assert key == "my-key/subkey" + + bucket, key = s3_utils.extract_bucket_and_key("s3://my-bucket/my-key/", "subkey/") + assert bucket == "my-bucket" + assert key == "my-key/subkey"