Skip to content

Commit

Permalink
add documentation, fix pep8
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Sep 20, 2023
1 parent 8e4bf00 commit b2b89c1
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 81 deletions.
17 changes: 5 additions & 12 deletions fink_science/fast_transient_rate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
from pyspark.sql.types import *
from pyspark.sql.types import DoubleType, BooleanType

rate_module_output_schema = {
# the first 5-sigma detection in the alert history
"jd_first_real_det": DoubleType(),

# The delta-time between the current jd of the alert and the jd_first_real_det,
# if above 30, the jd_first_real_det is no longer reliable.
"jdstarthist_dt": DoubleType(),

# The magnitude rate computed between the last available measurement
# The magnitude rate computed between the last available measurement
# and the current alerts. The mag_rate band are the same of the current alert.
"mag_rate": DoubleType(),

# The magnitude rate error computed using a random sampling in the magnitude sigma of the two alerts.
"sigma_rate": DoubleType(),

# The lowest mag_rate computed from the random sampling
"lower_rate": DoubleType(),

# The highest mag_rate computed from the random sampling
"upper_rate": DoubleType(),

# The delta-time between the last alert and the current alert
# The delta-time between the last alert and the current alert
"delta_time": DoubleType(),

# if True, the magnitude rate has been computed with an upper limit.
"from_upper": BooleanType()
}
"from_upper": BooleanType(),
}
199 changes: 135 additions & 64 deletions fink_science/fast_transient_rate/processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField

from pyspark.sql.functions import pandas_udf
import pyspark.sql.functions as F
Expand All @@ -9,22 +9,77 @@
from fink_science.fast_transient_rate import rate_module_output_schema
from fink_utils.spark.utils import concat_col

def get_last_alert(fid, cfid, cmagpsf, csigmapsf, cdiffmaglim, cjd):

def get_last_alert(
fid: int,
cfid: int,
cmagpsf: float,
csigmapsf: float,
cdiffmaglim: float,
cjd: float,
) -> list:
"""
Return the last measurement contains in the alert history used in the fast transient rate
Parameters
----------
fid : int
filter id
cfid : int
filter id in the history
cmagpsf : float
magnitude in the history
csigmapsf : float
magnitude error in the history
cdiffmaglim : float
upper limit in the history
cjd : float
julian date in the history
Returns
-------
list
list[0]: last magnitude in the history, nan if not available for the current filter
list[1]: last magnitude error in the history, nan if not available for the current filter
list[2]: last upper limit in the history, nan if not available for the current filter
list[3]: last julian date in the history, nan if not available for the current filter
list[4]: first 5-sigma julian date contains in the history, always available.
"""
jdstarthist5sigma = cjd[0]
for idx in range(len(cfid)-1):

for idx in range(len(cfid) - 1):
if cfid[idx] == fid:
if cmagpsf[idx] == None:
return [float('nan'), float('nan'), cdiffmaglim[idx], cjd[idx], jdstarthist5sigma]
if cmagpsf[idx] is None:
return [
float("nan"),
float("nan"),
cdiffmaglim[idx],
cjd[idx],
jdstarthist5sigma,
]
else:
return [cmagpsf[idx], csigmapsf[idx], cdiffmaglim[idx], cjd[idx], jdstarthist5sigma]
return [
cmagpsf[idx],
csigmapsf[idx],
cdiffmaglim[idx],
cjd[idx],
jdstarthist5sigma,
]

return [float('nan'), float('nan'), float('nan'), float('nan'), jdstarthist5sigma]
return [float("nan"), float("nan"), float("nan"), float("nan"), jdstarthist5sigma]


def return_last_alerts(*args):
"""
see get_last_alert documentation
Returns
-------
list
last measurements
"""
return [
get_last_alert(fid, cfid, cmagpsf, csigmapsf, cdiffmaglim, cjd)
get_last_alert(fid, cfid, cmagpsf, csigmapsf, cdiffmaglim, cjd)
for fid, cfid, cmagpsf, csigmapsf, cdiffmaglim, cjd in zip(*args)
]

