Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSProcessing] Increment version. Add a simplified launch script for EMR on EC2. #902

Merged
merged 4 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 49 additions & 138 deletions docs/source/gs-processing/usage/emr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,177 +77,94 @@ inline policy:
]
}

Create a security group that will allow us to SSH to the leader instance
------------------------------------------------------------------------

In order to be able to launch Spark jobs from within the leader instance
we will need to create an EC2 security group that will allow us to login
to the EMR leader.

To do so follow the `AWS docs <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/authorizing-access-to-an-instance.html#add-rule-authorize-access>`_,
and make note of the security group ID, e.g `sg-XXXXXXXXXXXXXXXXX`.

Launch an EMR cluster with the appropriate permissions
------------------------------------------------------
Launch an AWS EMR cluster with GSProcessing step
--------------------------------------------

Once our roles are set up, that is we have an EMR EC2 instance role,
and a user we can use to launch clusters, we can launch a cluster
configured to allow us to run jobs with the GSProcessing EMR on EC2
Docker image. We have tested GSProcessing with EMR 7.0.0 and EMR 6.10.0,
configured so it will run a GSProcessing job with the GSProcessing EMR on EC2
Docker image, then terminate. We have tested GSProcessing with EMR 7.0.0 and EMR 6.10.0,
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
and the instructions should apply for any EMR version ``>6.0.0``.
If you have persistent clusters you want to
use to run GSProcessing, you'd have to modify the EMR Dockerfile
accordingly to use an appropriate EMR image as the source image.

When launching the cluster, we need to provide a configuration to the launch
command to trust the GSProcessing ECR repository:

.. code-block:: json

[
{
"Classification": "container-executor",
"Configurations": [
{
"Classification": "docker",
"Properties": {
"docker.trusted.registries": "local,centos,<ACCOUNT>.dkr.ecr.<REGION>.amazonaws.com/graphstorm-processing-emr",
"docker.privileged-containers.registries": "local,centos,<ACCOUNT>.dkr.ecr.<REGION>.amazonaws.com/graphstorm-processing-emr"
}
}
]
}
]

Here you would replace the placeholder values for ``<ACCOUNT>`` and ``<REGION>``
with the appropriate values for your account. Save this
script and name it `container-executor.json`, we'll use it in the next step.
We provide a wrapper script that performs most of the configuration
needed to launch the EMR cluster and submit the GSProcessing job
as an [EMR Step](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-submit-step.html).

For more information on running Spark jobs with custom Docker containers see the EMR
`Configure Docker documentation <https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-docker.html>`_
and how to
`run Spark applications with Docker on Amazon EMR <https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-docker.html>`_.

To launch an EMR cluster using the AWS CLI we can use a ``bash`` script like the following.
To launch a GSProcessing job with EMR on EC2 we will use the ``graphstorm-processing/scripts/submit_gsp_emr_step.py`` Python
script that uses ``boto3`` to launch a cluster and the corresponding GSProcessing job as a step.
The script has four required arguments:

* ``--entry-point-s3``: We need to upload the GSProcessing entry point,
``graphstorm-processing/graphstorm_processing/distributed_executor.py`` to a location
on S3 from which our leader instance will be able to read it from.
* ``--gsp-arguments``: Here we pass all the arguments to the entry point as one space-separated
string. To ensure they are parsed as one string, enclose these in double quotes, e.g.
``--gsp-arguments "--input-config gsp-config.json --input-prefix s3://my-bucket/raw-data [...]"``.
* ``--instance-type``: The instance type to use for our cluster. Our script only supports
a uniform instance types currently.
* ``--instance-count``: Number of worker instances to launch for the cluster.

Run ``python graphstorm-processing/scripts/submit_gsp_emr_step.py --help`` for more optional arguments.

Let's demonstrate how we can launch an EC2 cluster with a GSProcessing step
using the above Python script.

.. code-block:: bash

KEYPAIR=my-key-pair-name
SUBNET_ID=subnet-XXXXXXXX
MASTER_SG=sg-XXXXXXXXXXXXXXXXX # Use the security group with ssh access
INSTANCE_TYPE=m6i.4xlarge
# INSTANCE_TYPE=m6g.4xlarge # Use for arm64 image
REGION=us-east-1
EMR_VERSION="emr-7.0.0"
CORE_INSTANCE_COUNT=3
CORE_INSTANCE_COUNT=2
CLUSTER_NAME="${USER}-gsp-${CORE_INSTANCE_COUNT}x-${INSTANCE_TYPE}"
INSTANCE_ROLE="EMR_EC2_DefaultRole"
TERMINATION_HOURS=1

# We assume this script is saved in the same path as container-executor.json
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

LOG_BUCKET=my-log-bucket

