Skip to content

Commit

Permalink
add magspf, sigmapsf and fid to the output, bump to 0.17.0 (#109)
Browse files Browse the repository at this point in the history
* add magspf, sigmapsf and fid to the output, bump to 0.17.0

* fix test columns

* fix test columns

* remove the old fast transient rate computation by the new in the fast transient science module

* doc modif

* update test data, many improvement, remove the hdfs request to the GW skymap by a double join to recover the raw_event

* remove fink call in CI

* remove fink call in CI

* add test data

* increase join time for test
  • Loading branch information
FusRoman authored Feb 9, 2024
1 parent 56b812d commit f320493
Show file tree
Hide file tree
Showing 245 changed files with 133 additions and 100 deletions.
4 changes: 4 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ exclude =
old,
build,
dist
../fink-mm/test_dir/
../fink-mm/prep_ztf_mm.py
../fink-mm/inspect_data.py
../fink-mm/generate_test_data.py
per-file-ignores =
../fink-mm/fink_mm/ztf_join_gcn.py:W503,E402
../fink-mm/fink_mm/offline/spark_offline.py:W503,W605
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/run_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ jobs:
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_MM
- name: Call raw2science
run: |
fink start raw2science -c ${FINK_MM}/fink_mm/test/test_data/with_hbase/fink_test.conf --night $NIGHT --exit_after 90
# - name: Call raw2science
# run: |
# fink start raw2science -c ${FINK_MM}/fink_mm/test/test_data/with_hbase/fink_test.conf --night $NIGHT --exit_after 90

- name: Merge data
run: |
fink start merge -c ${FINK_MM}/fink_mm/test/test_data/with_hbase/fink_test.conf --night $NIGHT
# - name: Merge data
# run: |
# fink start merge -c ${FINK_MM}/fink_mm/test/test_data/with_hbase/fink_test.conf --night $NIGHT

- name: Test preamble
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,4 @@ science2mm_locally.sh
clean_it.sh
# export_unit_test.sh
it_test.sh
test_dir
2 changes: 1 addition & 1 deletion fink_mm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "0.16.13"
__version__ = "0.17.0"
__distribution_schema_version__ = "1.3"
__observatory_schema_version__ = "1.1"
4 changes: 2 additions & 2 deletions fink_mm/conf/distribute_for_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ hdfs_gcn_storage=/user/roman.le-montagner/gcn_storage/raw
# Be careful though to have enough disk space!
online_ztf_data_prefix=fink_mm/test/test_data/distribution_test_data/

# Prefix path on disk to save GRB join ZTF data (work for both online and offline).
online_grb_data_prefix=fink_mm/test/test_data
# Prefix path on disk to save GRB join ZTF data (is used by online, offline and distribution).
online_grb_data_prefix=fink_mm/test/test_data/distribution_test_data/

# Path where are store the hbase catalog in order to query the hbase database
hbase_catalog=/home/roman.le-montagner/fink-broker/catalogs_hbase/ztf.jd.json
Expand Down
2 changes: 1 addition & 1 deletion fink_mm/conf/fink_mm.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ gaia_dist=5

[STREAM]
tinterval=30
manager=local[2]
manager=local[8]
principal=
secret=
role=
Expand Down
4 changes: 2 additions & 2 deletions fink_mm/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ def init_spark(doctest_namespace):
doctest_namespace["DataMode"] = DataMode
doctest_namespace["sql_func"] = sql_func

grb_data = "fink_mm/test/test_data/gcn_test/raw/year=2019/month=09/day=03"
grb_data = "fink_mm/test/test_data/gcn_test/raw/year=2024/month=01/day=15"
gw_data = "fink_mm/test/test_data/S230518h_0_test"
join_data = "fink_mm/test/test_data/join_raw_datatest.parquet"
alert_data = (
"fink_mm/test/test_data/ztf_test/online/science/year=2019/month=09/day=03/"
"fink_mm/test/test_data/ztf_test/online/science/year=2024/month=01/day=15/"
)

doctest_namespace["grb_data"] = grb_data
Expand Down
2 changes: 1 addition & 1 deletion fink_mm/distribution/apply_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def apply_filters(
df_stream["rb"],
df_stream["gcn_loc_error"],
df_stream["p_assoc"],
df_stream["rate"],
df_stream["mag_rate"],
),
)
.filter("f_gold == True")
Expand Down
10 changes: 6 additions & 4 deletions fink_mm/distribution/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def grb_distribution(
--------
>>> grb_distribution(
... ztfxgcn_test,
... "20190903",
... "20240115",
... 30, 40,
... "localhost:9092",
... "toto", "tata"
Expand All @@ -124,7 +124,8 @@ def grb_distribution(
... topic,
... alert["objectId"],
... alert["fink_class"],
... alert["rate"]
... alert["mag_rate"],
... alert["sigma_rate"]
... ]]
>>> len(table)
Expand Down Expand Up @@ -223,7 +224,7 @@ def launch_distribution(arguments):
--------
>>> launch_distribution({
... "--config" : "fink_mm/conf/distribute_for_test.conf",
... "--night" : "20190903",
... "--night" : "20240115",
... "--exit_after" : 30,
... "--verbose" : False
... })
Expand All @@ -236,7 +237,8 @@ def launch_distribution(arguments):
... topic,
... alert["objectId"],
... alert["fink_class"],
... alert["rate"]
... alert["mag_rate"],
... alert["sigma_rate"]
... ]]
>>> len(table)
Expand Down
56 changes: 36 additions & 20 deletions fink_mm/observatory/LVK/LVK.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
from datetime import datetime


