Skip to content

Commit

Permalink
add a grb offline mode using spark and hbase connection
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Sep 28, 2022
1 parent 5447f1c commit 87ba3e7
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 12 deletions.
36 changes: 24 additions & 12 deletions fink_grb/conf/fink_grb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,43 @@ id=
secret=

[PATH]
# Prefix path on disk where are save GCN live data.
online_gcn_data_prefix=fink_grb/test/test_data/gcn_test
# Prefix path on disk where are save GCN live data. used by the joining stream
online_gcn_data_prefix=hdfs://134.158.75.222:8020///user/roman.le-montagner/gcn_storage

# same path as online_gcn_data_prefix without the URI part. used by the gcn_stream monitor
hdfs_gcn_storage=/user/roman.le-montagner/gcn_storage/raw

# Prefix path on disk where are save ZTF live data.
# They can be in local FS (/path/ or files:///path/) or
# in distributed FS (e.g. hdfs:///path/).
# Be careful though to have enough disk space!
online_ztf_data_prefix=fink_grb/test/test_data/ztf_test/online
online_ztf_data_prefix=hdfs://134.158.75.222:8020///user/julien.peloton/online

# Prefix path on disk to save GRB join ZTF live data.
online_grb_data_prefix=fink_grb/test/test_output
online_grb_data_prefix=ztfxgcn_storage


[HDFS]
host=134.158.75.222
port=8020
user=roman.le-montagner


[STREAM]
tinterval=5
manager=local[2]
tinterval=30
manager=mesos://vm-75063.lal.in2p3.fr:5050
principal=lsst
secret=secret
role=lsst
exec_env='/home'
exec_env=/home/roman.le-montagner
driver_memory=4
executor_memory=16
max_core=64
executor_memory=8
max_core=16
executor_core=8
# must be path to .py, .zip or .egg file, multiple files must be separated by ',' without space
external_python_libs=
external_python_libs=Fink_GRB/dist/fink_grb-0.1.0-py3.7.egg,Fink_GRB/fink_utils-0.8.0-py3.7.egg

[ADMIN]
verbose=False # or True, case sensitive
verbose=True

[OFFLINE]
time_window=7
177 changes: 177 additions & 0 deletions fink_grb/offline/spark_offline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import json
from fink_grb.online.ztf_join_gcn import ztf_grb_filter, grb_assoc
from astropy.time import Time, TimeDelta

from fink_utils.science.utils import ang2pix
from fink_utils.broker.sparkUtils import init_sparksession

from pyspark.sql import functions as F
from pyspark.sql.functions import col
from fink_grb.init import get_config
import sys
import datetime

def ztf_grb_filter(spark_ztf):

spark_filter = (
spark_ztf.filter(
(spark_ztf.ssdistnr > 5)
| (
spark_ztf.ssdistnr == -999.0
) # distance to nearest known SSO above 30 arcsecond
)
.filter(
(spark_ztf.distpsnr1 > 2)
| (
spark_ztf.ssdistnr == -999.0
) # distance of closest source from Pan-Starrs 1 catalog above 30 arcsecond
)
.filter(
(spark_ztf.neargaia > 5)
| (
spark_ztf.ssdistnr == -999.0
) # distance of closest source from Gaia DR1 catalog above 60 arcsecond
)
)

return spark_filter


def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window):
"""
Parameters
----------
time_window : int
Number of day between now and now - time_window to join ztf alerts and gcn.
time_window are in days.
"""
path_to_catalog = "/home/julien.peloton/fink-broker/ipynb/hbase_catalogs/ztf_season1.class.json"

with open(path_to_catalog) as f:
catalog = json.load(f)

spark = init_sparksession(
"science2grb_offline_{}{}{}".format(night[0:4], night[4:6], night[6:8])
)

ztf_alert = spark.read.option("catalog", catalog)\
.format("org.apache.hadoop.hbase.spark")\
.option("hbase.spark.use.hbasecontext", False)\
.option("hbase.spark.pushdown.columnfilter", True)\
.load()

ztf_alert = ztf_alert.select(
"objectId",
"candid",
"ra", "dec",
"jd", "jdstarthist",
"class_jd_objectId",
"ssdistnr",
"distpsnr1",
"neargaia"
)

now = Time.now().jd
low_bound = now - TimeDelta(time_window*24*3600, format='sec').jd

request_class = ["SN candidate","Ambiguous","Unknown", "Solar System candidate"]
ztf_class = spark.createDataFrame([], ztf_alert.schema)

for _class in request_class:
ztf_class = ztf_class.union(ztf_alert\
.filter(ztf_alert['class_jd_objectId'] >= '{}_{}'.format(_class, low_bound))\
.filter(ztf_alert['class_jd_objectId'] < '{}_{}'.format(_class, now)))

ztf_class.cache().count()

ztf_class = ztf_grb_filter(ztf_class)

