Skip to content

Commit

Permalink
[GSProcessing] Initial support for EMR on EC2
Browse files Browse the repository at this point in the history
  • Loading branch information
thvasilo committed Mar 26, 2024
1 parent 0f7fab7 commit d8ce5be
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ Once Docker and Poetry are installed, and your AWS credentials are set up,
we can use the provided scripts
in the ``graphstorm-processing/docker`` directory to build the image.

GSProcessing supports Amazon SageMaker and EMR Serverless as
GSProcessing supports Amazon SageMaker, EMR, and EMR Serverless as
execution environments, so we need to choose which image we want
to build first.

The ``build_gsprocessing_image.sh`` script can build the image
locally and tag it, provided the intended execution environment,
using the ``-e/--environment`` argument. The supported environments
are ``sagemaker`` and ``emr-serverless``.
are ``sagemaker``, ``emr``, and ``emr-serverless``.
For example, assuming our current directory is where
we cloned ``graphstorm/graphstorm-processing``, we can use
the following to build the SageMaker image:
Expand Down Expand Up @@ -131,7 +131,7 @@ You can find detailed instructions on creating a VPC for EMR Serverless in the A
Support for arm64 architecture
------------------------------

For EMR Serverless images, it is possible to build images for the ``arm64`` architecture,
For EMR and EMR Serverless images, it is possible to build images for the ``arm64`` architecture,
which can lead to improved runtime and cost compared to ``x86_64``. For more details
on EMR Serverless architecture options see the
`official docs <https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/architecture.html>`_.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/gs-processing/usage/emr-serverless.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ you are using would be:
"ecr:BatchGetImage",
"ecr:DescribeImages"
],
"Resource": "<enter-ecr-repository-arn-here>"
"Resource": ""<ACCOUNT>.dkr.ecr.<REGION>.amazonaws.com/graphstorm-processing-emr-serverless"
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion graphstorm-processing/docker/push_gsprocessing_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ cleanup() {

parse_params "$@"

if [[ ${EXEC_ENV} == "sagemaker" || ${EXEC_ENV} == "emr-serverless" ]]; then
if [[ ${EXEC_ENV} == "sagemaker" || ${EXEC_ENV} == "emr-serverless" || ${EXEC_ENV} == "emr" ]]; then
: # Do nothing
else
die "--environment parameter needs to be one of 'emr', 'emr-serverless' or 'sagemaker', got ${EXEC_ENV}"
Expand Down
2 changes: 1 addition & 1 deletion graphstorm-processing/graphstorm_processing/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
############## Spark-specific constants #####################
SPECIAL_CHARACTERS = {".", "+", "*", "?", "^", "$", "(", ")", "[", "]", "{", "}", "|", "\\"}

"""Configuration to define driver and executor memory for distributed"""
"""Configuration to define driver and executor memory for SageMaker PySpark"""
# Percentage of instance memory to allocate to the driver process
DRIVER_MEM_INSTANCE_MEM_RATIO = 0.9
# Fraction of driver memory to be allocated as additional non-heap memory per process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def tokenize(text):
logging.warning("The device to run huggingface transformation is %s", device)
tokenizer = AutoTokenizer.from_pretrained(hf_model)
if max_seq_length > tokenizer.model_max_length:
# TODO: Could we possibly raise this at config time?
raise RuntimeError(
f"max_seq_length {max_seq_length} is larger "
f"than expected {tokenizer.model_max_length}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,85 @@ def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSessi
processing_job_config = bootstraper.load_processing_job_config()
instance_type_info = bootstraper.load_instance_type_info()

spark_builder = (
SparkSession.builder.appName("GSProcessing")
.config("spark.hadoop.validateOutputSpecs", "false")
.config("spark.logConf", "true")
)

if processing_job_config and instance_type_info:
instance_type = processing_job_config["ProcessingResources"]["ClusterConfig"][
"InstanceType"
].replace("ml.", "")
instance_type_info = instance_type_info[instance_type]
instance_mem_mb = instance_type_info["MemoryInfo"]["SizeInMiB"]
instance_cores = instance_type_info["VCpuInfo"]["DefaultVCpus"]
logging.info(
"Detected instance type: %s with " f"total memory: %dM and total cores: %d",
instance_type,
instance_mem_mb,
instance_cores,
spark_builder = _create_sagemaker_spark_builder(
spark_builder, processing_job_config, instance_type_info
)
else:
instance_mem_mb = int(psutil.virtual_memory().total / (1024 * 1024))
instance_cores = psutil.cpu_count(logical=True)
logging.warning(
"Failed to detect instance type config. " "Found total memory: %dM and total cores: %d",
"Failed to detect instance type config. Found total memory: %d MiB and total cores: %d",
instance_mem_mb,
instance_cores,
)

# TODO: These settings shouldn't be necessary for container execution,
# can we create such a Spark context only for testing?
# if not sm_execution and filesystem_type == "s3":
# spark_builder.config(
# "spark.jars.packages",
# "org.apache.hadoop:hadoop-aws:2.10.2," "org.apache.hadoop:hadoop-client:2.10.2",
# ).config("spark.jars.excludes", "com.google.guava:guava").config(
# "spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
# ).config(
# "spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
# )

spark = spark_builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("py4j").setLevel(logger.Level.ERROR)
spark_logger = logging.getLogger("py4j.java_gateway")
spark_logger.setLevel(logging.ERROR)

hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
# hadoop_config.set(
# "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
# )
# See https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/
hadoop_config.set("fs.s3.maxConnections", "5000")
hadoop_config.set("fs.s3.maxRetries", "20")
hadoop_config.set("fs.s3a.connection.maximum", "150")
# Only used for local testing and container execution
# if not sm_execution and filesystem_type == "s3":
# logging.info("Setting up local Spark instance for S3 access...")
# hadoop_config.set(
# "fs.s3a.aws.credentials.provider",
# "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
# )
# hadoop_config.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# hadoop_config.set("fs.AbstractFileSystem.s3a.imp", "org.apache.hadoop.fs.s3a.S3A")
# spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

return spark


def _create_sagemaker_spark_builder(
spark_builder: SparkSession.Builder, processing_job_config: dict, instance_type_info: dict
) -> SparkSession.Builder:
instance_type = processing_job_config["ProcessingResources"]["ClusterConfig"][
"InstanceType"
].replace("ml.", "")
instance_type_info = instance_type_info[instance_type]
instance_mem_mb = instance_type_info["MemoryInfo"]["SizeInMiB"]
instance_cores = instance_type_info["VCpuInfo"]["DefaultVCpus"]
logging.info(
"Detected instance type: %s with total memory: %d MiB and total cores: %d",
instance_type,
instance_mem_mb,
instance_cores,
)

executor_cores = instance_cores
executor_count_per_instance = int(instance_cores / executor_cores)

Expand All @@ -98,58 +155,16 @@ def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSessi
# Improve memory utilization
# Avoid timeout errors due to connection pool starving
# Allow sending large results to driver
# TODO: Only set config when running on SageMaker, allow EMR/EMR-S defaults
spark_builder = (
SparkSession.builder.appName("GraphlyticsGraphPreloading")
.config("spark.hadoop.validateOutputSpecs", "false")
.config("spark.driver.memory", f"{driver_mem_mb}m")
spark_builder.config("spark.driver.memory", f"{driver_mem_mb}m")
.config("spark.driver.memoryOverhead", f"{driver_mem_overhead_mb}m")
.config("spark.driver.maxResultSize", f"{driver_max_result}m")
.config("spark.executor.memory", f"{executor_mem_mb}m")
.config("spark.executor.memoryOverhead", f"{executor_mem_overhead_mb}m")
.config("spark.logConf", "true")
)

# TODO: These settings shouldn't be necessary for container execution,
# can we create such a Spark context only for testing?
if not sm_execution and filesystem_type == "s3":
spark_builder.config(
"spark.jars.packages",
"org.apache.hadoop:hadoop-aws:2.10.2," "org.apache.hadoop:hadoop-client:2.10.2",
).config("spark.jars.excludes", "com.google.guava:guava").config(
"spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
).config(
"spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
)

spark = spark_builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("py4j").setLevel(logger.Level.ERROR)
spark_logger = logging.getLogger("py4j.java_gateway")
spark_logger.setLevel(logging.ERROR)

hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
hadoop_config.set(
"mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
)
# See https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/
hadoop_config.set("fs.s3.maxConnections", "5000")
hadoop_config.set("fs.s3a.connection.maximum", "150")
# Only used for local testing and container execution
if not sm_execution and filesystem_type == "s3":
logging.info("Setting up local Spark instance for S3 access...")
hadoop_config.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
)
hadoop_config.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_config.set("fs.AbstractFileSystem.s3a.imp", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

return spark
return spark_builder


def safe_rename_column(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,10 @@ def main():
"""Main entry point for GSProcessing"""
# Allows us to get typed arguments from the command line
gsprocessing_args = GSProcessingArguments(**vars(parse_args()))
logging.basicConfig(level=gsprocessing_args.log_level)
logging.basicConfig(
level=gsprocessing_args.log_level,
format="[GSPROCESSING] %(asctime)s %(levelname)-8s %(message)s",
)

# Determine if we're running within a SageMaker container
is_sagemaker_execution = os.path.exists("/opt/ml/config/processingjobconfig.json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import json
import logging
import math
import numbers
import os
from collections import Counter, defaultdict
Expand Down Expand Up @@ -1613,7 +1614,7 @@ def _create_split_files_from_rates(
split_rates = SplitRates(train_rate=0.8, val_rate=0.1, test_rate=0.1)
else:
# TODO: add support for sums <= 1.0, useful for large-scale link prediction
if sum(split_rates.tolist()) != 1.0:
if math.fsum(split_rates.tolist()) != 1.0:
raise RuntimeError(f"Provided split rates do not sum to 1: {split_rates}")

split_list = split_rates.tolist()
Expand Down

0 comments on commit d8ce5be

Please sign in to comment.