Skip to content

Commit

Permalink
Add possibility to dump schema for inspection
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPeloton committed Jun 3, 2024
1 parent 7cc1534 commit eaf213c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
10 changes: 9 additions & 1 deletion fink_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import time
import fastavro
import confluent_kafka
from datetime import datetime, timezone

from fink_client.avroUtils import write_alert
from fink_client.avroUtils import _get_alert_schema
Expand All @@ -33,7 +34,7 @@ class AlertConsumer:
High level Kafka consumer to receive alerts from Fink broker
"""

def __init__(self, topics: list, config: dict, schema_path=None):
def __init__(self, topics: list, config: dict, schema_path=None, dump_schema=False):
"""Creates an instance of `AlertConsumer`
Parameters
Expand All @@ -56,6 +57,7 @@ def __init__(self, topics: list, config: dict, schema_path=None):
self.schema_path = schema_path
self._consumer = confluent_kafka.Consumer(self._kafka_config)
self._consumer.subscribe(self._topics)
self.dump_schema = dump_schema

def __enter__(self):
return self
Expand Down Expand Up @@ -101,6 +103,12 @@ def process_message(self, msg):

try:
_parsed_schema = fastavro.schema.parse_schema(json.loads(key))
if self.dump_schema:
today = datetime.now(timezone.utc).isoformat()
filename = 'schema_{}.json'.format(today)
with open(filename, 'w') as json_file:
json.dump(_parsed_schema, json_file, sort_keys=True, indent=4)
print("Schema saved as {}".format(filename))
alert = self._decode_msg(_parsed_schema, msg)
except json.JSONDecodeError:
# Old way
Expand Down
9 changes: 7 additions & 2 deletions fink_client/scripts/fink_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import time

from tabulate import tabulate
from astropy.time import Time

from fink_client.consumer import AlertConsumer
from fink_client.configuration import load_credentials
Expand Down Expand Up @@ -48,6 +49,10 @@ def main():
'-schema', type=str, default=None,
help="Avro schema to decode the incoming alerts. Default is None (version taken from each alert)"
)
parser.add_argument(
'--dump_schema', action='store_true',
help="If specified, save the schema on disk (json file)"
)
args = parser.parse_args(None)

# load user configuration
Expand All @@ -66,7 +71,7 @@ def main():
schema = None
else:
schema = args.schema
consumer = AlertConsumer(conf['mytopics'], myconfig, schema_path=schema)
consumer = AlertConsumer(conf['mytopics'], myconfig, schema_path=schema, dump_schema=args.dump_schema)

if args.available_topics:
print(consumer.available_topics().keys())
Expand Down Expand Up @@ -107,7 +112,7 @@ def main():
else:
table = [
[
alert['timestamp'], utc, topic, alert['objectId'],
Time(alert['candidate']['jd'], format='jd').iso, utc, topic, alert['objectId'],
alert['cdsxmatch'],
alert['candidate']['magpsf']
],
Expand Down

0 comments on commit eaf213c

Please sign in to comment.