Skip to content

Commit

Permalink
Wait for schema in distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
fjammes committed Dec 13, 2024
1 parent 72f1353 commit 935cb91
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion fink_broker/spark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,17 @@ def connect_to_raw_database(basepath: str, path: str, latestfirst: bool) -> Data
wait_sec = increase_wait_time(wait_sec)

# Create a DF from the database
userschema = spark.read.parquet(basepath).schema
# We need to wait for the schema to be available
while True:
try:
userschema = spark.read.parquet(basepath).schema
except Exception as e:
_LOG.error("Error while reading %s, %s", basepath, e)
time.sleep(wait_sec)
wait_sec = increase_wait_time(wait_sec)
continue
else:
break

df = (
spark.readStream.format("parquet")
Expand Down

0 comments on commit 935cb91

Please sign in to comment.