Skip to content

Commit

Permalink
Add TNS column (#910)
Browse files Browse the repository at this point in the history
* Add possibility to pass TNS catalog for raw2science. Currently not used; the catalog is downloaded.

* Add a new field in HBase

* Simplify the TNS index table ingestion

* Change default value for TNS to empty string

* Bump requirements

* PEP8

* Fix wrong spelling

* Fx test

* Add new filter

* Add new topic in the count

* Bump dependencies
  • Loading branch information
JulienPeloton authored Nov 6, 2024
1 parent dddc418 commit f42f439
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 91 deletions.
1 change: 1 addition & 0 deletions bin/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"fink_filters.filter_simbad_grav_candidates.filter.simbad_grav_candidates",
"fink_filters.filter_blazar.filter.blazar",
"fink_filters.filter_yso_spicy_candidates.filter.yso_spicy_candidates",
"fink_filters.filter_tns_match.filter.tns_match",
]


Expand Down
8 changes: 7 additions & 1 deletion bin/fink
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ elif [[ $service == "raw2science" ]]; then
NOSCIENCE=""
fi

if [[ $TNS_RAW_OUTPUT ]]; then
TNS_OPTION="-tns_raw_output ${TNS_RAW_OUTPUT}"
else
TNS_OPTION=""
fi

# Store the stream of alerts
spark-submit --master ${SPARK_MASTER} \
--packages ${FINK_PACKAGES} \
Expand All @@ -294,7 +300,7 @@ elif [[ $service == "raw2science" ]]; then
-tinterval ${FINK_TRIGGER_UPDATE} \
-night ${NIGHT} \
-mmconfigpath ${FINK_MM_CONFIG} \
-log_level ${LOG_LEVEL} ${NOSCIENCE} ${EXIT_AFTER}
-log_level ${LOG_LEVEL} ${TNS_OPTION} ${NOSCIENCE} ${EXIT_AFTER}
elif [[ $service == "raw2science_elasticc_paper" ]]; then
# Store the stream of alerts
spark-submit --master ${SPARK_MASTER} \
Expand Down
81 changes: 5 additions & 76 deletions bin/index_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@
4. Push data (single shot)
"""

import os
import argparse
import numpy as np
import pandas as pd

import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import pandas_udf, PandasUDFType

from fink_broker.parser import getargs
from fink_broker.science import ang2pix
Expand All @@ -45,11 +40,6 @@
from fink_filters.classification import extract_fink_classification
from fink_utils.spark.utils import check_status_last_prv_candidates

from fink_tns.utils import download_catalog

from astropy.coordinates import SkyCoord
from astropy import units as u


def main():
parser = argparse.ArgumentParser(description=__doc__)
Expand Down Expand Up @@ -277,75 +267,14 @@ def main():
# Remove unused columns
df_index = df_index.drop(*["rb", "nbad"])
elif columns[0] == "tns":
with open("{}/tns_marker.txt".format(args.tns_folder)) as f:
tns_marker = f.read().replace("\n", "")

pdf_tns = download_catalog(os.environ["TNS_API_KEY"], tns_marker)

# Filter TNS confirmed data
f1 = ~pdf_tns["type"].isna()
pdf_tns_filt = pdf_tns[f1]

pdf_tns_filt_b = spark.sparkContext.broadcast(pdf_tns_filt)

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def crossmatch_with_tns(objectid, ra, dec):
# TNS
pdf = pdf_tns_filt_b.value
ra2, dec2, type2 = pdf["ra"], pdf["declination"], pdf["type"]

# create catalogs
catalog_ztf = SkyCoord(
ra=np.array(ra, dtype=np.float) * u.degree,
dec=np.array(dec, dtype=np.float) * u.degree,
)
catalog_tns = SkyCoord(
ra=np.array(ra2, dtype=np.float) * u.degree,
dec=np.array(dec2, dtype=np.float) * u.degree,
)

# cross-match
_, _, _ = catalog_tns.match_to_catalog_sky(catalog_ztf)

sub_pdf = pd.DataFrame({
"objectId": objectid.to_numpy(),
"ra": ra.to_numpy(),
"dec": dec.to_numpy(),
})

# cross-match
idx2, d2d2, _ = catalog_ztf.match_to_catalog_sky(catalog_tns)

# set separation length
sep_constraint2 = d2d2.degree < 1.5 / 3600

sub_pdf["TNS"] = [""] * len(sub_pdf)
sub_pdf["TNS"][sep_constraint2] = type2.to_numpy()[idx2[sep_constraint2]]

to_return = objectid.apply(
lambda x: ""
if x not in sub_pdf["objectId"].to_numpy()
else sub_pdf["TNS"][sub_pdf["objectId"] == x].to_numpy()[0]
)

return to_return

df = df.withColumn(
"tns", crossmatch_with_tns(df["objectId"], df["ra"], df["dec"])
)
# Flag only objects with counterpart in TNS
df_index = df.filter(df["tns"] != "")

# Row key
df = add_row_key(df, row_key_name=index_row_key_name, cols=columns)

df = select_relevant_columns(
df, cols=common_cols + ["tns"], row_key_name=index_row_key_name
df_index = add_row_key(df_index, row_key_name=index_row_key_name, cols=columns)
df_index = select_relevant_columns(
df_index, cols=common_cols, row_key_name=index_row_key_name
)

df = df.cache()
df_index = df.filter(df["tns"] != "").drop("tns")
# trigger the cache - not the cache might be a killer for LSST...
n = df_index.count()
print("TNS objects: {}".format(n))
else:
# Row key
df = add_row_key(df, row_key_name=index_row_key_name, cols=columns)
Expand Down
2 changes: 1 addition & 1 deletion bin/raw2science.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def main():
from fink_broker.science import apply_science_modules

logger.info("Apply science modules")
df = apply_science_modules(df)
df = apply_science_modules(df, args.tns_raw_output)

logger.debug("Add ingestion timestamp")
df = df.withColumn(
Expand Down
2 changes: 1 addition & 1 deletion deps/requirements-science.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Fink-science dependencies
# WARNING: fink-science pip module is deprecated and fink-science is now installed from source

git+https://github.com/astrolabsoftware/[email protected].0
git+https://github.com/astrolabsoftware/[email protected].9

# xmatch_cds
line_profiler==4.1.3
Expand Down
4 changes: 2 additions & 2 deletions deps/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ avro-python3
fastavro==1.6.0

# Fink core
fink_filters>=3.37
fink-utils>=0.29.3
fink_filters>=3.38
fink-utils>=0.29.10
fink-spins>=0.3.8
fink-tns>=0.9

Expand Down
6 changes: 3 additions & 3 deletions e2e/check-results.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ DIR=$(cd "$(dirname "$0")"; pwd -P)
# for example in finkctl.yaml
if [ "$SUFFIX" = "noscience" ];
then
expected_topics="12"
expected_topics="13"
else
expected_topics="8"
expected_topics="9"
fi

count=0
Expand Down Expand Up @@ -60,4 +60,4 @@ do
exit 1
fi
done
finkctl get topics
finkctl get topics
7 changes: 4 additions & 3 deletions fink_broker/hbase_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def load_fink_cols():
--------
>>> fink_cols, fink_nested_cols = load_fink_cols()
>>> print(len(fink_cols))
29
30
>>> print(len(fink_nested_cols))
18
Expand Down Expand Up @@ -80,6 +80,7 @@ def load_fink_cols():
"from_upper": {"type": "boolean", "default": False},
"spicy_id": {"type": "int", "default": -1},
"spicy_class": {"type": "string", "default": "Unknown"},
"tns": {"type": "string", "default": ""},
}

fink_nested_cols = {}
Expand Down Expand Up @@ -107,7 +108,7 @@ def load_all_cols():
>>> root_level, candidates, images, fink_cols, fink_nested_cols = load_all_cols()
>>> out = {**root_level, **candidates, **images, **fink_cols, **fink_nested_cols}
>>> print(len(out))
158
159
"""
fink_cols, fink_nested_cols = load_fink_cols()

