Skip to content

Commit

Permalink
Issue/74/restart topics (#75)
Browse files Browse the repository at this point in the history
* update readme documentation

* add options to restart the kafka queue

* invert gcn and ztf loading in the offline mode

* update observatory schema version in the setup.py

* unified observatory schema version

* update readme

* bump to 0.9
  • Loading branch information
FusRoman authored Jun 15, 2023
1 parent dc4012b commit b192a17
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 45 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ toto@linux:~$ pip install git+https://github.com/FusRoman/[email protected]
* Download the default config file from GitHub:fink_grb/conf/fink_grb.conf and move it in a custom location
Update your configuration file with your custom parameters.
Please note that the configuration file's paths must not end with a '/'.
* Once Fink_GRB has been installed via pip, the command `fink_grb` become available. Type `fink_grb --help` to see the help message and show what you can do with Fink_GRB.

### Setup the Fink_GRB daemons
* Start listening to GCN
Expand All @@ -28,6 +29,9 @@ toto@linux:~$ fink_grb gcn_stream start --config /config_path
```
The above command will start a daemon that will store the GCN issued from the instruments registered in the system. The GNC will be stored at the location specified in the configuration file by the entry named 'online_gcn_data_prefix'. The path can be a local path or a hdfs path. In the latter case, the path must start with hdfs://IP:PORT///your_path where IP and PORT refer to the hdfs driver.

> :warning: The GCN stream need to be restarted after each update of Fink_GRB. Use the `ps aux | grep fink_grb` command to identify the process number of the gcn stream and kill it then restart the gcn stream with the same command as above.
### Schedulers
Fink_GRB has multiples script in the scheduler folder to launch the different services.
* science2grb.sh will launch the online services that will cross-match in real-time the alerts issued from the ZTF/LSST with incoming GCN. (latency: ZTF/LSST latency + 30 seconds max)
* science2grb_offline.sh launch the offline services that will cross-match all the latest alerts from ZTF/LSST with the GCN within the time window (in days) specified in your config file. (latency: 1 day)
Expand Down
3 changes: 2 additions & 1 deletion fink_grb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "0.8"
__version__ = "0.9"
__distribution_schema_version__ = "1.2"
__observatory_schema_version__ = "1.1"
9 changes: 2 additions & 7 deletions fink_grb/fink_grb_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
Usage:
fink_grb gcn_stream (start|monitor) [options]
fink_grb gcn_stream (start|monitor) [--restart] [options]
fink_grb join_stream (offline|online) --night=<date> [--exit_after=<second>] [options]
fink_grb distribute --night=<date> [--exit_after=<second>] [options]
fink_grb -h | --help
Expand All @@ -11,6 +11,7 @@
start start to listening the gcn stream
monitor print informations about the status of the gcn stream process
and the collected data.
--restart restarts the gcn topics to the beginning.
join_stream launch the script that join the ztf stream and the gcn stream
offline launch the offline mode
online launch the online mode
Expand All @@ -27,14 +28,12 @@


def main():

# parse the command line and return options provided by the user.
arguments = docopt(__doc__, version=__version__)

# The import are in the if statements to speed-up the cli execution.

if arguments["gcn_stream"]:

if arguments["start"]:
from fink_grb.gcn_stream.gcn_stream import start_gcn_stream

Expand All @@ -45,21 +44,17 @@ def main():
gcn_stream_monitoring(arguments)

elif arguments["join_stream"]:

if arguments["online"]:

from fink_grb.online.ztf_join_gcn import launch_joining_stream

launch_joining_stream(arguments)

elif arguments["offline"]:

from fink_grb.offline.spark_offline import launch_offline_mode

launch_offline_mode(arguments)

elif arguments["distribute"]:

from fink_grb.distribution.distribution import launch_distribution

launch_distribution(arguments)
Expand Down
6 changes: 5 additions & 1 deletion fink_grb/gcn_stream/gcn_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from fink_grb.init import get_config, init_logging, return_verbose_level
from fink_grb.utils.fun_utils import get_hdfs_connector
from fink_grb.observatory import TOPICS, TOPICS_FORMAT
from fink_client.scripts.fink_datatransfer import my_assign


def signal_handler(signal, frame): # pragma: no cover
Expand Down Expand Up @@ -206,7 +207,10 @@ def start_gcn_stream(arguments):
gcn_fs = None

# Subscribe to topics and receive alerts
consumer.subscribe(TOPICS)
if arguments["--restart"]:
consumer.subscribe(TOPICS, on_assign=my_assign)
else:
consumer.subscribe(TOPICS)

signal.signal(signal.SIGINT, signal_handler)

Expand Down
2 changes: 1 addition & 1 deletion fink_grb/observatory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __get_topics():


OBSERVATORY_PATH = "observatory"
OBSERVATORY_SCHEMA_VERSION = 1.1
OBSERVATORY_SCHEMA_VERSION = fink_grb.__observatory_schema_version__
OBSERVATORY_JSON_SCHEMA_PATH = files("fink_grb").joinpath(
"observatory/observatory_schema_version_{}.json".format(OBSERVATORY_SCHEMA_VERSION)
)
Expand Down
78 changes: 44 additions & 34 deletions fink_grb/offline/spark_offline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from astropy.time import TimeDelta
from astropy.time import TimeDelta, Time

from fink_utils.science.utils import ang2pix
from fink_utils.broker.sparkUtils import init_sparksession
Expand Down Expand Up @@ -100,18 +100,18 @@ def ztf_grb_filter(spark_ztf, ast_dist, pansstar_dist, pansstar_star_score, gaia


def spark_offline(
hbase_catalog,
gcn_read_path,
grbxztf_write_path,
night,
NSIDE,
start_window,
time_window,
ast_dist,
pansstar_dist,
pansstar_star_score,
gaia_dist,
with_columns_filter=True,
hbase_catalog: str,
gcn_read_path: str,
grbxztf_write_path: str,
night: str,
NSIDE: int,
start_window: float,
time_window: int,
ast_dist: float,
pansstar_dist: float,
pansstar_star_score: float,
gaia_dist: float,
with_columns_filter: bool = True,
):
"""
Cross-match Fink and the GNC in order to find the optical alerts falling in the error box of a GCN.
Expand All @@ -127,7 +127,7 @@ def spark_offline(
path to store the cross match ZTF/GCN results
night : string
launching night of the script
NSIDE: String
NSIDE: int
Healpix map resolution, better if a power of 2
start_window : float
start date of the time window (in jd / julian date)
Expand All @@ -146,6 +146,9 @@ def spark_offline(
gaia_dist: float
Distance to closest source from Gaia DR1 catalog irrespective of magnitude; if exists within 90 arcsec [arcsec]
neargaia field
with_columns_filter : boolean
Hbase options to optimize loading, work only in distributed mode
Set this option at False if in local mode. default = True
Returns
-------
Expand Down Expand Up @@ -175,12 +178,36 @@ def spark_offline(
>>> assert_frame_equal(datatest, datajoin, check_dtype=False, check_column_type=False, check_categorical=False)
"""
with open(hbase_catalog) as f:
catalog = json.load(f)

spark = init_sparksession(
"science2grb_offline_{}{}{}".format(night[0:4], night[4:6], night[6:8])
)
logger = init_logging()
low_bound = start_window - TimeDelta(time_window * 24 * 3600, format="sec").jd

if low_bound < 0 or low_bound > start_window:
raise ValueError(
"The time window is higher than the start_window : \nstart_window = {}\ntime_window = {}\nlow_bound={}".format(
start_window, time_window, low_bound
)
)

grb_alert = spark.read.format("parquet").load(gcn_read_path)

grb_alert = grb_alert.filter(grb_alert.triggerTimejd >= low_bound).filter(
grb_alert.triggerTimejd <= start_window
)

nb_gcn_alert = grb_alert.cache().count()
if nb_gcn_alert == 0:
logger.info(
"No gcn between {} and {}, exit the offline mode.".format(
Time(low_bound, format="jd").iso, Time(start_window, format="jd").iso
)
)
return

with open(hbase_catalog) as f:
catalog = json.load(f)

ztf_alert = (
spark.read.option("catalog", catalog)
Expand Down Expand Up @@ -219,15 +246,6 @@ def spark_offline(
"tracklet",
)

low_bound = start_window - TimeDelta(time_window * 24 * 3600, format="sec").jd

if low_bound < 0 or low_bound > start_window:
raise ValueError(
"The time window is higher than the start_window : \nstart_window = {}\ntime_window = {}\nlow_bound={}".format(
start_window, time_window, low_bound
)
)

ztf_alert = ztf_alert.filter(
ztf_alert["jd_objectId"] >= "{}".format(low_bound)
).filter(ztf_alert["jd_objectId"] < "{}".format(start_window))
Expand All @@ -238,14 +256,6 @@ def spark_offline(

ztf_alert.cache().count()

grb_alert = spark.read.format("parquet").load(gcn_read_path)

grb_alert = grb_alert.filter(grb_alert.triggerTimejd >= low_bound).filter(
grb_alert.triggerTimejd <= start_window
)

grb_alert.cache().count()

ztf_alert = ztf_alert.withColumn(
"hpix",
ang2pix(ztf_alert.ra, ztf_alert.dec, F.lit(NSIDE)),
Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
"conf/fink_grb_schema_version_{}.avsc".format(
fink_grb.__distribution_schema_version__
),
"observatory/observatory_schema_version_1.0.json",
"observatory/observatory_schema_version_{}.json".format(
fink_grb.__observatory_schema_version__
),
]
+ [
path.relpath(el, start="fink_grb")
Expand Down

0 comments on commit b192a17

Please sign in to comment.