diff --git a/bin/fink b/bin/fink index 85d01580..d24d95af 100755 --- a/bin/fink +++ b/bin/fink @@ -505,6 +505,7 @@ elif [[ $service == "hostless" ]]; then ${FINK_HOME}/bin/hostless_detection.py ${HELP_ON_SERVICE} \ -agg_data_prefix ${AGG_DATA_PREFIX} \ -night ${NIGHT} \ + -hostless_folder "hostless_ids" \ -log_level ${LOG_LEVEL} ${EXIT_AFTER} elif [[ $service == "dwarf_agn" ]]; then spark-submit --master ${SPARK_MASTER} \ diff --git a/bin/generate_ssoft.py b/bin/generate_ssoft.py index a7e3de3e..182dc347 100644 --- a/bin/generate_ssoft.py +++ b/bin/generate_ssoft.py @@ -21,7 +21,7 @@ from fink_broker.logging_utils import get_fink_logger, inspect_application from fink_broker.spark_utils import init_sparksession -from fink_spins.ssoft import build_the_ssoft +from fink_science.ssoft.processor import build_the_ssoft def main(): @@ -81,7 +81,7 @@ def main(): # Initialise Spark session spark = init_sparksession( - name="ssoft_{}_{}".format(args.model, version), shuffle_partitions=20 + name="ssoft_{}_{}".format(args.model, version), shuffle_partitions=200 ) # The level here should be controlled by an argument. @@ -106,6 +106,7 @@ def main(): frac=args.frac, model=args.model, version=version, + sb_method="fastnifty", ) pdf.to_parquet("ssoft_{}_{}.parquet".format(args.model, version)) diff --git a/bin/hostless_detection.py b/bin/hostless_detection.py index c3a29b46..616700a0 100644 --- a/bin/hostless_detection.py +++ b/bin/hostless_detection.py @@ -17,7 +17,7 @@ import argparse import os - +import pandas as pd from pyspark.sql import functions as F @@ -33,6 +33,8 @@ from fink_utils.tg_bot.utils import get_cutout from fink_utils.tg_bot.utils import msg_handler_tg_cutouts +from fink_tns.utils import read_past_ids + from fink_science.hostless_detection.processor import run_potential_hostless @@ -119,10 +121,20 @@ def main(): cond_template = df["kstest_static"][1] <= 0.85 pdf = df.filter(cond_science).filter(cond_template).select(cols_).toPandas() + # load hostless IDs + past_ids = read_past_ids(args.hostless_folder) + + new_ids = [] # Loop over matches & send to Telegram if ("FINK_TG_TOKEN" in os.environ) and os.environ["FINK_TG_TOKEN"] != "": payloads = [] for _, alert in pdf.iterrows(): + # Do not send request if the object + # has been already reported by the bot + if alert["objectId"] in past_ids.to_numpy(): + print("{} already sent!".format(alert["objectId"])) + continue + curve_png = get_curve( objectId=alert["objectId"], origin="API", @@ -144,10 +156,17 @@ def main(): ) payloads.append((text, curve_png, [cutout_science, cutout_template])) + new_ids.append(alert["objectId"]) if len(payloads) > 0: + # Send to tg msg_handler_tg_cutouts(payloads, channel_id="@fink_hostless", init_msg="") + # Save ids on disk + pdf_ids = pd.DataFrame.from_dict({"id": new_ids}) + name = "{}{}{}".format(args.night[:4], args.night[4:6], args.night[6:8]) + pdf_ids.to_csv("{}/{}.csv".format(args.hostless_folder, name), index=False) + if __name__ == "__main__": main() diff --git a/bin/raw2science_batch.py b/bin/raw2science_batch.py index 33423c02..66347250 100644 --- a/bin/raw2science_batch.py +++ b/bin/raw2science_batch.py @@ -67,7 +67,7 @@ def main(): df = df.filter(df["candidate.nbad"] == 0).filter(df["candidate.rb"] >= 0.55) # Apply science modules - df = apply_science_modules(df, logger) + df = apply_science_modules(df) # Add tracklet information df_trck = spark.read.format("parquet").load(input_raw) diff --git a/deps/requirements-science.txt b/deps/requirements-science.txt index d499911c..e66a28e6 100644 --- a/deps/requirements-science.txt +++ b/deps/requirements-science.txt @@ -1,13 +1,13 @@ # Fink-science dependencies # WARNING: fink-science pip module is deprecated and fink-science is now installed from source -git+https://github.com/astrolabsoftware/fink-science@5.19.0 +git+https://github.com/astrolabsoftware/fink-science@5.20.0 # xmatch_cds line_profiler==4.1.3 # Active learning --e git+https://github.com/emilleishida/fink_sn_activelearning.git@4f46b3a1e29de45793125452974e71e92c1ea454#egg=actsnfink +-e git+https://github.com/emilleishida/fink_sn_activelearning.git@cb61bbb34630c7811862050389b0f993de9639c7#egg=actsnfink -e git+https://github.com/COINtoolbox/ActSNClass.git@2c61da91a9d13834d39804fc35aeb3245ba20755#egg=actsnclass joblib==1.2.0 @@ -31,8 +31,7 @@ iminuit==2.21.0 -e git+https://github.com/b-biswas/kndetect@kndetect#egg=kndetect # CATS dependencies -tensorflow==2.9.2 -tensorflow_addons==0.18.0 +tensorflow==2.15 # Anomalies onnxruntime diff --git a/deps/requirements.txt b/deps/requirements.txt index 6d90d85c..0da71837 100644 --- a/deps/requirements.txt +++ b/deps/requirements.txt @@ -19,7 +19,7 @@ fastavro==1.6.0 # Fink core fink_filters>=3.37 -fink-utils>=0.25.0 +fink-utils>=0.28.0 fink-spins>=0.3.8 fink-tns>=0.9 diff --git a/fink_broker/parser.py b/fink_broker/parser.py index 0f51a319..bf94a7b7 100644 --- a/fink_broker/parser.py +++ b/fink_broker/parser.py @@ -350,6 +350,15 @@ def getargs(parser: argparse.ArgumentParser) -> argparse.Namespace: [TNS_FOLDER] """, ) + parser.add_argument( + "-hostless_folder", + type=str, + default="hostless_ids", + help=""" + Folder to store ids for hostless detection + [HOSTLESS_FOLDER] + """, + ) parser.add_argument( "--tns_sandbox", action="store_true", diff --git a/fink_broker/science.py b/fink_broker/science.py index d97f9960..8e395e1c 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -46,8 +46,7 @@ from fink_science.random_forest_snia.processor import rfscore_rainbow_elasticc from fink_science.snn.processor import snn_ia_elasticc, snn_broad_elasticc from fink_science.cats.processor import predict_nn -from fink_science.agn.processor import agn_elasticc -from fink_science.slsn.processor import slsn_elasticc +from fink_science.slsn.processor import slsn_elasticc_with_md from fink_science.fast_transient_rate.processor import magnitude_rate from fink_science.fast_transient_rate import rate_module_output_schema # from fink_science.t2.processor import t2 @@ -599,7 +598,7 @@ def apply_science_modules_elasticc(df: DataFrame) -> DataFrame: df = df.withColumn("cats_broad_class", mapping_cats_general_expr[df["cats_argmax"]]) df = df.withColumnRenamed("cbpf_preds", "cats_broad_array_prob") - # AGN & SLSN + # SLSN args_forced = [ "diaObject.diaObjectId", "cmidPointTai", @@ -610,11 +609,9 @@ def apply_science_modules_elasticc(df: DataFrame) -> DataFrame: "diaSource.decl", "diaObject.hostgal_zphot", "diaObject.hostgal_zphot_err", - "diaObject.hostgal_ra", - "diaObject.hostgal_dec", + "diaObject.hostgal_snsep", ] - df = df.withColumn("rf_agn_vs_nonagn", agn_elasticc(*args_forced)) - df = df.withColumn("rf_slsn_vs_nonslsn", slsn_elasticc(*args_forced)) + df = df.withColumn("rf_slsn_vs_nonslsn", slsn_elasticc_with_md(*args_forced)) # Drop temp columns df = df.drop(*expanded)