Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSProcessing] Improve Spark config to better support EMR/EMRS, small optimizations #838

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion graphstorm-processing/docker/push_gsprocessing_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ usage() {
cat <<EOF
Usage: $(basename "${BASH_SOURCE[0]}") [-h] [-x] [--image ...] [--version ...] [--region ...] [--account ...]

Script description here.
Pushes GSProcessing image to ECR.

Available options:

Expand Down
32 changes: 31 additions & 1 deletion graphstorm-processing/graphstorm_processing/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
limitations under the License.
"""

from enum import Enum

from pyspark.sql.types import FloatType, DoubleType

################### Categorical Limits #######################
Expand All @@ -33,7 +35,7 @@
############## Spark-specific constants #####################
SPECIAL_CHARACTERS = {".", "+", "*", "?", "^", "$", "(", ")", "[", "]", "{", "}", "|", "\\"}

"""Configuration to define driver and executor memory for distributed"""
"""Configuration to define driver and executor memory for SageMaker PySpark"""
# Percentage of instance memory to allocate to the driver process
DRIVER_MEM_INSTANCE_MEM_RATIO = 0.9
# Fraction of driver memory to be allocated as additional non-heap memory per process
Expand All @@ -55,3 +57,31 @@
HUGGINGFACE_TRANFORM = "huggingface"
HUGGINGFACE_TOKENIZE = "tokenize_hf"
HUGGINGFACE_EMB = "embedding_hf"


################# Supported execution envs ##############
class ExecutionEnv(Enum):
"""Supported execution environments"""

LOCAL = 1
SAGEMAKER = 2
EMR_SERVERLESS = 3
thvasilo marked this conversation as resolved.
Show resolved Hide resolved


################# Supported filesystem types#############
class FilesystemType(Enum):
"""Supported filesystem types"""

LOCAL = 1
S3 = 2


# NOTE: These need to be updated with each Spark release
# See the value for <hadoop.version> at the respective Spark version
# https://github.com/apache/spark/blob/v3.5.1/pom.xml#L125
# replace both Hadoop versions below with the one there
SPARK_HADOOP_VERSIONS = {
"3.5": "3.3.4",
"3.4": "3.3.4",
"3.3": "3.3.2",
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def tokenize(text):
logging.warning("The device to run huggingface transformation is %s", device)
tokenizer = AutoTokenizer.from_pretrained(hf_model)
if max_seq_length > tokenizer.model_max_length:
# TODO: Could we possibly raise this at config time?
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
raise RuntimeError(
f"max_seq_length {max_seq_length} is larger "
f"than expected {tokenizer.model_max_length}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@

import logging
import uuid
from typing import Tuple, Sequence
from typing import Optional, Tuple, Sequence

import psutil

import pyspark
from pyspark.sql import SparkSession, DataFrame, functions as F
from pyspark.util import VersionUtils
from graphstorm_processing import constants
from graphstorm_processing.constants import ExecutionEnv, FilesystemType, SPARK_HADOOP_VERSIONS

try:
from smspark.bootstrapper import Bootstrapper
Expand All @@ -31,13 +34,15 @@ def load_instance_type_info(self):
return None


def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSession:
def create_spark_session(
execution_env: ExecutionEnv, filesystem_type: FilesystemType
) -> SparkSession:
"""
Create a SparkSession with the appropriate configuration for the execution context.

Parameters
----------
sm_execution
execution_env
Whether or not this is being executed on a SageMaker instance.
filesystem_type
The filesystem type to use.
Expand All @@ -54,6 +59,71 @@ def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSessi
processing_job_config = bootstraper.load_processing_job_config()
instance_type_info = bootstraper.load_instance_type_info()

spark_builder = (
SparkSession.builder.appName("GSProcessing")
.config("spark.hadoop.validateOutputSpecs", "false")
.config("spark.logConf", "true")
)

if execution_env == ExecutionEnv.EMR_SERVERLESS:
pass
else:
spark_builder = _configure_spark_env(
spark_builder, processing_job_config, instance_type_info
)
thvasilo marked this conversation as resolved.
Show resolved Hide resolved

major, minor = VersionUtils.majorMinorVersion(pyspark.__version__)
hadoop_ver = SPARK_HADOOP_VERSIONS[f"{major}.{minor}"]
# Only used for local testing and container execution
if execution_env == ExecutionEnv.LOCAL and filesystem_type == FilesystemType.S3:
logging.info("Setting up local Spark instance for S3 access...")
spark_builder.config(
"spark.jars.packages",
f"org.apache.hadoop:hadoop-aws:{hadoop_ver},"
f"org.apache.hadoop:hadoop-client:{hadoop_ver}",
).config("spark.jars.excludes", "com.google.guava:guava").config(
"spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
).config(
"spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
)

spark = spark_builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("py4j").setLevel(logger.Level.ERROR)
spark_logger = logging.getLogger("py4j.java_gateway")
spark_logger.setLevel(logging.ERROR)

hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
# hadoop_config.set(
# "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
# )
# See https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/
hadoop_config.set("fs.s3.maxConnections", "5000")
hadoop_config.set("fs.s3.maxRetries", "20")
hadoop_config.set("fs.s3a.connection.maximum", "150")

# Set up auth for local and EMR
if execution_env != ExecutionEnv.SAGEMAKER and filesystem_type == FilesystemType.S3:
hadoop_config.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
)
hadoop_config.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_config.set("fs.AbstractFileSystem.s3a.imp", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

return spark


def _configure_spark_env(
spark_builder: SparkSession.Builder,
processing_job_config: Optional[dict],
instance_type_info: Optional[dict],
) -> SparkSession.Builder:
if processing_job_config and instance_type_info:
instance_type = processing_job_config["ProcessingResources"]["ClusterConfig"][
"InstanceType"
Expand All @@ -70,8 +140,8 @@ def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSessi
else:
instance_mem_mb = int(psutil.virtual_memory().total / (1024 * 1024))
instance_cores = psutil.cpu_count(logical=True)
logging.warning(
"Failed to detect instance type config. Found total memory: %d MiB and total cores: %d",
logging.info(
"Configuring Spark execution env. Found total memory: %d MiB and total cores: %d",
instance_mem_mb,
instance_cores,
)
Expand Down Expand Up @@ -99,57 +169,14 @@ def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSessi
# Avoid timeout errors due to connection pool starving
# Allow sending large results to driver
spark_builder = (
SparkSession.builder.appName("GSProcessing")
.config("spark.hadoop.validateOutputSpecs", "false")
.config("spark.driver.memory", f"{driver_mem_mb}m")
spark_builder.config("spark.driver.memory", f"{driver_mem_mb}m")
.config("spark.driver.memoryOverhead", f"{driver_mem_overhead_mb}m")
.config("spark.driver.maxResultSize", f"{driver_max_result}m")
.config("spark.executor.memory", f"{executor_mem_mb}m")
.config("spark.executor.memoryOverhead", f"{executor_mem_overhead_mb}m")
.config("spark.logConf", "true")
)

# TODO: These settings shouldn't be necessary for container execution,
# can we create such a Spark context only for testing?
if not sm_execution and filesystem_type == "s3":
spark_builder.config(
"spark.jars.packages",
"org.apache.hadoop:hadoop-aws:2.10.2," "org.apache.hadoop:hadoop-client:2.10.2",
).config("spark.jars.excludes", "com.google.guava:guava").config(
"spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
).config(
"spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"
)

spark = spark_builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("py4j").setLevel(logger.Level.ERROR)
spark_logger = logging.getLogger("py4j.java_gateway")
spark_logger.setLevel(logging.ERROR)

hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
hadoop_config.set(
"mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
)
# See https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/
hadoop_config.set("fs.s3.maxConnections", "5000")
hadoop_config.set("fs.s3a.connection.maximum", "150")
# Only used for local testing and container execution
if not sm_execution and filesystem_type == "s3":
logging.info("Setting up local Spark instance for S3 access...")
hadoop_config.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
)
hadoop_config.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_config.set("fs.AbstractFileSystem.s3a.imp", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

return spark
return spark_builder


def safe_rename_column(
Expand Down
Loading
Loading