Skip to content

Commit

Permalink
Ad/multiple_models (#184)
Browse files Browse the repository at this point in the history
* anomaly base

* login fix

* small fixes

* docstrings update

* flake8 fix

* docstring fix

* fix postfix row

* disable communication in doctest

* only beta model

* bold font + new_param

* pep8...

* new bold

* EQU bold

* Change basename for test

* Change adr in rqst

* tg webhooks

* flake8

* add sleep

* tg_id to int

* fix " symbol + more logs

* access token fix

* fix tg_id to int multiple, timeout snad cross 5 sec

* fix timeout exception

* add try/except in OID

* pep8

* science template + new colormap

* fix

* new cutout test

* add axis

* Template -> Scince

* pep8 + exc

* fix log

* data is None fix

* fix

* fix

* remove

* Fix triple quote format

---------

Co-authored-by: Julien <[email protected]>
  • Loading branch information
Knispel2 and JulienPeloton authored Jul 30, 2024
1 parent 318271e commit c0d7211
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 66 deletions.
112 changes: 62 additions & 50 deletions fink_filters/filter_anomaly_notification/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,10 @@


def anomaly_notification_(
df_proc,
threshold=10,
send_to_tg=False,
channel_id=None,
send_to_slack=False,
channel_name=None,
trick_par=10,
cut_coords=False,
history_period=90,
):
df_proc, threshold=10,
send_to_tg=False, channel_id=None,
send_to_slack=False, channel_name=None,
trick_par=10, cut_coords=False, history_period=90, send_to_anomaly_base=False, model=''):
""" Create event notifications with a high `anomaly_score` value
Notes
Expand Down Expand Up @@ -74,6 +68,15 @@ def anomaly_notification_(
history_period: int
Time period in days for which the number
of references is calculated
model: str
Name of the model used.
Name must start with a ‘_’ and be ‘_{user_name}’,
where user_name is the user name of the model at https://anomaly.fink-portal.org/.
send_to_anomaly_base: bool
If True, notifications are uploaded to
https://anomaly.fink-portal.org/ in the selected model's
account. Only works for model != ‘’
Returns
----------
Expand All @@ -88,7 +91,7 @@ def anomaly_notification_(
>>> from fink_science.anomaly_detection.processor import anomaly_score
>>> df = spark.read.format('parquet').load('datatest/regular')
>>> MODELS = ['', '_beta'] # '' corresponds to the model for a telegram channel
>>> what = [
... 'jd', 'fid', 'magpsf', 'sigmapsf',
... 'magnr', 'sigmagnr', 'isdiffpos', 'distnr']
Expand All @@ -108,15 +111,22 @@ def anomaly_notification_(
... 'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos']
>>> df = df.withColumn('lc_features', extract_features_ad(*ad_args))
>>> df = df.withColumn("anomaly_score", anomaly_score("lc_features"))
>>> for model in MODELS:
... df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model)))
>>> df_proc = df.select(
... 'objectId', 'candidate.ra',
... 'candidate.dec', 'candidate.rb',
... 'anomaly_score', 'timestamp')
>>> df_out = anomaly_notification_(df_proc)
>>> for model in MODELS:
... df_proc = df.select(
... 'objectId', 'candidate.ra',
... 'candidate.dec', 'candidate.rb',
... f'anomaly_score{model}', 'timestamp')
... df_out = anomaly_notification_(df_proc, send_to_tg=False,
... send_to_slack=False, send_to_anomaly_base=True, model=model)
# Disable communication
>>> df_proc = df.select(
... 'objectId', 'candidate.ra',
... 'candidate.dec', 'candidate.rb',
... 'anomaly_score', 'timestamp')
>>> pdf_anomalies = anomaly_notification_(df_proc, threshold=10,
... send_to_tg=False, channel_id=None,
... send_to_slack=False, channel_name=None)
Expand All @@ -140,39 +150,37 @@ def anomaly_notification_(
return pd.DataFrame()

# Compute the median for the night
med = df_proc.select("anomaly_score").approxQuantile("anomaly_score", [0.5], 0.05)
buf_df = df_proc.select(f'anomaly_score{model}')
med = buf_df.approxQuantile(f'anomaly_score{model}', [0.5], 0.05)
med = round(med[0], 2)

# Extract anomalous objects

pdf_anomalies_ext = (
df_proc.sort(["anomaly_score"], ascending=True)
.limit(trick_par * threshold)
.toPandas()
)
pdf_anomalies_ext = pdf_anomalies_ext.drop_duplicates(["objectId"])
upper_bound = np.max(pdf_anomalies_ext["anomaly_score"].values[:threshold])
pdf_anomalies = pdf_anomalies_ext[pdf_anomalies_ext["anomaly_score"] <= upper_bound]
pdf_anomalies_ext = df_proc.sort([f'anomaly_score{model}'], ascending=True).limit(trick_par * threshold).toPandas()
pdf_anomalies_ext = pdf_anomalies_ext.drop_duplicates(['objectId'])
upper_bound = np.max(pdf_anomalies_ext[f'anomaly_score{model}'].values[:threshold])
pdf_anomalies = pdf_anomalies_ext[pdf_anomalies_ext[f'anomaly_score{model}'] <= upper_bound].head(10)

history_objects = filter_utils.get_an_history(history_period)

tg_data, slack_data = [], []
tg_data, slack_data, base_data = [], [], []

for _, row in pdf_anomalies.iterrows():
gal = SkyCoord(
ra=row.ra * u.degree, dec=row.dec * u.degree, frame="icrs"
).galactic
oid = filter_utils.get_OID(row.ra, row.dec)
t1a = f"ID: [{row.objectId}](https://fink-portal.org/{row.objectId})"
t1b = f"ID: <https://fink-portal.org/{row.objectId}|{row.objectId}>"
t_oid_1a = f"DR OID (<1''): [{oid}](https://ztf.snad.space/view/{oid})"
t1a = f'**ID**: [{row.objectId}](https://fink-portal.org/{row.objectId})'
t1b = f'ID: <https://fink-portal.org/{row.objectId}|{row.objectId}>'
t_oid_1a = f"**DR OID (<1'')**: [{oid}](https://ztf.snad.space/view/{oid})"
t_oid_1b = f"DR OID (<1''): <https://ztf.snad.space/view/{oid}|{oid}>"
t2_ = f"GAL coordinates: {round(gal.l.deg, 6)}, {round(gal.b.deg, 6)}"
t_ = f"""
EQU: {row.ra}, {row.dec}"""
t2_ = f'**GAL coordinates**: {round(gal.l.deg, 6)}, {round(gal.b.deg, 6)}'
t_ = f'''
**EQU**: {row.ra}, {row.dec}'''
t2_ += t_
t3_ = f"UTC: {str(row.timestamp)[:-3]}"
t4_ = f"Real bogus: {round(row.rb, 2)}"
t5_ = f"Anomaly score: {round(row.anomaly_score, 2)}"
t3_ = f'**UTC**: {str(row.timestamp)[:-3]}'
t4_ = f'**Real bogus**: {round(row.rb, 2)}'
t5_ = f'**Anomaly score**: {round(row[f"anomaly_score{model}"], 2)}'
if row.objectId in history_objects:
t5_ += f"""
Detected as top-{threshold} in the last {history_period} days: {history_objects[row.objectId]} {'times' if history_objects[row.objectId] > 1 else 'time'}."""
Expand All @@ -183,39 +191,43 @@ def anomaly_notification_(
cutout.seek(0)
cutout_perml = f"<{cutout_perml}|{' '}>"
curve_perml = f"<{curve_perml}|{' '}>"
tg_data.append(
(
f"""{t1a}
if model == '':
tg_data.append((f'''{t1a}
{t_oid_1a}
{t2_}
{t3_}
{t4_}
{t5_}""",
cutout,
curve,
)
)
slack_data.append(f"""==========================
{t5_}''', cutout, curve))
slack_data.append(f'''==========================
{t1b}
{t_oid_1b}
{t2_}
{t3_}
{t4_}
{t5_}
{cutout_perml}{curve_perml}""")
init_msg = f"Median anomaly score overnight: {med}."
if cut_coords:
{cutout_perml}{curve_perml}''')
base_data.append((row.objectId, f'''{t1a}
{t_oid_1a}
{t2_}
{t3_}
{t4_}
{t5_}'''.replace('\n', ' \n'), cutout, curve))

init_msg = f'Median anomaly score overnight: {med}.'
if cut_coords and model == '':
init_msg += f"""
(of the objects in the sky area)
Sky area:
1) delta <= 20°
2) alpha ∈ (160°, 240°)
1) delta <= 20°
2) alpha ∈ (160°, 240°)
Total number of objects per night in the area: {cut_count}.
"""
if send_to_slack:
filter_utils.msg_handler_slack(slack_data, channel_name, init_msg)
if send_to_tg:
filter_utils.msg_handler_tg(tg_data, channel_id, init_msg)
if model != '':
filter_utils.load_to_anomaly_base(base_data, model)
return pdf_anomalies


Expand Down
Loading

0 comments on commit c0d7211

Please sign in to comment.