Skip to content

Commit

Permalink
Update filter with TG bot
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPeloton committed Feb 28, 2024
1 parent 4740e84 commit 6d498f7
Showing 1 changed file with 49 additions and 26 deletions.
75 changes: 49 additions & 26 deletions fink_filters/filter_yso_spicy_candidates/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
from fink_filters.tester import spark_unit_tests

import pandas as pd
import os


@pandas_udf(BooleanType(), PandasUDFType.SCALAR)
def yso_spicy_candidates(spicy_id, spicy_class, objectId, cjdc, cmagpsfc, csigmapsfc, cdiffmaglimc, cfidc) -> pd.Series:
""" Return alerts with a match in the SPICY catalog
def yso_spicy_candidates(
spicy_id, spicy_class, objectId, cjdc, cmagpsfc, csigmapsfc, cdiffmaglimc, cfidc
) -> pd.Series:
"""Return alerts with a match in the SPICY catalog
Parameters
----------
Expand All @@ -41,7 +44,19 @@ def yso_spicy_candidates(spicy_id, spicy_class, objectId, cjdc, cmagpsfc, csigma
Examples
----------
>>> from fink_utils.spark.utils import apply_user_defined_filter
>>> from fink_utils.spark.utils import concat_col
>>> df = spark.read.format('parquet').load('datatest/spicy_yso')
>>> to_expand = ['jd', 'fid', 'magpsf', 'sigmapsf', 'diffmaglim']
>>> prefix = 'c'
>>> for colname in to_expand:
... df = concat_col(df, colname, prefix=prefix)
# quick fix for https://github.com/astrolabsoftware/fink-broker/issues/457
>>> for colname in to_expand:
... df = df.withColumnRenamed('c' + colname, 'c' + colname + 'c')
>>> f = 'fink_filters.filter_yso_spicy_candidates.filter.yso_spicy_candidates'
>>> df = apply_user_defined_filter(df, f)
>>> print(df.count())
Expand All @@ -51,39 +66,47 @@ def yso_spicy_candidates(spicy_id, spicy_class, objectId, cjdc, cmagpsfc, csigma

pdf = pd.DataFrame(
{
'objectId': objectId,
'magpsf': magpsf,
'sigmapsf': sigmapsf,
'diffmaglim': diffmaglim,
'fid': fid,
'jd': jd
"objectId": objectId,
"magpsf": cmagpsfc,
"sigmapsf": csigmapsfc,
"diffmaglim": cdiffmaglimc,
"fid": cfidc,
"jd": cjdc,
"spicy_id": spicy_id,
"spicy_class": spicy_class,
}
)

# Loop over matches
payloads = []
for index, (_, alert) in enumerate(pdf[mask].iterrows()):
curve_png = get_curve(
jd=alert["jd"],
magpsf=alert["magpsf"],
sigmapsf=alert["sigmapsf"],
diffmaglim=alert["diffmaglim"],
fid=alert["fid"],
objectId=alert["objectId"],
origin="fields",
)
hyperlink = "[{}](https://fink-portal.org/{}): ID {} ({})".format(
alert["objectId"], alert["objectId"], spicy_id, spicy_class)
payloads.append((hyperlink, None, curve_png))

if len(payloads) > 0:
msg_handler_tg(payloads, channel_id="@spicy_fink", init_msg='')
if "FINK_TG_TOKEN" in os.environ:
payloads = []
for _, alert in pdf[mask].iterrows():
curve_png = get_curve(
jd=alert["jd"],
magpsf=alert["magpsf"],
sigmapsf=alert["sigmapsf"],
diffmaglim=alert["diffmaglim"],
fid=alert["fid"],
objectId=alert["objectId"],
origin="fields",
)
# pd.DataFrame({'magpsf': alert["magpsf"]})['magpsf']
hyperlink = "[{}](https://fink-portal.org/{}): ID {} ({})".format(
alert["objectId"],
alert["objectId"],
alert["spicy_id"],
alert["spicy_class"],
)
payloads.append((hyperlink, None, curve_png))

if len(payloads) > 0:
msg_handler_tg(payloads, channel_id="@spicy_fink", init_msg="")

return pd.Series(mask)


if __name__ == "__main__":
""" Execute the test suite """
"""Execute the test suite"""

# Run the test suite
globs = globals()
Expand Down

0 comments on commit 6d498f7

Please sign in to comment.