Skip to content

Commit

Permalink
Add --dry-run command line parameter.
Browse files Browse the repository at this point in the history
This commit fixes issue kbase#56 by adding the --dry-run parameter to validate the
input data and print an output summary even in the presence of errors. To this
end, the --output parameter is also introduced to give an option to display the
summary as JSON or in a more user-friendly text format. Data should not be
loaded if there are errors or if this command is invoked with --dry-run.
  • Loading branch information
dakotablair committed Nov 20, 2020
1 parent c6ddf99 commit f78ddea
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 42 deletions.
159 changes: 123 additions & 36 deletions importers/djornl/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
RES_ROOT_DATA_PATH=/path/to/data/dir python -m importers.djornl.parser
"""
import argparse
import csv
import json
import requests
import os
import csv
import requests
import yaml

import importers.utils.config as config
Expand Down Expand Up @@ -132,11 +133,9 @@ def _get_file_reader(self, fd, file):
"""Given a dict containing file information, instantiate the correct type of parser"""

delimiter = "\t"
if (
"file_format" in file
and file["file_format"].lower() == "csv"
or file["path"].lower().endswith(".csv")
):
if file.get("file_format", "").lower() == "csv" or file[
"path"
].lower().endswith(".csv"):
delimiter = ","
return csv.reader(fd, delimiter=delimiter)

Expand Down Expand Up @@ -181,8 +180,8 @@ def check_headers(self, headers, validator=None):
:return header_errs: (dict) dict of header errors:
'missing': required headers that are missing from the input
'invalid': additional headers that should not be in the input
'duplicate': duplicated headers (content would be overwritten)
'invalid': headers that should not be in the input
'duplicate': duplicated headers (data would be overwritten)
If the list of headers supplied is valid--i.e. it
contains all the fields marked as required in the validator
schema--or no validator has been supplied, the method
Expand Down Expand Up @@ -212,10 +211,7 @@ def check_headers(self, headers, validator=None):
if missing_headers:
header_errs["missing"] = missing_headers

if (
"additionalProperties" in validator.schema
and validator.schema["additionalProperties"] is False
):
if not validator.schema.get("additionalProperties", True):
all_props = validator.schema["properties"].keys()
extra_headers = [i for i in headers if i not in all_props]
if extra_headers:
Expand Down Expand Up @@ -276,11 +272,16 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None):
"""
print("Parsing " + file["data_type"] + " file " + file["file_path"])
file_parser = self.parser_gen(file)

def add_error(error):
print(error)
err_list.append(error)

try:
(line_no, cols, err_str) = next(file_parser)
except StopIteration:
# no valid lines found in the file
err_list.append(f"{file['path']}: no header line found")
add_error(f"{file['path']}: no header line found")
return

header_errors = self.check_headers(cols, validator)
Expand All @@ -292,7 +293,7 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None):
}
for err_type in ["missing", "invalid", "duplicate"]:
if err_type in header_errors:
err_list.append(
add_error(
f"{file['path']}: {err_str[err_type]} headers: "
+ ", ".join(sorted(header_errors[err_type]))
)
Expand All @@ -303,7 +304,7 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None):
for (line_no, cols, err_str) in file_parser:
# mismatch in number of cols
if cols is None:
err_list.append(err_str)
add_error(err_str)
continue

# merge headers with cols to create an object
Expand All @@ -313,15 +314,15 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None):
# validate the object
if not validator.is_valid(row_object):
for e in sorted(validator.iter_errors(row_object), key=str):
err_list.append(f"{file['path']} line {line_no}: " + e.message)
add_error(f"{file['path']} line {line_no}: " + e.message)
continue

try:
# transform it using the remap_functions
datum = self.remap_object(row_object, remap_fn)
except Exception as err:
err_type = type(err)
err_list.append(
add_error(
f"{file['path']} line {line_no}: error remapping data: {err_type} {err}"
)
continue
Expand All @@ -331,16 +332,16 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None):
if storage_error is None:
n_stored += 1
else:
err_list.append(f"{file['path']} line {line_no}: " + storage_error)
add_error(f"{file['path']} line {line_no}: " + storage_error)

if not n_stored:
err_list.append(f"{file['path']}: no valid data found")
add_error(f"{file['path']}: no valid data found")

