diff --git a/.flake8 b/.flake8 index fa16103b8..c7291573a 100644 --- a/.flake8 +++ b/.flake8 @@ -10,4 +10,5 @@ exclude = build, dist per-file-ignores = - ../Fink_GRB/fink_grb/online/ztf_join_gcn.py:W503,E402 \ No newline at end of file + ../Fink_GRB/fink_grb/online/ztf_join_gcn.py:W503,E402 + ../Fink_GRB/fink_grb/offline/spark_offline.py:W503 \ No newline at end of file diff --git a/fink_grb/offline/spark_offline.py b/fink_grb/offline/spark_offline.py index e6d9e2c8d..cc6a40f04 100644 --- a/fink_grb/offline/spark_offline.py +++ b/fink_grb/offline/spark_offline.py @@ -1,5 +1,5 @@ import json -from fink_grb.online.ztf_join_gcn import ztf_grb_filter, grb_assoc +from fink_grb.online.ztf_join_gcn import grb_assoc from astropy.time import Time, TimeDelta from fink_utils.science.utils import ang2pix @@ -11,6 +11,7 @@ import sys import datetime + def ztf_grb_filter(spark_ztf): spark_filter = ( @@ -39,7 +40,7 @@ def ztf_grb_filter(spark_ztf): def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window): """ - + Parameters ---------- @@ -47,7 +48,9 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window): Number of day between now and now - time_window to join ztf alerts and gcn. time_window are in days. """ - path_to_catalog = "/home/julien.peloton/fink-broker/ipynb/hbase_catalogs/ztf_season1.class.json" + path_to_catalog = ( + "/home/julien.peloton/fink-broker/ipynb/hbase_catalogs/ztf_season1.class.json" + ) with open(path_to_catalog) as f: catalog = json.load(f) @@ -56,33 +59,39 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window): "science2grb_offline_{}{}{}".format(night[0:4], night[4:6], night[6:8]) ) - ztf_alert = spark.read.option("catalog", catalog)\ - .format("org.apache.hadoop.hbase.spark")\ - .option("hbase.spark.use.hbasecontext", False)\ - .option("hbase.spark.pushdown.columnfilter", True)\ + ztf_alert = ( + spark.read.option("catalog", catalog) + .format("org.apache.hadoop.hbase.spark") + .option("hbase.spark.use.hbasecontext", False) + .option("hbase.spark.pushdown.columnfilter", True) .load() + ) ztf_alert = ztf_alert.select( "objectId", "candid", - "ra", "dec", - "jd", "jdstarthist", + "ra", + "dec", + "jd", + "jdstarthist", "class_jd_objectId", "ssdistnr", "distpsnr1", - "neargaia" + "neargaia", ) now = Time.now().jd - low_bound = now - TimeDelta(time_window*24*3600, format='sec').jd + low_bound = now - TimeDelta(time_window * 24 * 3600, format="sec").jd - request_class = ["SN candidate","Ambiguous","Unknown", "Solar System candidate"] + request_class = ["SN candidate", "Ambiguous", "Unknown", "Solar System candidate"] ztf_class = spark.createDataFrame([], ztf_alert.schema) for _class in request_class: - ztf_class = ztf_class.union(ztf_alert\ - .filter(ztf_alert['class_jd_objectId'] >= '{}_{}'.format(_class, low_bound))\ - .filter(ztf_alert['class_jd_objectId'] < '{}_{}'.format(_class, now))) + ztf_class = ztf_class.union( + ztf_alert.filter( + ztf_alert["class_jd_objectId"] >= "{}_{}".format(_class, low_bound) + ).filter(ztf_alert["class_jd_objectId"] < "{}_{}".format(_class, now)) + ) ztf_class.cache().count() @@ -90,13 +99,13 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window): grb_alert = spark.read.format("parquet").load(gcn_read_path) - grb_alert = grb_alert\ - .filter(grb_alert.triggerTimejd >= low_bound)\ - .filter(grb_alert.triggerTimejd <= now) - + grb_alert = grb_alert.filter(grb_alert.triggerTimejd >= low_bound).filter( + grb_alert.triggerTimejd <= now + ) + grb_alert.cache().count() - NSIDE=4 + NSIDE = 4 ztf_class = ztf_class.withColumn( "hpix", @@ -107,13 +116,13 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window): "hpix", ang2pix(grb_alert.ra, grb_alert.dec, F.lit(NSIDE)) ) - ztf_class = ztf_class\ - .withColumnRenamed("ra", "ztf_ra")\ - .withColumnRenamed("dec", "ztf_dec") + ztf_class = ztf_class.withColumnRenamed("ra", "ztf_ra").withColumnRenamed( + "dec", "ztf_dec" + ) - grb_alert = grb_alert\ - .withColumnRenamed("ra", "grb_ra")\ - .withColumnRenamed("dec", "grb_dec") + grb_alert = grb_alert.withColumnRenamed("ra", "grb_ra").withColumnRenamed( + "dec", "grb_dec" + ) join_condition = [ ztf_class.hpix == grb_alert.hpix, @@ -165,7 +174,7 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window): else: config_path = None d = datetime.datetime.today() - night = "{}{}{}".format(d.strftime('%Y'), d.strftime('%m'), d.strftime('%d')) + night = "{}{}{}".format(d.strftime("%Y"), d.strftime("%m"), d.strftime("%d")) config = get_config({"--config": config_path}) @@ -174,4 +183,4 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window): grb_datapath_prefix = config["PATH"]["online_grb_data_prefix"] time_window = config["OFFLINE"]["time_window"] - spark_offline(gcn_datapath_prefix, grb_datapath_prefix, night, time_window) \ No newline at end of file + spark_offline(gcn_datapath_prefix, grb_datapath_prefix, night, time_window)