Skip to content

Commit

Permalink
Remove partitioning for online data
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPeloton committed Jan 18, 2024
1 parent 9fcd4bf commit 1bf32a0
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 84 deletions.
13 changes: 6 additions & 7 deletions bin/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ def main():
args = getargs(parser)

# Initialise Spark session
spark = init_sparksession(name="distribute_{}_{}".format(args.producer, args.night), shuffle_partitions=2)
spark = init_sparksession(
name="distribute_{}_{}".format(args.producer, args.night),
shuffle_partitions=2
)

# The level here should be controlled by an argument.
logger = get_fink_logger(spark.sparkContext.appName, args.log_level)
Expand All @@ -64,20 +67,16 @@ def main():

# data path
scitmpdatapath = args.online_data_prefix + '/science'
checkpointpath_kafka = args.online_data_prefix + '/kafka_checkpoint'
checkpointpath_kafka = args.online_data_prefix + '/kafka_checkpoint/{}'.format(args.night)

# Connect to the TMP science database
input_sci = scitmpdatapath + "/year={}/month={}/day={}".format(
args.night[0:4], args.night[4:6], args.night[6:8])
input_sci = scitmpdatapath + "/{}".format(args.night)
df = connect_to_raw_database(
input_sci,
input_sci,
latestfirst=False
)

# Drop partitioning columns
df = df.drop('year').drop('month').drop('day')

# Cast fields to ease the distribution
cnames = df.columns
cnames[cnames.index('timestamp')] = 'cast(timestamp as string) as timestamp'
Expand Down
12 changes: 3 additions & 9 deletions bin/merge_ztf_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,10 @@ def main():
# debug statements
inspect_application(logger)

year = args.night[:4]
month = args.night[4:6]
day = args.night[6:8]
print('Processing {}'.format(args.night))

print('Processing {}/{}/{}'.format(year, month, day))

input_raw = '{}/raw/year={}/month={}/day={}'.format(
args.online_data_prefix, year, month, day)
input_science = '{}/science/year={}/month={}/day={}'.format(
args.online_data_prefix, year, month, day)
input_raw = '{}/raw/{}'.format(args.online_data_prefix, args.night)
input_science = '{}/science/{}'.format(args.online_data_prefix, args.night)

# basepath
output_raw = '{}/raw'.format(args.agg_data_prefix)
Expand Down
105 changes: 56 additions & 49 deletions bin/raw2science.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# Copyright 2019-2022 AstroLab Software
# Copyright 2019-2024 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -49,7 +49,11 @@ def main():
tz = None

# Initialise Spark session
spark = init_sparksession(name="raw2science_{}_{}".format(args.producer, args.night), shuffle_partitions=2, tz=tz)
spark = init_sparksession(
name="raw2science_{}_{}".format(args.producer, args.night),
shuffle_partitions=2,
tz=tz
)

# Logger to print useful debug statements
logger = get_fink_logger(spark.sparkContext.appName, args.log_level)
Expand All @@ -60,7 +64,7 @@ def main():
# data path
rawdatapath = args.online_data_prefix + '/raw'
scitmpdatapath = args.online_data_prefix + '/science'
checkpointpath_sci_tmp = args.online_data_prefix + '/science_checkpoint'
checkpointpath_sci_tmp = args.online_data_prefix + '/science_checkpoint/{}'.format(args.night)

