Skip to content

Commit

Permalink
Add timings in the alert packets
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPeloton committed Jan 18, 2024
1 parent 1bf32a0 commit ee5ba52
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
11 changes: 5 additions & 6 deletions bin/distribute.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# Copyright 2019-2023 AstroLab Software
# Copyright 2019-2024 AstroLab Software
# Author: Abhishek Chauhan, Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -66,14 +66,13 @@ def main():
inspect_application(logger)

# data path
scitmpdatapath = args.online_data_prefix + '/science'
scitmpdatapath = args.online_data_prefix + '/science/{}'.format(args.night)
checkpointpath_kafka = args.online_data_prefix + '/kafka_checkpoint/{}'.format(args.night)

# Connect to the TMP science database
input_sci = scitmpdatapath + "/{}".format(args.night)
df = connect_to_raw_database(
input_sci,
input_sci,
scitmpdatapath,
scitmpdatapath,
latestfirst=False
)

Expand All @@ -91,7 +90,7 @@ def main():
cnames[cnames.index('lc_features_r')] = 'struct(lc_features_r.*) as lc_features_r'

# Extract schema
df_schema = spark.read.format('parquet').load(input_sci)
df_schema = spark.read.format('parquet').load(scitmpdatapath)
df_schema = df_schema.selectExpr(cnames)

schema = schema_converter.to_avro(df_schema.coalesce(1).limit(1).schema)
Expand Down
22 changes: 20 additions & 2 deletions bin/raw2science.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ def main():
latestfirst=False
)

# Add ingestion timestamp
df = df.withColumn(
'brokerStartProcessTimestamp',
convert_to_millitime(
df['candidate.jd'],
F.lit('jd'),
F.lit(True)
)
)

# Add library versions
df = df.withColumn('fink_broker_version', F.lit(fbvsn))\
.withColumn('fink_science_version', F.lit(fsvsn))
Expand All @@ -94,8 +104,16 @@ def main():
.filter(df['candidate.rb'] >= 0.55)

df = apply_science_modules(df, args.noscience)
# timecol = 'candidate.jd'
# converter = lambda x: convert_to_datetime(x)

# Add ingestion timestamp
df = df.withColumn(
'brokerEndProcessTimestamp',
convert_to_millitime(
df['candidate.jd'],
F.lit('jd'),
F.lit(True)
)
)

# Append new rows in the tmp science database
countquery = df\
Expand Down
11 changes: 9 additions & 2 deletions bin/stream2raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,15 @@ def main():
df_decoded = df_decoded.selectExpr(cnames)

if 'candidate' in df_decoded.columns:
# timecol = 'candidate.jd'
# converter = lambda x: convert_to_datetime(x)
# Add ingestion timestamp
df_decoded = df_decoded.withColumn(
'brokerIngestTimestamp',
convert_to_millitime(
df_decoded['candidate.jd'],
F.lit('jd'),
F.lit(True)
)
)

# write unpartitioned data
countquery_tmp = df_decoded\
Expand Down

0 comments on commit ee5ba52

Please sign in to comment.