aws emr create-cluster \
--applications Name=Hadoop Name=Spark \
--auto-termination-policy IdleTimeout=$((${TERMINATION_HOURS}*60*60)) \
--configurations file://${SCRIPT_DIR}/container-executor.json \
--ec2-attributes KeyName=${KEYPAIR},SubnetId=${SUBNET_ID},AdditionalMasterSecurityGroups=${MASTER_SG} \
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=${INSTANCE_TYPE} \
InstanceGroupType=CORE,InstanceCount=${CORE_INSTANCE_COUNT},InstanceType=${INSTANCE_TYPE} \
--log-uri s3://${LOG_BUCKET}/emr-logs/ \
--name ${CLUSTER_NAME} \
--region ${REGION} \
--release-label ${EMR_VERSION} \
--use-default-roles

Running the above will return a JSON structure like:

.. code-block:: json

{
"ClusterId": "j-XXXXXXXXXX",
"ClusterArn": "arn:aws:elasticmapreduce:us-east-1:<ACCOUNT>:cluster/j-XXXXXXXXXX"
}

Make note of the cluster ID, which we will use to log into the leader instance.

We can also run a waiter to ensure we only proceed to the next step when the cluster is
ready to run jobs:

.. code-block:: bash

aws emr wait cluster-running --cluster-id j-XXXXXXXXXX --region ${REGION} && echo "Cluster ready"

Log in to the leader and submit a GSProcessing job
--------------------------------------------------

To submit a job we can use a helper ``bash`` script, which we list below:

.. code-block:: bash

# submit-gsp-job.sh
#!/usr/bin/env bash
set -euox pipefail

# GSProcessing arguments
MY_BUCKET="enter-your-bucket-name-here"
REGION="bucket-region" # e.g. us-west-2
INPUT_PREFIX="s3://${MY_BUCKET}/gsprocessing-input"
NUM_EXECUTORS=2
OUTPUT_BUCKET=${MY_BUCKET}
GRAPH_NAME="small-graph"
CONFIG_FILE="gconstruct-config.json"
DO_REPARTITION="true"
GENERATE_REVERSE="true"

ACCOUNT=$(aws sts get-caller-identity --query Account --output text)

REPOSITORY="graphstorm-processing-emr"
ARCH="x86_64"
TAG="latest-${ARCH}"
IMAGE="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${REPOSITORY}:${TAG}"

# We assume this script is saved in the same path as submit_gsp_emr_step.py,
# that is graphstorm-processing/scripts
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
# Upload the entry point to S3
PATH_TO_ENTRYPOINT="$SCRIPT_DIR/../graphstorm_processing/distributed_executor.py"
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
S3_ENTRY_POINT="s3://${OUTPUT_BUCKET}/emr-scripts/distributed_executor.py"
aws s3 cp "${PATH_TO_ENTRYPOINT}" ${S3_ENTRY_POINT}

OUTPUT_PREFIX="s3://${OUTPUT_BUCKET}/gsprocessing/emr/${GRAPH_NAME}"

export OUTPUT_PREFIX="s3://${OUTPUT_BUCKET}/gsprocessing/emr/${GRAPH_NAME}/"

spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${IMAGE} \
--conf spark.executorEnv.PYSPARK_PYTHON="/.pyenv/shims/python" \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${IMAGE} \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON="/.pyenv/shims/python" \
--num-executors ${NUM_EXECUTORS} \
${S3_ENTRY_POINT} \
--config-filename ${CONFIG_FILENAME} \
python "${SCRIPT_DIR}/submit_gsp_emr_step.py" \
--entry-point-s3 ${S3_ENTRY_POINT} \
--instance-type ${INSTANCE_TYPE} \
--log-uri "${OUTPUT_PREFIX}/spark-logs" \
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
--worker-count ${CORE_INSTANCE_COUNT} \
--gsp-arguments "--config-filename ${CONFIG_FILE} \
--input-prefix ${INPUT_PREFIX} \
--output-prefix ${OUTPUT_PREFIX} \
--do-repartition True


We will need to save and upload this helper script to the Spark leader,
and the ``distributed_executor.py`` entry point to an S3 location that the leader can access.
From where you cloned graphstorm you can run:

.. code-block:: bash
--add-reverse-edges ${GENERATE_REVERSE} \
--do-repartition ${REPARTITION_ON_LEADER}"

MY_BUCKET="enter-your-bucket-name-here" # The leader instance needs to be able to read this bucket
aws s3 cp /path/to/graphstorm/graphstorm-processing/graphstorm_processing/distributed_executor.py
\ "s3://${MY_BUCKET}/emr-scripts/distributed_executor.py"
aws emr put --cluster-id j-XXXXXXXXXX --key-pair-file /path/to/my-key-pair.pem \
--src submit-gsp-job.sh
Running the above will return a cluster ID, which you can use to monitor the
GSProcessing job execution.

Once the cluster is launched we can use the key pair
we created and the cluster ID to log into the Spark leader
to submit jobs. We can do so by running:
We can also run a waiter to wait for the job to finish before checking logs.

.. code-block:: bash

aws emr ssh --cluster-id j-XXXXXXXXXX --key-pair-file /path/to/my-key-pair.pem \
--region ${REGION}

bash submit-gsp-job.sh
aws emr wait step-complete --cluster-id j-XXXXXXXXXX --region ${REGION} && echo "GSProcessing job complete."