def store_parsed_edge_data(self, datum):
"""
store node and edge data in the node (node_ix) and edge (edge_ix) indexes respectively
Nodes are indexed by the '_key' attribute. Parsed edge data only contains node '_key' values.
Nodes are indexed by the '_key' attribute.
Parsed edge data only contains node '_key' values.
Edges are indexed by the unique combination of the two node IDs, the edge type, and whether
or not it is a directed edge. It is assumed that if there is more than one score for a given
Expand Down Expand Up @@ -682,22 +683,18 @@ def load_data(self, dry_run=False):
if output["err_list"]:
all_errs = all_errs + output["err_list"]

if all_errs:
raise RuntimeError("\n".join(all_errs))

if dry_run:
# report stats on the data that has been gathered
return self.summarise_dataset()
# if there are no errors then save the dataset unless this is a dry run
if len(all_errs) == 0 and not dry_run:
self.save_dataset()

# otherwise, save the dataset
self.save_dataset()
return True
# report stats on the data that has been gathered
return self.summarise_dataset(all_errs)

def summarise_dataset(self):
def summarise_dataset(self, errs):
"""summarise the data that has been loaded"""

# go through the node index, checking for nodes that only have one attribute ('_key') or
# were loaded from the clusters files, with their only attributes being '_key' and 'clusters'
# were loaded from the clusters files, with only '_key' and 'clusters' attributes

node_type_ix = {"__NO_TYPE__": 0}
node_data = {"key_only": [], "cluster": [], "full": []}
Expand Down Expand Up @@ -739,13 +736,103 @@ def summarise_dataset(self):
"cluster": len(node_data["cluster"]),
"full": len(node_data["full"]),
},
"errors_total": len(errs),
"errors": errs,
}


if __name__ == "__main__":
def format_summary(summary, output):
if output == "json":
return json.dumps(summary)
node_type_counts = [count for count in summary["node_type_count"].values()]
edge_type_counts = [count for count in summary["node_type_count"].values()]
values = (
[
summary["nodes_total"],
summary["edges_total"],
summary["nodes_in_edge"],
summary["node_data_available"]["key_only"],
summary["node_data_available"]["cluster"],
summary["node_data_available"]["full"],
summary.get("errors_total"),
]
+ node_type_counts
+ edge_type_counts
)
value_width = max([len(str(value)) for value in values])
node_type_names = dict(__NO_TYPE__="No type")
node_types = "\n".join(
[
(
f"{count:{value_width}} {node_type_names.get(ntype, ntype)}".format(
value_width
)
)
for ntype, count in summary["node_type_count"].items()
]
)
edge_type_names = dict()
edge_types = "\n".join(
[
(
f"{count:{value_width}} {edge_type_names.get(etype, etype)}".format(
value_width
)
)
for etype, count in summary["edge_type_count"].items()
]
)
text_summary = f"""
{summary["nodes_total"]:{value_width}} Total nodes
{summary["edges_total"]:{value_width}} Total edges
{summary["nodes_in_edge"]:{value_width}} Nodes in edge
---
Node Types
{node_types:{value_width}}
---
Edge Types
{edge_types:{value_width}}
---
Node data available
{summary["node_data_available"]["key_only"]:{value_width}} Key only
{summary["node_data_available"]["cluster"]:{value_width}} Cluster
{summary["node_data_available"]["full"]:{value_width}} Full
---
{summary.get("errors_total"):{value_width}} Errors
""".format(
value_width
)
return text_summary


def main():
argparser = argparse.ArgumentParser(description="Load DJORNL data")
argparser.add_argument(
"--dry-run",
dest="dry",
action="store_true",
help="Perform all actions of the parser, except loading the data.",
)
argparser.add_argument(
"--output",
default="text",
help="Specify the format of any output generated. (text or json)",
)
args = argparser.parse_args()
parser = DJORNL_Parser()
summary = dict()
try:
parser.load_data()
summary = parser.load_data(dry_run=args.dry)
except Exception as err:
print(err)
print("Unhandled exception", err)
exit(1)
errors = summary.get("errors")
if summary:
print(format_summary(summary, args.output))
if errors:
error_output = f"Aborted with {len(errors)} errors.\n"
raise RuntimeError(error_output)


if __name__ == "__main__":
main()
10 changes: 5 additions & 5 deletions importers/test/test_djornl_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ def test_errors(self, parser=None, errs={}):

with self.subTest(data_type="all types"):
# test all errors
with self.assertRaisesRegex(RuntimeError, all_errs[0]) as cm:
parser.load_data()
exception = cm.exception
err_list = exception.split("\n")
self.assertEqual(err_list, all_errs)
summary = parser.load_data(dry_run=True)
err_list = summary["errors"]
self.assertEqual(err_list, all_errs)

def test_missing_required_env_var(self):
"""test that the parser exits with code 1 if the RES_ROOT_DATA_PATH env var is not set"""
Expand Down Expand Up @@ -331,6 +329,8 @@ def test_dry_run(self):
"node_type_count": {"__NO_TYPE__": 0, "gene": 10, "pheno": 4},
"nodes_in_edge": 12,
"nodes_total": 14,
"errors_total": 0,
"errors": [],
},
output,
)
Expand Down
2 changes: 1 addition & 1 deletion importers/test/test_djornl_parser_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ def test_the_full_shebang(self):
):
parser = DJORNL_Parser()
parser.load_data()
self.assertEqual(True, parser.load_data())
self.assertTrue(bool(parser.load_data()))

0 comments on commit f78ddea

Please sign in to comment.