Skip to content

Commit

Permalink
[GSProcessing] Improve Spark config to better support EMR/EMRS
Browse files Browse the repository at this point in the history
  • Loading branch information
thvasilo committed May 14, 2024
1 parent 39e38d8 commit c08b3f9
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 142 deletions.
2 changes: 1 addition & 1 deletion graphstorm-processing/docker/push_gsprocessing_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ usage() {
cat <<EOF
Usage: $(basename "${BASH_SOURCE[0]}") [-h] [-x] [--image ...] [--version ...] [--region ...] [--account ...]
Script description here.
Pushes GSProcessing image to ECR.
Available options:
Expand Down
32 changes: 31 additions & 1 deletion graphstorm-processing/graphstorm_processing/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
limitations under the License.
"""

from enum import Enum

from pyspark.sql.types import FloatType, DoubleType

################### Categorical Limits #######################
Expand All @@ -33,7 +35,7 @@
############## Spark-specific constants #####################
SPECIAL_CHARACTERS = {".", "+", "*", "?", "^", "$", "(", ")", "[", "]", "{", "}", "|", "\\"}

"""Configuration to define driver and executor memory for distributed"""
"""Configuration to define driver and executor memory for SageMaker PySpark"""
# Percentage of instance memory to allocate to the driver process
DRIVER_MEM_INSTANCE_MEM_RATIO = 0.9
# Fraction of driver memory to be allocated as additional non-heap memory per process
Expand All @@ -55,3 +57,31 @@
HUGGINGFACE_TRANFORM = "huggingface"
HUGGINGFACE_TOKENIZE = "tokenize_hf"
HUGGINGFACE_EMB = "embedding_hf"


################# Supported execution envs ##############
class ExecutionEnv(Enum):
"""Supported execution environments"""

LOCAL = 1
SAGEMAKER = 2
EMR_SERVERLESS = 3


################# Supported filesystem types#############
class FilesystemType(Enum):
"""Supported filesystem types"""

LOCAL = 1
S3 = 2


# NOTE: These need to be updated with each Spark release
# See the value for <hadoop.version> at the respective Spark version
# https://github.com/apache/spark/blob/v3.5.1/pom.xml#L125
# replace both Hadoop versions below with the one there
SPARK_HADOOP_VERSIONS = {
"3.5": "3.3.4",
"3.4": "3.3.4",
"3.3": "3.3.2",
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def tokenize(text):
logging.warning("The device to run huggingface transformation is %s", device)
tokenizer = AutoTokenizer.from_pretrained(hf_model)
if max_seq_length > tokenizer.model_max_length:
# TODO: Could we possibly raise this at config time?
raise RuntimeError(
f"max_seq_length {max_seq_length} is larger "
f"than expected {tokenizer.model_max_length}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@

import logging
import uuid
from typing import Tuple, Sequence
from typing import Optional, Tuple, Sequence

import psutil

import pyspark
from pyspark.sql import SparkSession, DataFrame, functions as F
from pyspark.util import VersionUtils
from graphstorm_processing import constants
from graphstorm_processing.constants import ExecutionEnv, FilesystemType, SPARK_HADOOP_VERSIONS

try:
from smspark.bootstrapper import Bootstrapper
Expand All @@ -31,13 +34,15 @@ def load_instance_type_info(self):
return None


def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSession:
def create_spark_session(
execution_env: ExecutionEnv, filesystem_type: FilesystemType
) -> SparkSession:
"""
Create a SparkSession with the appropriate configuration for the execution context.
Parameters
----------
sm_execution
execution_env
Whether or not this is being executed on a SageMaker instance.
filesystem_type
The filesystem type to use.
Expand All @@ -54,6 +59,71 @@ def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSessi
processing_job_config = bootstraper.load_processing_job_config()
instance_type_info = bootstraper.load_instance_type_info()

spark_builder = (
SparkSession.builder.appName("GSProcessing")
.config("spark.hadoop.validateOutputSpecs", "false")
.config("spark.logConf", "true")
)

if execution_env == ExecutionEnv.EMR_SERVERLESS:
pass
else:
spark_builder = _configure_spark_env(
spark_builder, processing_job_config, instance_type_info
)

major, minor = VersionUtils.majorMinorVersion(pyspark.__version__)
hadoop_ver = SPARK_HADOOP_VERSIONS[f"{major}.{minor}"]
# Only used for local testing and container execution
if execution_env == ExecutionEnv.LOCAL and filesystem_type == FilesystemType.S3:
logging.info("Setting up local Spark instance for S3 access...")
spark_builder.config(
"spark.jars.packages",
f"org.apache.hadoop:hadoop-aws:{hadoop_ver},"
f"org.apache.hadoop:hadoop-client:{hadoop_ver}",
).config("spark.jars.excludes", "com.google.guava:guava").config(
"spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
).config(
"spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
)

spark = spark_builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("py4j").setLevel(logger.Level.ERROR)
spark_logger = logging.getLogger("py4j.java_gateway")
spark_logger.setLevel(logging.ERROR)

hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
# hadoop_config.set(
# "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
# )
# See https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/
hadoop_config.set("fs.s3.maxConnections", "5000")
hadoop_config.set("fs.s3.maxRetries", "20")
hadoop_config.set("fs.s3a.connection.maximum", "150")

# Set up auth for local and EMR
if execution_env != ExecutionEnv.SAGEMAKER and filesystem_type == FilesystemType.S3:
hadoop_config.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
)
hadoop_config.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_config.set("fs.AbstractFileSystem.s3a.imp", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

return spark


def _configure_spark_env(
spark_builder: SparkSession.Builder,
processing_job_config: Optional[dict],
instance_type_info: Optional[dict],
) -> SparkSession.Builder:
if processing_job_config and instance_type_info:
instance_type = processing_job_config["ProcessingResources"]["ClusterConfig"][
"InstanceType"
Expand All @@ -70,8 +140,8 @@ def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSessi
else:
instance_mem_mb = int(psutil.virtual_memory().total / (1024 * 1024))
instance_cores = psutil.cpu_count(logical=True)
logging.warning(
"Failed to detect instance type config. Found total memory: %d MiB and total cores: %d",
logging.info(
"Configuring Spark execution env. Found total memory: %d MiB and total cores: %d",
instance_mem_mb,
instance_cores,
)
Expand All @@ -98,58 +168,16 @@ def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSessi
# Improve memory utilization
# Avoid timeout errors due to connection pool starving
# Allow sending large results to driver
# TODO: Only set config when running on SageMaker, allow EMR/EMR-S defaults
spark_builder = (
SparkSession.builder.appName("GSProcessing")
.config("spark.hadoop.validateOutputSpecs", "false")
.config("spark.driver.memory", f"{driver_mem_mb}m")
spark_builder.config("spark.driver.memory", f"{driver_mem_mb}m")
.config("spark.driver.memoryOverhead", f"{driver_mem_overhead_mb}m")
.config("spark.driver.maxResultSize", f"{driver_max_result}m")
.config("spark.executor.memory", f"{executor_mem_mb}m")
.config("spark.executor.memoryOverhead", f"{executor_mem_overhead_mb}m")
.config("spark.logConf", "true")
)

# TODO: These settings shouldn't be necessary for container execution,
# can we create such a Spark context only for testing?
if not sm_execution and filesystem_type == "s3":
spark_builder.config(
"spark.jars.packages",
"org.apache.hadoop:hadoop-aws:2.10.2," "org.apache.hadoop:hadoop-client:2.10.2",
).config("spark.jars.excludes", "com.google.guava:guava").config(
"spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
).config(
"spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
)

spark = spark_builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("py4j").setLevel(logger.Level.ERROR)
spark_logger = logging.getLogger("py4j.java_gateway")
spark_logger.setLevel(logging.ERROR)

hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
hadoop_config.set(
"mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
)
# See https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/
hadoop_config.set("fs.s3.maxConnections", "5000")
hadoop_config.set("fs.s3a.connection.maximum", "150")
# Only used for local testing and container execution
if not sm_execution and filesystem_type == "s3":
logging.info("Setting up local Spark instance for S3 access...")
hadoop_config.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
)
hadoop_config.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_config.set("fs.AbstractFileSystem.s3a.imp", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

return spark
return spark_builder


def safe_rename_column(
Expand Down
Loading

0 comments on commit c08b3f9

Please sign in to comment.