grb_alert = spark.read.format("parquet").load(gcn_read_path)

grb_alert = grb_alert\
.filter(grb_alert.triggerTimejd >= low_bound)\
.filter(grb_alert.triggerTimejd <= now)

grb_alert.cache().count()

NSIDE=4

ztf_class = ztf_class.withColumn(
"hpix",
ang2pix(ztf_class.ra, ztf_class.dec, F.lit(NSIDE)),
)

grb_alert = grb_alert.withColumn(
"hpix", ang2pix(grb_alert.ra, grb_alert.dec, F.lit(NSIDE))
)

ztf_class = ztf_class\
.withColumnRenamed("ra", "ztf_ra")\
.withColumnRenamed("dec", "ztf_dec")

grb_alert = grb_alert\
.withColumnRenamed("ra", "grb_ra")\
.withColumnRenamed("dec", "grb_dec")

join_condition = [
ztf_class.hpix == grb_alert.hpix,
ztf_class.jdstarthist > grb_alert.triggerTimejd,
]
join_ztf_grb = ztf_class.join(grb_alert, join_condition, "inner")

df_grb = join_ztf_grb.withColumn(
"grb_proba",
grb_assoc(
join_ztf_grb.ztf_ra,
join_ztf_grb.ztf_dec,
join_ztf_grb.jdstarthist,
join_ztf_grb.platform,
join_ztf_grb.triggerTimeUTC,
join_ztf_grb.grb_ra,
join_ztf_grb.grb_dec,
join_ztf_grb.err,
join_ztf_grb.units,
),
)

df_grb = df_grb.select(
[
"objectId",
"candid",
"ztf_ra",
"ztf_dec",
"jd",
"instrument_or_event",
"platform",
"triggerId",
"grb_ra",
"grb_dec",
col("err").alias("grb_loc_error"),
"triggerTimeUTC",
"grb_proba",
]
).filter(df_grb.grb_proba != -1.0)

df_grb.write.parquet(grbxztf_write_path)


if __name__ == "__main__":

if len(sys.argv) > 2:
config_path = sys.argv[1]
night = sys.argv[2]
else:
config_path = None
d = datetime.datetime.today()
night = "{}{}{}".format(d.strftime('%Y'), d.strftime('%m'), d.strftime('%d'))

config = get_config({"--config": config_path})

# ztf_datapath_prefix = config["PATH"]["online_ztf_data_prefix"]
gcn_datapath_prefix = config["PATH"]["online_gcn_data_prefix"]
grb_datapath_prefix = config["PATH"]["online_grb_data_prefix"]
time_window = config["OFFLINE"]["time_window"]

spark_offline(gcn_datapath_prefix, grb_datapath_prefix, night, time_window)
1 change: 1 addition & 0 deletions scheduler/science2grb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ZTF_ONLINE="/user/julien.peloton/online"
GCN_ONLINE="/user/roman.le-montagner/gcn_storage"

HDFS_HOME="/opt/hadoop-2/bin/"
JAVA_HOME="/etc/alternatives/java_sdk_openjdk"

while true; do
$(${HDFS_HOME}hdfs dfs -test -d ${ZTF_ONLINE}/science/year=${YEAR}/month=${MONTH}/day=${DAY})
Expand Down
26 changes: 26 additions & 0 deletions script/launch_offline_grb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash

# Other dependencies (incl. Scala part of Fink)
FINK_JARS=${FINK_HOME}/libs/fink-broker_2.11-1.2.jar,\
${FINK_HOME}/libs/hbase-spark-hbase2.2_spark3_scala2.11_hadoop2.7.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.2_spark3_scala2.11_hadoop2.7.jar

FINK_PACKAGES=org.apache.hbase:hbase-shaded-mapreduce:2.2.7

CONFIG=${FINK_GRB_HOME}/fink_grb/conf/fink_grb.conf

NIGHT=`date +"%Y%m%d" -d "now"`
YEAR=${NIGHT:0:4}
MONTH=${NIGHT:4:2}
DAY=${NIGHT:6:2}

spark-submit \
--master mesos://vm-75063.lal.in2p3.fr:5050 \
--conf spark.mesos.principal=lsst \
--conf spark.mesos.secret=secret \
--conf spark.mesos.role=lsst \
--conf spark.executorEnv.HOME='/home/roman.le-montagner'\
--driver-memory 4G --executor-memory 8G --conf spark.cores.max=16 --conf spark.executor.cores=8 \
--jars $FINK_JARS --packages $FINK_PACKAGES \
--py-files /home/roman.le-montagner/Doctorat/GRB/Fink_GRB_test/Fink_GRB/dist/fink_grb-0.1.0-py3.7.egg \
${FINK_GRB_HOME}/fink_grb/offline/spark_offline.py ${CONFIG} ${NIGHT}

0 comments on commit 87ba3e7

Please sign in to comment.