Skip to content

Commit

Permalink
Allow to read gzipped avro (#139)
Browse files Browse the repository at this point in the history
* Allow to read gzipped avro

* Ignore E731 (lambda rocks)
  • Loading branch information
JulienPeloton authored Apr 21, 2022
1 parent 8933d97 commit 378b084
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
pip install flake8
- name: utilities
run: |
flake8 fink_client/*.py --count --show-source --statistics --ignore=E302,E501,E711
flake8 fink_client/*.py --count --show-source --statistics --ignore=E302,E501,E711,E731
- name: Science modules
run: |
flake8 fink_client/scripts/*.py --count --show-source --statistics --ignore=E302,E501,E711
flake8 fink_client/scripts/*.py --count --show-source --statistics --ignore=E302,E501,E711,E731
Binary file not shown.
40 changes: 35 additions & 5 deletions fink_client/avroUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import glob
import io
import json
import gzip
import requests
from requests.exceptions import RequestException

Expand All @@ -31,7 +32,10 @@
from fink_client import __schema_version__

class AlertReader():
""" Class to load alert Avro files
""" Class to load alert Avro files.
It accepts single avro file (.avro), gzipped avro file (.avro.gz), and
folders containing several of these.
Parameters
----------
Expand Down Expand Up @@ -89,17 +93,19 @@ def _load_avro_files(self, ext_path: str = None):
if isinstance(path, list):
self.filenames = path
elif os.path.isdir(path):
self.filenames = glob.glob(os.path.join(path, '*.avro'))
self.filenames = glob.glob(os.path.join(path, '*.avro*'))
elif path == '':
print('WARNING: path to avro files is empty')
self.filenames = []
elif fastavro.is_avro(path):
self.filenames = [path]
elif path.endswith('avro.gz'):
self.filenames = [path]
else:
msg = """
Data path not understood: {}
You must give an avro file with
its extension (.avro), or a folder with avro files.
its extension (.avro or .avro.gz), or a folder with avro files.
""".format(path)
raise IOError(msg)

Expand Down Expand Up @@ -129,7 +135,18 @@ def _read_single_alert(self, name: str = None) -> dict:

data = []

with open(name, 'rb') as fo:
if name.endswith('avro'):
copen = lambda x: open(x, mode='rb')
elif name.endswith('avro.gz'):
copen = lambda x: gzip.open(x, mode='rb')
else:
msg = """
Alert filename should end with `avro` or `avro.gz`.
Currently trying to read: {}
""".format(name)
raise NotImplementedError(msg)

with copen(name) as fo:
avro_reader = reader(fo)
for record in avro_reader:
data.append(record)
Expand All @@ -149,6 +166,10 @@ def to_pandas(self) -> pd.DataFrame:
>>> df = r.to_pandas()
>>> assert('objectId' in r.to_pandas().columns)
>>> r = AlertReader(avro_gzipped)
>>> df = r.to_pandas()
>>> assert('diaSource' in r.to_pandas().columns)
"""
return pd.DataFrame(self.to_iterator())

Expand All @@ -174,6 +195,11 @@ def to_list(self, size: int = None) -> list:
>>> mylist = r.to_list(size=2)
>>> print(len(mylist))
2
>>> r = AlertReader(avro_gzipped)
>>> mylist = r.to_list()
>>> print(len(mylist))
1
"""
nest = [self._read_single_alert(fn) for fn in self.filenames[:size]]
return [item for sublist in nest for item in sublist][:size]
Expand Down Expand Up @@ -228,7 +254,10 @@ def write_alert(alert: dict, schema: str, path: str, overwrite: bool = False):
...
OSError: ./ZTF19acihgng_1060135832015015002.avro already exists!
"""
alert_filename = os.path.join(path, "{}_{}.avro".format(alert["objectId"], alert["candidate"]["candid"]))
alert_filename = os.path.join(
path,
"{}_{}.avro".format(alert["objectId"], alert["candidate"]["candid"])
)

if type(schema) == str:
schema = _get_alert_schema(schema)
Expand Down Expand Up @@ -405,6 +434,7 @@ def _decode_avro_alert(avro_alert: io.IOBase, schema: dict) -> Any:
args['avro_multi_file2'] = 'datatest/avro_multi_alerts_other.avro'
args['avro_list'] = ['datatest/ZTF19acihgng.avro', 'datatest/ZTF19acyfkzd.avro']
args['avro_folder'] = 'datatest'
args['avro_gzipped'] = 'datatest/alert_mjd60674.0512_obj747_src1494043.avro.gz'
args['schema_path'] = 'schemas/distribution_schema_0p2.avsc'

regular_unit_tests(global_args=args)

0 comments on commit 378b084

Please sign in to comment.