From ac70c224712cfe41c58b252263cbaaabbe5c2586 Mon Sep 17 00:00:00 2001 From: Tianhao-Gu Date: Fri, 31 May 2024 13:50:18 -0500 Subject: [PATCH] add timeout arg --- src/spark/utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/spark/utils.py b/src/spark/utils.py index 18d3521..2a22b0e 100644 --- a/src/spark/utils.py +++ b/src/spark/utils.py @@ -9,7 +9,6 @@ HADOOP_AWS_VER = os.getenv('HADOOP_AWS_VER') DELTA_SPARK_VER = os.getenv('DELTA_SPARK_VER') SCALA_VER = os.getenv('SCALA_VER') -SPARK_TIMEOUT_SECONDS = os.getenv('SPARK_TIMEOUT_SECONDS', 4 * 60 * 60) def _get_jars(jar_names: list) -> str: @@ -75,13 +74,15 @@ def get_base_spark_conf(app_name: str) -> SparkConf: def get_spark_session( app_name: str, local: bool = False, - delta_lake: bool = False) -> SparkSession: + delta_lake: bool = False, + timeout_sec: int = 4 * 60 * 60) -> SparkSession: """ Helper to get and manage the SparkSession and keep all of our spark configuration params in one place. :param app_name: The name of the application :param local: Whether to run the spark session locally or not :param delta_lake: Build the spark session with Delta Lake support + :param timeout_sec: The timeout in seconds to stop the Spark session forcefully :return: A SparkSession object """ @@ -101,6 +102,7 @@ def get_spark_session( spark_conf.set(key, value) spark = SparkSession.builder.config(conf=spark_conf).getOrCreate() - Timer(int(SPARK_TIMEOUT_SECONDS), _stop_spark_session, [spark]).start() + timeout_sec = os.getenv('SPARK_TIMEOUT_SECONDS', timeout_sec) + Timer(int(timeout_sec), _stop_spark_session, [spark]).start() return spark