diff --git a/fink_filters/filter_anomaly_notification/filter.py b/fink_filters/filter_anomaly_notification/filter.py index 767228b..2f97476 100755 --- a/fink_filters/filter_anomaly_notification/filter.py +++ b/fink_filters/filter_anomaly_notification/filter.py @@ -25,16 +25,10 @@ def anomaly_notification_( - df_proc, - threshold=10, - send_to_tg=False, - channel_id=None, - send_to_slack=False, - channel_name=None, - trick_par=10, - cut_coords=False, - history_period=90, -): + df_proc, threshold=10, + send_to_tg=False, channel_id=None, + send_to_slack=False, channel_name=None, + trick_par=10, cut_coords=False, history_period=90, send_to_anomaly_base=False, model=''): """ Create event notifications with a high `anomaly_score` value Notes @@ -74,6 +68,15 @@ def anomaly_notification_( history_period: int Time period in days for which the number of references is calculated + model: str + Name of the model used. + Name must start with a ‘_’ and be ‘_{user_name}’, + where user_name is the user name of the model at https://anomaly.fink-portal.org/. + send_to_anomaly_base: bool + If True, notifications are uploaded to + https://anomaly.fink-portal.org/ in the selected model's + account. Only works for model != ‘’ + Returns ---------- @@ -88,7 +91,7 @@ def anomaly_notification_( >>> from fink_science.anomaly_detection.processor import anomaly_score >>> df = spark.read.format('parquet').load('datatest/regular') - + >>> MODELS = ['', '_beta'] # '' corresponds to the model for a telegram channel >>> what = [ ... 'jd', 'fid', 'magpsf', 'sigmapsf', ... 'magnr', 'sigmagnr', 'isdiffpos', 'distnr'] @@ -108,15 +111,22 @@ def anomaly_notification_( ... 'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos'] >>> df = df.withColumn('lc_features', extract_features_ad(*ad_args)) - >>> df = df.withColumn("anomaly_score", anomaly_score("lc_features")) + >>> for model in MODELS: + ... df = df.withColumn(f'anomaly_score{model}', anomaly_score("lc_features", F.lit(model))) - >>> df_proc = df.select( - ... 'objectId', 'candidate.ra', - ... 'candidate.dec', 'candidate.rb', - ... 'anomaly_score', 'timestamp') - >>> df_out = anomaly_notification_(df_proc) + >>> for model in MODELS: + ... df_proc = df.select( + ... 'objectId', 'candidate.ra', + ... 'candidate.dec', 'candidate.rb', + ... f'anomaly_score{model}', 'timestamp') + ... df_out = anomaly_notification_(df_proc, send_to_tg=False, + ... send_to_slack=False, send_to_anomaly_base=True, model=model) # Disable communication + >>> df_proc = df.select( + ... 'objectId', 'candidate.ra', + ... 'candidate.dec', 'candidate.rb', + ... 'anomaly_score', 'timestamp') >>> pdf_anomalies = anomaly_notification_(df_proc, threshold=10, ... send_to_tg=False, channel_id=None, ... send_to_slack=False, channel_name=None) @@ -140,39 +150,37 @@ def anomaly_notification_( return pd.DataFrame() # Compute the median for the night - med = df_proc.select("anomaly_score").approxQuantile("anomaly_score", [0.5], 0.05) + buf_df = df_proc.select(f'anomaly_score{model}') + med = buf_df.approxQuantile(f'anomaly_score{model}', [0.5], 0.05) med = round(med[0], 2) # Extract anomalous objects - pdf_anomalies_ext = ( - df_proc.sort(["anomaly_score"], ascending=True) - .limit(trick_par * threshold) - .toPandas() - ) - pdf_anomalies_ext = pdf_anomalies_ext.drop_duplicates(["objectId"]) - upper_bound = np.max(pdf_anomalies_ext["anomaly_score"].values[:threshold]) - pdf_anomalies = pdf_anomalies_ext[pdf_anomalies_ext["anomaly_score"] <= upper_bound] + pdf_anomalies_ext = df_proc.sort([f'anomaly_score{model}'], ascending=True).limit(trick_par * threshold).toPandas() + pdf_anomalies_ext = pdf_anomalies_ext.drop_duplicates(['objectId']) + upper_bound = np.max(pdf_anomalies_ext[f'anomaly_score{model}'].values[:threshold]) + pdf_anomalies = pdf_anomalies_ext[pdf_anomalies_ext[f'anomaly_score{model}'] <= upper_bound].head(10) history_objects = filter_utils.get_an_history(history_period) - tg_data, slack_data = [], [] + tg_data, slack_data, base_data = [], [], [] + for _, row in pdf_anomalies.iterrows(): gal = SkyCoord( ra=row.ra * u.degree, dec=row.dec * u.degree, frame="icrs" ).galactic oid = filter_utils.get_OID(row.ra, row.dec) - t1a = f"ID: [{row.objectId}](https://fink-portal.org/{row.objectId})" - t1b = f"ID: " - t_oid_1a = f"DR OID (<1''): [{oid}](https://ztf.snad.space/view/{oid})" + t1a = f'**ID**: [{row.objectId}](https://fink-portal.org/{row.objectId})' + t1b = f'ID: ' + t_oid_1a = f"**DR OID (<1'')**: [{oid}](https://ztf.snad.space/view/{oid})" t_oid_1b = f"DR OID (<1''): " - t2_ = f"GAL coordinates: {round(gal.l.deg, 6)}, {round(gal.b.deg, 6)}" - t_ = f""" -EQU: {row.ra}, {row.dec}""" + t2_ = f'**GAL coordinates**: {round(gal.l.deg, 6)}, {round(gal.b.deg, 6)}' + t_ = f''' +**EQU**: {row.ra}, {row.dec}''' t2_ += t_ - t3_ = f"UTC: {str(row.timestamp)[:-3]}" - t4_ = f"Real bogus: {round(row.rb, 2)}" - t5_ = f"Anomaly score: {round(row.anomaly_score, 2)}" + t3_ = f'**UTC**: {str(row.timestamp)[:-3]}' + t4_ = f'**Real bogus**: {round(row.rb, 2)}' + t5_ = f'**Anomaly score**: {round(row[f"anomaly_score{model}"], 2)}' if row.objectId in history_objects: t5_ += f""" Detected as top-{threshold} in the last {history_period} days: {history_objects[row.objectId]} {'times' if history_objects[row.objectId] > 1 else 'time'}.""" @@ -183,39 +191,43 @@ def anomaly_notification_( cutout.seek(0) cutout_perml = f"<{cutout_perml}|{' '}>" curve_perml = f"<{curve_perml}|{' '}>" - tg_data.append( - ( - f"""{t1a} + if model == '': + tg_data.append((f'''{t1a} {t_oid_1a} {t2_} {t3_} {t4_} -{t5_}""", - cutout, - curve, - ) - ) - slack_data.append(f"""========================== +{t5_}''', cutout, curve)) + slack_data.append(f'''========================== {t1b} {t_oid_1b} {t2_} {t3_} {t4_} {t5_} -{cutout_perml}{curve_perml}""") - init_msg = f"Median anomaly score overnight: {med}." - if cut_coords: +{cutout_perml}{curve_perml}''') + base_data.append((row.objectId, f'''{t1a} +{t_oid_1a} +{t2_} +{t3_} +{t4_} +{t5_}'''.replace('\n', ' \n'), cutout, curve)) + + init_msg = f'Median anomaly score overnight: {med}.' + if cut_coords and model == '': init_msg += f""" (of the objects in the sky area) Sky area: - 1) delta <= 20° - 2) alpha ∈ (160°, 240°) +1) delta <= 20° +2) alpha ∈ (160°, 240°) Total number of objects per night in the area: {cut_count}. """ if send_to_slack: filter_utils.msg_handler_slack(slack_data, channel_name, init_msg) if send_to_tg: filter_utils.msg_handler_tg(tg_data, channel_id, init_msg) + if model != '': + filter_utils.load_to_anomaly_base(base_data, model) return pdf_anomalies diff --git a/fink_filters/filter_anomaly_notification/filter_utils.py b/fink_filters/filter_anomaly_notification/filter_utils.py index af64b31..f0eb42f 100755 --- a/fink_filters/filter_anomaly_notification/filter_utils.py +++ b/fink_filters/filter_anomaly_notification/filter_utils.py @@ -20,6 +20,7 @@ from collections import Counter import pandas as pd import numpy as np +import json import matplotlib.pyplot as plt @@ -52,11 +53,10 @@ def get_an_history(delta_date=90): } ) - if status_check(history_data): + if status_check(history_data, 'checking history'): res_obj = Counter(pd.read_json(io.BytesIO(history_data.content))['i:objectId'].values) return res_obj - else: - return Counter() + return Counter() def get_data_permalink_slack(ztf_id): @@ -117,7 +117,7 @@ def get_data_permalink_slack(ztf_id): return cutout, curve, result['files'][0]['permalink'], result['files'][1]['permalink'] -def status_check(res): +def status_check(res, source='not defined'): ''' Checks whether the request was successful. In case of an error, sends information about the error to the @fink_test telegram channel @@ -125,7 +125,7 @@ def status_check(res): Parameters ---------- res : Response object - + source : source of log Returns ------- result : bool @@ -141,13 +141,14 @@ def status_check(res): method, data={ "chat_id": "@fink_test", - "text": str(res.status_code) + "text": f'Source: {source}, error: {str(res.status_code)}, description: {res.text}' }, timeout=25 ) return False return True + def msg_handler_slack(slack_data, channel_name, init_msg): ''' Notes @@ -196,6 +197,7 @@ def msg_handler_slack(slack_data, channel_name, init_msg): timeout=25 ) + def msg_handler_tg(tg_data, channel_id, init_msg): ''' Notes @@ -234,8 +236,16 @@ def msg_handler_tg(tg_data, channel_id, init_msg): }, timeout=25 ) - status_check(res) + status_check(res, 'sending to tg_channel (init)') time.sleep(10) + inline_keyboard = { + "inline_keyboard": [ + [ + {"text": "Anomaly", "callback_data": "ANOMALY"}, + {"text": "Not anomaly", "callback_data": "NOTANOMALY"} + ] + ] + } for text_data, cutout, curve in tg_data: res = requests.post( method, @@ -252,7 +262,8 @@ def msg_handler_tg(tg_data, channel_id, init_msg): "type" : "photo", "media": "attach://first" }} - ]''' + ]''', + "reply_markup": inline_keyboard }, files={ "second": cutout, @@ -260,9 +271,113 @@ def msg_handler_tg(tg_data, channel_id, init_msg): }, timeout=25 ) - status_check(res) + status_check(res, 'sending to tg_channel (main messages)') time.sleep(10) +def load_to_anomaly_base(data, model): + ''' + + Parameters + ---------- + data: list + A list of tuples of 4 elements each: (ZTF identifier: str, + notification text: str, cutout: BytesIO, light curve: BytesIO) + model: str + Name of the model used. + Name must start with a ‘_’ and be ‘_{user_name}’, + where user_name is the user name of the model at https://anomaly.fink-portal.org/. + Returns + ------- + NONE + ''' + username = model[1:] + time.sleep(3) + res = requests.post('https://fink.matwey.name:443/user/signin', data={ + 'username': username, + 'password': os.environ['ANOMALY_TG_TOKEN'] + }) + if status_check(res, 'load_to_anomaly_base_login'): + access_token = json.loads(res.text)['access_token'] + tg_id_data = requests.get(url=f'https://fink.matwey.name:443/user/get_tgid/{username}') + if status_check(tg_id_data, 'tg_id loading'): + tg_id_data = tg_id_data.content.decode('utf-8') + tg_id_data = int(tg_id_data.replace('"', '')) + else: + tg_id_data = 'ND' + + for ztf_id, text_data, cutout, curve in data: + cutout.seek(0) + curve.seek(0) + files = { + "image1": cutout, + "image2": curve + } + data = { + "description": text_data + } + params = { + "ztf_id": ztf_id + } + headers = { + "Authorization": f"Bearer {access_token}" + } + response = requests.post('https://fink.matwey.name:443/images/upload', files=files, params=params, data=data, + headers=headers, timeout=10) + status_check(response, 'upload to anomaly base') + cutout.seek(0) + curve.seek(0) + # send in tg personal + if tg_id_data == 'ND': + continue + + inline_keyboard = { + "inline_keyboard": [ + [ + {"text": "Anomaly", "callback_data": f"A_{ztf_id}"}, + {"text": "Not anomaly", "callback_data": f"NA_{ztf_id}"} + ] + ] + } + + url = "https://api.telegram.org/bot" + url += os.environ['ANOMALY_TG_TOKEN'] + method = url + "/sendMediaGroup" + + res = requests.post( + method, + params={ + "chat_id": tg_id_data, + "media": f'''[ + {{ + "type" : "photo", + "media": "attach://second", + "caption" : "{text_data}", + "parse_mode": "markdown" + }}, + {{ + "type" : "photo", + "media": "attach://first" + }} + ]''' + }, + files={ + "second": cutout, + "first": curve, + }, + timeout=25 + ) + if status_check(res, f'individual sending to {tg_id_data}'): + res = requests.post( + f"https://api.telegram.org/bot{os.environ['ANOMALY_TG_TOKEN']}/sendMessage", + json={ + "chat_id": tg_id_data, + "text": f"Feedback for {ztf_id}:", + "reply_markup": inline_keyboard + }, + timeout=25 + ) + status_check(res, f'button individual sending to {tg_id_data}') + def get_OID(ra, dec): ''' @@ -282,9 +397,13 @@ def get_OID(ra, dec): out: str ZTF DR OID ''' - r = requests.get( - url=f'http://db.ztf.snad.space/api/v3/data/latest/circle/full/json?ra={ra}&dec={dec}&radius_arcsec=1') - if not status_check(r): + try: + r = requests.get( + url=f'http://db.ztf.snad.space/api/v3/data/latest/circle/full/json?ra={ra}&dec={dec}&radius_arcsec=1' + ) + except Exception: + return None + if not status_check(r, 'get cross from snad'): return None oids = [key for key, _ in r.json().items()] if oids: @@ -311,11 +430,11 @@ def get_cutout(ztf_id): 'https://fink-portal.org/api/v1/cutouts', json={ 'objectId': ztf_id, - 'kind': 'Difference', + 'kind': 'Science' }, timeout=25 ) - status_check(r) + status_check(r, 'get cutouts') return io.BytesIO(r.content) def get_curve(ztf_id): @@ -338,7 +457,7 @@ def get_curve(ztf_id): 'withupperlim': 'True' } ) - if not status_check(r): + if not status_check(r, 'getting curve'): return None # Format output in a DataFrame @@ -384,4 +503,5 @@ def get_curve(ztf_id): buf = io.BytesIO() plt.savefig(buf, format='png') buf.seek(0) + plt.close() return buf diff --git a/requirements.txt b/requirements.txt index 4023ac9..c9933bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,5 +14,4 @@ astroquery pandas pyarrow - h5py