diff --git a/bin/anomaly_archival.py b/bin/anomaly_archival.py index 61a636c9..005263b1 100644 --- a/bin/anomaly_archival.py +++ b/bin/anomaly_archival.py @@ -21,7 +21,7 @@ from fink_broker.spark_utils import init_sparksession, load_parquet_files from fink_filters.filter_anomaly_notification.filter import anomaly_notification_ - +from fink_science.anomaly_detection.processor import ANOMALY_MODELS from fink_broker.logging_utils import get_fink_logger, inspect_application from fink_broker.hbase_utils import push_full_df_to_hbase @@ -81,6 +81,14 @@ def main(): cut_coords=True, ) + for model in ANOMALY_MODELS: + df_proc = df.select( + 'objectId', 'candidate.ra', + 'candidate.dec', 'candidate.rb', + f'anomaly_score{model}', 'timestamp') + anomaly_notification_(df_proc, send_to_tg=False, + send_to_slack=False, send_to_anomaly_base=True, model=model) + # Keep only candidates of interest for all sky anomalies oids = [int(i) for i in pdf["candid"].to_numpy()] df_hbase = df.filter(df["candid"].isin(list(oids))) diff --git a/fink_broker/science.py b/fink_broker/science.py index 2314c0a7..ea1cc787 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -43,6 +43,7 @@ from fink_science.kilonova.processor import knscore from fink_science.ad_features.processor import extract_features_ad from fink_science.anomaly_detection.processor import anomaly_score +from fink_science.anomaly_detection.processor import ANOMALY_MODELS from fink_science.random_forest_snia.processor import rfscore_rainbow_elasticc from fink_science.snn.processor import snn_ia_elasticc, snn_broad_elasticc @@ -410,10 +411,12 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame: ] df = df.withColumn("lc_features", extract_features_ad(*ad_args)) - # Apply level one processor: anomaly_score _LOG.info("New processor: Anomaly score") - df = df.withColumn("anomaly_score", anomaly_score("lc_features")) + ANOMALY_MODELS = [''] + ANOMALY_MODELS # '' - model for a public channel + for model in ANOMALY_MODELS: + _LOG.info(f"...Anomaly score{model}") + df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model))) # split features df = (