From 0eda41a3c43b7892f38609d1a6537809f0114c1b Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 17 Nov 2023 11:22:35 +0100 Subject: [PATCH 1/2] add fast transient module to the science pipeline --- fink_broker/science.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/fink_broker/science.py b/fink_broker/science.py index b0a846d2..be680cee 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -56,6 +56,8 @@ from fink_science.cats.processor import predict_nn from fink_science.agn.processor import agn_elasticc from fink_science.slsn.processor import slsn_elasticc + from fink_science.fast_transient_rate.processor import magnitude_rate + from fink_science.fast_transient_rate import rate_module_output_schema # from fink_science.t2.processor import t2 except ImportError: _LOG.warning("Fink science modules are not available. ") @@ -208,7 +210,8 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame: # Retrieve time-series information to_expand = [ 'jd', 'fid', 'magpsf', 'sigmapsf', - 'magnr', 'sigmagnr', 'isdiffpos', 'distnr' + 'magnr', 'sigmagnr', 'isdiffpos', 'distnr', + 'diffmaglim' ] # Append temp columns with historical + current measurements @@ -373,6 +376,19 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame: .withColumn("lc_features_r", df['lc_features'].getItem("2"))\ .drop('lc_features') + # Apply level one processor: fast transient + _LOG.info("New processor: magnitude rate for fast transient") + mag_rate_args = [ + "candidate.magpsf", "candidate.sigmapsf", "candidate.jd", + "candidate.jdstarthist", "candidate.fid", "cmagpsf", "csigmapsf", + "cjd", "cfid", "cdiffmaglim", F.lit(10000), F.lit(None) + ] + cols_before = df.columns + df = DataFrame.withColumn("ft_module", magnitude_rate(*mag_rate_args)) + df = df.select( + cols_before + [df["ft_module"][k].alias(k) for k in rate_module_output_schema.keys()] + ) + # Drop temp columns df = df.drop(*expanded) From 72659b204fdcc1c37b0c18c066bae5e42461736c Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 17 Nov 2023 11:47:00 +0100 Subject: [PATCH 2/2] fix DataFrame --- fink_broker/science.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fink_broker/science.py b/fink_broker/science.py index be680cee..07d52a49 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -384,7 +384,7 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame: "cjd", "cfid", "cdiffmaglim", F.lit(10000), F.lit(None) ] cols_before = df.columns - df = DataFrame.withColumn("ft_module", magnitude_rate(*mag_rate_args)) + df = df.withColumn("ft_module", magnitude_rate(*mag_rate_args)) df = df.select( cols_before + [df["ft_module"][k].alias(k) for k in rate_module_output_schema.keys()] )