From 561402b9535b4f8929fa1ce42664ff04da1ca805 Mon Sep 17 00:00:00 2001 From: FusRoman Date: Wed, 11 Jan 2023 13:20:55 +0000 Subject: [PATCH] first work to enable the distribution --- .flake8 | 3 +- .../conf/fink_grb_schema_version_1.0.avsc | 184 ++++++++++++++ fink_grb/distribution/distribution.py | 227 ++++++++++++++++++ fink_grb/fink_grb_cli.py | 10 +- requirements.txt | 3 +- 5 files changed, 424 insertions(+), 3 deletions(-) create mode 100644 fink_grb/conf/fink_grb_schema_version_1.0.avsc create mode 100644 fink_grb/distribution/distribution.py diff --git a/.flake8 b/.flake8 index dda77f505..9aa40a169 100644 --- a/.flake8 +++ b/.flake8 @@ -12,4 +12,5 @@ exclude = per-file-ignores = ../Fink_GRB/fink_grb/online/ztf_join_gcn.py:W503,E402 ../Fink_GRB/fink_grb/offline/spark_offline.py:W503,W605 - ../Fink_GRB/fink_grb/utils/fun_utils.py:F811 \ No newline at end of file + ../Fink_GRB/fink_grb/utils/fun_utils.py:F811 + ../Fink_GRB/fink_grb/distribution/distribution.py:W503 \ No newline at end of file diff --git a/fink_grb/conf/fink_grb_schema_version_1.0.avsc b/fink_grb/conf/fink_grb_schema_version_1.0.avsc new file mode 100644 index 000000000..fea5e03b8 --- /dev/null +++ b/fink_grb/conf/fink_grb_schema_version_1.0.avsc @@ -0,0 +1,184 @@ +{ + "type": "record", + "name": "topLevelRecord", + "fields": [ + { + "name": "objectId", + "type": [ + "string", + "null" + ] + }, + { + "name": "candid", + "type": [ + "long", + "null" + ] + }, + { + "name": "ztf_ra", + "type": [ + "double", + "null" + ] + }, + { + "name": "ztf_dec", + "type": [ + "double", + "null" + ] + }, + { + "name": "fid", + "type": [ + "long", + "null" + ] + }, + { + "name": "jdstarthist", + "type": [ + "double", + "null" + ] + }, + { + "name": "rb", + "type": [ + "double", + "null" + ] + }, + { + "name": "jd", + "type": [ + "double", + "null" + ] + }, + { + "name": "instrument_or_event", + "type": [ + "string", + "null" + ] + }, + { + "name": "platform", + "type": [ + "string", + "null" + ] + }, + { + "name": "triggerId", + "type": [ + "long", + "null" + ] + }, + { + "name": "grb_ra", + "type": [ + "double", + "null" + ] + }, + { + "name": "grb_dec", + "type": [ + "double", + "null" + ] + }, + { + "name": "grb_loc_error", + "type": [ + "double", + "null" + ] + }, + { + "name": "triggerTimeUTC", + "type": [ + "string", + "null" + ] + }, + { + "name": "grb_proba", + "type": [ + "double", + "null" + ] + }, + { + "name": "fink_class", + "type": [ + "string", + "null" + ] + }, + { + "name": "delta_mag", + "type": [ + "double", + "null" + ] + }, + { + "name": "rate", + "type": [ + "double", + "null" + ] + }, + { + "name": "from_upper", + "type": [ + "double", + "null" + ] + }, + { + "name": "start_vartime", + "type": [ + "double", + "null" + ] + }, + { + "name": "diff_vartime", + "type": [ + "double", + "null" + ] + }, + { + "name": "timestamp", + "type": [ + { + "type": "long", + "logicalType": "timestamp-micros" + }, + "null" + ] + }, + { + "name": "month", + "type": [ + "long", + "null" + ] + }, + { + "name": "day", + "type": [ + "long", + "null" + ] + } + ] +} \ No newline at end of file diff --git a/fink_grb/distribution/distribution.py b/fink_grb/distribution/distribution.py new file mode 100644 index 000000000..eacbd7724 --- /dev/null +++ b/fink_grb/distribution/distribution.py @@ -0,0 +1,227 @@ +import time +import avro +import os +import subprocess + +from fink_utils.broker.sparkUtils import init_sparksession, connect_to_raw_database +from fink_utils.broker.distributionUtils import write_to_kafka + +from fink_filters.filter_on_axis_grb.filter import ( + f_bronze_events, + f_silver_events, + f_gold_events, +) + +import fink_grb +# from fink_grb.utils.fun_utils import return_verbose_level +from fink_grb.init import get_config, init_logging +from fink_grb.utils.fun_utils import build_spark_submit + + +def grb_distribution(grbdatapath, night, tinterval, exit_after, kafka_broker_server): + """ + + Distribute the data return by the online mode over kafka. + + Parameters + ---------- + grbdatapath: string + path where are located the grb data produce by the online mode. + night: string + the processing night + example: "20191023" + tinterval: integer + processing interval time between each data batch + exit_after: int + the maximum active time in second of the streaming process + kafka_broker_server: string + address of the kafka cluster + + Return + ------ + None + + Examples + -------- + + """ + spark = init_sparksession( + "science2grb_offline_{}{}{}".format(night[0:4], night[4:6], night[6:8]) + ) + + logger = init_logging() + + schema_path = "fink_grb/conf/fink_grb_schema_version_1.0.avsc" + schema = avro.schema.parse(open(schema_path, "rb").read()) + + checkpointpath_grb = grbdatapath + "/grb_checkpoint" + + # connection to the grb database + df_grb_stream = connect_to_raw_database( + grbdatapath + + "/year={}/month={}/day={}".format(night[0:4], night[4:6], night[6:8]), + grbdatapath + + "/year={}/month={}/day={}".format(night[0:4], night[4:6], night[6:8]), + latestfirst=True, + ) + + df_grb_stream = df_grb_stream.drop("year").drop("month").drop("day") + + df_bronze = ( + df_grb_stream.withColumn( + "f_bronze", + f_bronze_events(df_grb_stream["fink_class"], df_grb_stream["rb"]), + ) + .filter("f_bronze == True") + .drop("f_bronze") + ) + + df_silver = ( + df_grb_stream.withColumn( + "f_bronze", + f_silver_events(df_grb_stream["fink_class"], df_grb_stream["rb"]), + ) + .filter("f_bronze == True") + .drop("f_bronze") + ) + + df_gold = ( + df_grb_stream.withColumn( + "f_bronze", f_gold_events(df_grb_stream["fink_class"], df_grb_stream["rb"]) + ) + .filter("f_bronze == True") + .drop("f_bronze") + ) + + for df_filter, topicname in [ + (df_bronze, "grb_bronze_samples"), + (df_silver, "grb_silver_samples"), + (df_gold, "grb_gold_samples"), + ]: + + grb_stream_distribute = write_to_kafka( + df_filter, + str(schema.to_json()), + kafka_broker_server, + "", + "", + topicname, + checkpointpath_grb, + tinterval, + ) + + # Keep the Streaming running until something or someone ends it! + if exit_after is not None: + time.sleep(int(exit_after)) + grb_stream_distribute.stop() + logger.info("Exiting the science2grb streaming subprocess normally...") + else: # pragma: no cover + # Wait for the end of queries + spark.streams.awaitAnyTermination() + + +def launch_distribution(arguments): + """ + + + Parameters + ---------- + + Returns + ------- + None + + Examples + -------- + + """ + config = get_config(arguments) + logger = init_logging() + + # verbose = return_verbose_level(config, logger) + + try: + master_manager = config["STREAM"]["manager"] + principal_group = config["STREAM"]["principal"] + secret = config["STREAM"]["secret"] + role = config["STREAM"]["role"] + executor_env = config["STREAM"]["exec_env"] + driver_mem = config["STREAM"]["driver_memory"] + exec_mem = config["STREAM"]["executor_memory"] + max_core = config["STREAM"]["max_core"] + exec_core = config["STREAM"]["executor_core"] + + grb_datapath_prefix = config["PATH"]["online_grb_data_prefix"] + tinterval = config["STREAM"]["tinterval"] + except Exception as e: # pragma: no cover + logger.error("Config entry not found \n\t {}".format(e)) + exit(1) + + try: + night = arguments["--night"] + except Exception as e: # pragma: no cover + logger.error("Command line arguments not found: {}\n{}".format("--night", e)) + exit(1) + + try: + exit_after = arguments["--exit_after"] + except Exception as e: # pragma: no cover + logger.error( + "Command line arguments not found: {}\n{}".format("--exit_after", e) + ) + exit(1) + + application = os.path.join( + os.path.dirname(fink_grb.__file__), + "online", + "ztf_join_gcn.py prod", + ) + + application += " " + grb_datapath_prefix + application += " " + night + application += " " + str(exit_after) + application += " " + tinterval + + spark_submit = "spark-submit \ + --master {} \ + --conf spark.mesos.principal={} \ + --conf spark.mesos.secret={} \ + --conf spark.mesos.role={} \ + --conf spark.executorEnv.HOME={} \ + --driver-memory {}G \ + --executor-memory {}G \ + --conf spark.cores.max={} \ + --conf spark.executor.cores={}".format( + master_manager, + principal_group, + secret, + role, + executor_env, + driver_mem, + exec_mem, + max_core, + exec_core, + ) + + spark_submit = build_spark_submit(spark_submit, application, "", "", "") + + process = subprocess.Popen( + spark_submit, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + shell=True, + ) + + stdout, stderr = process.communicate() + if process.returncode != 0: # pragma: no cover + logger.error( + "Fink_GRB joining stream spark application has ended with a non-zero returncode.\ + \n\t cause:\n\t\t{}\n\t\t{}".format( + stdout, stderr + ) + ) + exit(1) + + logger.info("Fink_GRB joining stream spark application ended normally") + return diff --git a/fink_grb/fink_grb_cli.py b/fink_grb/fink_grb_cli.py index 2f41932e0..2c89403b1 100644 --- a/fink_grb/fink_grb_cli.py +++ b/fink_grb/fink_grb_cli.py @@ -2,6 +2,7 @@ Usage: fink_grb gcn_stream (start|monitor) [options] fink_grb join_stream (offline|online) --night= [--exit_after=] [options] + fink_grb distribute --night= [--exit_after=] [options] fink_grb -h | --help fink_grb --version @@ -13,10 +14,11 @@ join_stream launch the script that join the ztf stream and the gcn stream offline launch the offline mode online launch the online mode + distribute launch the distribution -h --help Show help and quit. --version Show version. --config FILE Specify the config file. - --verbose Print information and progress bar during the process. + --verbose Print information during the process. """ from docopt import docopt @@ -55,5 +57,11 @@ def main(): launch_offline_mode(arguments) + elif arguments["distribute"]: + + from fink_grb.distribution.distribution import launch_distribution + + launch_distribution(arguments) + else: exit(0) diff --git a/requirements.txt b/requirements.txt index 678da270e..23a7d890e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ flake8 black psutil==5.9.1 fink-utils==0.8.0 -fink-filters==3.6 \ No newline at end of file +fink-filters==3.6 +avro==1.11.1 \ No newline at end of file