diff --git a/fink_mm/__init__.py b/fink_mm/__init__.py index c62c9e49..7cce5d9b 100644 --- a/fink_mm/__init__.py +++ b/fink_mm/__init__.py @@ -12,6 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.17.0" +__version__ = "0.18.0" __distribution_schema_version__ = "1.3" __observatory_schema_version__ = "1.1" diff --git a/fink_mm/ztf_join_gcn.py b/fink_mm/ztf_join_gcn.py index 83539c81..45f68031 100644 --- a/fink_mm/ztf_join_gcn.py +++ b/fink_mm/ztf_join_gcn.py @@ -412,6 +412,120 @@ def gcn_pre_join( return gcn_dataframe, gcn_rawevent +def ztf_join_gcn_stream( + mm_mode: DataMode, + ztf_datapath_prefix: str, + gcn_datapath_prefix: str, + night: str, + NSIDE: int, + time_window: int, + hdfs_adress: str, + ast_dist: float, + pansstar_dist: float, + pansstar_star_score: float, + gaia_dist: float, + test: bool = False, +) -> Tuple[DataFrame, SparkSession]: + """ + Perform the join stream and return the dataframe + + Parameters + ---------- + mm_mode : DataMode + run this function in streaming or offline mode. + ztf_datapath_prefix : string + the prefix path where are stored the ztf alerts. + gcn_datapath_prefix : string + the prefix path where are stored the gcn alerts. + night : string + the processing night + NSIDE: String + Healpix map resolution, better if a power of 2 + hdfs_adress: string + HDFS adress used to instanciate the hdfs client from the hdfs package + ast_dist: float + distance to nearest known solar system object; set to -999.0 if none [arcsec] + ssdistnr field + pansstar_dist: float + Distance of closest source from PS1 catalog; if exists within 30 arcsec [arcsec] + distpsnr1 field + pansstar_star_score: float + Star/Galaxy score of closest source from PS1 catalog 0 <= sgscore <= 1 where closer to 1 implies higher likelihood of being a star + sgscore1 field + gaia_dist: float + Distance to closest source from Gaia DR1 catalog irrespective of magnitude; if exists within 90 arcsec [arcsec] + neargaia field + + Returns + ------- + DataFrame + the join stream dataframe + SparkSession + the current spark session + """ + if mm_mode == DataMode.OFFLINE: + job_name = "offline" + elif mm_mode == DataMode.STREAMING: + job_name = "online" + + spark = init_sparksession( + "science2mm_{}_{}{}{}".format(job_name, night[0:4], night[4:6], night[6:8]) + ) + + ztf_dataframe, gcn_dataframe = load_dataframe( + spark, + ztf_datapath_prefix, + gcn_datapath_prefix, + night, + int(time_window), + mm_mode, + ) + ztf_dataframe = ztf_pre_join( + ztf_dataframe, ast_dist, pansstar_dist, pansstar_star_score, gaia_dist, NSIDE + ) + gcn_dataframe, gcn_rawevent = gcn_pre_join(gcn_dataframe, NSIDE, test) + + # join the two streams according to the healpix columns. + # A pixel id will be assign to each alerts / gcn according to their position in the sky. + # Each alerts / gcn with the same pixel id are in the same area of the sky. + join_condition = [ + ztf_dataframe.hpix == gcn_dataframe.hpix, + ztf_dataframe.candidate.jdstarthist > gcn_dataframe.triggerTimejd, + ] + # multi_messenger join to combine optical stream with other streams + df_join_mm = gcn_dataframe.join(F.broadcast(ztf_dataframe), join_condition, "inner") + + # combine the multi-messenger join with the raw_event removed previously to save memory + df_join_mm = ( + gcn_rawevent.join( + F.broadcast(df_join_mm), + [df_join_mm.triggerId == gcn_rawevent.gcn_trigId], + "inner", + ) + .drop("gcn_trigId") + .dropDuplicates(["objectId", "triggerId", "gcn_status"]) + ) + + df_join_mm = join_post_process(df_join_mm, hdfs_adress, gcn_datapath_prefix) + + # re-create partitioning columns if needed. + timecol = "jd" + converter = lambda x: convert_to_datetime(x) # noqa: E731 + if "timestamp" not in df_join_mm.columns: + df_join_mm = df_join_mm.withColumn("timestamp", converter(df_join_mm[timecol])) + + if "year" not in df_join_mm.columns: + df_join_mm = df_join_mm.withColumn("year", F.date_format("timestamp", "yyyy")) + + if "month" not in df_join_mm.columns: + df_join_mm = df_join_mm.withColumn("month", F.date_format("timestamp", "MM")) + + if "day" not in df_join_mm.columns: + df_join_mm = df_join_mm.withColumn("day", F.date_format("timestamp", "dd")) + + return df_join_mm, spark + + def ztf_join_gcn( mm_mode: DataMode, ztf_datapath_prefix: str, @@ -511,61 +625,20 @@ def ztf_join_gcn( """ logger = init_logging() - if mm_mode == DataMode.OFFLINE: - job_name = "offline" - elif mm_mode == DataMode.STREAMING: - job_name = "online" - - spark = init_sparksession( - "science2mm_{}_{}{}{}".format(job_name, night[0:4], night[4:6], night[6:8]) - ) - - ztf_dataframe, gcn_dataframe = load_dataframe( - spark, + df_join_mm, spark = ztf_join_gcn_stream( + mm_mode, ztf_datapath_prefix, gcn_datapath_prefix, night, - int(time_window), - mm_mode, - ) - ztf_dataframe = ztf_pre_join( - ztf_dataframe, ast_dist, pansstar_dist, pansstar_star_score, gaia_dist, NSIDE + NSIDE, + time_window, + hdfs_adress, + ast_dist, + pansstar_dist, + pansstar_star_score, + gaia_dist, + test, ) - gcn_dataframe, gcn_rawevent = gcn_pre_join(gcn_dataframe, NSIDE, test) - - # join the two streams according to the healpix columns. - # A pixel id will be assign to each alerts / gcn according to their position in the sky. - # Each alerts / gcn with the same pixel id are in the same area of the sky. - join_condition = [ - ztf_dataframe.hpix == gcn_dataframe.hpix, - ztf_dataframe.candidate.jdstarthist > gcn_dataframe.triggerTimejd, - ] - # multi_messenger join to combine optical stream with other streams - df_join_mm = gcn_dataframe.join(F.broadcast(ztf_dataframe), join_condition, "inner") - - # combine the multi-messenger join with the raw_event removed previously to save memory - df_join_mm = gcn_rawevent.join( - F.broadcast(df_join_mm), - [df_join_mm.triggerId == gcn_rawevent.gcn_trigId], - "inner", - ).drop("gcn_trigId").dropDuplicates(["objectId", "triggerId", "gcn_status"]) - - df_join_mm = join_post_process(df_join_mm, hdfs_adress, gcn_datapath_prefix) - - # re-create partitioning columns if needed. - timecol = "jd" - converter = lambda x: convert_to_datetime(x) # noqa: E731 - if "timestamp" not in df_join_mm.columns: - df_join_mm = df_join_mm.withColumn("timestamp", converter(df_join_mm[timecol])) - - if "year" not in df_join_mm.columns: - df_join_mm = df_join_mm.withColumn("year", F.date_format("timestamp", "yyyy")) - - if "month" not in df_join_mm.columns: - df_join_mm = df_join_mm.withColumn("month", F.date_format("timestamp", "MM")) - - if "day" not in df_join_mm.columns: - df_join_mm = df_join_mm.withColumn("day", F.date_format("timestamp", "dd")) write_dataframe( spark,