Skip to content

Commit

Permalink
add ztf join gcn stream function to be used in fink raw2science (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman authored Feb 9, 2024
1 parent f320493 commit d6c391a
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 52 deletions.
2 changes: 1 addition & 1 deletion fink_mm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "0.17.0"
__version__ = "0.18.0"
__distribution_schema_version__ = "1.3"
__observatory_schema_version__ = "1.1"
175 changes: 124 additions & 51 deletions fink_mm/ztf_join_gcn.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,120 @@ def gcn_pre_join(
return gcn_dataframe, gcn_rawevent


def ztf_join_gcn_stream(
mm_mode: DataMode,
ztf_datapath_prefix: str,
gcn_datapath_prefix: str,
night: str,
NSIDE: int,
time_window: int,
hdfs_adress: str,
ast_dist: float,
pansstar_dist: float,
pansstar_star_score: float,
gaia_dist: float,
test: bool = False,
) -> Tuple[DataFrame, SparkSession]:
"""
Perform the join stream and return the dataframe
Parameters
----------
mm_mode : DataMode
run this function in streaming or offline mode.
ztf_datapath_prefix : string
the prefix path where are stored the ztf alerts.
gcn_datapath_prefix : string
the prefix path where are stored the gcn alerts.
night : string
the processing night
NSIDE: String
Healpix map resolution, better if a power of 2
hdfs_adress: string
HDFS adress used to instanciate the hdfs client from the hdfs package
ast_dist: float
distance to nearest known solar system object; set to -999.0 if none [arcsec]
ssdistnr field
pansstar_dist: float
Distance of closest source from PS1 catalog; if exists within 30 arcsec [arcsec]
distpsnr1 field
pansstar_star_score: float
Star/Galaxy score of closest source from PS1 catalog 0 <= sgscore <= 1 where closer to 1 implies higher likelihood of being a star
sgscore1 field
gaia_dist: float
Distance to closest source from Gaia DR1 catalog irrespective of magnitude; if exists within 90 arcsec [arcsec]
neargaia field
Returns
-------
DataFrame
the join stream dataframe
SparkSession
the current spark session
"""
if mm_mode == DataMode.OFFLINE:
job_name = "offline"
elif mm_mode == DataMode.STREAMING:
job_name = "online"

spark = init_sparksession(
"science2mm_{}_{}{}{}".format(job_name, night[0:4], night[4:6], night[6:8])
)

ztf_dataframe, gcn_dataframe = load_dataframe(
spark,
ztf_datapath_prefix,
gcn_datapath_prefix,
night,
int(time_window),
mm_mode,
)
ztf_dataframe = ztf_pre_join(
ztf_dataframe, ast_dist, pansstar_dist, pansstar_star_score, gaia_dist, NSIDE
)
gcn_dataframe, gcn_rawevent = gcn_pre_join(gcn_dataframe, NSIDE, test)

# join the two streams according to the healpix columns.
# A pixel id will be assign to each alerts / gcn according to their position in the sky.
# Each alerts / gcn with the same pixel id are in the same area of the sky.
join_condition = [
ztf_dataframe.hpix == gcn_dataframe.hpix,
ztf_dataframe.candidate.jdstarthist > gcn_dataframe.triggerTimejd,
]
# multi_messenger join to combine optical stream with other streams
df_join_mm = gcn_dataframe.join(F.broadcast(ztf_dataframe), join_condition, "inner")

# combine the multi-messenger join with the raw_event removed previously to save memory
df_join_mm = (
gcn_rawevent.join(
F.broadcast(df_join_mm),
[df_join_mm.triggerId == gcn_rawevent.gcn_trigId],
"inner",
)
.drop("gcn_trigId")
.dropDuplicates(["objectId", "triggerId", "gcn_status"])
)

df_join_mm = join_post_process(df_join_mm, hdfs_adress, gcn_datapath_prefix)

# re-create partitioning columns if needed.
timecol = "jd"
converter = lambda x: convert_to_datetime(x) # noqa: E731
if "timestamp" not in df_join_mm.columns:
df_join_mm = df_join_mm.withColumn("timestamp", converter(df_join_mm[timecol]))

if "year" not in df_join_mm.columns:
df_join_mm = df_join_mm.withColumn("year", F.date_format("timestamp", "yyyy"))

if "month" not in df_join_mm.columns:
df_join_mm = df_join_mm.withColumn("month", F.date_format("timestamp", "MM"))

if "day" not in df_join_mm.columns:
df_join_mm = df_join_mm.withColumn("day", F.date_format("timestamp", "dd"))

return df_join_mm, spark


def ztf_join_gcn(
mm_mode: DataMode,
ztf_datapath_prefix: str,
Expand Down Expand Up @@ -511,61 +625,20 @@ def ztf_join_gcn(
"""
logger = init_logging()

if mm_mode == DataMode.OFFLINE:
job_name = "offline"
elif mm_mode == DataMode.STREAMING:
job_name = "online"

spark = init_sparksession(
"science2mm_{}_{}{}{}".format(job_name, night[0:4], night[4:6], night[6:8])
)

ztf_dataframe, gcn_dataframe = load_dataframe(
spark,
df_join_mm, spark = ztf_join_gcn_stream(
mm_mode,
ztf_datapath_prefix,
gcn_datapath_prefix,
night,
int(time_window),
mm_mode,
)
ztf_dataframe = ztf_pre_join(
ztf_dataframe, ast_dist, pansstar_dist, pansstar_star_score, gaia_dist, NSIDE
NSIDE,
time_window,
hdfs_adress,
ast_dist,
pansstar_dist,
pansstar_star_score,
gaia_dist,
test,
)
gcn_dataframe, gcn_rawevent = gcn_pre_join(gcn_dataframe, NSIDE, test)

# join the two streams according to the healpix columns.
# A pixel id will be assign to each alerts / gcn according to their position in the sky.
# Each alerts / gcn with the same pixel id are in the same area of the sky.
join_condition = [
ztf_dataframe.hpix == gcn_dataframe.hpix,
ztf_dataframe.candidate.jdstarthist > gcn_dataframe.triggerTimejd,
]
# multi_messenger join to combine optical stream with other streams
df_join_mm = gcn_dataframe.join(F.broadcast(ztf_dataframe), join_condition, "inner")

# combine the multi-messenger join with the raw_event removed previously to save memory
df_join_mm = gcn_rawevent.join(
F.broadcast(df_join_mm),
[df_join_mm.triggerId == gcn_rawevent.gcn_trigId],
"inner",
).drop("gcn_trigId").dropDuplicates(["objectId", "triggerId", "gcn_status"])

df_join_mm = join_post_process(df_join_mm, hdfs_adress, gcn_datapath_prefix)

# re-create partitioning columns if needed.
timecol = "jd"
converter = lambda x: convert_to_datetime(x) # noqa: E731
if "timestamp" not in df_join_mm.columns:
df_join_mm = df_join_mm.withColumn("timestamp", converter(df_join_mm[timecol]))

if "year" not in df_join_mm.columns:
df_join_mm = df_join_mm.withColumn("year", F.date_format("timestamp", "yyyy"))

if "month" not in df_join_mm.columns:
df_join_mm = df_join_mm.withColumn("month", F.date_format("timestamp", "MM"))

if "day" not in df_join_mm.columns:
df_join_mm = df_join_mm.withColumn("day", F.date_format("timestamp", "dd"))

write_dataframe(
spark,
Expand Down

0 comments on commit d6c391a

Please sign in to comment.