def gcn_from_hdfs(client: InsecureClient, root_path: str, triggerId: str, triggerTime: datetime, gcn_status: str) -> pd.DataFrame:
def gcn_from_hdfs(
client: InsecureClient,
root_path: str,
triggerId: str,
triggerTime: datetime,
gcn_status: str,
) -> pd.DataFrame:
path_date = os.path.join(
root_path,
f"year={triggerTime.year:04d}/month={triggerTime.month:02d}/day={triggerTime.day:02d}",
Expand All @@ -32,7 +38,10 @@ def gcn_from_hdfs(client: InsecureClient, root_path: str, triggerId: str, trigge
with client.read(path_to_load) as reader:
content = reader.read()
pdf = pd.read_parquet(io.BytesIO(content))
if triggerId in pdf["triggerId"].values and gcn_status in pdf["gcn_status"].values:
if (
triggerId in pdf["triggerId"].values
and gcn_status in pdf["gcn_status"].values
):
return pdf[
(pdf["triggerId"] == triggerId)
& (pdf["gcn_status"] == gcn_status)
Expand Down Expand Up @@ -82,19 +91,22 @@ def get_skymap(self, **kwargs) -> QTable:
>>> np.array(lvk_initial.get_skymap()["UNIQ"])
array([ 1285, 1287, 1296, ..., 162369, 162370, 162371])
"""
if "skymap" in self.voevent["event"]:
skymap_str = self.voevent["event"]["skymap"]
else:
hdfs_adress = kwargs["hdfs_adress"]
hdfs_client = InsecureClient(f"http://{hdfs_adress}:50070")
triggerId = self.get_trigger_id()
gcn_status = kwargs["gcn_status"]
root_path = kwargs["root_path"]
t_obs = Time(self.get_trigger_time()[1], format="jd").to_datetime()
gcn_pdf = gcn_from_hdfs(
hdfs_client, root_path, triggerId, t_obs, gcn_status
)
skymap_str = json.loads(gcn_pdf["raw_event"].iloc[0])["event"]["skymap"]

skymap_str = self.voevent["event"]["skymap"]

# if "skymap" in self.voevent["event"]:
# skymap_str = self.voevent["event"]["skymap"]
# else:
# hdfs_adress = kwargs["hdfs_adress"]
# hdfs_client = InsecureClient(f"http://{hdfs_adress}:50070")
# triggerId = self.get_trigger_id()
# gcn_status = kwargs["gcn_status"]
# root_path = kwargs["root_path"]
# t_obs = Time(self.get_trigger_time()[1], format="jd").to_datetime()
# gcn_pdf = gcn_from_hdfs(
# hdfs_client, root_path, triggerId, t_obs, gcn_status
# )
# skymap_str = json.loads(gcn_pdf["raw_event"].iloc[0])["event"]["skymap"]

# Decode and parse skymap
skymap_bytes = b64decode(skymap_str)
Expand Down Expand Up @@ -136,7 +148,8 @@ def is_observation(self, is_test: bool) -> bool:
or self.voevent["superevent_id"][0] == "M"
)
return (
self.voevent["superevent_id"][0] == "S"
self.voevent["superevent_id"][0]
== "S"
# comment the significant filter (enable cross-match with 'burst' and subthreshold events)
# and self.voevent["event"]["significant"]
)
Expand Down Expand Up @@ -432,10 +445,13 @@ def association_proba(
>>> lvk_initial.association_proba(95.712890625, -10.958863307027668, 0)
0.0054008620296433045
"""
if "hdfs_adress" in kwargs and "gcn_status" in kwargs and "root_path" in kwargs:
skymap = self.get_skymap(**kwargs)
else:
skymap = self.get_skymap()

skymap = self.get_skymap()

# if "hdfs_adress" in kwargs and "gcn_status" in kwargs and "root_path" in kwargs:
# skymap = self.get_skymap(**kwargs)
# else:
# skymap = self.get_skymap()

max_level = 29
max_nside = ah.level_to_nside(max_level)
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified fink_mm/test/test_data/grb_prob_test.parquet
Binary file not shown.
2 changes: 1 addition & 1 deletion fink_mm/test/test_data/with_hbase/fink_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ KAFKA_IPPORT=""
KAFKA_STARTING_OFFSET="earliest"

# Apache Spark mode
SPARK_MASTER="local[*]"
SPARK_MASTER="local[5]"

# Should be Spark options actually (cluster resources, ...)
EXTRA_SPARK_CONFIG='--driver-memory 2g --executor-memory 2g'
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
14 changes: 7 additions & 7 deletions fink_mm/test/utils_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ def align_ztf(
Examples
--------
>>> path_ztf_raw = ("fink_mm/test/test_data/ztf_test/online/raw/year=2019/month=09/day=03/")
>>> path_ztf_raw = ("fink_mm/test/test_data/ztf_test/online/science/year=2024/month=01/day=15/")
>>> ztf_pdf = pd.read_parquet(path_ztf_raw)
>>> random = np.random.default_rng(0)
>>> align_ztf(ztf_pdf, 5, 0, 10, random)
>>> ztf_pdf.loc[0]["candidate"]["ra"]
289.4610443
62.2777961
>>> ztf_pdf.loc[0]["candidate"]["dec"]
-11.0504023
57.4803954
>>> ztf_pdf.loc[len(ztf_pdf)-1]["candidate"]["ra"]
0
Expand All @@ -82,11 +82,11 @@ def align_ztf(
10
>>> ztf_pdf.loc[0]["candidate"]["jdstarthist"]
2458729.6881481
2458422.9036806
>>> ztf_pdf.loc[0]["candidate"]["jd"]
2458729.6881481
2460324.7135301
>>> ztf_pdf.loc[0]["prv_candidates"][-1]["jd"]
2458725.7316204
2460324.6389931
>>> ztf_pdf.loc[len(ztf_pdf)-1]["candidate"]["jdstarthist"]
5.020833333333333
Expand Down Expand Up @@ -262,7 +262,7 @@ def align_ztf_and_gcn(
>>> path_gcn = "fink_mm/test/test_data/683571622_0_test"
>>> gcn = pd.read_parquet(path_gcn)
>>> path_ztf_raw = ("fink_mm/test/test_data/ztf_test/online/raw/year=2019/month=09/day=03/")
>>> path_ztf_raw = ("fink_mm/test/test_data/ztf_test/online/science/year=2024/month=01/day=15/")
>>> ztf_pdf = pd.read_parquet(path_ztf_raw)
>>> random = np.random.default_rng(0)
Expand Down
41 changes: 22 additions & 19 deletions fink_mm/utils/fun_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def get_observatory(obsname: str, rawEvent: str): # -> Observatory
-------
>>> pdf = pd.read_parquet(grb_data)
>>> type(get_observatory(pdf["observatory"].iloc[0], pdf["raw_event"].iloc[0]))
<class 'Fermi.Fermi'>
<class 'Integral.Integral'>
>>> pdf = pd.read_parquet(gw_data)
>>> type(get_observatory(pdf["observatory"].iloc[0], pdf["raw_event"].iloc[0]))
<class 'LVK.LVK'>
Expand Down Expand Up @@ -331,7 +331,7 @@ def get_pixels(obsname: pd.Series, rawEvent: pd.Series, NSIDE: pd.Series) -> pd.
>>> grb_pixs.withColumn("hpix", explode("hpix_circle"))\
.orderBy(["triggerId", "hpix"])\
.select(["triggerId", "hpix"]).head(5)
[Row(triggerId='683476673', hpix=42), Row(triggerId='683476673', hpix=42), Row(triggerId='683476673', hpix=62), Row(triggerId='683476673', hpix=62), Row(triggerId='683476673', hpix=63)]
[Row(triggerId='10472', hpix=609), Row(triggerId='727009399', hpix=177), Row(triggerId='727009399', hpix=177), Row(triggerId='727009399', hpix=177), Row(triggerId='727009399', hpix=178)]
"""
return pd.Series(
[
Expand Down Expand Up @@ -757,21 +757,22 @@ def join_post_process(df_grb: DataFrame, hdfs_adress: str, root_path: str) -> Da
df_grb = concat_col(df_grb, "jd")
df_grb = concat_col(df_grb, "fid")

df_grb = df_grb.withColumn(
"c_rate",
compute_rate(
df_grb["candidate.magpsf"],
df_grb["candidate.jdstarthist"],
df_grb["candidate.jd"],
df_grb["candidate.fid"],
df_grb["cmagpsf"],
df_grb["cdiffmaglim"],
df_grb["cjd"],
df_grb["cfid"],
),
)

df_grb = format_rate_results(df_grb, "c_rate")
# DEPRECATED
# computed by the fast transient fink science module
# df_grb = df_grb.withColumn(
# "c_rate",
# compute_rate(
# df_grb["candidate.magpsf"],
# df_grb["candidate.jdstarthist"],
# df_grb["candidate.jd"],
# df_grb["candidate.fid"],
# df_grb["cmagpsf"],
# df_grb["cdiffmaglim"],
# df_grb["cjd"],
# df_grb["cfid"],
# ),
# )
# df_grb = format_rate_results(df_grb, "c_rate")

# TODO : do something better with satellites
# df_grb = add_tracklet_information(df_grb)
Expand Down Expand Up @@ -805,7 +806,7 @@ def join_post_process(df_grb: DataFrame, hdfs_adress: str, root_path: str) -> Da
df_grb["raw_event"],
df_grb["ztf_ra"],
df_grb["ztf_dec"],
df_grb["start_vartime"],
df_grb["jd_first_real_det"],
F.lit(hdfs_adress),
df_grb["gcn_status"],
F.lit(root_path),
Expand Down Expand Up @@ -838,6 +839,8 @@ def join_post_process(df_grb: DataFrame, hdfs_adress: str, root_path: str) -> Da
"candidate.jdstarthist",
"candidate.rb",
"candidate.jd",
"candidate.magpsf",
"candidate.sigmapsf"
]
df_grb = df_grb.select(cols_fink + cols_extra).filter("p_assoc != -1.0")
df_grb = df_grb.withColumnRenamed("err_arcmin", "gcn_loc_error")
Expand Down Expand Up @@ -869,7 +872,7 @@ def read_and_build_spark_submit(config, logger):
>>> home_path = os.environ["HOME"]
>>> path_bash_profile = os.path.join(home_path, ".bash_profile")
>>> test_str = f"if test -f '{path_bash_profile}'; then source {path_bash_profile}; fi; `which spark-submit` --master local[2] --conf spark.mesos.principal= --conf spark.mesos.secret= --conf spark.mesos.role= --conf spark.executorEnv.HOME=/path/to/user/ --driver-memory 4G --executor-memory 8G --conf spark.cores.max=16 --conf spark.executor.cores=8"
>>> test_str = f"if test -f '{path_bash_profile}'; then source {path_bash_profile}; fi; `which spark-submit` --master local[8] --conf spark.mesos.principal= --conf spark.mesos.secret= --conf spark.mesos.role= --conf spark.executorEnv.HOME=/path/to/user/ --driver-memory 4G --executor-memory 8G --conf spark.cores.max=16 --conf spark.executor.cores=8"
>>> test_str == spark_str
True
"""
Expand Down
Loading

0 comments on commit f320493

Please sign in to comment.