Skip to content

Commit

Permalink
Add logging for distribute
Browse files Browse the repository at this point in the history
  • Loading branch information
fjammes committed Sep 4, 2024
1 parent db47782 commit 5e1379b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 77 deletions.
97 changes: 24 additions & 73 deletions bin/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
path_exist,
)
from fink_broker.distribution_utils import get_kafka_df

from fink_broker.logging_utils import init_logger
from fink_utils.spark.utils import concat_col

from fink_utils.spark.utils import apply_user_defined_filter


Expand All @@ -60,69 +59,13 @@
]


def launch_fink_mm(spark, args: dict):
"""Manage multimessenger operations
Parameters
----------
spark: SparkSession
Spark Session
args: dict
Arguments from Fink configuration file
Returns
-------
time_spent_in_wait: int
Time spent in waiting for GCN to come
before launching the streaming query.
stream_distrib_list: list of StreamingQuery
List of Spark Streaming queries
"""
if args.noscience:
_LOG.info("No science: fink-mm is not applied")
return 0, []
elif args.mmconfigpath != "no-config":
from fink_mm.init import get_config
from fink_broker.mm_utils import mm2distribute

_LOG.info("Fink-MM configuration file: {args.mmconfigpath}")
config = get_config({"--config": args.mmconfigpath})

# Wait for GCN comming
time_spent_in_wait = 0
stream_distrib_list = []
while time_spent_in_wait < args.exit_after:
mm_path_output = config["PATH"]["online_grb_data_prefix"]
mmtmpdatapath = os.path.join(mm_path_output, "online")

# if there is gcn and ztf data
if path_exist(mmtmpdatapath):
t_before = time.time()
_LOG.info("starting mm2distribute ...")
stream_distrib_list = mm2distribute(spark, config, args)
time_spent_in_wait += time.time() - t_before
break

time_spent_in_wait += 1
time.sleep(1.0)
if stream_distrib_list == []:
_LOG.warning(
f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job."
)
else:
_LOG.info("Time spent in waiting for Fink-MM: {time_spent_in_wait} seconds")
return time_spent_in_wait, stream_distrib_list

_LOG.warning("No configuration found for fink-mm -- not applied")
return 0, []


def main():
parser = argparse.ArgumentParser(description=__doc__)
args = getargs(parser)

# Initialise Spark session
logger = init_logger(args.log_level)

logger.debug("Initialise Spark session")
spark = init_sparksession(
name="distribute_{}_{}".format(args.producer, args.night),
shuffle_partitions=2,
Expand All @@ -135,10 +78,10 @@ def main():
args.night
)

# Connect to the TMP science database
logger.debug("Connect to the TMP science database")
df = connect_to_raw_database(scitmpdatapath, scitmpdatapath, latestfirst=False)

# Cast fields to ease the distribution
logger.debug("Cast fields to ease the distribution")
cnames = df.columns

if "brokerEndProcessTimestamp" in cnames:
Expand Down Expand Up @@ -173,7 +116,7 @@ def main():
"struct(lc_features_r.*) as lc_features_r"
)

# Retrieve time-series information
logger.debug("Retrieve time-series information")
to_expand = [
"jd",
"fid",
Expand All @@ -186,7 +129,7 @@ def main():
"diffmaglim",
]

# Append temp columns with historical + current measurements
logger.debug("Append temp columns with historical + current measurements")
prefix = "c"
for colname in to_expand:
df = concat_col(df, colname, prefix=prefix)
Expand All @@ -206,23 +149,25 @@ def main():
# The topic name is the filter name
topicname = args.substream_prefix + userfilter.split(".")[-1] + "_ztf"

# Apply user-defined filter

if args.noscience:
logger.debug("Do not apply user-defined filter in no-science mode")
df_tmp = df
else:
logger.debug("Apply user-defined filter")
df_tmp = apply_user_defined_filter(df, userfilter, _LOG)

# Wrap alert data
logger.debug("Wrap alert data")
df_tmp = df_tmp.selectExpr(cnames)

# get schema from the streaming dataframe to
# avoid non-nullable bug #852
schema = schema_converter.to_avro(df_tmp.schema)

# Get the DataFrame for publishing to Kafka (avro serialized)
logger.debug("Get the DataFrame for publishing to Kafka (avro serialized)")
df_kafka = get_kafka_df(df_tmp, key=schema, elasticc=False)

# Ensure that the topic(s) exist on the Kafka Server)
logger.debug("Ensure that the topic(s) exist on the Kafka Server")
disquery = (
df_kafka.writeStream.format("kafka")
.option("kafka.bootstrap.servers", broker_list)
Expand All @@ -239,20 +184,26 @@ def main():
.start()
)

