Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculating anomaly_score and sending notifications for user models #929

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion bin/anomaly_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from fink_broker.spark_utils import init_sparksession, load_parquet_files

from fink_filters.filter_anomaly_notification.filter import anomaly_notification_

from fink_science.anomaly_detection.processor import ANOMALY_MODELS
from fink_broker.logging_utils import get_fink_logger, inspect_application

from fink_broker.hbase_utils import push_full_df_to_hbase
Expand Down Expand Up @@ -81,6 +81,14 @@ def main():
cut_coords=True,
)

for model in ANOMALY_MODELS:
df_proc = df.select(
'objectId', 'candidate.ra',
'candidate.dec', 'candidate.rb',
f'anomaly_score{model}', 'timestamp')
anomaly_notification_(df_proc, send_to_tg=False,
send_to_slack=False, send_to_anomaly_base=True, model=model)

# Keep only candidates of interest for all sky anomalies
oids = [int(i) for i in pdf["candid"].to_numpy()]
df_hbase = df.filter(df["candid"].isin(list(oids)))
Expand Down
7 changes: 5 additions & 2 deletions fink_broker/science.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from fink_science.kilonova.processor import knscore
from fink_science.ad_features.processor import extract_features_ad
from fink_science.anomaly_detection.processor import anomaly_score
from fink_science.anomaly_detection.processor import ANOMALY_MODELS

from fink_science.random_forest_snia.processor import rfscore_rainbow_elasticc
from fink_science.snn.processor import snn_ia_elasticc, snn_broad_elasticc
Expand Down Expand Up @@ -410,10 +411,12 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame:
]

df = df.withColumn("lc_features", extract_features_ad(*ad_args))

# Apply level one processor: anomaly_score
_LOG.info("New processor: Anomaly score")
df = df.withColumn("anomaly_score", anomaly_score("lc_features"))
ANOMALY_MODELS = [''] + ANOMALY_MODELS # '' - model for a public channel
for model in ANOMALY_MODELS:
_LOG.info(f"...Anomaly score{model}")
df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model)))

# split features
df = (
Expand Down