From 1d9eb0718e7db97a3dfb0a1edcf9f51299d0f8cd Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 23 Feb 2024 15:37:13 +0100 Subject: [PATCH] integrate fink-mm distribution to the broker --- bin/distribute.py | 38 ++++++++++++++++++++++++++++++++-- bin/fink | 1 + bin/raw2science.py | 16 ++++----------- fink_broker/mm2distribute.py | 40 ++++++++++++++++++++++++++++++++++++ 4 files changed, 81 insertions(+), 14 deletions(-) create mode 100644 fink_broker/mm2distribute.py diff --git a/bin/distribute.py b/bin/distribute.py index 6c89cb1a..88faa25f 100644 --- a/bin/distribute.py +++ b/bin/distribute.py @@ -23,17 +23,21 @@ """ import argparse import time +import os from fink_utils.spark import schema_converter from fink_broker.parser import getargs -from fink_broker.sparkUtils import init_sparksession, connect_to_raw_database +from fink_broker.sparkUtils import init_sparksession, connect_to_raw_database, path_exist from fink_broker.distributionUtils import get_kafka_df from fink_broker.loggingUtils import get_fink_logger, inspect_application +from fink_broker.mm2distribute import mm2distribute from fink_utils.spark.utils import concat_col from fink_utils.spark.utils import apply_user_defined_filter +from fink_mm.init import get_config + # User-defined topics userfilters = [ 'fink_filters.filter_early_sn_candidates.filter.early_sn_candidates', @@ -141,8 +145,38 @@ def main(): # Keep the Streaming running until something or someone ends it! if args.exit_after is not None: - time.sleep(args.exit_after) + + count = 0 + config = get_config({"--config": args.mmconfigpath}) + + while count < args.exit_after: + + mm_path_output = config["PATH"]["online_grb_data_prefix"] + mmtmpdatapath = os.path.join(mm_path_output, "online") + + # if there is gcn and ztf data + if path_exist(mmtmpdatapath): + + t_before = time.time() + time.sleep(45) + logger.info("starting mm2distribute ...") + stream_distrib_list = mm2distribute( + spark, + config, + args + ) + count += time.time() - t_before + break + + count += 1 + time.sleep(1.0) + + remaining_time = args.exit_after - count + remaining_time = remaining_time if remaining_time > 0 else 0 + time.sleep(remaining_time) disquery.stop() + for stream in stream_distrib_list: + stream.stop() logger.info("Exiting the distribute service normally...") else: # Wait for the end of queries diff --git a/bin/fink b/bin/fink index 65125473..7f5c92bb 100755 --- a/bin/fink +++ b/bin/fink @@ -303,6 +303,7 @@ elif [[ $service == "distribution" ]]; then -distribution_schema ${DISTRIBUTION_SCHEMA} \ -substream_prefix ${SUBSTREAM_PREFIX} \ -tinterval ${FINK_TRIGGER_UPDATE} \ + -mmconfigpath ${FINK_MM_CONFIG} \ -night ${NIGHT} \ -log_level ${LOG_LEVEL} ${EXIT_AFTER} elif [[ $service == "merge" ]]; then diff --git a/bin/raw2science.py b/bin/raw2science.py index e4ac7b2a..b6dc26dc 100644 --- a/bin/raw2science.py +++ b/bin/raw2science.py @@ -136,24 +136,16 @@ def main(): gcndatapath + f"/year={args.night[0:4]}/month={args.night[4:6]}/day={args.night[6:8]}" ) - all_parquet_files = glob.glob(os.path.join(scitmpdatapath, "*.parquet")) - - logger.info(f""" - cond1: {path_exist(gcn_path)} - cond2: {len(all_parquet_files)} - cond3: {os.path.getsize(all_parquet_files[0]) if len(all_parquet_files) > 0 else ""} - """) - # if there is gcn and ztf data - if path_exist(gcn_path) and len(all_parquet_files) > 0 and os.path.getsize(all_parquet_files[0]) > 0: + if path_exist(gcn_path) and path_exist(scitmpdatapath): # Start the GCN x ZTF cross-match stream t_before = time.time() - time.sleep(30) + time.sleep(45) logger.info("starting science2mm ...") countquery_mm = science2mm( args, config, gcndatapath, scitmpdatapath ) - count += (time.time() - t_before) + count += time.time() - t_before break else: # wait for comming GCN @@ -162,7 +154,7 @@ def main(): # If GCN arrived, wait for the remaining time since the launch of raw2science remaining_time = args.exit_after - count - logger.info(f"time to the end: {remaining_time} sec") + remaining_time = remaining_time if remaining_time > 0 else 0 time.sleep(remaining_time) countquery_science.stop() if countquery_mm is not None: diff --git a/fink_broker/mm2distribute.py b/fink_broker/mm2distribute.py new file mode 100644 index 00000000..607bbf9a --- /dev/null +++ b/fink_broker/mm2distribute.py @@ -0,0 +1,40 @@ +import os + +from fink_mm.distribution.distribution import grb_distribution_stream + + +def mm2distribute(spark, config, args): + + mm_data_path = config["PATH"]["online_grb_data_prefix"] + kafka_broker = config["DISTRIBUTION"]["kafka_broker"] + username_writer = config["DISTRIBUTION"]["username_writer"] + password_writer = config["DISTRIBUTION"]["password_writer"] + + year, month, day = args.night[0:4], args.night[4:6], args.night[6:8] + basepath = os.path.join(mm_data_path, "online", f"year={year}/month={month}/day={day}") + checkpointpath_mm = os.path.join(mm_data_path, "mm_distribute_checkpoint") + + # force the mangrove columns to have the struct type + static_df = spark.read.parquet(basepath) + + path = basepath + df_grb_stream = ( + spark.readStream.format("parquet") + .schema(static_df.schema) + .option("basePath", basepath) + .option("path", path) + .option("latestFirst", True) + .load() + ) + + stream_distribute_list = grb_distribution_stream( + df_grb_stream, + static_df, + checkpointpath_mm, + args.tinterval, + kafka_broker, + username_writer, + password_writer, + ) + + return stream_distribute_list