Skip to content

Commit

Permalink
data products addition done
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Nov 22, 2022
1 parent ea0c051 commit d0f556f
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 239 deletions.
3 changes: 2 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ exclude =
dist
per-file-ignores =
../Fink_GRB/fink_grb/online/ztf_join_gcn.py:W503,E402
../Fink_GRB/fink_grb/offline/spark_offline.py:W503,W605
../Fink_GRB/fink_grb/offline/spark_offline.py:W503,W605
../Fink_GRB/fink_grb/utils/fun_utils.py:F811
62 changes: 6 additions & 56 deletions fink_grb/offline/spark_offline.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import json
from fink_grb.online.ztf_join_gcn import grb_assoc
from astropy.time import Time, TimeDelta

from fink_utils.science.utils import ang2pix
from fink_utils.broker.sparkUtils import init_sparksession

from pyspark.sql import functions as F
from pyspark.sql.functions import col
import os
import sys
import subprocess

from fink_utils.spark.partitioning import convert_to_datetime

import fink_grb
from fink_grb.utils.fun_utils import return_verbose_level, build_spark_submit
from fink_grb.utils.fun_utils import (
return_verbose_level,
build_spark_submit,
join_post_process,
)
from fink_grb.init import get_config, init_logging


Expand Down Expand Up @@ -83,7 +85,6 @@ def spark_offline(hbase_catalog, gcn_read_path, grbxztf_write_path, night, time_
"""
Cross-match Fink and the GNC in order to find the optical alerts falling in the error box of a GCN.
Parameters
----------
hbase_catalog : string
Expand Down Expand Up @@ -177,37 +178,7 @@ def spark_offline(hbase_catalog, gcn_read_path, grbxztf_write_path, night, time_
]
join_ztf_grb = ztf_alert.join(grb_alert, join_condition, "inner")

df_grb = join_ztf_grb.withColumn(
"grb_proba",
grb_assoc(
join_ztf_grb.ztf_ra,
join_ztf_grb.ztf_dec,
join_ztf_grb.jdstarthist,
join_ztf_grb.platform,
join_ztf_grb.triggerTimeUTC,
join_ztf_grb.grb_ra,
join_ztf_grb.grb_dec,
join_ztf_grb.err_arcmin,
),
)

df_grb = df_grb.select(
[
"objectId",
"candid",
"ztf_ra",
"ztf_dec",
"jd",
"instrument_or_event",
"platform",
"triggerId",
"grb_ra",
"grb_dec",
col("err_arcmin").alias("grb_loc_error"),
"triggerTimeUTC",
"grb_proba",
]
).filter(df_grb.grb_proba != -1.0)
df_grb = join_post_process(join_ztf_grb)

timecol = "jd"
converter = lambda x: convert_to_datetime(x) # noqa: E731
Expand Down Expand Up @@ -397,24 +368,3 @@ def launch_offline_mode(arguments):
spark_offline(
hbase_catalog, gcn_datapath_prefix, grb_datapath_prefix, night, time_window
)

# if len(sys.argv) > 2:
# config_path = sys.argv[1]
# night = sys.argv[2]
# else:
# config_path = None
# d = datetime.datetime.today()
# night = "{}{}{}".format(d.strftime("%Y"), d.strftime("%m"), d.strftime("%d"))

# config = get_config({"--config": config_path})

# # ztf_datapath_prefix = config["PATH"]["online_ztf_data_prefix"]

# hbase_catalog = config["PATH"]["hbase_catalog"]
# gcn_datapath_prefix = config["PATH"]["online_gcn_data_prefix"]
# grb_datapath_prefix = config["PATH"]["online_grb_data_prefix"]
# time_window = int(config["OFFLINE"]["time_window"])

# spark_offline(
# hbase_catalog, gcn_datapath_prefix, grb_datapath_prefix, night, time_window
# )
178 changes: 4 additions & 174 deletions fink_grb/online/ztf_join_gcn.py
Original file line number Diff line number Diff line change
@@ -1,164 +1,25 @@
import warnings

from fink_grb.utils.fun_utils import return_verbose_level

warnings.filterwarnings("ignore")

import pandas as pd
import numpy as np
import pandas as pd # noqa: F401
import time
import os
import subprocess
import sys

import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.time import Time

from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import DoubleType

from fink_utils.science.utils import ang2pix
from fink_utils.spark.partitioning import convert_to_datetime
from fink_utils.broker.sparkUtils import init_sparksession, connect_to_raw_database

import fink_grb
from fink_grb.utils.grb_prob import p_ser_grb_vect
from fink_grb.utils.fun_utils import build_spark_submit
from fink_grb.utils.fun_utils import build_spark_submit, join_post_process
from fink_grb.init import get_config, init_logging


@pandas_udf(DoubleType())
def grb_assoc(
ztf_ra: pd.Series,
ztf_dec: pd.Series,
jdstarthist: pd.Series,
platform: pd.Series,
trigger_time: pd.Series,
grb_ra: pd.Series,
grb_dec: pd.Series,
grb_error: pd.Series,
) -> pd.Series:
"""
Find the ztf alerts falling in the error box of the notices and emits after the trigger time.
Then, Compute an association serendipitous probability for each of them and return it.
Parameters
----------
ztf_ra : double spark column
right ascension coordinates of the ztf alerts
ztf_dec : double spark column
declination coordinates of the ztf alerts
jdstarthist : double spark column
Earliest Julian date of epoch corresponding to ndethist [days]
ndethist : Number of spatially-coincident detections falling within 1.5 arcsec
going back to beginning of survey;
only detections that fell on the same field and readout-channel ID
where the input candidate was observed are counted.
All raw detections down to a photometric S/N of ~ 3 are included.
platform : string spark column
voevent emitting platform
trigger_time : double spark column
grb trigger time (UTC)
grb_ra : double spark column
grb right ascension
grb_dec : double spark column
grb declination
grb_error : double spark column
grb error radius (in arcminute)
Returns
grb_proba : pandas Series
the serendipitous probability for each ztf alerts.
Examples
--------
>>> sparkDF = spark.read.format('parquet').load(join_data)
>>> df_grb = sparkDF.withColumn(
... "grb_proba",
... grb_assoc(
... sparkDF.candidate.ra,
... sparkDF.candidate.dec,
... sparkDF.candidate.jdstarthist,
... sparkDF.platform,
... sparkDF.timeUTC,
... sparkDF.ra,
... sparkDF.dec,
... sparkDF.err
... ),
... )
>>> df_grb = df_grb.select([
... "objectId",
... "candid",
... col("candidate.ra").alias("ztf_ra"),
... col("candidate.dec").alias("ztf_dec"),
... "candidate.jd",
... "platform",
... "instrument",
... "trigger_id",
... col("ra").alias("grb_ra"),
... col("dec").alias("grb_dec"),
... col("err_arcmin").alias("grb_loc_error"),
... "timeUTC",
... "grb_proba"
... ])
>>> grb_prob = df_grb.toPandas()
>>> grb_test = pd.read_parquet("fink_grb/test/test_data/grb_prob_test.parquet")
>>> assert_frame_equal(grb_prob, grb_test)
"""
grb_proba = np.ones_like(ztf_ra.values, dtype=float) * -1.0
platform = platform.values

# array of events detection rates in events/years
# depending of the instruments
condition = [
np.equal(platform, "Fermi"),
np.equal(platform, "SWIFT"),
np.equal(platform, "INTEGRAL"),
np.equal(platform, "ICECUBE"),
]
choice_grb_rate = [250, 100, 60, 8]
grb_det_rate = np.select(condition, choice_grb_rate)

# array of error box
grb_error = grb_error.values

trigger_time = Time(
pd.to_datetime(trigger_time.values, utc=True), format="datetime"
).jd

# alerts emits after the grb
delay = jdstarthist - trigger_time
time_condition = delay > 0

ztf_coords = SkyCoord(ztf_ra, ztf_dec, unit=u.degree)
grb_coord = SkyCoord(grb_ra, grb_dec, unit=u.degree)

# alerts falling within the grb_error_box
spatial_condition = (
ztf_coords.separation(grb_coord).arcminute < 1.5 * grb_error
) # 63.5 * grb_error

# convert the delay in year
delay_year = delay[time_condition & spatial_condition] / 365.25

# compute serendipitous probability
p_ser = p_ser_grb_vect(
grb_error[time_condition & spatial_condition] / 60,
delay_year.values,
grb_det_rate[time_condition & spatial_condition],
)

grb_proba[time_condition & spatial_condition] = p_ser[0]

return pd.Series(grb_proba)


def ztf_grb_filter(spark_ztf):
"""
filter the ztf alerts by taking cross-match values from ztf.
Expand Down Expand Up @@ -320,39 +181,7 @@ def ztf_join_gcn_stream(
]
df_grb = df_ztf_stream.join(df_grb_stream, join_condition, "inner")

# refine the association and compute the serendipitous probability
df_grb = df_grb.withColumn(
"grb_proba",
grb_assoc(
df_grb.candidate.ra,
df_grb.candidate.dec,
df_grb.candidate.jdstarthist,
df_grb.platform,
df_grb.triggerTimeUTC,
df_grb.ra,
df_grb.dec,
df_grb.err_arcmin,
),
)

# select a subset of columns before the writing
df_grb = df_grb.select(
[
"objectId",
"candid",
col("candidate.ra").alias("ztf_ra"),
col("candidate.dec").alias("ztf_dec"),
"candidate.jd",
"instrument_or_event",
"platform",
"triggerId",
col("ra").alias("grb_ra"),
col("dec").alias("grb_dec"),
col("err_arcmin").alias("grb_loc_error"),
"triggerTimeUTC",
"grb_proba",
]
)
df_grb = join_post_process(df_grb)

# re-create partitioning columns if needed.
timecol = "jd"
Expand Down Expand Up @@ -420,6 +249,7 @@ def launch_joining_stream(arguments):
>>> datatest = pd.read_parquet("fink_grb/test/test_data/grb_join_output.parquet")
>>> datajoin = pd.read_parquet(grb_datatest + "/grb/year=2019")
>>> assert_frame_equal(datatest, datajoin, check_dtype=False, check_column_type=False, check_categorical=False)
>>> shutil.rmtree(grb_datatest + "/grb/_spark_metadata")
Expand Down
Binary file modified fink_grb/test/test_data/grb_join_output.parquet
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit d0f556f

Please sign in to comment.