Expand Down Expand Up @@ -332,7 +333,7 @@ def load_ztf_index_cols():
--------
>>> out = load_ztf_index_cols()
>>> print(len(out))
83
84
"""
# From `root` or `candidates.`
common = [
Expand Down
17 changes: 13 additions & 4 deletions fink_broker/science.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
# Import of science modules
from fink_science.random_forest_snia.processor import rfscore_sigmoid_full
from fink_science.xmatch.processor import xmatch_cds
from fink_science.xmatch.processor import xmatch_tns
from fink_science.xmatch.processor import crossmatch_other_catalog
from fink_science.xmatch.processor import crossmatch_mangrove

Expand Down Expand Up @@ -175,7 +176,7 @@ def fake_t2(incol):
return pd.Series([out] * len(incol))


def apply_science_modules(df: DataFrame) -> DataFrame:
def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame:
"""Load and apply Fink science modules to enrich alert content
Focus on ZTF stream
Expand All @@ -184,13 +185,19 @@ def apply_science_modules(df: DataFrame) -> DataFrame:
----------
df: DataFrame
Spark (Streaming or SQL) DataFrame containing raw alert data
tns_catalog: str, optional
Folder that contains raw TNS catalog. Inside, it is expected
to find the file `tns_raw.parquet` downloaded using
`fink-broker/bin/download_tns.py`. Default is "", in
which case the catalog will be downloaded. Beware that
to download the catalog, you need to set environment variables:
- TNS_API_MARKER: path to the TNS API marker (tns_marker.txt)
- TNS_API_KEY: path to the TNS API key (tns_api.key)
Returns
-------
df: DataFrame
Spark (Streaming or SQL) DataFrame containing enriched alert data
noscience: bool
Return untouched input dataframe, useful for Fink-broker infrastructure validation
Examples
--------
Expand Down Expand Up @@ -223,10 +230,12 @@ def apply_science_modules(df: DataFrame) -> DataFrame:
df = concat_col(df, colname, prefix=prefix)
expanded = [prefix + i for i in to_expand]

# Apply level one processor: cdsxmatch
_LOG.info("New processor: cdsxmatch")
df = xmatch_cds(df)

_LOG.info("New processor: TNS")
df = xmatch_tns(df, tns_raw_output=tns_raw_output)

_LOG.info("New processor: Gaia xmatch (1.0 arcsec)")
df = xmatch_cds(
df,
Expand Down

0 comments on commit f42f439

Please sign in to comment.