Skip to content

Commit

Permalink
New filter for TNS match
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPeloton committed Nov 5, 2024
1 parent 50ccfcb commit 0088180
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 0 deletions.
Empty file.
203 changes: 203 additions & 0 deletions fink_filters/filter_tns_match/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# Copyright 2019-2024 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import BooleanType

from fink_utils.tg_bot.utils import get_curve
from fink_utils.tg_bot.utils import get_cutout
from fink_utils.tg_bot.utils import msg_handler_tg

from fink_filters.tester import spark_unit_tests

import pandas as pd
import os


def extract_url_from_class(tns: str) -> str:
""" Wikipedia link based on the TNS tag
Parameters
----------
tns: str
TNS tag
Returns
-------
out: str
Wikipedia URL
"""
if tns.startswith("SN Ia"):
return "https://en.wikipedia.org/wiki/Type_Ia_supernova"
elif tns.startswith("SN II"):
return "https://en.wikipedia.org/wiki/Type_II_supernova"
elif tns == "Impostor-SN":
return "https://en.wikipedia.org/wiki/Supernova_impostor"
elif tns.startswith("TDE"):
return "https://en.wikipedia.org/wiki/Tidal_disruption_event"
elif tns == "Varstar":
return "https://en.wikipedia.org/wiki/Variable_star"
elif tns.startswith("SN Ib"):
return "https://en.wikipedia.org/wiki/Type_Ib_and_Ic_supernovae"
elif tns.startswith("SN Ic"):
return "https://en.wikipedia.org/wiki/Type_Ib_and_Ic_supernovae"
elif tns == "Nova":
return "https://en.wikipedia.org/wiki/Nova"
elif tns == "Kilonova":
return "https://en.wikipedia.org/wiki/Kilonova"
elif tns == "LBV":
return "https://en.wikipedia.org/wiki/Luminous_blue_variable"
elif tns == "AGN":
return "https://en.wikipedia.org/wiki/Active_galactic_nucleus"
elif tns == "CV":
return "https://en.wikipedia.org/wiki/Cataclysmic_variable_star"
elif tns == "FRB":
return "https://en.wikipedia.org/wiki/Fast_radio_burst"
elif tns == "M dwarf":
return "https://en.wikipedia.org/wiki/Red_dwarf"
else:
return "https://en.wikipedia.org/wiki/Time-domain_astronomy"


def tns_match_(
tns,
jd,
jdstarthist,
) -> pd.Series:
"""Return alerts with a counterpart in TNS
Parameters
----------
tns: Pandas series
Column containing the TNS cross-match values
jd: Pandas series
Column containing observation Julian dates at start of exposure [days]
jdstarthist: Pandas series
Column containing earliest Julian dates corresponding to ndethist
Returns
----------
out: pandas.Series of bool
Return a Pandas DataFrame with the appropriate flag:
false for bad alert, and true for good alert.
Examples
----------
>>> pdf = pd.read_parquet('datatest/regular')
>>> fake_tns = ["" for i in range(len(pdf))]
>>> fake_tns[0] = "SN Ia"
>>> pdf["tns"] = fake_tns
>>> classification = tns_match_(
... pdf['tns'],
... pdf['candidate'].apply(lambda x: x['jd'])
... pdf['candidate'].apply(lambda x: x['jdstarthist']))
>>> print(len(pdf[classification]['objectId'].values))
1
"""
is_in_tns = tns != ""
is_young = jd.astype(float) - jdstarthist.astype(float) <= 30

return is_in_tns & is_young


@pandas_udf(BooleanType(), PandasUDFType.SCALAR)
def tns_match(
objectId,
jd,
jdstarthist,
tns,
) -> pd.Series:
"""Pandas UDF for tns_match_
Parameters
----------
objectId: Pandas series
Column with ZTF objectId
jd: Pandas series
Column containing observation Julian dates at start of exposure [days]
jdstarthist: Pandas series
Column containing earliest Julian dates corresponding to ndethist
tns: Pandas series
Column containing the TNS cross-match values
Returns
----------
out: pandas.Series of bool
Return a Pandas DataFrame with the appropriate flag:
false for bad alert, and true for good alert.
Examples
----------
>>> from fink_utils.spark.utils import apply_user_defined_filter
>>> import pyspark.sql.functions as F
>>> df = spark.read.format('parquet').load('datatest/regular')
# Add a fake column
>>> df = df.withColumn("tns", F.lit("SN Ia"))
>>> f = 'fink_filters.filter_tns_match.filter.tns_match'
>>> df = apply_user_defined_filter(df, f)
>>> print(df.count())
0
"""
series = tns_match_(tns, jd, jdstarthist)

pdf = pd.DataFrame(
{
"objectId": objectId,
"tns": tns,
"dt": jd - jdstarthist,
}
)

# Loop over matches
if ("FINK_TG_TOKEN" in os.environ) and os.environ["FINK_TG_TOKEN"] != "":
payloads = []
for _, alert in pdf[series.values].iterrows():
curve_png = get_curve(
objectId=alert["objectId"],
origin="API",
)

time.sleep(1)

cutout = get_cutout(ztf_id=alert["objectId"], kind="Science", origin="API")

time.sleep(2)

text = """
Appeared {:.0f} days ago!
*Object ID*: [{}](https://fink-portal.org/{})
*Class*: [{}]({})
""".format(
alert["dt"],
alert["objectId"],
alert["objectId"],
alert["tns"],
extract_url_from_class(alert["tns"])
)

payloads.append((text, curve_png, cutout))

if len(payloads) > 0:
msg_handler_tg(payloads, channel_id="@fink_tns", init_msg="")
return series


if __name__ == "__main__":
"""Execute the test suite"""

# Run the test suite
globs = globals()
spark_unit_tests(globs)

0 comments on commit 0088180

Please sign in to comment.