diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b51cd088..7e5fa439 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -69,14 +69,14 @@ jobs: cd $USRLIBS source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION} cd $FINK_HOME - fink_test -c conf/fink.conf.dev --unit-tests --db-integration --stream-integration + fink_test -c conf/fink.conf.dev --stream-integration --db-integration --mm-offline --unit-tests - name: Run test suites [prod] if: matrix.container == 'julienpeloton/fink-ci:prod' run: | cd $USRLIBS source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION} cd $FINK_HOME - fink_test -c conf/fink.conf.prod --unit-tests --db-integration --stream-integration + fink_test -c conf/fink.conf.prod --stream-integration --db-integration --mm-offline --unit-tests curl -s https://codecov.io/bash | bash - uses: act10ns/slack@v1 with: diff --git a/bin/distribute.py b/bin/distribute.py index 02e9040c..5d53a2ad 100644 --- a/bin/distribute.py +++ b/bin/distribute.py @@ -143,41 +143,53 @@ def main(): .trigger(processingTime='{} seconds'.format(args.tinterval)) \ .start() + config_path = args.mmconfigpath + count = 0 + stream_distrib_list = None + # Keep the Streaming running until something or someone ends it! if args.exit_after is not None: - count = 0 - config = get_config({"--config": args.mmconfigpath}) + if config_path != "no-config": + config = get_config({"--config": config_path}) - while count < args.exit_after: + while count < args.exit_after: - mm_path_output = config["PATH"]["online_grb_data_prefix"] - mmtmpdatapath = os.path.join(mm_path_output, "online") + mm_path_output = config["PATH"]["online_grb_data_prefix"] + mmtmpdatapath = os.path.join(mm_path_output, "online") - # if there is gcn and ztf data - if path_exist(mmtmpdatapath): + # if there is gcn and ztf data + if path_exist(mmtmpdatapath): - t_before = time.time() - logger.info("starting mm2distribute ...") - stream_distrib_list = mm2distribute( - spark, - config, - args - ) - count += time.time() - t_before - break + t_before = time.time() + logger.info("starting mm2distribute ...") + stream_distrib_list = mm2distribute( + spark, + config, + args + ) + count += time.time() - t_before + break - count += 1 - time.sleep(1.0) + count += 1 + time.sleep(1.0) remaining_time = args.exit_after - count remaining_time = remaining_time if remaining_time > 0 else 0 time.sleep(remaining_time) disquery.stop() - for stream in stream_distrib_list: - stream.stop() + if stream_distrib_list is not None: + for stream in stream_distrib_list: + stream.stop() logger.info("Exiting the distribute service normally...") else: + if config_path != "no-config": + config = get_config({"--config": config_path}) + stream_distrib_list = mm2distribute( + spark, + config, + args + ) # Wait for the end of queries spark.streams.awaitAnyTermination() diff --git a/bin/fink_test b/bin/fink_test index 35736392..30228849 100755 --- a/bin/fink_test +++ b/bin/fink_test @@ -47,6 +47,10 @@ while [ "$#" -gt 0 ]; do WITH_UNITS=true shift 1 ;; + --mm-offline) + WITH_MM_OFFLINE=true + shift 1 + ;; -h) echo -e $message_help exit @@ -112,9 +116,6 @@ if [[ "$WITH_DB" = true ]] ; then # merge data fink start merge -c $conf --night $NIGHT_STR - # fink-mm offline mode - fink_mm join_stream offline --night $NIGHT_INT --config $conf - # Update science portal fink start science_archival -c $conf --night $NIGHT_INT fink start index_archival -c $conf --night $NIGHT_INT --index_table jd_objectId @@ -135,6 +136,12 @@ if [[ "$WITH_DB" = true ]] ; then fink start check_science_portal -c $conf fi +# MM offline test (need "fink start merge" before) +if [[ "$WITH_MM_OFFLINE" = true ]] ; then + # fink-mm offline mode + fink_mm join_stream offline --night $NIGHT_INT --config ${FINK_HOME}/conf/fink_mm.conf +fi + if [[ "$WITH_UNITS" = true ]] ; then # Run the test suite on the modules assuming the integration # tests have been run (to build the databases) diff --git a/bin/raw2science.py b/bin/raw2science.py index 01406f56..c6714a42 100644 --- a/bin/raw2science.py +++ b/bin/raw2science.py @@ -68,9 +68,11 @@ def main(): # data path rawdatapath = os.path.join(args.online_data_prefix, "raw") - scitmpdatapath = os.path.join(args.online_data_prefix, f"science/{args.night}") + scitmpdatapath = os.path.join( + args.online_data_prefix, "science/{}".format(args.night) + ) checkpointpath_sci_tmp = os.path.join( - args.online_data_prefix, f"science_checkpoint/{args.night}" + args.online_data_prefix, "science_checkpoint/{}".format(args.night) ) if args.producer == "elasticc": @@ -78,8 +80,8 @@ def main(): else: # assume YYYYMMHH df = connect_to_raw_database( - os.path.join(rawdatapath, f"{args.night}"), - os.path.join(rawdatapath, f"{args.night}"), + os.path.join(rawdatapath, "{}".format(args.night)), + os.path.join(rawdatapath, "{}".format(args.night)), latestfirst=False, ) @@ -121,34 +123,37 @@ def main(): .start() ) + config_path = args.mmconfigpath count = 0 - config = get_config({"--config": args.mmconfigpath}) - gcndatapath = config["PATH"]["online_gcn_data_prefix"] countquery_mm = None if args.exit_after is not None: - # Keep the Streaming running until something or someone ends it! - # Wait for GCN comming - while count < args.exit_after: - gcn_path = ( - gcndatapath + f"/year={args.night[0:4]}/month={args.night[4:6]}/day={args.night[6:8]}" - ) - - # if there is gcn and ztf data - if path_exist(gcn_path) and path_exist(scitmpdatapath): - # Start the GCN x ZTF cross-match stream - t_before = time.time() - logger.info("starting science2mm ...") - countquery_mm = science2mm( - args, config, gcndatapath, scitmpdatapath + # Keep the Streaming running until something or someone ends it! + if config_path != "no-config": + config = get_config({"--config": config_path}) + gcndatapath = config["PATH"]["online_gcn_data_prefix"] + + # Wait for GCN comming + while count < args.exit_after: + gcn_path = gcndatapath + "/year={}/month={}/day={}".format( + args.night[0:4], args.night[4:6], args.night[6:8] ) - count += time.time() - t_before - break - else: - # wait for comming GCN - count += 1 - time.sleep(1) + + # if there is gcn and ztf data + if path_exist(gcn_path) and path_exist(scitmpdatapath): + # Start the GCN x ZTF cross-match stream + t_before = time.time() + logger.info("starting science2mm ...") + countquery_mm = science2mm( + args, config, gcndatapath, scitmpdatapath + ) + count += time.time() - t_before + break + else: + # wait for comming GCN + count += 1 + time.sleep(1) # If GCN arrived, wait for the remaining time since the launch of raw2science remaining_time = args.exit_after - count @@ -158,8 +163,11 @@ def main(): if countquery_mm is not None: countquery_mm.stop() else: - # Start the GCN x ZTF cross-match stream - countquery_mm = science2mm(args, config, gcndatapath, scitmpdatapath) + if config_path != "no-config": + config = get_config({"--config": config_path}) + gcndatapath = config["PATH"]["online_gcn_data_prefix"] + # Start the GCN x ZTF cross-match stream + countquery_mm = science2mm(args, config, gcndatapath, scitmpdatapath) # Wait for the end of queries spark.streams.awaitAnyTermination() diff --git a/fink_broker/mm2distribute.py b/fink_broker/mm2distribute.py index c9423351..8311c076 100644 --- a/fink_broker/mm2distribute.py +++ b/fink_broker/mm2distribute.py @@ -12,7 +12,7 @@ def mm2distribute(spark, config, args): password_writer = config["DISTRIBUTION"]["password_writer"] year, month, day = args.night[0:4], args.night[4:6], args.night[6:8] - basepath = os.path.join(mm_data_path, "online", f"year={year}/month={month}/day={day}") + basepath = os.path.join(mm_data_path, "online", "year={}/month={}/day={}".format(year, month, day)) checkpointpath_mm = os.path.join(mm_data_path, "mm_distribute_checkpoint") logger = get_fink_logger() @@ -35,7 +35,7 @@ def mm2distribute(spark, config, args): break except Exception: - logger.info(f"Exception occured: wait: {wait}", exc_info=1) + logger.info("Exception occured: wait: {}".format(wait), exc_info=1) time.sleep(wait) wait *= 1.2 if wait < 60 else 1 continue diff --git a/fink_broker/parser.py b/fink_broker/parser.py index de722d31..a9992327 100644 --- a/fink_broker/parser.py +++ b/fink_broker/parser.py @@ -231,7 +231,7 @@ def getargs(parser: argparse.ArgumentParser) -> argparse.Namespace: [FINK_FAT_OUTPUT] """) parser.add_argument( - '-mmconfigpath', type=str, default='', + '-mmconfigpath', type=str, default='no-config', help=""" Path to fink_mm configuration file [MMCONFIGPATH] diff --git a/fink_broker/science2mm.py b/fink_broker/science2mm.py index bd0e59c6..3e29f119 100644 --- a/fink_broker/science2mm.py +++ b/fink_broker/science2mm.py @@ -52,7 +52,7 @@ def science2mm( break except Exception: - logger.info(f"Exception occured: wait: {wait}", exc_info=1) + logger.info("Exception occured: wait: {}".format(wait), exc_info=1) time.sleep(wait) wait *= 1.2 if wait < 60 else 1 continue @@ -62,7 +62,7 @@ def science2mm( last_time = cur_time - timedelta(hours=7) # 17:00 Paris time yesterday end_time = cur_time + timedelta(hours=17) # 17:00 Paris time today gcn_dataframe = gcn_dataframe.filter( - f"triggerTimejd >= {last_time.jd} and triggerTimejd < {end_time.jd}" + "triggerTimejd >= {} and triggerTimejd < {}".format(last_time.jd, end_time.jd) ) df_multi_messenger, _ = ztf_join_gcn_stream( DataMode.STREAMING,