diff --git a/fink_filters/filter_tns_match/__init__.py b/fink_filters/filter_tns_match/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fink_filters/filter_tns_match/filter.py b/fink_filters/filter_tns_match/filter.py new file mode 100644 index 0000000..6076f6d --- /dev/null +++ b/fink_filters/filter_tns_match/filter.py @@ -0,0 +1,203 @@ +# Copyright 2019-2024 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.types import BooleanType + +from fink_utils.tg_bot.utils import get_curve +from fink_utils.tg_bot.utils import get_cutout +from fink_utils.tg_bot.utils import msg_handler_tg + +from fink_filters.tester import spark_unit_tests + +import pandas as pd +import os + + +def extract_url_from_class(tns: str) -> str: + """ Wikipedia link based on the TNS tag + + Parameters + ---------- + tns: str + TNS tag + + Returns + ------- + out: str + Wikipedia URL + """ + if tns.startswith("SN Ia"): + return "https://en.wikipedia.org/wiki/Type_Ia_supernova" + elif tns.startswith("SN II"): + return "https://en.wikipedia.org/wiki/Type_II_supernova" + elif tns == "Impostor-SN": + return "https://en.wikipedia.org/wiki/Supernova_impostor" + elif tns.startswith("TDE"): + return "https://en.wikipedia.org/wiki/Tidal_disruption_event" + elif tns == "Varstar": + return "https://en.wikipedia.org/wiki/Variable_star" + elif tns.startswith("SN Ib"): + return "https://en.wikipedia.org/wiki/Type_Ib_and_Ic_supernovae" + elif tns.startswith("SN Ic"): + return "https://en.wikipedia.org/wiki/Type_Ib_and_Ic_supernovae" + elif tns == "Nova": + return "https://en.wikipedia.org/wiki/Nova" + elif tns == "Kilonova": + return "https://en.wikipedia.org/wiki/Kilonova" + elif tns == "LBV": + return "https://en.wikipedia.org/wiki/Luminous_blue_variable" + elif tns == "AGN": + return "https://en.wikipedia.org/wiki/Active_galactic_nucleus" + elif tns == "CV": + return "https://en.wikipedia.org/wiki/Cataclysmic_variable_star" + elif tns == "FRB": + return "https://en.wikipedia.org/wiki/Fast_radio_burst" + elif tns == "M dwarf": + return "https://en.wikipedia.org/wiki/Red_dwarf" + else: + return "https://en.wikipedia.org/wiki/Time-domain_astronomy" + + +def tns_match_( + tns, + jd, + jdstarthist, +) -> pd.Series: + """Return alerts with a counterpart in TNS + + Parameters + ---------- + tns: Pandas series + Column containing the TNS cross-match values + jd: Pandas series + Column containing observation Julian dates at start of exposure [days] + jdstarthist: Pandas series + Column containing earliest Julian dates corresponding to ndethist + + Returns + ---------- + out: pandas.Series of bool + Return a Pandas DataFrame with the appropriate flag: + false for bad alert, and true for good alert. + + Examples + ---------- + >>> pdf = pd.read_parquet('datatest/regular') + >>> fake_tns = ["" for i in range(len(pdf))] + >>> fake_tns[0] = "SN Ia" + >>> pdf["tns"] = fake_tns + >>> classification = tns_match_( + ... pdf['tns'], + ... pdf['candidate'].apply(lambda x: x['jd']) + ... pdf['candidate'].apply(lambda x: x['jdstarthist'])) + >>> print(len(pdf[classification]['objectId'].values)) + 1 + """ + is_in_tns = tns != "" + is_young = jd.astype(float) - jdstarthist.astype(float) <= 30 + + return is_in_tns & is_young + + +@pandas_udf(BooleanType(), PandasUDFType.SCALAR) +def tns_match( + objectId, + jd, + jdstarthist, + tns, +) -> pd.Series: + """Pandas UDF for tns_match_ + + Parameters + ---------- + objectId: Pandas series + Column with ZTF objectId + jd: Pandas series + Column containing observation Julian dates at start of exposure [days] + jdstarthist: Pandas series + Column containing earliest Julian dates corresponding to ndethist + tns: Pandas series + Column containing the TNS cross-match values + + Returns + ---------- + out: pandas.Series of bool + Return a Pandas DataFrame with the appropriate flag: + false for bad alert, and true for good alert. + + Examples + ---------- + >>> from fink_utils.spark.utils import apply_user_defined_filter + >>> import pyspark.sql.functions as F + >>> df = spark.read.format('parquet').load('datatest/regular') + + # Add a fake column + >>> df = df.withColumn("tns", F.lit("SN Ia")) + + >>> f = 'fink_filters.filter_tns_match.filter.tns_match' + >>> df = apply_user_defined_filter(df, f) + >>> print(df.count()) + 0 + """ + series = tns_match_(tns, jd, jdstarthist) + + pdf = pd.DataFrame( + { + "objectId": objectId, + "tns": tns, + "dt": jd - jdstarthist, + } + ) + + # Loop over matches + if ("FINK_TG_TOKEN" in os.environ) and os.environ["FINK_TG_TOKEN"] != "": + payloads = [] + for _, alert in pdf[series.values].iterrows(): + curve_png = get_curve( + objectId=alert["objectId"], + origin="API", + ) + + time.sleep(1) + + cutout = get_cutout(ztf_id=alert["objectId"], kind="Science", origin="API") + + time.sleep(2) + + text = """ +Appeared {:.0f} days ago! +*Object ID*: [{}](https://fink-portal.org/{}) +*Class*: [{}]({}) + """.format( + alert["dt"], + alert["objectId"], + alert["objectId"], + alert["tns"], + extract_url_from_class(alert["tns"]) + ) + + payloads.append((text, curve_png, cutout)) + + if len(payloads) > 0: + msg_handler_tg(payloads, channel_id="@fink_tns", init_msg="") + return series + + +if __name__ == "__main__": + """Execute the test suite""" + + # Run the test suite + globs = globals() + spark_unit_tests(globs)