Expand Down Expand Up @@ -66,7 +121,7 @@ def fast_transient_rate(df, N):
delta time between the the two measurement used for the magnitude rate
* from_upper: boolean
if true, the magnitude rate have been computed with the last upper limit
Examples
--------
Expand All @@ -88,12 +143,12 @@ def fast_transient_rate(df, N):

# get the last measurements for each alerts
tmp_last = return_last_alerts(
df["fid"],
df["cfid"],
df["cmagpsf"],
df["csigmapsf"],
df["cdiffmaglim"],
df["cjd"]
df["fid"],
df["cfid"],
df["cmagpsf"],
df["csigmapsf"],
df["cdiffmaglim"],
df["cjd"],
)
tmp_last = np.array(tmp_last)

Expand All @@ -118,24 +173,30 @@ def fast_transient_rate(df, N):
current_mag_sample = np.empty((N, len(df)), dtype=np.float64)
current_flux = u.to_flux(df["magpsf"][idx_valid_data])
current_mag_sample[:, idx_valid_data] = np.random.normal(
current_flux,
u.to_fluxerr(df["sigmapsf"][idx_valid_data], current_flux),
(N, len(idx_valid_data))
current_flux,
u.to_fluxerr(df["sigmapsf"][idx_valid_data], current_flux),
(N, len(idx_valid_data)),
)

last_flux = u.to_flux(tmp_last[:, 0][idx_last_mag])
last_mag_sample = np.random.normal(
last_flux,
u.to_fluxerr(tmp_last[:, 1][idx_last_mag], last_flux),
(N, len(idx_last_mag))
(N, len(idx_last_mag)),
)

# sample upper limit from a uniform distribution starting at 0 until the upper limit
uniform_upper = np.random.uniform(0, u.to_flux(tmp_last[:, 2][idx_last_upper]), (N, len(idx_last_upper)))
uniform_upper = np.random.uniform(
0, u.to_flux(tmp_last[:, 2][idx_last_upper]), (N, len(idx_last_upper))
)

# compute the rate from the flux difference and convert back to magnitude
sample_rate = (-2.5*np.log10(current_mag_sample[:, idx_last_mag] / last_mag_sample)).T / u.stack_column(dt[idx_last_mag], N)
sample_rate_upper = (-2.5*np.log10(current_mag_sample[:, idx_last_upper] / uniform_upper)).T / u.stack_column(dt[idx_last_upper], N)
sample_rate = (
-2.5 * np.log10(current_mag_sample[:, idx_last_mag] / last_mag_sample)
).T / u.stack_column(dt[idx_last_mag], N)
sample_rate_upper = (
-2.5 * np.log10(current_mag_sample[:, idx_last_upper] / uniform_upper)
).T / u.stack_column(dt[idx_last_upper], N)

# fill the result arrays and return a result dataframe
res_rate[idx_last_mag] = np.mean(sample_rate, axis=1)
Expand All @@ -150,22 +211,28 @@ def fast_transient_rate(df, N):
upper_rate[idx_last_mag] = np.percentile(sample_rate, 95.0, axis=1)
upper_rate[idx_last_upper] = np.percentile(sample_rate_upper, 95.0, axis=1)

return pd.DataFrame(np.array([
tmp_last[:, -1],
jdstarthist_dt,
res_rate,
res_sigmarate,
lower_rate,
upper_rate,
dt,
(~mask_finite_mag) & mask_finite_upper
]).T, columns = list(rate_module_output_schema.keys()))
return pd.DataFrame(
np.array(
[
tmp_last[:, -1],
jdstarthist_dt,
res_rate,
res_sigmarate,
lower_rate,
upper_rate,
dt,
(~mask_finite_mag) & mask_finite_upper,
]
).T,
columns=list(rate_module_output_schema.keys()),
)


ft_schema = StructType(
[StructField(k, v, True) for k, v in rate_module_output_schema.items()]
)


