From 1bf32a0eeb83e6e40248ff9f795297440edbabe0 Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Thu, 18 Jan 2024 10:01:43 +0100 Subject: [PATCH] Remove partitioning for online data --- bin/distribute.py | 13 +++-- bin/merge_ztf_night.py | 12 ++--- bin/raw2science.py | 105 ++++++++++++++++++++++------------------- bin/stream2raw.py | 50 ++++++++++++-------- 4 files changed, 96 insertions(+), 84 deletions(-) diff --git a/bin/distribute.py b/bin/distribute.py index d8b10200..ff2fecd8 100644 --- a/bin/distribute.py +++ b/bin/distribute.py @@ -54,7 +54,10 @@ def main(): args = getargs(parser) # Initialise Spark session - spark = init_sparksession(name="distribute_{}_{}".format(args.producer, args.night), shuffle_partitions=2) + spark = init_sparksession( + name="distribute_{}_{}".format(args.producer, args.night), + shuffle_partitions=2 + ) # The level here should be controlled by an argument. logger = get_fink_logger(spark.sparkContext.appName, args.log_level) @@ -64,20 +67,16 @@ def main(): # data path scitmpdatapath = args.online_data_prefix + '/science' - checkpointpath_kafka = args.online_data_prefix + '/kafka_checkpoint' + checkpointpath_kafka = args.online_data_prefix + '/kafka_checkpoint/{}'.format(args.night) # Connect to the TMP science database - input_sci = scitmpdatapath + "/year={}/month={}/day={}".format( - args.night[0:4], args.night[4:6], args.night[6:8]) + input_sci = scitmpdatapath + "/{}".format(args.night) df = connect_to_raw_database( input_sci, input_sci, latestfirst=False ) - # Drop partitioning columns - df = df.drop('year').drop('month').drop('day') - # Cast fields to ease the distribution cnames = df.columns cnames[cnames.index('timestamp')] = 'cast(timestamp as string) as timestamp' diff --git a/bin/merge_ztf_night.py b/bin/merge_ztf_night.py index 54de0635..681d6e0b 100644 --- a/bin/merge_ztf_night.py +++ b/bin/merge_ztf_night.py @@ -38,16 +38,10 @@ def main(): # debug statements inspect_application(logger) - year = args.night[:4] - month = args.night[4:6] - day = args.night[6:8] + print('Processing {}'.format(args.night)) - print('Processing {}/{}/{}'.format(year, month, day)) - - input_raw = '{}/raw/year={}/month={}/day={}'.format( - args.online_data_prefix, year, month, day) - input_science = '{}/science/year={}/month={}/day={}'.format( - args.online_data_prefix, year, month, day) + input_raw = '{}/raw/{}'.format(args.online_data_prefix, args.night) + input_science = '{}/science/{}'.format(args.online_data_prefix, args.night) # basepath output_raw = '{}/raw'.format(args.agg_data_prefix) diff --git a/bin/raw2science.py b/bin/raw2science.py index 81320027..751f9f15 100644 --- a/bin/raw2science.py +++ b/bin/raw2science.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2019-2022 AstroLab Software +# Copyright 2019-2024 AstroLab Software # Author: Julien Peloton # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -49,7 +49,11 @@ def main(): tz = None # Initialise Spark session - spark = init_sparksession(name="raw2science_{}_{}".format(args.producer, args.night), shuffle_partitions=2, tz=tz) + spark = init_sparksession( + name="raw2science_{}_{}".format(args.producer, args.night), + shuffle_partitions=2, + tz=tz + ) # Logger to print useful debug statements logger = get_fink_logger(spark.sparkContext.appName, args.log_level) @@ -60,7 +64,7 @@ def main(): # data path rawdatapath = args.online_data_prefix + '/raw' scitmpdatapath = args.online_data_prefix + '/science' - checkpointpath_sci_tmp = args.online_data_prefix + '/science_checkpoint' + checkpointpath_sci_tmp = args.online_data_prefix + '/science_checkpoint/{}'.format(args.night) if args.producer == 'elasticc': df = connect_to_raw_database( @@ -69,19 +73,18 @@ def main(): else: # assume YYYYMMHH df = connect_to_raw_database( - rawdatapath + "/year={}/month={}/day={}".format( - args.night[0:4], - args.night[4:6], - args.night[6:8] - ), - rawdatapath + "/year={}/month={}/day={}".format( - args.night[0:4], - args.night[4:6], - args.night[6:8] - ), + rawdatapath + '/{}'.format(args.night), + rawdatapath + '/{}'.format(args.night), latestfirst=False ) + # Add library versions + df = df.withColumn('fink_broker_version', F.lit(fbvsn))\ + .withColumn('fink_science_version', F.lit(fsvsn)) + + # Switch publisher + df = df.withColumn('publisher', F.lit('Fink')) + # Apply science modules if 'candidate' in df.columns: # Apply quality cuts @@ -91,47 +94,51 @@ def main(): .filter(df['candidate.rb'] >= 0.55) df = apply_science_modules(df, args.noscience) - timecol = 'candidate.jd' - converter = lambda x: convert_to_datetime(x) + # timecol = 'candidate.jd' + # converter = lambda x: convert_to_datetime(x) + + # Append new rows in the tmp science database + countquery = df\ + .writeStream\ + .outputMode("append") \ + .format("parquet") \ + .option("checkpointLocation", checkpointpath_sci_tmp) \ + .option("path", scitmpdatapath)\ + .trigger(processingTime='{} seconds'.format(args.tinterval)) \ + .start() + elif 'diaSource' in df.columns: df = apply_science_modules_elasticc(df) timecol = 'diaSource.midPointTai' converter = lambda x: convert_to_datetime(x, F.lit('mjd')) - # Add library versions - df = df.withColumn('fink_broker_version', F.lit(fbvsn))\ - .withColumn('fink_science_version', F.lit(fsvsn)) - - # Switch publisher - df = df.withColumn('publisher', F.lit('Fink')) - - # re-create partitioning columns if needed. - if 'timestamp' not in df.columns: - df = df\ - .withColumn("timestamp", converter(df[timecol])) - - if "year" not in df.columns: - df = df\ - .withColumn("year", F.date_format("timestamp", "yyyy")) - - if "month" not in df.columns: - df = df\ - .withColumn("month", F.date_format("timestamp", "MM")) - - if "day" not in df.columns: - df = df\ - .withColumn("day", F.date_format("timestamp", "dd")) - - # Append new rows in the tmp science database - countquery = df\ - .writeStream\ - .outputMode("append") \ - .format("parquet") \ - .option("checkpointLocation", checkpointpath_sci_tmp) \ - .option("path", scitmpdatapath)\ - .partitionBy("year", "month", "day") \ - .trigger(processingTime='{} seconds'.format(args.tinterval)) \ - .start() + # re-create partitioning columns if needed. + if 'timestamp' not in df.columns: + df = df\ + .withColumn("timestamp", converter(df[timecol])) + + if "year" not in df.columns: + df = df\ + .withColumn("year", F.date_format("timestamp", "yyyy")) + + if "month" not in df.columns: + df = df\ + .withColumn("month", F.date_format("timestamp", "MM")) + + if "day" not in df.columns: + df = df\ + .withColumn("day", F.date_format("timestamp", "dd")) + + # Append new rows in the tmp science database + countquery = df\ + .writeStream\ + .outputMode("append") \ + .format("parquet") \ + .option("checkpointLocation", checkpointpath_sci_tmp) \ + .option("path", scitmpdatapath)\ + .partitionBy("year", "month", "day") \ + .trigger(processingTime='{} seconds'.format(args.tinterval)) \ + .start() # Keep the Streaming running until something or someone ends it! if args.exit_after is not None: diff --git a/bin/stream2raw.py b/bin/stream2raw.py index 395e5e81..8273faaf 100644 --- a/bin/stream2raw.py +++ b/bin/stream2raw.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2019-2022 AstroLab Software +# Copyright 2019-2024 AstroLab Software # Author: Julien Peloton # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -51,7 +51,11 @@ def main(): tz = None # Initialise Spark session - spark = init_sparksession(name="stream2raw_{}_{}".format(args.producer, args.night), shuffle_partitions=2, tz=tz) + spark = init_sparksession( + name="stream2raw_{}_{}".format(args.producer, args.night), + shuffle_partitions=2, + tz=tz + ) # The level here should be controlled by an argument. logger = get_fink_logger(spark.sparkContext.appName, args.log_level) @@ -61,7 +65,7 @@ def main(): # data path rawdatapath = args.online_data_prefix + '/raw' - checkpointpath_raw = args.online_data_prefix + '/raw_checkpoint' + checkpointpath_raw = args.online_data_prefix + '/raw_checkpoint/{}'.format(args.night) # Create a streaming dataframe pointing to a Kafka stream df = connect_to_kafka( @@ -112,10 +116,18 @@ def main(): cnames[cnames.index('decoded')] = 'decoded.*' df_decoded = df_decoded.selectExpr(cnames) - # Partition the data hourly if 'candidate' in df_decoded.columns: - timecol = 'candidate.jd' - converter = lambda x: convert_to_datetime(x) + # timecol = 'candidate.jd' + # converter = lambda x: convert_to_datetime(x) + + # write unpartitioned data + countquery_tmp = df_decoded\ + .writeStream\ + .outputMode("append") \ + .format("parquet") \ + .option("checkpointLocation", checkpointpath_raw) \ + .option("path", rawdatapath + '/raw/{}'.format(args.night)) + elif 'diaSource' in df_decoded.columns: timecol = 'diaSource.midPointTai' converter = lambda x: convert_to_datetime(x, F.lit('mjd')) @@ -130,19 +142,19 @@ def main(): ) ) - df_partitionedby = df_decoded\ - .withColumn("timestamp", converter(df_decoded[timecol]))\ - .withColumn("year", F.date_format("timestamp", "yyyy"))\ - .withColumn("month", F.date_format("timestamp", "MM"))\ - .withColumn("day", F.date_format("timestamp", "dd")) - - countquery_tmp = df_partitionedby\ - .writeStream\ - .outputMode("append") \ - .format("parquet") \ - .option("checkpointLocation", checkpointpath_raw) \ - .option("path", rawdatapath)\ - .partitionBy("year", "month", "day") + df_partitionedby = df_decoded\ + .withColumn("timestamp", converter(df_decoded[timecol]))\ + .withColumn("year", F.date_format("timestamp", "yyyy"))\ + .withColumn("month", F.date_format("timestamp", "MM"))\ + .withColumn("day", F.date_format("timestamp", "dd")) + + countquery_tmp = df_partitionedby\ + .writeStream\ + .outputMode("append") \ + .format("parquet") \ + .option("checkpointLocation", checkpointpath_raw) \ + .option("path", rawdatapath)\ + .partitionBy("year", "month", "day") # Fixed interval micro-batches or ASAP if args.tinterval > 0: