Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config executor cores #23

Merged
merged 6 commits into from
Jun 6, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/spark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
HADOOP_AWS_VER = os.getenv('HADOOP_AWS_VER')
DELTA_SPARK_VER = os.getenv('DELTA_SPARK_VER')
SCALA_VER = os.getenv('SCALA_VER')
# the default number of CPU cores that each Spark executor will use
# If not specified, Spark will typically use all available cores on the worker nodes
DEFAULT_EXECUTOR_CORES = 1


def _get_jars(jar_names: list) -> str:
Expand All @@ -29,11 +32,14 @@
return ", ".join(jars)


def _get_delta_lake_conf(jars_str: str) -> dict:
def _get_delta_lake_conf(
jars_str: str,
executor_cores: int) -> dict:
"""
Helper function to get Delta Lake specific Spark configuration.

:param jars_str: A comma-separated string of JAR file paths
:param executor_cores: The number of CPU cores that each Spark executor will use
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this only applies if Delta lake is true and the user doesn't call the get base conf method. It seems like it should always apply

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


:return: A dictionary of Delta Lake specific Spark configuration

Expand All @@ -50,6 +56,7 @@
"spark.hadoop.fs.s3a.path.style.access": "true",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.sql.catalogImplementation": "hive",
"spark.executor.cores": executor_cores,
}


Expand All @@ -76,14 +83,16 @@
app_name: str = None,
local: bool = False,
delta_lake: bool = True,
timeout_sec: int = 4 * 60 * 60) -> SparkSession:
timeout_sec: int = 4 * 60 * 60,
executor_cores: int = DEFAULT_EXECUTOR_CORES) -> SparkSession:
"""
Helper to get and manage the SparkSession and keep all of our spark configuration params in one place.

:param app_name: The name of the application. If not provided, a default name will be generated.
:param local: Whether to run the spark session locally or not. Default is False.
:param delta_lake: Build the spark session with Delta Lake support. Default is True.
:param timeout_sec: The timeout in seconds to stop the Spark session forcefully. Default is 4 hours.
:param executor_cores: The number of CPU cores that each Spark executor will use. Default is 1.

:return: A SparkSession object
"""
Expand All @@ -101,7 +110,7 @@
jar_names = [f"delta-spark_{SCALA_VER}-{DELTA_SPARK_VER}.jar",
f"hadoop-aws-{HADOOP_AWS_VER}.jar"]
jars_str = _get_jars(jar_names)
delta_conf = _get_delta_lake_conf(jars_str)
delta_conf = _get_delta_lake_conf(jars_str, executor_cores)

Check warning on line 113 in src/spark/utils.py

View check run for this annotation

Codecov / codecov/patch

src/spark/utils.py#L113

Added line #L113 was not covered by tests
for key, value in delta_conf.items():
spark_conf.set(key, value)

Expand Down
Loading