time_spent_in_wait, stream_distrib_list = launch_fink_mm(spark, args)
if args.noscience:
logger.info("Do not perform multi-messenger operations")
time_spent_in_wait, stream_distrib_list = 0, None
else:
logger.debug("Perform multi-messenger operations")
from fink_broker.mm_utils import distribute_launch_fink_mm
time_spent_in_wait, stream_distrib_list = distribute_launch_fink_mm(spark, args)

# Keep the Streaming running until something or someone ends it!
if args.exit_after is not None:
logger.debug("Keep the Streaming running until something or someone ends it!")
remaining_time = args.exit_after - time_spent_in_wait
remaining_time = remaining_time if remaining_time > 0 else 0
time.sleep(remaining_time)
disquery.stop()
if stream_distrib_list != []:
for stream in stream_distrib_list:
stream.stop()
_LOG.info("Exiting the distribute service normally...")
logger.info("Exiting the distribute service normally...")
else:
# Wait for the end of queries
logger.debug("Wait for the end of queries")
spark.streams.awaitAnyTermination()


Expand Down
4 changes: 2 additions & 2 deletions bin/raw2science.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ def main():
time_spent_in_wait, countquery_mm = 0, None
else:
logger.debug("Perform multi-messenger operations")
from fink_broker.mm_utils import launch_fink_mm
time_spent_in_wait, countquery_mm = launch_fink_mm(args, scitmpdatapath)
from fink_broker.mm_utils import raw2science_launch_fink_mm
time_spent_in_wait, countquery_mm = raw2science_launch_fink_mm(args, scitmpdatapath)


if args.exit_after is not None:
Expand Down
58 changes: 57 additions & 1 deletion fink_broker/mm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import os
import time
from typing import Tuple, List

from fink_mm.ztf_join_gcn import ztf_join_gcn_stream, DataMode
from fink_mm.distribution.distribution import grb_distribution_stream
Expand All @@ -33,7 +34,62 @@

_LOG = logging.getLogger(__name__)

def launch_fink_mm(args: dict, scitmpdatapath: str):

def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]:
"""Manage multimessenger operations
Parameters
----------
spark: SparkSession
Spark Session
args: dict
Arguments from Fink configuration file
Returns
-------
time_spent_in_wait: int
Time spent in waiting for GCN to come
before launching the streaming query.
stream_distrib_list: list of StreamingQuery
List of Spark Streaming queries
"""
if args.mmconfigpath != "no-config":
from fink_mm.init import get_config
from fink_broker.mm_utils import mm2distribute

_LOG.info("Fink-MM configuration file: {args.mmconfigpath}")
config = get_config({"--config": args.mmconfigpath})

# Wait for GCN comming
time_spent_in_wait = 0
stream_distrib_list = []
while time_spent_in_wait < args.exit_after:
mm_path_output = config["PATH"]["online_grb_data_prefix"]
mmtmpdatapath = os.path.join(mm_path_output, "online")

# if there is gcn and ztf data
if path_exist(mmtmpdatapath):
t_before = time.time()
_LOG.info("starting mm2distribute ...")
stream_distrib_list = mm2distribute(spark, config, args)
time_spent_in_wait += time.time() - t_before
break

time_spent_in_wait += 1
time.sleep(1.0)
if stream_distrib_list == []:
_LOG.warning(
f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job."
)
else:
_LOG.info("Time spent in waiting for Fink-MM: {time_spent_in_wait} seconds")
return time_spent_in_wait, stream_distrib_list

_LOG.warning("No configuration found for fink-mm -- not applied")
return 0, []

def raw2science_launch_fink_mm(args: dict, scitmpdatapath: str) -> Tuple[int, StreamingQuery]:
"""Manage multimessenger operations
Parameters
Expand Down
7 changes: 6 additions & 1 deletion fink_broker/spark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
from typing import Tuple
from pyspark import SparkContext
from pyspark.sql import SparkSession
Expand All @@ -26,6 +27,10 @@
from fink_broker.avro_utils import readschemafromavrofile
from fink_broker.tester import spark_unit_tests

# ---------------------------------
# Local non-exported definitions --
# ---------------------------------
_LOG = logging.getLogger(__name__)

def from_avro(dfcol: Column, jsonformatschema: str) -> Column:
"""Decode the Avro data contained in a DataFrame column into a struct.
Expand Down Expand Up @@ -324,7 +329,7 @@ def connect_to_raw_database(basepath: str, path: str, latestfirst: bool) -> Data

wait_sec = 5
while not path_exist(basepath):
print("Waiting for data to arrive in {}".format(basepath))
_LOG.info("Waiting for stream2raw to upload data to %s", basepath)
time.sleep(wait_sec)
# Sleep for longer and longer
wait_sec = increase_wait_time(wait_sec)
Expand Down

0 comments on commit 5e1379b

Please sign in to comment.