Skip to content

Commit

Permalink
fink-mm: various updates (#906)
Browse files Browse the repository at this point in the history
* Fix log. Closes #900

* Remove checkpoints for fink-mm. Closes #902

* Wait 5 minutes before launching fink-mm to give time for raw2science to produce data
  • Loading branch information
JulienPeloton authored Oct 24, 2024
1 parent 9ee658b commit d6b5881
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
2 changes: 1 addition & 1 deletion bin/raw2science.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def main():
logger.info("Do not perform multi-messenger operations")
time_spent_in_wait, countquery_mm = 0, None
else:
logger.debug("Perform multi-messenger operations")
logger.info("Perform multi-messenger operations")
from fink_broker.mm_utils import raw2science_launch_fink_mm

time_spent_in_wait, countquery_mm = raw2science_launch_fink_mm(
Expand Down
18 changes: 10 additions & 8 deletions fink_broker/mm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]:
from fink_mm.init import get_config
from fink_broker.mm_utils import mm2distribute

_LOG.info("Fink-MM configuration file: {args.mmconfigpath}")
_LOG.info(f"Fink-MM configuration file: {args.mmconfigpath}")
config = get_config({"--config": args.mmconfigpath})

# Wait for GCN comming
Expand All @@ -71,13 +71,14 @@ def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]:
# if there is gcn and ztf data
if path_exist(mmtmpdatapath):
t_before = time.time()
_LOG.info("starting mm2distribute ...")
_LOG.info("starting mm2distribute in 300 seconds...")
time.sleep(300)
stream_distrib_list = mm2distribute(spark, config, args)
time_spent_in_wait += time.time() - t_before
break

time_spent_in_wait += 1
time.sleep(1.0)
time_spent_in_wait += 300
time.sleep(300)
if stream_distrib_list == []:
_LOG.warning(
f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job."
Expand Down Expand Up @@ -115,7 +116,7 @@ def raw2science_launch_fink_mm(
from fink_mm.init import get_config
from fink_broker.mm_utils import science2mm

_LOG.info("Fink-MM configuration file: {args.mmconfigpath}")
_LOG.info(f"Fink-MM configuration file: {args.mmconfigpath}")
config = get_config({"--config": args.mmconfigpath})
gcndatapath = config["PATH"]["online_gcn_data_prefix"]
gcn_path = gcndatapath + "/year={}/month={}/day={}".format(
Expand All @@ -130,14 +131,15 @@ def raw2science_launch_fink_mm(
if path_exist(gcn_path) and path_exist(scitmpdatapath):
# Start the GCN x ZTF cross-match stream
t_before = time.time()
_LOG.info("starting science2mm ...")
_LOG.info("starting science2mm in 300 seconds...")
time.sleep(300)
countquery_mm = science2mm(args, config, gcn_path, scitmpdatapath)
time_spent_in_wait += time.time() - t_before
break
else:
# wait for comming GCN
time_spent_in_wait += 1
time.sleep(1)
time_spent_in_wait += 300
time.sleep(300)

if countquery_mm is None:
_LOG.warning("science2mm could not start before the end of the job.")
Expand Down
5 changes: 5 additions & 0 deletions scheduler/database_service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ if [[ $? == 0 ]]; then
hdfs dfs -rm -r /user/julien.peloton/online/raw_checkpoint/${NIGHT}
hdfs dfs -rm -r /user/julien.peloton/online/science_checkpoint/${NIGHT}
hdfs dfs -rm -r /user/julien.peloton/online/kafka_checkpoint/${NIGHT}

# Remove checkpoints for fink-mm
hdfs dfs -rm -r /user/julien.peloton/fink_mm/gcn_x_ztf/online/_spark_metadata
hdfs dfs -rm -r /user/julien.peloton/fink_mm/gcn_x_ztf/online_checkpoint

fi

0 comments on commit d6b5881

Please sign in to comment.