diff --git a/fink_broker/science.py b/fink_broker/science.py index b0a846d2..07d52a49 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -56,6 +56,8 @@ from fink_science.cats.processor import predict_nn from fink_science.agn.processor import agn_elasticc from fink_science.slsn.processor import slsn_elasticc + from fink_science.fast_transient_rate.processor import magnitude_rate + from fink_science.fast_transient_rate import rate_module_output_schema # from fink_science.t2.processor import t2 except ImportError: _LOG.warning("Fink science modules are not available. ") @@ -208,7 +210,8 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame: # Retrieve time-series information to_expand = [ 'jd', 'fid', 'magpsf', 'sigmapsf', - 'magnr', 'sigmagnr', 'isdiffpos', 'distnr' + 'magnr', 'sigmagnr', 'isdiffpos', 'distnr', + 'diffmaglim' ] # Append temp columns with historical + current measurements @@ -373,6 +376,19 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame: .withColumn("lc_features_r", df['lc_features'].getItem("2"))\ .drop('lc_features') + # Apply level one processor: fast transient + _LOG.info("New processor: magnitude rate for fast transient") + mag_rate_args = [ + "candidate.magpsf", "candidate.sigmapsf", "candidate.jd", + "candidate.jdstarthist", "candidate.fid", "cmagpsf", "csigmapsf", + "cjd", "cfid", "cdiffmaglim", F.lit(10000), F.lit(None) + ] + cols_before = df.columns + df = df.withColumn("ft_module", magnitude_rate(*mag_rate_args)) + df = df.select( + cols_before + [df["ft_module"][k].alias(k) for k in rate_module_output_schema.keys()] + ) + # Drop temp columns df = df.drop(*expanded)