Skip to content

Commit

Permalink
pep8 requirements
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Sep 28, 2022
1 parent 87ba3e7 commit 47b2880
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 29 deletions.
3 changes: 2 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ exclude =
build,
dist
per-file-ignores =
../Fink_GRB/fink_grb/online/ztf_join_gcn.py:W503,E402
../Fink_GRB/fink_grb/online/ztf_join_gcn.py:W503,E402
../Fink_GRB/fink_grb/offline/spark_offline.py:W503
65 changes: 37 additions & 28 deletions fink_grb/offline/spark_offline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from fink_grb.online.ztf_join_gcn import ztf_grb_filter, grb_assoc
from fink_grb.online.ztf_join_gcn import grb_assoc
from astropy.time import Time, TimeDelta

from fink_utils.science.utils import ang2pix
Expand All @@ -11,6 +11,7 @@
import sys
import datetime


def ztf_grb_filter(spark_ztf):

spark_filter = (
Expand Down Expand Up @@ -39,15 +40,17 @@ def ztf_grb_filter(spark_ztf):

def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window):
"""
Parameters
----------
time_window : int
Number of day between now and now - time_window to join ztf alerts and gcn.
time_window are in days.
"""
path_to_catalog = "/home/julien.peloton/fink-broker/ipynb/hbase_catalogs/ztf_season1.class.json"
path_to_catalog = (
"/home/julien.peloton/fink-broker/ipynb/hbase_catalogs/ztf_season1.class.json"
)

with open(path_to_catalog) as f:
catalog = json.load(f)
Expand All @@ -56,47 +59,53 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window):
"science2grb_offline_{}{}{}".format(night[0:4], night[4:6], night[6:8])
)

ztf_alert = spark.read.option("catalog", catalog)\
.format("org.apache.hadoop.hbase.spark")\
.option("hbase.spark.use.hbasecontext", False)\
.option("hbase.spark.pushdown.columnfilter", True)\
ztf_alert = (
spark.read.option("catalog", catalog)
.format("org.apache.hadoop.hbase.spark")
.option("hbase.spark.use.hbasecontext", False)
.option("hbase.spark.pushdown.columnfilter", True)
.load()
)

ztf_alert = ztf_alert.select(
"objectId",
"candid",
"ra", "dec",
"jd", "jdstarthist",
"ra",
"dec",
"jd",
"jdstarthist",
"class_jd_objectId",
"ssdistnr",
"distpsnr1",
"neargaia"
"neargaia",
)

now = Time.now().jd
low_bound = now - TimeDelta(time_window*24*3600, format='sec').jd
low_bound = now - TimeDelta(time_window * 24 * 3600, format="sec").jd

request_class = ["SN candidate","Ambiguous","Unknown", "Solar System candidate"]
request_class = ["SN candidate", "Ambiguous", "Unknown", "Solar System candidate"]
ztf_class = spark.createDataFrame([], ztf_alert.schema)

for _class in request_class:
ztf_class = ztf_class.union(ztf_alert\
.filter(ztf_alert['class_jd_objectId'] >= '{}_{}'.format(_class, low_bound))\
.filter(ztf_alert['class_jd_objectId'] < '{}_{}'.format(_class, now)))
ztf_class = ztf_class.union(
ztf_alert.filter(
ztf_alert["class_jd_objectId"] >= "{}_{}".format(_class, low_bound)
).filter(ztf_alert["class_jd_objectId"] < "{}_{}".format(_class, now))
)

ztf_class.cache().count()

ztf_class = ztf_grb_filter(ztf_class)

grb_alert = spark.read.format("parquet").load(gcn_read_path)

grb_alert = grb_alert\
.filter(grb_alert.triggerTimejd >= low_bound)\
.filter(grb_alert.triggerTimejd <= now)
grb_alert = grb_alert.filter(grb_alert.triggerTimejd >= low_bound).filter(
grb_alert.triggerTimejd <= now
)

grb_alert.cache().count()

NSIDE=4
NSIDE = 4

ztf_class = ztf_class.withColumn(
"hpix",
Expand All @@ -107,13 +116,13 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window):
"hpix", ang2pix(grb_alert.ra, grb_alert.dec, F.lit(NSIDE))
)

ztf_class = ztf_class\
.withColumnRenamed("ra", "ztf_ra")\
.withColumnRenamed("dec", "ztf_dec")
ztf_class = ztf_class.withColumnRenamed("ra", "ztf_ra").withColumnRenamed(
"dec", "ztf_dec"
)

grb_alert = grb_alert\
.withColumnRenamed("ra", "grb_ra")\
.withColumnRenamed("dec", "grb_dec")
grb_alert = grb_alert.withColumnRenamed("ra", "grb_ra").withColumnRenamed(
"dec", "grb_dec"
)

join_condition = [
ztf_class.hpix == grb_alert.hpix,
Expand Down Expand Up @@ -165,7 +174,7 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window):
else:
config_path = None
d = datetime.datetime.today()
night = "{}{}{}".format(d.strftime('%Y'), d.strftime('%m'), d.strftime('%d'))
night = "{}{}{}".format(d.strftime("%Y"), d.strftime("%m"), d.strftime("%d"))

config = get_config({"--config": config_path})

Expand All @@ -174,4 +183,4 @@ def spark_offline(gcn_read_path, grbxztf_write_path, night, time_window):
grb_datapath_prefix = config["PATH"]["online_grb_data_prefix"]
time_window = config["OFFLINE"]["time_window"]

spark_offline(gcn_datapath_prefix, grb_datapath_prefix, night, time_window)
spark_offline(gcn_datapath_prefix, grb_datapath_prefix, night, time_window)

0 comments on commit 47b2880

Please sign in to comment.