From 91bbf7c98eafa6e88ed78a267e97c3d65ba4c206 Mon Sep 17 00:00:00 2001 From: Knispel <timofei.psheno@gmail.com> Date: Fri, 13 Dec 2024 13:12:57 +0300 Subject: [PATCH 1/3] Calculating anomaly-score and sending notifications for user models --- bin/anomaly_archival.py | 10 +++++++++- fink_broker/science.py | 6 ++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/bin/anomaly_archival.py b/bin/anomaly_archival.py index 61a636c9..005263b1 100644 --- a/bin/anomaly_archival.py +++ b/bin/anomaly_archival.py @@ -21,7 +21,7 @@ from fink_broker.spark_utils import init_sparksession, load_parquet_files from fink_filters.filter_anomaly_notification.filter import anomaly_notification_ - +from fink_science.anomaly_detection.processor import ANOMALY_MODELS from fink_broker.logging_utils import get_fink_logger, inspect_application from fink_broker.hbase_utils import push_full_df_to_hbase @@ -81,6 +81,14 @@ def main(): cut_coords=True, ) + for model in ANOMALY_MODELS: + df_proc = df.select( + 'objectId', 'candidate.ra', + 'candidate.dec', 'candidate.rb', + f'anomaly_score{model}', 'timestamp') + anomaly_notification_(df_proc, send_to_tg=False, + send_to_slack=False, send_to_anomaly_base=True, model=model) + # Keep only candidates of interest for all sky anomalies oids = [int(i) for i in pdf["candid"].to_numpy()] df_hbase = df.filter(df["candid"].isin(list(oids))) diff --git a/fink_broker/science.py b/fink_broker/science.py index 2314c0a7..19dd934b 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -43,6 +43,7 @@ from fink_science.kilonova.processor import knscore from fink_science.ad_features.processor import extract_features_ad from fink_science.anomaly_detection.processor import anomaly_score +from fink_science.anomaly_detection.processor import ANOMALY_MODELS from fink_science.random_forest_snia.processor import rfscore_rainbow_elasticc from fink_science.snn.processor import snn_ia_elasticc, snn_broad_elasticc @@ -410,10 +411,11 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame: ] df = df.withColumn("lc_features", extract_features_ad(*ad_args)) - # Apply level one processor: anomaly_score _LOG.info("New processor: Anomaly score") - df = df.withColumn("anomaly_score", anomaly_score("lc_features")) + for model in ANOMALY_MODELS: + _LOG.info(f"...Anomaly score{model}") + df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model))) # split features df = ( From 23c9e2fab8a7812cce9cb66dc8410160cc86ca01 Mon Sep 17 00:00:00 2001 From: Timofei Pshenichnyy <93309519+Knispel2@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:36:14 +0300 Subject: [PATCH 2/3] add def. model --- fink_broker/science.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fink_broker/science.py b/fink_broker/science.py index 19dd934b..1b391643 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -413,6 +413,7 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame: df = df.withColumn("lc_features", extract_features_ad(*ad_args)) # Apply level one processor: anomaly_score _LOG.info("New processor: Anomaly score") + ANOMALY_MODELS = [''] + ANOMALY_MODELS for model in ANOMALY_MODELS: _LOG.info(f"...Anomaly score{model}") df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model))) From e41a8f79cc074d5d1e3e8dabc44eba624dc917bc Mon Sep 17 00:00:00 2001 From: Timofei Pshenichnyy <93309519+Knispel2@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:41:40 +0300 Subject: [PATCH 3/3] add comment --- fink_broker/science.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fink_broker/science.py b/fink_broker/science.py index 1b391643..ea1cc787 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -413,7 +413,7 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame: df = df.withColumn("lc_features", extract_features_ad(*ad_args)) # Apply level one processor: anomaly_score _LOG.info("New processor: Anomaly score") - ANOMALY_MODELS = [''] + ANOMALY_MODELS + ANOMALY_MODELS = [''] + ANOMALY_MODELS # '' - model for a public channel for model in ANOMALY_MODELS: _LOG.info(f"...Anomaly score{model}") df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model)))