@pandas_udf(ft_schema)
def magnitude_rate(
magpsf,
Expand All @@ -178,7 +245,7 @@ def magnitude_rate(
cjd,
cfid,
cdiffmaglim,
N
N,
):
"""
Call the fast_transient_rate within a distributed context (spark pandas udf)
Expand Down Expand Up @@ -216,26 +283,28 @@ def magnitude_rate(
--------
"""
pdf = pd.DataFrame({
"magpsf": magpsf,
"sigmapsf": sigmapsf,
"jd": jd,
"jdstarthist": jdstarthist,
"fid": fid,
"cmagpsf": cmagpsf,
"csigmapsf": csigmapsf,
"cdiffmaglim": cdiffmaglim,
"cjd": cjd,
"cfid": cfid
})

pdf = pd.DataFrame(
{
"magpsf": magpsf,
"sigmapsf": sigmapsf,
"jd": jd,
"jdstarthist": jdstarthist,
"fid": fid,
"cmagpsf": cmagpsf,
"csigmapsf": csigmapsf,
"cdiffmaglim": cdiffmaglim,
"cjd": cjd,
"cfid": cfid,
}
)

return fast_transient_rate(pdf, N.values[0])


def fast_transient_module(spark_df, N):
"""
wrapper function to easily call the fast transient module
wrapper function to easily call the fast transient module
Parameters
----------
spark_df : spark dataframe
Expand Down Expand Up @@ -271,21 +340,23 @@ def fast_transient_module(spark_df, N):
df_concat = concat_col(df_concat, "jd")
df_concat = concat_col(df_concat, "fid")

df_ft = df_concat.withColumn("ft_module", magnitude_rate(
df_concat["candidate.magpsf"],
df_concat["candidate.sigmapsf"],
df_concat["candidate.jd"],
df_concat["candidate.jdstarthist"],
df_concat["candidate.fid"],
df_concat["cmagpsf"],
df_concat["csigmapsf"],
df_concat["cjd"],
df_concat["cfid"],
df_concat["cdiffmaglim"],
F.lit(N)
))
df_ft = df_concat.withColumn(
"ft_module",
magnitude_rate(
df_concat["candidate.magpsf"],
df_concat["candidate.sigmapsf"],
df_concat["candidate.jd"],
df_concat["candidate.jdstarthist"],
df_concat["candidate.fid"],
df_concat["cmagpsf"],
df_concat["csigmapsf"],
df_concat["cjd"],
df_concat["cfid"],
df_concat["cdiffmaglim"],
F.lit(N),
),
)

return df_ft.select(
cols_before +
[df_ft["ft_module"][k].alias(k) for k in rate_module_output_schema.keys()]
cols_before + [df_ft["ft_module"][k].alias(k) for k in rate_module_output_schema.keys()]
)
13 changes: 8 additions & 5 deletions fink_science/fast_transient_rate/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@

def to_flux(mag):
# from Serguey Karpov
return 10**(0.4*(27.5 - mag)) # FLUXCAL, mag = 27.5 - 2.5*np.log10(flux)
return 10 ** (0.4 * (27.5 - mag)) # FLUXCAL, mag = 27.5 - 2.5*np.log10(flux)


def to_fluxerr(magerr, flux):
return magerr * flux * np.log(10)/2.5 # magerr = 2.5/np.log(10) * fluxerr / flux
return magerr * flux * np.log(10) / 2.5 # magerr = 2.5/np.log(10) * fluxerr / flux


def to_mag(flux):
return 27.5 - 2.5*np.log10(flux)
return 27.5 - 2.5 * np.log10(flux)


def to_magerr(fluxerr, flux):
return 2.5/np.log(10) * fluxerr / flux
return 2.5 / np.log(10) * fluxerr / flux


def stack_columns(df, *cols):
return list(np.dstack([df[c] for c in cols])[0])


def stack_column(col, N):
return np.stack([col for _ in range(N)]).T
return np.stack([col for _ in range(N)]).T

0 comments on commit b2b89c1

Please sign in to comment.