From b192a17b79102fe5cf2775c3870e779eca15d4e4 Mon Sep 17 00:00:00 2001 From: FusRoman <46221629+FusRoman@users.noreply.github.com> Date: Thu, 15 Jun 2023 14:59:43 +0200 Subject: [PATCH] Issue/74/restart topics (#75) * update readme documentation * add options to restart the kafka queue * invert gcn and ztf loading in the offline mode * update observatory schema version in the setup.py * unified observatory schema version * update readme * bump to 0.9 --- README.md | 4 ++ fink_grb/__init__.py | 3 +- fink_grb/fink_grb_cli.py | 9 +--- fink_grb/gcn_stream/gcn_stream.py | 6 ++- fink_grb/observatory/__init__.py | 2 +- fink_grb/offline/spark_offline.py | 78 +++++++++++++++++-------------- setup.py | 4 +- 7 files changed, 61 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index 46d2d904c..c51cc5410 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ toto@linux:~$ pip install git+https://github.com/FusRoman/Fink_GRB@v0.6-beta * Download the default config file from GitHub:fink_grb/conf/fink_grb.conf and move it in a custom location Update your configuration file with your custom parameters. Please note that the configuration file's paths must not end with a '/'. +* Once Fink_GRB has been installed via pip, the command `fink_grb` become available. Type `fink_grb --help` to see the help message and show what you can do with Fink_GRB. ### Setup the Fink_GRB daemons * Start listening to GCN @@ -28,6 +29,9 @@ toto@linux:~$ fink_grb gcn_stream start --config /config_path ``` The above command will start a daemon that will store the GCN issued from the instruments registered in the system. The GNC will be stored at the location specified in the configuration file by the entry named 'online_gcn_data_prefix'. The path can be a local path or a hdfs path. In the latter case, the path must start with hdfs://IP:PORT///your_path where IP and PORT refer to the hdfs driver. +> :warning: The GCN stream need to be restarted after each update of Fink_GRB. Use the `ps aux | grep fink_grb` command to identify the process number of the gcn stream and kill it then restart the gcn stream with the same command as above. + +### Schedulers Fink_GRB has multiples script in the scheduler folder to launch the different services. * science2grb.sh will launch the online services that will cross-match in real-time the alerts issued from the ZTF/LSST with incoming GCN. (latency: ZTF/LSST latency + 30 seconds max) * science2grb_offline.sh launch the offline services that will cross-match all the latest alerts from ZTF/LSST with the GCN within the time window (in days) specified in your config file. (latency: 1 day) diff --git a/fink_grb/__init__.py b/fink_grb/__init__.py index d14fbd95b..824f26e61 100644 --- a/fink_grb/__init__.py +++ b/fink_grb/__init__.py @@ -12,5 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.8" +__version__ = "0.9" __distribution_schema_version__ = "1.2" +__observatory_schema_version__ = "1.1" diff --git a/fink_grb/fink_grb_cli.py b/fink_grb/fink_grb_cli.py index ecb98bf7c..fd8fc4847 100644 --- a/fink_grb/fink_grb_cli.py +++ b/fink_grb/fink_grb_cli.py @@ -1,6 +1,6 @@ """ Usage: - fink_grb gcn_stream (start|monitor) [options] + fink_grb gcn_stream (start|monitor) [--restart] [options] fink_grb join_stream (offline|online) --night= [--exit_after=] [options] fink_grb distribute --night= [--exit_after=] [options] fink_grb -h | --help @@ -11,6 +11,7 @@ start start to listening the gcn stream monitor print informations about the status of the gcn stream process and the collected data. + --restart restarts the gcn topics to the beginning. join_stream launch the script that join the ztf stream and the gcn stream offline launch the offline mode online launch the online mode @@ -27,14 +28,12 @@ def main(): - # parse the command line and return options provided by the user. arguments = docopt(__doc__, version=__version__) # The import are in the if statements to speed-up the cli execution. if arguments["gcn_stream"]: - if arguments["start"]: from fink_grb.gcn_stream.gcn_stream import start_gcn_stream @@ -45,21 +44,17 @@ def main(): gcn_stream_monitoring(arguments) elif arguments["join_stream"]: - if arguments["online"]: - from fink_grb.online.ztf_join_gcn import launch_joining_stream launch_joining_stream(arguments) elif arguments["offline"]: - from fink_grb.offline.spark_offline import launch_offline_mode launch_offline_mode(arguments) elif arguments["distribute"]: - from fink_grb.distribution.distribution import launch_distribution launch_distribution(arguments) diff --git a/fink_grb/gcn_stream/gcn_stream.py b/fink_grb/gcn_stream/gcn_stream.py index b0c288094..93879a3ee 100644 --- a/fink_grb/gcn_stream/gcn_stream.py +++ b/fink_grb/gcn_stream/gcn_stream.py @@ -12,6 +12,7 @@ from fink_grb.init import get_config, init_logging, return_verbose_level from fink_grb.utils.fun_utils import get_hdfs_connector from fink_grb.observatory import TOPICS, TOPICS_FORMAT +from fink_client.scripts.fink_datatransfer import my_assign def signal_handler(signal, frame): # pragma: no cover @@ -206,7 +207,10 @@ def start_gcn_stream(arguments): gcn_fs = None # Subscribe to topics and receive alerts - consumer.subscribe(TOPICS) + if arguments["--restart"]: + consumer.subscribe(TOPICS, on_assign=my_assign) + else: + consumer.subscribe(TOPICS) signal.signal(signal.SIGINT, signal_handler) diff --git a/fink_grb/observatory/__init__.py b/fink_grb/observatory/__init__.py index 8bb6aea6f..2085b9490 100644 --- a/fink_grb/observatory/__init__.py +++ b/fink_grb/observatory/__init__.py @@ -77,7 +77,7 @@ def __get_topics(): OBSERVATORY_PATH = "observatory" -OBSERVATORY_SCHEMA_VERSION = 1.1 +OBSERVATORY_SCHEMA_VERSION = fink_grb.__observatory_schema_version__ OBSERVATORY_JSON_SCHEMA_PATH = files("fink_grb").joinpath( "observatory/observatory_schema_version_{}.json".format(OBSERVATORY_SCHEMA_VERSION) ) diff --git a/fink_grb/offline/spark_offline.py b/fink_grb/offline/spark_offline.py index d2ba139b2..545ceaf0a 100644 --- a/fink_grb/offline/spark_offline.py +++ b/fink_grb/offline/spark_offline.py @@ -1,5 +1,5 @@ import json -from astropy.time import TimeDelta +from astropy.time import TimeDelta, Time from fink_utils.science.utils import ang2pix from fink_utils.broker.sparkUtils import init_sparksession @@ -100,18 +100,18 @@ def ztf_grb_filter(spark_ztf, ast_dist, pansstar_dist, pansstar_star_score, gaia def spark_offline( - hbase_catalog, - gcn_read_path, - grbxztf_write_path, - night, - NSIDE, - start_window, - time_window, - ast_dist, - pansstar_dist, - pansstar_star_score, - gaia_dist, - with_columns_filter=True, + hbase_catalog: str, + gcn_read_path: str, + grbxztf_write_path: str, + night: str, + NSIDE: int, + start_window: float, + time_window: int, + ast_dist: float, + pansstar_dist: float, + pansstar_star_score: float, + gaia_dist: float, + with_columns_filter: bool = True, ): """ Cross-match Fink and the GNC in order to find the optical alerts falling in the error box of a GCN. @@ -127,7 +127,7 @@ def spark_offline( path to store the cross match ZTF/GCN results night : string launching night of the script - NSIDE: String + NSIDE: int Healpix map resolution, better if a power of 2 start_window : float start date of the time window (in jd / julian date) @@ -146,6 +146,9 @@ def spark_offline( gaia_dist: float Distance to closest source from Gaia DR1 catalog irrespective of magnitude; if exists within 90 arcsec [arcsec] neargaia field + with_columns_filter : boolean + Hbase options to optimize loading, work only in distributed mode + Set this option at False if in local mode. default = True Returns ------- @@ -175,12 +178,36 @@ def spark_offline( >>> assert_frame_equal(datatest, datajoin, check_dtype=False, check_column_type=False, check_categorical=False) """ - with open(hbase_catalog) as f: - catalog = json.load(f) - spark = init_sparksession( "science2grb_offline_{}{}{}".format(night[0:4], night[4:6], night[6:8]) ) + logger = init_logging() + low_bound = start_window - TimeDelta(time_window * 24 * 3600, format="sec").jd + + if low_bound < 0 or low_bound > start_window: + raise ValueError( + "The time window is higher than the start_window : \nstart_window = {}\ntime_window = {}\nlow_bound={}".format( + start_window, time_window, low_bound + ) + ) + + grb_alert = spark.read.format("parquet").load(gcn_read_path) + + grb_alert = grb_alert.filter(grb_alert.triggerTimejd >= low_bound).filter( + grb_alert.triggerTimejd <= start_window + ) + + nb_gcn_alert = grb_alert.cache().count() + if nb_gcn_alert == 0: + logger.info( + "No gcn between {} and {}, exit the offline mode.".format( + Time(low_bound, format="jd").iso, Time(start_window, format="jd").iso + ) + ) + return + + with open(hbase_catalog) as f: + catalog = json.load(f) ztf_alert = ( spark.read.option("catalog", catalog) @@ -219,15 +246,6 @@ def spark_offline( "tracklet", ) - low_bound = start_window - TimeDelta(time_window * 24 * 3600, format="sec").jd - - if low_bound < 0 or low_bound > start_window: - raise ValueError( - "The time window is higher than the start_window : \nstart_window = {}\ntime_window = {}\nlow_bound={}".format( - start_window, time_window, low_bound - ) - ) - ztf_alert = ztf_alert.filter( ztf_alert["jd_objectId"] >= "{}".format(low_bound) ).filter(ztf_alert["jd_objectId"] < "{}".format(start_window)) @@ -238,14 +256,6 @@ def spark_offline( ztf_alert.cache().count() - grb_alert = spark.read.format("parquet").load(gcn_read_path) - - grb_alert = grb_alert.filter(grb_alert.triggerTimejd >= low_bound).filter( - grb_alert.triggerTimejd <= start_window - ) - - grb_alert.cache().count() - ztf_alert = ztf_alert.withColumn( "hpix", ang2pix(ztf_alert.ra, ztf_alert.dec, F.lit(NSIDE)), diff --git a/setup.py b/setup.py index 2a82ecc77..cdaf698e2 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,9 @@ "conf/fink_grb_schema_version_{}.avsc".format( fink_grb.__distribution_schema_version__ ), - "observatory/observatory_schema_version_1.0.json", + "observatory/observatory_schema_version_{}.json".format( + fink_grb.__observatory_schema_version__ + ), ] + [ path.relpath(el, start="fink_grb")