Skip to content

Commit

Permalink
review modification, fix the fink_mm offline test conf
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Feb 28, 2024
1 parent 2c2e6e7 commit b018caa
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 58 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ jobs:
cd $USRLIBS
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_HOME
fink_test -c conf/fink.conf.dev --unit-tests --db-integration --stream-integration
fink_test -c conf/fink.conf.dev --stream-integration --db-integration --mm-offline --unit-tests
- name: Run test suites [prod]
if: matrix.container == 'julienpeloton/fink-ci:prod'
run: |
cd $USRLIBS
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_HOME
fink_test -c conf/fink.conf.prod --unit-tests --db-integration --stream-integration
fink_test -c conf/fink.conf.prod --stream-integration --db-integration --mm-offline --unit-tests
curl -s https://codecov.io/bash | bash
- uses: act10ns/slack@v1
with:
Expand Down
52 changes: 32 additions & 20 deletions bin/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,41 +143,53 @@ def main():
.trigger(processingTime='{} seconds'.format(args.tinterval)) \
.start()

config_path = args.mmconfigpath
count = 0
stream_distrib_list = None

# Keep the Streaming running until something or someone ends it!
if args.exit_after is not None:

count = 0
config = get_config({"--config": args.mmconfigpath})
if config_path != "no-config":
config = get_config({"--config": config_path})

while count < args.exit_after:
while count < args.exit_after:

mm_path_output = config["PATH"]["online_grb_data_prefix"]
mmtmpdatapath = os.path.join(mm_path_output, "online")
mm_path_output = config["PATH"]["online_grb_data_prefix"]
mmtmpdatapath = os.path.join(mm_path_output, "online")

# if there is gcn and ztf data
if path_exist(mmtmpdatapath):
# if there is gcn and ztf data
if path_exist(mmtmpdatapath):

t_before = time.time()
logger.info("starting mm2distribute ...")
stream_distrib_list = mm2distribute(
spark,
config,
args
)
count += time.time() - t_before
break
t_before = time.time()
logger.info("starting mm2distribute ...")
stream_distrib_list = mm2distribute(
spark,
config,
args
)
count += time.time() - t_before
break

count += 1
time.sleep(1.0)
count += 1
time.sleep(1.0)

remaining_time = args.exit_after - count
remaining_time = remaining_time if remaining_time > 0 else 0
time.sleep(remaining_time)
disquery.stop()
for stream in stream_distrib_list:
stream.stop()
if stream_distrib_list is not None:
for stream in stream_distrib_list:
stream.stop()
logger.info("Exiting the distribute service normally...")
else:
if config_path != "no-config":
config = get_config({"--config": config_path})
stream_distrib_list = mm2distribute(
spark,
config,
args
)
# Wait for the end of queries
spark.streams.awaitAnyTermination()

Expand Down
13 changes: 10 additions & 3 deletions bin/fink_test
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ while [ "$#" -gt 0 ]; do
WITH_UNITS=true
shift 1
;;
--mm-offline)
WITH_MM_OFFLINE=true
shift 1
;;
-h)
echo -e $message_help
exit
Expand Down Expand Up @@ -112,9 +116,6 @@ if [[ "$WITH_DB" = true ]] ; then
# merge data
fink start merge -c $conf --night $NIGHT_STR

# fink-mm offline mode
fink_mm join_stream offline --night $NIGHT_INT --config $conf

# Update science portal
fink start science_archival -c $conf --night $NIGHT_INT
fink start index_archival -c $conf --night $NIGHT_INT --index_table jd_objectId
Expand All @@ -135,6 +136,12 @@ if [[ "$WITH_DB" = true ]] ; then
fink start check_science_portal -c $conf
fi

# MM offline test (need "fink start merge" before)
if [[ "$WITH_MM_OFFLINE" = true ]] ; then
# fink-mm offline mode
fink_mm join_stream offline --night $NIGHT_INT --config ${FINK_HOME}/conf/fink_mm.conf
fi

if [[ "$WITH_UNITS" = true ]] ; then
# Run the test suite on the modules assuming the integration
# tests have been run (to build the databases)
Expand Down
64 changes: 36 additions & 28 deletions bin/raw2science.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ def main():

# data path
rawdatapath = os.path.join(args.online_data_prefix, "raw")
scitmpdatapath = os.path.join(args.online_data_prefix, f"science/{args.night}")
scitmpdatapath = os.path.join(
args.online_data_prefix, "science/{}".format(args.night)
)
checkpointpath_sci_tmp = os.path.join(
args.online_data_prefix, f"science_checkpoint/{args.night}"
args.online_data_prefix, "science_checkpoint/{}".format(args.night)
)

if args.producer == "elasticc":
df = connect_to_raw_database(rawdatapath, rawdatapath, latestfirst=False)
else:
# assume YYYYMMHH
df = connect_to_raw_database(
os.path.join(rawdatapath, f"{args.night}"),
os.path.join(rawdatapath, f"{args.night}"),
os.path.join(rawdatapath, "{}".format(args.night)),
os.path.join(rawdatapath, "{}".format(args.night)),
latestfirst=False,
)

