diff --git a/importers/djornl/parser.py b/importers/djornl/parser.py index b1d0de27..cf4ad241 100644 --- a/importers/djornl/parser.py +++ b/importers/djornl/parser.py @@ -15,10 +15,11 @@ RES_ROOT_DATA_PATH=/path/to/data/dir python -m importers.djornl.parser """ +import argparse +import csv import json -import requests import os -import csv +import requests import yaml import importers.utils.config as config @@ -132,11 +133,9 @@ def _get_file_reader(self, fd, file): """Given a dict containing file information, instantiate the correct type of parser""" delimiter = "\t" - if ( - "file_format" in file - and file["file_format"].lower() == "csv" - or file["path"].lower().endswith(".csv") - ): + if file.get("file_format", "").lower() == "csv" or file[ + "path" + ].lower().endswith(".csv"): delimiter = "," return csv.reader(fd, delimiter=delimiter) @@ -181,8 +180,8 @@ def check_headers(self, headers, validator=None): :return header_errs: (dict) dict of header errors: 'missing': required headers that are missing from the input - 'invalid': additional headers that should not be in the input - 'duplicate': duplicated headers (content would be overwritten) + 'invalid': headers that should not be in the input + 'duplicate': duplicated headers (data would be overwritten) If the list of headers supplied is valid--i.e. it contains all the fields marked as required in the validator schema--or no validator has been supplied, the method @@ -212,10 +211,7 @@ def check_headers(self, headers, validator=None): if missing_headers: header_errs["missing"] = missing_headers - if ( - "additionalProperties" in validator.schema - and validator.schema["additionalProperties"] is False - ): + if not validator.schema.get("additionalProperties", True): all_props = validator.schema["properties"].keys() extra_headers = [i for i in headers if i not in all_props] if extra_headers: @@ -276,11 +272,16 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): """ print("Parsing " + file["data_type"] + " file " + file["file_path"]) file_parser = self.parser_gen(file) + + def add_error(error): + print(error) + err_list.append(error) + try: (line_no, cols, err_str) = next(file_parser) except StopIteration: # no valid lines found in the file - err_list.append(f"{file['path']}: no header line found") + add_error(f"{file['path']}: no header line found") return header_errors = self.check_headers(cols, validator) @@ -292,7 +293,7 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): } for err_type in ["missing", "invalid", "duplicate"]: if err_type in header_errors: - err_list.append( + add_error( f"{file['path']}: {err_str[err_type]} headers: " + ", ".join(sorted(header_errors[err_type])) ) @@ -303,7 +304,7 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): for (line_no, cols, err_str) in file_parser: # mismatch in number of cols if cols is None: - err_list.append(err_str) + add_error(err_str) continue # merge headers with cols to create an object @@ -312,8 +313,11 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): if validator is not None: # validate the object if not validator.is_valid(row_object): - for e in sorted(validator.iter_errors(row_object), key=str): - err_list.append(f"{file['path']} line {line_no}: " + e.message) + err_msg = "".join( + f"{file['path']} line {line_no}: " + e.message + for e in sorted(validator.iter_errors(row_object), key=str) + ) + add_error(err_msg) continue try: @@ -321,7 +325,7 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): datum = self.remap_object(row_object, remap_fn) except Exception as err: err_type = type(err) - err_list.append( + add_error( f"{file['path']} line {line_no}: error remapping data: {err_type} {err}" ) continue @@ -331,16 +335,16 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): if storage_error is None: n_stored += 1 else: - err_list.append(f"{file['path']} line {line_no}: " + storage_error) + add_error(f"{file['path']} line {line_no}: " + storage_error) if not n_stored: - err_list.append(f"{file['path']}: no valid data found") + add_error(f"{file['path']}: no valid data found") def store_parsed_edge_data(self, datum): """ store node and edge data in the node (node_ix) and edge (edge_ix) indexes respectively - - Nodes are indexed by the '_key' attribute. Parsed edge data only contains node '_key' values. + Nodes are indexed by the '_key' attribute. + Parsed edge data only contains node '_key' values. Edges are indexed by the unique combination of the two node IDs, the edge type, and whether or not it is a directed edge. It is assumed that if there is more than one score for a given @@ -682,22 +686,18 @@ def load_data(self, dry_run=False): if output["err_list"]: all_errs = all_errs + output["err_list"] - if all_errs: - raise RuntimeError("\n".join(all_errs)) + # if there are no errors then save the dataset unless this is a dry run + if len(all_errs) == 0 and not dry_run: + self.save_dataset() - if dry_run: - # report stats on the data that has been gathered - return self.summarise_dataset() + # report stats on the data that has been gathered + return self.summarise_dataset(all_errs) - # otherwise, save the dataset - self.save_dataset() - return True - - def summarise_dataset(self): + def summarise_dataset(self, errs): """summarise the data that has been loaded""" # go through the node index, checking for nodes that only have one attribute ('_key') or - # were loaded from the clusters files, with their only attributes being '_key' and 'clusters' + # were loaded from the clusters files, with only '_key' and 'clusters' attributes node_type_ix = {"__NO_TYPE__": 0} node_data = {"key_only": [], "cluster": [], "full": []} @@ -739,13 +739,103 @@ def summarise_dataset(self): "cluster": len(node_data["cluster"]), "full": len(node_data["full"]), }, + "errors_total": len(errs), + "errors": errs, } -if __name__ == "__main__": +def format_summary(summary, output): + if output == "json": + return json.dumps(summary) + node_type_counts = [count for count in summary["node_type_count"].values()] + edge_type_counts = [count for count in summary["node_type_count"].values()] + values = ( + [ + summary["nodes_total"], + summary["edges_total"], + summary["nodes_in_edge"], + summary["node_data_available"]["key_only"], + summary["node_data_available"]["cluster"], + summary["node_data_available"]["full"], + summary.get("errors_total"), + ] + + node_type_counts + + edge_type_counts + ) + value_width = max([len(str(value)) for value in values]) + node_type_names = dict(__NO_TYPE__="No type") + node_types = "\n".join( + [ + ( + f"{count:{value_width}} {node_type_names.get(ntype, ntype)}".format( + value_width + ) + ) + for ntype, count in summary["node_type_count"].items() + ] + ) + edge_type_names = dict() + edge_types = "\n".join( + [ + ( + f"{count:{value_width}} {edge_type_names.get(etype, etype)}".format( + value_width + ) + ) + for etype, count in summary["edge_type_count"].items() + ] + ) + text_summary = f""" +{summary["nodes_total"]:{value_width}} Total nodes +{summary["edges_total"]:{value_width}} Total edges +{summary["nodes_in_edge"]:{value_width}} Nodes in edge +--- +Node Types +{node_types:{value_width}} +--- +Edge Types +{edge_types:{value_width}} +--- +Node data available +{summary["node_data_available"]["key_only"]:{value_width}} Key only +{summary["node_data_available"]["cluster"]:{value_width}} Cluster +{summary["node_data_available"]["full"]:{value_width}} Full +--- +{summary.get("errors_total"):{value_width}} Errors +""".format( + value_width + ) + return text_summary + + +def main(): + argparser = argparse.ArgumentParser(description="Load DJORNL data") + argparser.add_argument( + "--dry-run", + dest="dry", + action="store_true", + help="Perform all actions of the parser, except loading the data.", + ) + argparser.add_argument( + "--output", + default="text", + help="Specify the format of any output generated. (text or json)", + ) + args = argparser.parse_args() parser = DJORNL_Parser() + summary = dict() try: - parser.load_data() + summary = parser.load_data(dry_run=args.dry) except Exception as err: - print(err) + print("Unhandled exception", err) exit(1) + errors = summary.get("errors") + if summary: + print(format_summary(summary, args.output)) + if errors: + error_output = f"Aborted with {len(errors)} errors.\n" + raise RuntimeError(error_output) + + +if __name__ == "__main__": + main() diff --git a/importers/test/test_djornl_parser.py b/importers/test/test_djornl_parser.py index e2c7f665..79ec783d 100644 --- a/importers/test/test_djornl_parser.py +++ b/importers/test/test_djornl_parser.py @@ -52,11 +52,9 @@ def test_errors(self, parser=None, errs={}): with self.subTest(data_type="all types"): # test all errors - with self.assertRaisesRegex(RuntimeError, all_errs[0]) as cm: - parser.load_data() - exception = cm.exception - err_list = exception.split("\n") - self.assertEqual(err_list, all_errs) + summary = parser.load_data(dry_run=True) + err_list = summary["errors"] + self.assertEqual(err_list, all_errs) def test_missing_required_env_var(self): """test that the parser exits with code 1 if the RES_ROOT_DATA_PATH env var is not set""" @@ -331,6 +329,8 @@ def test_dry_run(self): "node_type_count": {"__NO_TYPE__": 0, "gene": 10, "pheno": 4}, "nodes_in_edge": 12, "nodes_total": 14, + "errors_total": 0, + "errors": [], }, output, ) diff --git a/importers/test/test_djornl_parser_integration.py b/importers/test/test_djornl_parser_integration.py index fb184684..d98ee1d9 100644 --- a/importers/test/test_djornl_parser_integration.py +++ b/importers/test/test_djornl_parser_integration.py @@ -25,4 +25,4 @@ def test_the_full_shebang(self): ): parser = DJORNL_Parser() parser.load_data() - self.assertEqual(True, parser.load_data()) + self.assertTrue(bool(parser.load_data()))