From 378b0848aa4e00b8cd22e20024c9198d3df50ca8 Mon Sep 17 00:00:00 2001 From: Julien Date: Thu, 21 Apr 2022 08:02:24 +0200 Subject: [PATCH] Allow to read gzipped avro (#139) * Allow to read gzipped avro * Ignore E731 (lambda rocks) --- .github/workflows/linter.yml | 4 +- ...rt_mjd60674.0512_obj747_src1494043.avro.gz | Bin 0 -> 1462 bytes fink_client/avroUtils.py | 40 +++++++++++++++--- 3 files changed, 37 insertions(+), 7 deletions(-) create mode 100644 datatest/alert_mjd60674.0512_obj747_src1494043.avro.gz diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 1b5b068..bd1b03d 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -26,7 +26,7 @@ jobs: pip install flake8 - name: utilities run: | - flake8 fink_client/*.py --count --show-source --statistics --ignore=E302,E501,E711 + flake8 fink_client/*.py --count --show-source --statistics --ignore=E302,E501,E711,E731 - name: Science modules run: | - flake8 fink_client/scripts/*.py --count --show-source --statistics --ignore=E302,E501,E711 + flake8 fink_client/scripts/*.py --count --show-source --statistics --ignore=E302,E501,E711,E731 diff --git a/datatest/alert_mjd60674.0512_obj747_src1494043.avro.gz b/datatest/alert_mjd60674.0512_obj747_src1494043.avro.gz new file mode 100644 index 0000000000000000000000000000000000000000..a3cea5142b5c498e53ba000468234a17569d5d17 GIT binary patch literal 1462 zcmV;n1xflJiwFpkY7Jrn|6y!pa&%v9YGgJrHa9dbFf}nUUvFY+H#9e2b8=%bG&wXd zG&3$?c5-h3w3&Ns6jc<)w^vFeJOl+qP0YHMDuHa;Da5+xcX;>-fQyUXl@ z?%|(FcIN!PbLQN0?|#{>ZfMLKHeP7fRIjK?M7&WkWqD%kvo0fCjn~Um8ys=RG zaflLG(yf36%dSY83T-mUWi=`h#h@Xg#cRbLJsd0JN@239ggtH3+{*@oBq}_qnwm)B z!kk{jxxfTEzR0WOGu0uA($iF@DeIL;NHAr?x@k8)t{^KzbweW&6UhZ0JIuZjlL)5Q zhzgq3Ki;ucN2M!Zgi3Q&su(o_jW1hL%?6nyzF>&T2GOeQPjQjDq0!`4XhJ3}YyRX+ zF;TrjHkZang=AGQoPuU+T821XNfnW@r;6I3JGSu>9Bd1=Wmpugd2mG>mDrdHnkKZ^ zA=DrdJ>8hL1+z(7p<1!h&{AL1=7~>~W(&oK<+_p5?;D4}>oVh@(`l$i7HXHHoaL-x z%xW^KR7E0&)itUrkJ1P=uIqSQl`YkbWV`6sWi4ZcQk@K`t%GBjzK>Zni^-(eMcIve zZl`a`_lztd4b5rK!_0U+NtoShlxVFFQALn*7RarN#j0+EZ4p_fQEUCE(UQBE-lPc< zHTB$;S^*?r8q^wHtQz%A#YJFJ7*6UwIH~DyQuo72&481708VNqoKy**R7vhq5g}Y} z!pMYSWHcBV3M11BBU6{VOv3l$5H+lCP=G^U(Mc4fE!KGrG<>#F7)(h6Q=(u>tzb&I z&msnO5g5)!V1PDZAR~bRodgE75*W}+U_dj00o?=!v=bQ6PhdbpfdL%_2DB6y$Wve- zQ-Oh81qQMe7|2&(AY*|coCOB478uA|U^sJu;oJp=vlkf7Utllrof(0lt{a?I7c ze=gq_+{LIj7*)Gq*GF3%b8%=s|Nh}-Ms+f(=81c{xOEm4PNwu@UvoPbe&!#fx)@b5 zdydv^Q)V(HcHYLlylsHxdXrH{PWd+}SDZY{@k=+cZtP&xjBUQ!l^dLTxS>GhI=A{5)x#*G zVC_Y;(TS?97|w0!xQS7{aosrhYVqVwr*2G{^gZ{`-jnRT_An}Z?AJzNlT$Y)uKa-e z?v^o(a=Ba dict: data = [] - with open(name, 'rb') as fo: + if name.endswith('avro'): + copen = lambda x: open(x, mode='rb') + elif name.endswith('avro.gz'): + copen = lambda x: gzip.open(x, mode='rb') + else: + msg = """ + Alert filename should end with `avro` or `avro.gz`. + Currently trying to read: {} + """.format(name) + raise NotImplementedError(msg) + + with copen(name) as fo: avro_reader = reader(fo) for record in avro_reader: data.append(record) @@ -149,6 +166,10 @@ def to_pandas(self) -> pd.DataFrame: >>> df = r.to_pandas() >>> assert('objectId' in r.to_pandas().columns) + >>> r = AlertReader(avro_gzipped) + >>> df = r.to_pandas() + >>> assert('diaSource' in r.to_pandas().columns) + """ return pd.DataFrame(self.to_iterator()) @@ -174,6 +195,11 @@ def to_list(self, size: int = None) -> list: >>> mylist = r.to_list(size=2) >>> print(len(mylist)) 2 + + >>> r = AlertReader(avro_gzipped) + >>> mylist = r.to_list() + >>> print(len(mylist)) + 1 """ nest = [self._read_single_alert(fn) for fn in self.filenames[:size]] return [item for sublist in nest for item in sublist][:size] @@ -228,7 +254,10 @@ def write_alert(alert: dict, schema: str, path: str, overwrite: bool = False): ... OSError: ./ZTF19acihgng_1060135832015015002.avro already exists! """ - alert_filename = os.path.join(path, "{}_{}.avro".format(alert["objectId"], alert["candidate"]["candid"])) + alert_filename = os.path.join( + path, + "{}_{}.avro".format(alert["objectId"], alert["candidate"]["candid"]) + ) if type(schema) == str: schema = _get_alert_schema(schema) @@ -405,6 +434,7 @@ def _decode_avro_alert(avro_alert: io.IOBase, schema: dict) -> Any: args['avro_multi_file2'] = 'datatest/avro_multi_alerts_other.avro' args['avro_list'] = ['datatest/ZTF19acihgng.avro', 'datatest/ZTF19acyfkzd.avro'] args['avro_folder'] = 'datatest' + args['avro_gzipped'] = 'datatest/alert_mjd60674.0512_obj747_src1494043.avro.gz' args['schema_path'] = 'schemas/distribution_schema_0p2.avsc' regular_unit_tests(global_args=args)