From 91bbf7c98eafa6e88ed78a267e97c3d65ba4c206 Mon Sep 17 00:00:00 2001
From: Knispel <timofei.psheno@gmail.com>
Date: Fri, 13 Dec 2024 13:12:57 +0300
Subject: [PATCH 1/3] Calculating anomaly-score and sending notifications for
 user models

---
 bin/anomaly_archival.py | 10 +++++++++-
 fink_broker/science.py  |  6 ++++--
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/bin/anomaly_archival.py b/bin/anomaly_archival.py
index 61a636c9..005263b1 100644
--- a/bin/anomaly_archival.py
+++ b/bin/anomaly_archival.py
@@ -21,7 +21,7 @@
 from fink_broker.spark_utils import init_sparksession, load_parquet_files
 
 from fink_filters.filter_anomaly_notification.filter import anomaly_notification_
-
+from fink_science.anomaly_detection.processor import ANOMALY_MODELS
 from fink_broker.logging_utils import get_fink_logger, inspect_application
 
 from fink_broker.hbase_utils import push_full_df_to_hbase
@@ -81,6 +81,14 @@ def main():
         cut_coords=True,
     )
 
+    for model in ANOMALY_MODELS:
+        df_proc = df.select(
+            'objectId', 'candidate.ra',
+            'candidate.dec', 'candidate.rb',
+            f'anomaly_score{model}', 'timestamp')
+        anomaly_notification_(df_proc, send_to_tg=False,
+        send_to_slack=False, send_to_anomaly_base=True, model=model)
+
     # Keep only candidates of interest for all sky anomalies
     oids = [int(i) for i in pdf["candid"].to_numpy()]
     df_hbase = df.filter(df["candid"].isin(list(oids)))
diff --git a/fink_broker/science.py b/fink_broker/science.py
index 2314c0a7..19dd934b 100644
--- a/fink_broker/science.py
+++ b/fink_broker/science.py
@@ -43,6 +43,7 @@
 from fink_science.kilonova.processor import knscore
 from fink_science.ad_features.processor import extract_features_ad
 from fink_science.anomaly_detection.processor import anomaly_score
+from fink_science.anomaly_detection.processor import ANOMALY_MODELS
 
 from fink_science.random_forest_snia.processor import rfscore_rainbow_elasticc
 from fink_science.snn.processor import snn_ia_elasticc, snn_broad_elasticc
@@ -410,10 +411,11 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame:
     ]
 
     df = df.withColumn("lc_features", extract_features_ad(*ad_args))
-
     # Apply level one processor: anomaly_score
     _LOG.info("New processor: Anomaly score")
-    df = df.withColumn("anomaly_score", anomaly_score("lc_features"))
+    for model in ANOMALY_MODELS:
+        _LOG.info(f"...Anomaly score{model}")
+        df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model)))
 
     # split features
     df = (

From 23c9e2fab8a7812cce9cb66dc8410160cc86ca01 Mon Sep 17 00:00:00 2001
From: Timofei Pshenichnyy <93309519+Knispel2@users.noreply.github.com>
Date: Fri, 13 Dec 2024 13:36:14 +0300
Subject: [PATCH 2/3] add def. model

---
 fink_broker/science.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/fink_broker/science.py b/fink_broker/science.py
index 19dd934b..1b391643 100644
--- a/fink_broker/science.py
+++ b/fink_broker/science.py
@@ -413,6 +413,7 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame:
     df = df.withColumn("lc_features", extract_features_ad(*ad_args))
     # Apply level one processor: anomaly_score
     _LOG.info("New processor: Anomaly score")
+    ANOMALY_MODELS = [''] + ANOMALY_MODELS
     for model in ANOMALY_MODELS:
         _LOG.info(f"...Anomaly score{model}")
         df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model)))

From e41a8f79cc074d5d1e3e8dabc44eba624dc917bc Mon Sep 17 00:00:00 2001
From: Timofei Pshenichnyy <93309519+Knispel2@users.noreply.github.com>
Date: Fri, 13 Dec 2024 13:41:40 +0300
Subject: [PATCH 3/3] add comment

---
 fink_broker/science.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fink_broker/science.py b/fink_broker/science.py
index 1b391643..ea1cc787 100644
--- a/fink_broker/science.py
+++ b/fink_broker/science.py
@@ -413,7 +413,7 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame:
     df = df.withColumn("lc_features", extract_features_ad(*ad_args))
     # Apply level one processor: anomaly_score
     _LOG.info("New processor: Anomaly score")
-    ANOMALY_MODELS = [''] + ANOMALY_MODELS
+    ANOMALY_MODELS = [''] + ANOMALY_MODELS # '' - model for a public channel
     for model in ANOMALY_MODELS:
         _LOG.info(f"...Anomaly score{model}")
         df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model)))