Ensure row counts are aligned and terminate the cluster
---------------------------------------------------
Expand Down Expand Up @@ -277,12 +194,6 @@ which means our data are ready for distributed partitioning.
If the re-partitioning failed, we can run a separate job, see :doc:`row-count-alignment`
for details.

Once done, remember to clean up your cluster resources by terminating the cluster:

.. code-block:: bash

aws emr terminate-clusters --cluster-ids j-XXXXXXXXXX

Run distributed partitioning and training on Amazon SageMaker
-------------------------------------------------------------

Expand Down
55 changes: 55 additions & 0 deletions graphstorm-processing/docker/0.3.1/emr-serverless/Dockerfile.cpu
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
ARG ARCH=x86_64
FROM public.ecr.aws/emr-serverless/spark/emr-7.0.0:20240206-${ARCH} as base

USER root
ENV PYTHON_VERSION=3.9.18

# Python won’t try to write .pyc or .pyo files on the import of source modules
# Force stdin, stdout and stderr to be totally unbuffered. Good for logging
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING=UTF-8


FROM base AS arch-x86_64

FROM base AS arch-arm64
RUN yum install -y python3-devel && \
rm -rf /var/cache/yum

FROM arch-${ARCH} AS runtime


WORKDIR /usr/lib/spark/code/

# Install GSProcessing requirements to pyenv Python
COPY requirements.txt requirements.txt
# Use --mount=type=cache,target=/root/.cache when Buildkit CI issue is fixed:
# https://github.com/moby/buildkit/issues/1512
RUN pip install -r /usr/lib/spark/code/requirements.txt \
&& rm -rf /root/.cache

# Install Huggingface model cache if it is necessary
ARG MODEL=""
ENV HF_HOME=/home/hadoop/.cache/huggingface/hub
RUN if [ -z "${MODEL}" ]; then \
echo "Skip installing model cache"; \
else \
echo "Installing model cache for $MODEL" && \
python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \
python3 -c "from transformers import AutoModel; AutoModel.from_pretrained('${MODEL}')"; \
fi


# GSProcessing codebase
COPY code/ /usr/lib/spark/code/
thvasilo marked this conversation as resolved.
Show resolved Hide resolved

FROM runtime AS prod
RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \
rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache

FROM runtime AS test
RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm-processing/ && rm -rf /root/.cache

USER hadoop:hadoop
WORKDIR /home/hadoop
78 changes: 78 additions & 0 deletions graphstorm-processing/docker/0.3.1/emr/Dockerfile.cpu
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# TODO: Pin image version
FROM public.ecr.aws/amazoncorretto/amazoncorretto:17 as base

ENV PYTHON_VERSION=3.9.18

# Python won’t try to write .pyc or .pyo files on the import of source modules
# Force stdin, stdout and stderr to be totally unbuffered. Good for logging
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING=UTF-8

ENV PYENV_ROOT="${HOME}/.pyenv"
ENV PATH="${PYENV_ROOT}/shims:${PYENV_ROOT}/bin:${PATH}"

ENV PYSPARK_DRIVER_PYTHON=${PYENV_ROOT}/shims/python
ENV PYSPARK_PYTHON=${PYENV_ROOT}/shims/python

# pyenv and Spark/YARN dependencies
RUN yum erase -y openssl-devel && \
yum install -y \
bzip2-devel\
gcc \
git \
headless \
hostname \
java-17-amazon-corretto-headless \
libffi-devel \
make \
ncurses-devel \
openssl11-devel \
readline-devel \
sqlite-devel \
sudo \
tar \
xz-devel && \
rm -rf /var/cache/yum

# Install Python through pyenv
RUN git clone https://github.com/pyenv/pyenv.git ${PYENV_ROOT} --single-branch && \
pyenv install ${PYTHON_VERSION} && \
pyenv global ${PYTHON_VERSION}

FROM base AS runtime

WORKDIR /usr/lib/spark/code/


# Install GSProcessing requirements to pyenv Python
COPY requirements.txt requirements.txt
# Use --mount=type=cache,target=/root/.cache when Buildkit CI issue is fixed:
# https://github.com/moby/buildkit/issues/1512
RUN pip3 install -r /usr/lib/spark/code/requirements.txt \
&& rm -rf /root/.cache

# Install Huggingface model cache if it is necessary
# This needs to happen after the transformers library has been installed above
ARG MODEL=""
ENV HF_HOME=/usr/lib/spark/.cache/huggingface/hub
RUN if [ -z "${MODEL}" ]; then \
echo "Skip installing model cache"; \
else \
echo "Installing model cache for $MODEL" && \
python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \
python3 -c "from transformers import AutoModel; AutoModel.from_pretrained('${MODEL}')"; \
fi

# We use this file as an indicator of the execution environment
RUN touch /usr/lib/spark/code/EMR_EXECUTION

# GSProcessing codebase
COPY code/ /usr/lib/spark/code/

FROM runtime AS prod
RUN python3 -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \
rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache

FROM runtime AS test
RUN python3 -m pip install --no-deps /usr/lib/spark/code/graphstorm-processing/ && rm -rf /root/.cache
Loading
Loading