diff --git a/fink_broker/spark_utils.py b/fink_broker/spark_utils.py index 0df4470b..ebe2d6b3 100644 --- a/fink_broker/spark_utils.py +++ b/fink_broker/spark_utils.py @@ -336,7 +336,17 @@ def connect_to_raw_database(basepath: str, path: str, latestfirst: bool) -> Data wait_sec = increase_wait_time(wait_sec) # Create a DF from the database - userschema = spark.read.parquet(basepath).schema + # We need to wait for the schema to be available + while True: + try: + userschema = spark.read.parquet(basepath).schema + except Exception as e: + _LOG.error("Error while reading %s, %s", basepath, e) + time.sleep(wait_sec) + wait_sec = increase_wait_time(wait_sec) + continue + else: + break df = ( spark.readStream.format("parquet")