Skip to content

Commit

Permalink
Update display and write for MMA schema (#177)
Browse files Browse the repository at this point in the history
* Update display and write for MMA schema. Viewer is not yet updated

* Bump to version 5.1

* Fix tests

* PEP8

* Handle MMA schema internally

* Bump to 6.

* Bump fastarrow

* Freeze fastavro and numpy versions

* Removing unused args

* no-build-isolation

* typo in the option name

* Cython<3

* fastavro/fastavro#701 (comment)

* Create output dir if it does not exist

* Missing import
  • Loading branch information
JulienPeloton authored Jul 19, 2023
1 parent 51f5dfc commit a47f718
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 23 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/run_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ jobs:
run: |
pip install --upgrade pip setuptools wheel
pip install -r requirements.txt
pip install 'Cython<3'
pip install --no-build-isolation fastavro==1.6.0
pip install -e .
echo "PYTHONPATH="${PYTHONPATH}:${SPARKLIB}:${FINK_CLIENT_HOME}"" >> $GITHUB_ENV
- name: Check env
Expand Down
6 changes: 3 additions & 3 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ channels:
- conda-forge
dependencies:
- python>=3.9
- Cython<3
- coverage>=4.2
- coveralls
- python-confluent-kafka>=1.9.2
- fastavro=1.6.0
- astropy
- numpy
- numpy<1.25
- pyarrow>=10.0.1
#- fastavro==1.6.0
- pandas
- pyyaml
- tabulate
- matplotlib
- tqdm

2 changes: 1 addition & 1 deletion fink_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "5.0"
__version__ = "6.0"
__schema_version__ = "distribution_schema_fink_ztf_{}.avsc"
8 changes: 4 additions & 4 deletions fink_client/avroUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def to_iterator(self) -> Iterable[dict]:
for alert in self._read_single_alert(fn):
yield alert

def write_alert(alert: dict, schema: str, path: str, overwrite: bool = False):
def write_alert(alert: dict, schema: str, path: str, overwrite: bool = False, id1: str = '', id2: str = ''):
""" Write avro alert on disk
Parameters
Expand All @@ -242,21 +242,21 @@ def write_alert(alert: dict, schema: str, path: str, overwrite: bool = False):
>>> alert = r.to_list(size=1)[0]
Write the alert on disk
>>> write_alert(alert, schema_path, ".", overwrite=True)
>>> write_alert(alert, schema_path, ".", overwrite=True, id1="objectId", id2="candid")
For test purposes, you can overwrite alert data on disk, but that should
not happen in production as alert ID must be unique! Hence the writer will
raise an exception if overwrite is not specified (default).
>>> write_alert(
... alert, schema_path, ".", overwrite=False)
... alert, schema_path, ".", overwrite=False, id1="objectId", id2="candid")
... # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
Traceback (most recent call last):
...
OSError: ./ZTF19acihgng_1060135832015015002.avro already exists!
"""
alert_filename = os.path.join(
path,
"{}_{}.avro".format(alert["objectId"], alert["candidate"]["candid"])
"{}_{}.avro".format(alert[id1], alert[id2])
)

if type(schema) == str:
Expand Down
11 changes: 11 additions & 0 deletions fink_client/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ def load_credentials(tmp: bool = False) -> dict:

return creds

def mm_topic_names():
""" Return list of topics with MMA schema
"""
out = [
'fink_grb_bronze',
'fink_grb_silver',
'fink_grb_gold',
'fink_gw_bronze',
]
return out


if __name__ == "__main__":
""" Run the test suite """
Expand Down
18 changes: 17 additions & 1 deletion fink_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from fink_client.avroUtils import write_alert
from fink_client.avroUtils import _get_alert_schema
from fink_client.avroUtils import _decode_avro_alert
from fink_client.configuration import mm_topic_names

class AlertError(Exception):
pass
Expand Down Expand Up @@ -232,9 +233,24 @@ def poll_and_write(
"""
topic, alert, key = self.poll(timeout)
is_mma = topic in mm_topic_names()

if is_mma:
id1 = 'objectId'
id2 = 'triggerId'
else:
id1 = 'objectId'
id2 = 'candid'

if topic is not None:
write_alert(alert, self._parsed_schema, outdir, overwrite=overwrite)
write_alert(
alert,
self._parsed_schema,
outdir,
overwrite=overwrite,
id1=id1,
id2=id2
)

return topic, alert, key

Expand Down
35 changes: 23 additions & 12 deletions fink_client/scripts/fink_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
""" Kafka consumer to listen and archive Fink streams from the Livestream service """
import sys
import os

import argparse
import time
Expand All @@ -23,6 +24,7 @@

from fink_client.consumer import AlertConsumer
from fink_client.configuration import load_credentials
from fink_client.configuration import mm_topic_names

def main():
""" """
Expand All @@ -44,7 +46,8 @@ def main():
help="Folder to store incoming alerts if --save is set. It must exist.")
parser.add_argument(
'-schema', type=str, default=None,
help="Avro schema to decode the incoming alerts. Default is None (version taken from each alert)")
help="Avro schema to decode the incoming alerts. Default is None (version taken from each alert)"
)
args = parser.parse_args(None)

# load user configuration
Expand Down Expand Up @@ -72,6 +75,9 @@ def main():
# Time to wait before polling again if no alerts
maxtimeout = conf['maxtimeout']

if not os.path.isdir(args.outdir):
os.makedirs(args.outdir, exist_ok=True)

# infinite loop
maxpoll = args.limit if args.limit else 1e10
try:
Expand All @@ -91,20 +97,25 @@ def main():

if topic is not None:
poll_number += 1
is_mma = topic in mm_topic_names()

if args.display and topic is not None:
utc = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
table = [
[
alert['timestamp'], utc, topic, alert['objectId'],
alert['cdsxmatch'],
alert['candidate']['magpsf']
],
]
headers = [
'Emitted at (UTC)', 'Received at (UTC)',
'Topic', 'objectId', 'Simbad', 'Magnitude'
]
if is_mma:
table = [[alert['objectId'], alert['fink_class'], topic, alert['rate'], alert['observatory'], alert['triggerId']]]
headers = ['ObjectId', 'Classification', 'Topic', 'Rate (mag/day)', 'Observatory', 'Trigger ID']
else:
table = [
[
alert['timestamp'], utc, topic, alert['objectId'],
alert['cdsxmatch'],
alert['candidate']['magpsf']
],
]
headers = [
'Emitted at (UTC)', 'Received at (UTC)',
'Topic', 'objectId', 'Simbad', 'Magnitude'
]
print(tabulate(table, headers, tablefmt="pretty"))
elif args.display:
print('No alerts the last {} seconds'.format(maxtimeout))
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
coverage>=4.2
coveralls
confluent-kafka==2.0.2
fastavro==1.6.0
#fastavro==1.6.0
astropy
numpy
numpy<1.25
pyarrow>=10.0.1
pandas
pyyaml
Expand Down

0 comments on commit a47f718

Please sign in to comment.