Expand Down Expand Up @@ -121,34 +123,37 @@ def main():
.start()
)

config_path = args.mmconfigpath
count = 0
config = get_config({"--config": args.mmconfigpath})
gcndatapath = config["PATH"]["online_gcn_data_prefix"]
countquery_mm = None

if args.exit_after is not None:
# Keep the Streaming running until something or someone ends it!

# Wait for GCN comming
while count < args.exit_after:
gcn_path = (
gcndatapath + f"/year={args.night[0:4]}/month={args.night[4:6]}/day={args.night[6:8]}"
)

# if there is gcn and ztf data
if path_exist(gcn_path) and path_exist(scitmpdatapath):
# Start the GCN x ZTF cross-match stream
t_before = time.time()
logger.info("starting science2mm ...")
countquery_mm = science2mm(
args, config, gcndatapath, scitmpdatapath
# Keep the Streaming running until something or someone ends it!
if config_path != "no-config":
config = get_config({"--config": config_path})
gcndatapath = config["PATH"]["online_gcn_data_prefix"]

# Wait for GCN comming
while count < args.exit_after:
gcn_path = gcndatapath + "/year={}/month={}/day={}".format(
args.night[0:4], args.night[4:6], args.night[6:8]
)
count += time.time() - t_before
break
else:
# wait for comming GCN
count += 1
time.sleep(1)

# if there is gcn and ztf data
if path_exist(gcn_path) and path_exist(scitmpdatapath):
# Start the GCN x ZTF cross-match stream
t_before = time.time()
logger.info("starting science2mm ...")
countquery_mm = science2mm(
args, config, gcndatapath, scitmpdatapath
)
count += time.time() - t_before
break
else:
# wait for comming GCN
count += 1
time.sleep(1)

# If GCN arrived, wait for the remaining time since the launch of raw2science
remaining_time = args.exit_after - count
Expand All @@ -158,8 +163,11 @@ def main():
if countquery_mm is not None:
countquery_mm.stop()
else:
# Start the GCN x ZTF cross-match stream
countquery_mm = science2mm(args, config, gcndatapath, scitmpdatapath)
if config_path != "no-config":
config = get_config({"--config": config_path})
gcndatapath = config["PATH"]["online_gcn_data_prefix"]
# Start the GCN x ZTF cross-match stream
countquery_mm = science2mm(args, config, gcndatapath, scitmpdatapath)
# Wait for the end of queries
spark.streams.awaitAnyTermination()

Expand Down
4 changes: 2 additions & 2 deletions fink_broker/mm2distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def mm2distribute(spark, config, args):
password_writer = config["DISTRIBUTION"]["password_writer"]

year, month, day = args.night[0:4], args.night[4:6], args.night[6:8]
basepath = os.path.join(mm_data_path, "online", f"year={year}/month={month}/day={day}")
basepath = os.path.join(mm_data_path, "online", "year={}/month={}/day={}".format(year, month, day))
checkpointpath_mm = os.path.join(mm_data_path, "mm_distribute_checkpoint")

logger = get_fink_logger()
Expand All @@ -35,7 +35,7 @@ def mm2distribute(spark, config, args):
break

except Exception:
logger.info(f"Exception occured: wait: {wait}", exc_info=1)
logger.info("Exception occured: wait: {}".format(wait), exc_info=1)
time.sleep(wait)
wait *= 1.2 if wait < 60 else 1
continue
Expand Down
2 changes: 1 addition & 1 deletion fink_broker/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def getargs(parser: argparse.ArgumentParser) -> argparse.Namespace:
[FINK_FAT_OUTPUT]
""")
parser.add_argument(
'-mmconfigpath', type=str, default='',
'-mmconfigpath', type=str, default='no-config',
help="""
Path to fink_mm configuration file
[MMCONFIGPATH]
Expand Down
4 changes: 2 additions & 2 deletions fink_broker/science2mm.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def science2mm(
break

except Exception:
logger.info(f"Exception occured: wait: {wait}", exc_info=1)
logger.info("Exception occured: wait: {}".format(wait), exc_info=1)
time.sleep(wait)
wait *= 1.2 if wait < 60 else 1
continue
Expand All @@ -62,7 +62,7 @@ def science2mm(
last_time = cur_time - timedelta(hours=7) # 17:00 Paris time yesterday
end_time = cur_time + timedelta(hours=17) # 17:00 Paris time today
gcn_dataframe = gcn_dataframe.filter(
f"triggerTimejd >= {last_time.jd} and triggerTimejd < {end_time.jd}"
"triggerTimejd >= {} and triggerTimejd < {}".format(last_time.jd, end_time.jd)
)
df_multi_messenger, _ = ztf_join_gcn_stream(
DataMode.STREAMING,
Expand Down

0 comments on commit b018caa

Please sign in to comment.