diff --git a/.github/workflows/run_test.yml b/.github/workflows/run_test.yml index 2fda3a7..e63bd48 100644 --- a/.github/workflows/run_test.yml +++ b/.github/workflows/run_test.yml @@ -28,6 +28,8 @@ jobs: run: | pip install --upgrade pip setuptools wheel pip install -r requirements.txt + pip install 'Cython<3' + pip install --no-build-isolation fastavro==1.6.0 pip install -e . echo "PYTHONPATH="${PYTHONPATH}:${SPARKLIB}:${FINK_CLIENT_HOME}"" >> $GITHUB_ENV - name: Check env diff --git a/environment.yml b/environment.yml index ef7b0d9..eae6b45 100644 --- a/environment.yml +++ b/environment.yml @@ -4,16 +4,16 @@ channels: - conda-forge dependencies: - python>=3.9 + - Cython<3 - coverage>=4.2 - coveralls - python-confluent-kafka>=1.9.2 - - fastavro=1.6.0 - astropy - - numpy + - numpy<1.25 - pyarrow>=10.0.1 + #- fastavro==1.6.0 - pandas - pyyaml - tabulate - matplotlib - tqdm - diff --git a/fink_client/__init__.py b/fink_client/__init__.py index 7d4a771..6e29138 100644 --- a/fink_client/__init__.py +++ b/fink_client/__init__.py @@ -12,5 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "5.0" +__version__ = "6.0" __schema_version__ = "distribution_schema_fink_ztf_{}.avsc" diff --git a/fink_client/avroUtils.py b/fink_client/avroUtils.py index eb7eb08..e078519 100644 --- a/fink_client/avroUtils.py +++ b/fink_client/avroUtils.py @@ -223,7 +223,7 @@ def to_iterator(self) -> Iterable[dict]: for alert in self._read_single_alert(fn): yield alert -def write_alert(alert: dict, schema: str, path: str, overwrite: bool = False): +def write_alert(alert: dict, schema: str, path: str, overwrite: bool = False, id1: str = '', id2: str = ''): """ Write avro alert on disk Parameters @@ -242,13 +242,13 @@ def write_alert(alert: dict, schema: str, path: str, overwrite: bool = False): >>> alert = r.to_list(size=1)[0] Write the alert on disk - >>> write_alert(alert, schema_path, ".", overwrite=True) + >>> write_alert(alert, schema_path, ".", overwrite=True, id1="objectId", id2="candid") For test purposes, you can overwrite alert data on disk, but that should not happen in production as alert ID must be unique! Hence the writer will raise an exception if overwrite is not specified (default). >>> write_alert( - ... alert, schema_path, ".", overwrite=False) + ... alert, schema_path, ".", overwrite=False, id1="objectId", id2="candid") ... # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE Traceback (most recent call last): ... @@ -256,7 +256,7 @@ def write_alert(alert: dict, schema: str, path: str, overwrite: bool = False): """ alert_filename = os.path.join( path, - "{}_{}.avro".format(alert["objectId"], alert["candidate"]["candid"]) + "{}_{}.avro".format(alert[id1], alert[id2]) ) if type(schema) == str: diff --git a/fink_client/configuration.py b/fink_client/configuration.py index 82ec0b0..d128566 100644 --- a/fink_client/configuration.py +++ b/fink_client/configuration.py @@ -134,6 +134,17 @@ def load_credentials(tmp: bool = False) -> dict: return creds +def mm_topic_names(): + """ Return list of topics with MMA schema + """ + out = [ + 'fink_grb_bronze', + 'fink_grb_silver', + 'fink_grb_gold', + 'fink_gw_bronze', + ] + return out + if __name__ == "__main__": """ Run the test suite """ diff --git a/fink_client/consumer.py b/fink_client/consumer.py index abb2816..8601cca 100644 --- a/fink_client/consumer.py +++ b/fink_client/consumer.py @@ -22,6 +22,7 @@ from fink_client.avroUtils import write_alert from fink_client.avroUtils import _get_alert_schema from fink_client.avroUtils import _decode_avro_alert +from fink_client.configuration import mm_topic_names class AlertError(Exception): pass @@ -232,9 +233,24 @@ def poll_and_write( """ topic, alert, key = self.poll(timeout) + is_mma = topic in mm_topic_names() + + if is_mma: + id1 = 'objectId' + id2 = 'triggerId' + else: + id1 = 'objectId' + id2 = 'candid' if topic is not None: - write_alert(alert, self._parsed_schema, outdir, overwrite=overwrite) + write_alert( + alert, + self._parsed_schema, + outdir, + overwrite=overwrite, + id1=id1, + id2=id2 + ) return topic, alert, key diff --git a/fink_client/scripts/fink_consumer.py b/fink_client/scripts/fink_consumer.py index 7c8735e..c466c82 100755 --- a/fink_client/scripts/fink_consumer.py +++ b/fink_client/scripts/fink_consumer.py @@ -15,6 +15,7 @@ # limitations under the License. """ Kafka consumer to listen and archive Fink streams from the Livestream service """ import sys +import os import argparse import time @@ -23,6 +24,7 @@ from fink_client.consumer import AlertConsumer from fink_client.configuration import load_credentials +from fink_client.configuration import mm_topic_names def main(): """ """ @@ -44,7 +46,8 @@ def main(): help="Folder to store incoming alerts if --save is set. It must exist.") parser.add_argument( '-schema', type=str, default=None, - help="Avro schema to decode the incoming alerts. Default is None (version taken from each alert)") + help="Avro schema to decode the incoming alerts. Default is None (version taken from each alert)" + ) args = parser.parse_args(None) # load user configuration @@ -72,6 +75,9 @@ def main(): # Time to wait before polling again if no alerts maxtimeout = conf['maxtimeout'] + if not os.path.isdir(args.outdir): + os.makedirs(args.outdir, exist_ok=True) + # infinite loop maxpoll = args.limit if args.limit else 1e10 try: @@ -91,20 +97,25 @@ def main(): if topic is not None: poll_number += 1 + is_mma = topic in mm_topic_names() if args.display and topic is not None: utc = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) - table = [ - [ - alert['timestamp'], utc, topic, alert['objectId'], - alert['cdsxmatch'], - alert['candidate']['magpsf'] - ], - ] - headers = [ - 'Emitted at (UTC)', 'Received at (UTC)', - 'Topic', 'objectId', 'Simbad', 'Magnitude' - ] + if is_mma: + table = [[alert['objectId'], alert['fink_class'], topic, alert['rate'], alert['observatory'], alert['triggerId']]] + headers = ['ObjectId', 'Classification', 'Topic', 'Rate (mag/day)', 'Observatory', 'Trigger ID'] + else: + table = [ + [ + alert['timestamp'], utc, topic, alert['objectId'], + alert['cdsxmatch'], + alert['candidate']['magpsf'] + ], + ] + headers = [ + 'Emitted at (UTC)', 'Received at (UTC)', + 'Topic', 'objectId', 'Simbad', 'Magnitude' + ] print(tabulate(table, headers, tablefmt="pretty")) elif args.display: print('No alerts the last {} seconds'.format(maxtimeout)) diff --git a/requirements.txt b/requirements.txt index 805053a..292da26 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,9 @@ coverage>=4.2 coveralls confluent-kafka==2.0.2 -fastavro==1.6.0 +#fastavro==1.6.0 astropy -numpy +numpy<1.25 pyarrow>=10.0.1 pandas pyyaml