Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fast transient module to the science pipeline #757

Merged
merged 2 commits into from
Nov 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion fink_broker/science.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
from fink_science.cats.processor import predict_nn
from fink_science.agn.processor import agn_elasticc
from fink_science.slsn.processor import slsn_elasticc
from fink_science.fast_transient_rate.processor import magnitude_rate
from fink_science.fast_transient_rate import rate_module_output_schema
# from fink_science.t2.processor import t2
except ImportError:
_LOG.warning("Fink science modules are not available. ")
Expand Down Expand Up @@ -208,7 +210,8 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame:
# Retrieve time-series information
to_expand = [
'jd', 'fid', 'magpsf', 'sigmapsf',
'magnr', 'sigmagnr', 'isdiffpos', 'distnr'
'magnr', 'sigmagnr', 'isdiffpos', 'distnr',
'diffmaglim'
]

# Append temp columns with historical + current measurements
Expand Down Expand Up @@ -373,6 +376,19 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame:
.withColumn("lc_features_r", df['lc_features'].getItem("2"))\
.drop('lc_features')

# Apply level one processor: fast transient
_LOG.info("New processor: magnitude rate for fast transient")
mag_rate_args = [
"candidate.magpsf", "candidate.sigmapsf", "candidate.jd",
"candidate.jdstarthist", "candidate.fid", "cmagpsf", "csigmapsf",
"cjd", "cfid", "cdiffmaglim", F.lit(10000), F.lit(None)
]
cols_before = df.columns
df = df.withColumn("ft_module", magnitude_rate(*mag_rate_args))
df = df.select(
cols_before + [df["ft_module"][k].alias(k) for k in rate_module_output_schema.keys()]
)

# Drop temp columns
df = df.drop(*expanded)

Expand Down
Loading