From eaf213c71101177ede01b549240c74acd953fdd3 Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Mon, 3 Jun 2024 13:14:57 +0200 Subject: [PATCH] Add possibility to dump schema for inspection --- fink_client/consumer.py | 10 +++++++++- fink_client/scripts/fink_consumer.py | 9 +++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/fink_client/consumer.py b/fink_client/consumer.py index d355e6f..87ae2a5 100644 --- a/fink_client/consumer.py +++ b/fink_client/consumer.py @@ -18,6 +18,7 @@ import time import fastavro import confluent_kafka +from datetime import datetime, timezone from fink_client.avroUtils import write_alert from fink_client.avroUtils import _get_alert_schema @@ -33,7 +34,7 @@ class AlertConsumer: High level Kafka consumer to receive alerts from Fink broker """ - def __init__(self, topics: list, config: dict, schema_path=None): + def __init__(self, topics: list, config: dict, schema_path=None, dump_schema=False): """Creates an instance of `AlertConsumer` Parameters @@ -56,6 +57,7 @@ def __init__(self, topics: list, config: dict, schema_path=None): self.schema_path = schema_path self._consumer = confluent_kafka.Consumer(self._kafka_config) self._consumer.subscribe(self._topics) + self.dump_schema = dump_schema def __enter__(self): return self @@ -101,6 +103,12 @@ def process_message(self, msg): try: _parsed_schema = fastavro.schema.parse_schema(json.loads(key)) + if self.dump_schema: + today = datetime.now(timezone.utc).isoformat() + filename = 'schema_{}.json'.format(today) + with open(filename, 'w') as json_file: + json.dump(_parsed_schema, json_file, sort_keys=True, indent=4) + print("Schema saved as {}".format(filename)) alert = self._decode_msg(_parsed_schema, msg) except json.JSONDecodeError: # Old way diff --git a/fink_client/scripts/fink_consumer.py b/fink_client/scripts/fink_consumer.py index c466c82..b46ff1d 100755 --- a/fink_client/scripts/fink_consumer.py +++ b/fink_client/scripts/fink_consumer.py @@ -21,6 +21,7 @@ import time from tabulate import tabulate +from astropy.time import Time from fink_client.consumer import AlertConsumer from fink_client.configuration import load_credentials @@ -48,6 +49,10 @@ def main(): '-schema', type=str, default=None, help="Avro schema to decode the incoming alerts. Default is None (version taken from each alert)" ) + parser.add_argument( + '--dump_schema', action='store_true', + help="If specified, save the schema on disk (json file)" + ) args = parser.parse_args(None) # load user configuration @@ -66,7 +71,7 @@ def main(): schema = None else: schema = args.schema - consumer = AlertConsumer(conf['mytopics'], myconfig, schema_path=schema) + consumer = AlertConsumer(conf['mytopics'], myconfig, schema_path=schema, dump_schema=args.dump_schema) if args.available_topics: print(consumer.available_topics().keys()) @@ -107,7 +112,7 @@ def main(): else: table = [ [ - alert['timestamp'], utc, topic, alert['objectId'], + Time(alert['candidate']['jd'], format='jd').iso, utc, topic, alert['objectId'], alert['cdsxmatch'], alert['candidate']['magpsf'] ],