if args.producer == 'elasticc':
df = connect_to_raw_database(
Expand All @@ -69,19 +73,18 @@ def main():
else:
# assume YYYYMMHH
df = connect_to_raw_database(
rawdatapath + "/year={}/month={}/day={}".format(
args.night[0:4],
args.night[4:6],
args.night[6:8]
),
rawdatapath + "/year={}/month={}/day={}".format(
args.night[0:4],
args.night[4:6],
args.night[6:8]
),
rawdatapath + '/{}'.format(args.night),
rawdatapath + '/{}'.format(args.night),
latestfirst=False
)

# Add library versions
df = df.withColumn('fink_broker_version', F.lit(fbvsn))\
.withColumn('fink_science_version', F.lit(fsvsn))

# Switch publisher
df = df.withColumn('publisher', F.lit('Fink'))

# Apply science modules
if 'candidate' in df.columns:
# Apply quality cuts
Expand All @@ -91,47 +94,51 @@ def main():
.filter(df['candidate.rb'] >= 0.55)

df = apply_science_modules(df, args.noscience)
timecol = 'candidate.jd'
converter = lambda x: convert_to_datetime(x)
# timecol = 'candidate.jd'
# converter = lambda x: convert_to_datetime(x)

# Append new rows in the tmp science database
countquery = df\
.writeStream\
.outputMode("append") \
.format("parquet") \
.option("checkpointLocation", checkpointpath_sci_tmp) \
.option("path", scitmpdatapath)\
.trigger(processingTime='{} seconds'.format(args.tinterval)) \
.start()

elif 'diaSource' in df.columns:
df = apply_science_modules_elasticc(df)
timecol = 'diaSource.midPointTai'
converter = lambda x: convert_to_datetime(x, F.lit('mjd'))

# Add library versions
df = df.withColumn('fink_broker_version', F.lit(fbvsn))\
.withColumn('fink_science_version', F.lit(fsvsn))

# Switch publisher
df = df.withColumn('publisher', F.lit('Fink'))

# re-create partitioning columns if needed.
if 'timestamp' not in df.columns:
df = df\
.withColumn("timestamp", converter(df[timecol]))

if "year" not in df.columns:
df = df\
.withColumn("year", F.date_format("timestamp", "yyyy"))

if "month" not in df.columns:
df = df\
.withColumn("month", F.date_format("timestamp", "MM"))

if "day" not in df.columns:
df = df\
.withColumn("day", F.date_format("timestamp", "dd"))

# Append new rows in the tmp science database
countquery = df\
.writeStream\
.outputMode("append") \
.format("parquet") \
.option("checkpointLocation", checkpointpath_sci_tmp) \
.option("path", scitmpdatapath)\
.partitionBy("year", "month", "day") \
.trigger(processingTime='{} seconds'.format(args.tinterval)) \
.start()
# re-create partitioning columns if needed.
if 'timestamp' not in df.columns:
df = df\
.withColumn("timestamp", converter(df[timecol]))

if "year" not in df.columns:
df = df\
.withColumn("year", F.date_format("timestamp", "yyyy"))

if "month" not in df.columns:
df = df\
.withColumn("month", F.date_format("timestamp", "MM"))

if "day" not in df.columns:
df = df\
.withColumn("day", F.date_format("timestamp", "dd"))

# Append new rows in the tmp science database
countquery = df\
.writeStream\
.outputMode("append") \
.format("parquet") \
.option("checkpointLocation", checkpointpath_sci_tmp) \
.option("path", scitmpdatapath)\
.partitionBy("year", "month", "day") \
.trigger(processingTime='{} seconds'.format(args.tinterval)) \
.start()

# Keep the Streaming running until something or someone ends it!
if args.exit_after is not None:
Expand Down
50 changes: 31 additions & 19 deletions bin/stream2raw.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# Copyright 2019-2022 AstroLab Software
# Copyright 2019-2024 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -51,7 +51,11 @@ def main():
tz = None

# Initialise Spark session
spark = init_sparksession(name="stream2raw_{}_{}".format(args.producer, args.night), shuffle_partitions=2, tz=tz)
spark = init_sparksession(
name="stream2raw_{}_{}".format(args.producer, args.night),
shuffle_partitions=2,
tz=tz
)

# The level here should be controlled by an argument.
logger = get_fink_logger(spark.sparkContext.appName, args.log_level)
Expand All @@ -61,7 +65,7 @@ def main():

# data path
rawdatapath = args.online_data_prefix + '/raw'
checkpointpath_raw = args.online_data_prefix + '/raw_checkpoint'
checkpointpath_raw = args.online_data_prefix + '/raw_checkpoint/{}'.format(args.night)

# Create a streaming dataframe pointing to a Kafka stream
df = connect_to_kafka(
Expand Down Expand Up @@ -112,10 +116,18 @@ def main():
cnames[cnames.index('decoded')] = 'decoded.*'
df_decoded = df_decoded.selectExpr(cnames)

# Partition the data hourly
if 'candidate' in df_decoded.columns:
timecol = 'candidate.jd'
converter = lambda x: convert_to_datetime(x)
# timecol = 'candidate.jd'
# converter = lambda x: convert_to_datetime(x)

# write unpartitioned data
countquery_tmp = df_decoded\
.writeStream\
.outputMode("append") \
.format("parquet") \
.option("checkpointLocation", checkpointpath_raw) \
.option("path", rawdatapath + '/raw/{}'.format(args.night))

elif 'diaSource' in df_decoded.columns:
timecol = 'diaSource.midPointTai'
converter = lambda x: convert_to_datetime(x, F.lit('mjd'))
Expand All @@ -130,19 +142,19 @@ def main():
)
)

df_partitionedby = df_decoded\
.withColumn("timestamp", converter(df_decoded[timecol]))\
.withColumn("year", F.date_format("timestamp", "yyyy"))\
.withColumn("month", F.date_format("timestamp", "MM"))\
.withColumn("day", F.date_format("timestamp", "dd"))

countquery_tmp = df_partitionedby\
.writeStream\
.outputMode("append") \
.format("parquet") \
.option("checkpointLocation", checkpointpath_raw) \
.option("path", rawdatapath)\
.partitionBy("year", "month", "day")
df_partitionedby = df_decoded\
.withColumn("timestamp", converter(df_decoded[timecol]))\
.withColumn("year", F.date_format("timestamp", "yyyy"))\
.withColumn("month", F.date_format("timestamp", "MM"))\
.withColumn("day", F.date_format("timestamp", "dd"))

countquery_tmp = df_partitionedby\
.writeStream\
.outputMode("append") \
.format("parquet") \
.option("checkpointLocation", checkpointpath_raw) \
.option("path", rawdatapath)\
.partitionBy("year", "month", "day")

# Fixed interval micro-batches or ASAP
if args.tinterval > 0:
Expand Down

0 comments on commit 1bf32a0

Please sign in to comment.