Skip to content

Commit

Permalink
set spark session timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Tianhao-Gu committed May 31, 2024
1 parent f53c262 commit 529d5f8
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/spark/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from threading import Timer

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
Expand All @@ -8,6 +9,7 @@
HADOOP_AWS_VER = os.getenv('HADOOP_AWS_VER')
DELTA_SPARK_VER = os.getenv('DELTA_SPARK_VER')
SCALA_VER = os.getenv('SCALA_VER')
SPARK_TIMEOUT_SECONDS = os.getenv('SPARK_TIMEOUT_SECONDS', 4 * 60 * 60)


def _get_jars(jar_names: list) -> str:
Expand Down Expand Up @@ -51,6 +53,11 @@ def _get_delta_lake_conf(jars_str: str) -> dict:
}


def _stop_spark_session(spark):
print("Stopping Spark session after timeout...")
spark.stop()


def get_base_spark_conf(app_name: str) -> SparkConf:
"""
Helper function to get the base Spark configuration.
Expand Down Expand Up @@ -93,4 +100,7 @@ def get_spark_session(
for key, value in delta_conf.items():
spark_conf.set(key, value)

return SparkSession.builder.config(conf=spark_conf).getOrCreate()
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
Timer(int(SPARK_TIMEOUT_SECONDS), _stop_spark_session, [spark]).start()

return spark

0 comments on commit 529d5f8

Please sign in to comment.