Skip to content

Commit

Permalink
integrate fink-mm distribution to the broker
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Feb 23, 2024
1 parent 05944c9 commit 1d9eb07
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 14 deletions.
38 changes: 36 additions & 2 deletions bin/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
"""
import argparse
import time
import os

from fink_utils.spark import schema_converter
from fink_broker.parser import getargs
from fink_broker.sparkUtils import init_sparksession, connect_to_raw_database
from fink_broker.sparkUtils import init_sparksession, connect_to_raw_database, path_exist
from fink_broker.distributionUtils import get_kafka_df
from fink_broker.loggingUtils import get_fink_logger, inspect_application
from fink_broker.mm2distribute import mm2distribute

from fink_utils.spark.utils import concat_col

from fink_utils.spark.utils import apply_user_defined_filter

from fink_mm.init import get_config

# User-defined topics
userfilters = [
'fink_filters.filter_early_sn_candidates.filter.early_sn_candidates',
Expand Down Expand Up @@ -141,8 +145,38 @@ def main():

# Keep the Streaming running until something or someone ends it!
if args.exit_after is not None:
time.sleep(args.exit_after)

count = 0
config = get_config({"--config": args.mmconfigpath})

while count < args.exit_after:

mm_path_output = config["PATH"]["online_grb_data_prefix"]
mmtmpdatapath = os.path.join(mm_path_output, "online")

# if there is gcn and ztf data
if path_exist(mmtmpdatapath):

t_before = time.time()
time.sleep(45)
logger.info("starting mm2distribute ...")
stream_distrib_list = mm2distribute(
spark,
config,
args
)
count += time.time() - t_before
break

count += 1
time.sleep(1.0)

remaining_time = args.exit_after - count
remaining_time = remaining_time if remaining_time > 0 else 0
time.sleep(remaining_time)
disquery.stop()
for stream in stream_distrib_list:
stream.stop()
logger.info("Exiting the distribute service normally...")
else:
# Wait for the end of queries
Expand Down
1 change: 1 addition & 0 deletions bin/fink
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ elif [[ $service == "distribution" ]]; then
-distribution_schema ${DISTRIBUTION_SCHEMA} \
-substream_prefix ${SUBSTREAM_PREFIX} \
-tinterval ${FINK_TRIGGER_UPDATE} \
-mmconfigpath ${FINK_MM_CONFIG} \
-night ${NIGHT} \
-log_level ${LOG_LEVEL} ${EXIT_AFTER}
elif [[ $service == "merge" ]]; then
Expand Down
16 changes: 4 additions & 12 deletions bin/raw2science.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,16 @@ def main():
gcndatapath + f"/year={args.night[0:4]}/month={args.night[4:6]}/day={args.night[6:8]}"
)

all_parquet_files = glob.glob(os.path.join(scitmpdatapath, "*.parquet"))

logger.info(f"""
cond1: {path_exist(gcn_path)}
cond2: {len(all_parquet_files)}
cond3: {os.path.getsize(all_parquet_files[0]) if len(all_parquet_files) > 0 else ""}
""")

# if there is gcn and ztf data
if path_exist(gcn_path) and len(all_parquet_files) > 0 and os.path.getsize(all_parquet_files[0]) > 0:
if path_exist(gcn_path) and path_exist(scitmpdatapath):
# Start the GCN x ZTF cross-match stream
t_before = time.time()
time.sleep(30)
time.sleep(45)
logger.info("starting science2mm ...")
countquery_mm = science2mm(
args, config, gcndatapath, scitmpdatapath
)
count += (time.time() - t_before)
count += time.time() - t_before
break
else:
# wait for comming GCN
Expand All @@ -162,7 +154,7 @@ def main():

# If GCN arrived, wait for the remaining time since the launch of raw2science
remaining_time = args.exit_after - count
logger.info(f"time to the end: {remaining_time} sec")
remaining_time = remaining_time if remaining_time > 0 else 0
time.sleep(remaining_time)
countquery_science.stop()
if countquery_mm is not None:
Expand Down
40 changes: 40 additions & 0 deletions fink_broker/mm2distribute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os

from fink_mm.distribution.distribution import grb_distribution_stream


def mm2distribute(spark, config, args):

mm_data_path = config["PATH"]["online_grb_data_prefix"]
kafka_broker = config["DISTRIBUTION"]["kafka_broker"]
username_writer = config["DISTRIBUTION"]["username_writer"]
password_writer = config["DISTRIBUTION"]["password_writer"]

year, month, day = args.night[0:4], args.night[4:6], args.night[6:8]
basepath = os.path.join(mm_data_path, "online", f"year={year}/month={month}/day={day}")
checkpointpath_mm = os.path.join(mm_data_path, "mm_distribute_checkpoint")

# force the mangrove columns to have the struct type
static_df = spark.read.parquet(basepath)

path = basepath
df_grb_stream = (
spark.readStream.format("parquet")
.schema(static_df.schema)
.option("basePath", basepath)
.option("path", path)
.option("latestFirst", True)
.load()
)

stream_distribute_list = grb_distribution_stream(
df_grb_stream,
static_df,
checkpointpath_mm,
args.tinterval,
kafka_broker,
username_writer,
password_writer,
)

return stream_distribute_list

0 comments on commit 1d9eb07

Please sign in to comment.