diff --git a/src/koza/io/writer/jsonl_writer.py b/src/koza/io/writer/jsonl_writer.py index 9d1aa8b..00b64bc 100644 --- a/src/koza/io/writer/jsonl_writer.py +++ b/src/koza/io/writer/jsonl_writer.py @@ -1,27 +1,22 @@ import json import os -from typing import List, Optional +from typing import List, Optional, TextIO from koza.io.writer.writer import KozaWriter -from koza.model.config.sssom_config import SSSOMConfig class JSONLWriter(KozaWriter): - def __init__( - self, - output_dir: str, - source_name: str, - node_properties: List[str], - edge_properties: Optional[List[str]] = None, - sssom_config: SSSOMConfig = None, - ): - super().__init__(output_dir, source_name, node_properties, edge_properties, sssom_config) + node_properties: List[str] + edge_properties: List[str] + nodeFH: Optional[TextIO] + edgeFH: Optional[TextIO] - os.makedirs(output_dir, exist_ok=True) - if node_properties: - self.nodeFH = open(f"{output_dir}/{source_name}_nodes.jsonl", "w") - if edge_properties: - self.edgeFH = open(f"{output_dir}/{source_name}_edges.jsonl", "w") + def init(self): + os.makedirs(self.output_dir, exist_ok=True) + if self.node_properties: + self.nodeFH = open(f"{self.output_dir}/{self.source_name}_nodes.jsonl", "w") + if self.edge_properties: + self.edgeFH = open(f"{self.output_dir}/{self.source_name}_edges.jsonl", "w") def write_edge(self, edge: dict): edge = json.dumps(edge, ensure_ascii=False) diff --git a/src/koza/io/writer/tsv_writer.py b/src/koza/io/writer/tsv_writer.py index 45019d3..b6a94e9 100644 --- a/src/koza/io/writer/tsv_writer.py +++ b/src/koza/io/writer/tsv_writer.py @@ -1,79 +1,71 @@ #### TSV Writer #### -# NOTE - May want to rename to KGXWriter at some point, if we develop writers for other models non biolink/kgx specific from pathlib import Path -from typing import Dict, List, Literal, Set, Tuple, Union +from typing import Dict, List, Literal, Set, TextIO -from numpy.f2py.auxfuncs import throw_error from ordered_set import OrderedSet -# from koza.converter.kgx_converter import KGXConverter from koza.io.utils import build_export_row from koza.io.writer.writer import KozaWriter -from koza.model.config.sssom_config import SSSOMConfig class TSVWriter(KozaWriter): - def __init__( - self, - output_dir: Union[str, Path], - source_name: str, - node_properties: List[str] = None, - edge_properties: List[str] = None, - sssom_config: SSSOMConfig = None, - ): - super().__init__(output_dir, source_name, node_properties, edge_properties, sssom_config) - self.delimiter = "\t" - self.list_delimiter = "|" + delimiter: str = "\t" + list_delimiter: str = "|" + nodes_file_name: Path + edges_file_name: Path + + nodeFH: TextIO + edgeFH: TextIO + + def init(self): Path(self.output_dir).mkdir(parents=True, exist_ok=True) - if node_properties: # Make node file - self.node_columns = TSVWriter._order_columns(set(node_properties), "node") + if self.node_properties: # Make node file + self.node_properties = TSVWriter._order_columns(set(self.node_properties), "node") self.nodes_file_name = Path(self.output_dir if self.output_dir else "", f"{self.source_name}_nodes.tsv") self.nodeFH = open(self.nodes_file_name, "w") - self.nodeFH.write(self.delimiter.join(self.node_columns) + "\n") + self.nodeFH.write(self.delimiter.join(self.node_properties) + "\n") - if edge_properties: # Make edge file - if sssom_config: - edge_properties = self.add_sssom_columns(edge_properties) - self.edge_columns = TSVWriter._order_columns(set(edge_properties), "edge") + if self.edge_properties: # Make edge file + if self.sssom_config: + self.edge_properties = self.add_sssom_columns(self.edge_properties) + self.edge_properties = TSVWriter._order_columns(set(self.edge_properties), "edge") self.edges_file_name = Path(self.output_dir if self.output_dir else "", f"{self.source_name}_edges.tsv") self.edgeFH = open(self.edges_file_name, "w") - self.edgeFH.write(self.delimiter.join(self.edge_columns) + "\n") + self.edgeFH.write(self.delimiter.join(self.edge_properties) + "\n") def write_edge(self, edge: dict): - self.write_row(edge, record_type="edge") + """Write an edge to the underlying store. - def write_node(self, node: dict): - self.write_row(node, record_type="node") + Args: + edge: dict - An edge record + """ + row = build_export_row(edge, list_delimiter=self.list_delimiter) + values = self.get_columns(row, self.edge_properties) + self.edgeFH.write(self.delimiter.join(values) + "\n") - def write_row(self, record: Dict, record_type: Literal["node", "edge"]) -> None: - """Write a row to the underlying store. + def write_node(self, node: dict): + """Write a node to the underlying store. Args: - record: Dict - A node or edge record - record_type: Literal["node", "edge"] - The record_type of record + node: dict - A node record """ - fh = self.nodeFH if record_type == "node" else self.edgeFH - columns = self.node_columns if record_type == "node" else self.edge_columns - row = build_export_row(record, list_delimiter=self.list_delimiter) - - # Throw error if the record has extra columns - columns_tuple = tuple(columns) - row_keys_tuple = tuple(row.keys()) - if self.has_extra_columns(row_keys_tuple, columns_tuple): - throw_error(f"Record has extra columns: {set(row.keys()) - set(columns)} not defined in {record_type}") + row = build_export_row(node, list_delimiter=self.list_delimiter) + row["id"] = node["id"] + values = self.get_columns(row, self.node_properties) + self.nodeFH.write(self.delimiter.join(values) + "\n") + @staticmethod + def get_columns(row: Dict, columns) -> List[str]: values = [] - if record_type == "node": - row["id"] = record["id"] for c in columns: if c in row: values.append(str(row[c])) else: values.append("") - fh.write(self.delimiter.join(values) + "\n") + return values def finalize(self): """Close file handles.""" @@ -83,19 +75,6 @@ def finalize(self): if hasattr(self, "edgeFH"): self.edgeFH.close() - @staticmethod - def has_extra_columns(row_keys: Tuple[str, ...], columns_tuple: Tuple[str, ...]) -> bool: - """Check if a row has extra columns. - - Args: - row_keys: Tuple[str, ...] - A tuple of row keys - columns_tuple: Tuple[str, ...] - A tuple of columns - - Returns: - bool - True if row has extra columns, False otherwise - """ - return not set(row_keys).issubset(set(columns_tuple)) - @staticmethod def _order_columns(cols: Set, record_type: Literal["node", "edge"]) -> OrderedSet: """Arrange node or edge columns in a defined order. diff --git a/src/koza/io/writer/writer.py b/src/koza/io/writer/writer.py index 7ef5063..c8b4f82 100644 --- a/src/koza/io/writer/writer.py +++ b/src/koza/io/writer/writer.py @@ -19,28 +19,34 @@ def __init__( node_properties: List[str] = None, edge_properties: List[str] = None, sssom_config: SSSOMConfig = None, + skip_checks: bool = False, ): + """Do not override this method; implement `init` instead.""" self.output_dir = output_dir self.source_name = source_name - self.node_columns = node_properties - self.edge_columns = edge_properties + self.node_properties = node_properties + self.edge_properties = edge_properties self.sssom_config = sssom_config - + self.skip_checks = skip_checks self.converter = KGXConverter() - def write(self, entities: Iterable, skip_checks: bool = False): + self.init() + + def write(self, entities: Iterable): nodes, edges = self.converter.convert(entities) if nodes: for node in nodes: - self.check_extra_fields(tuple(node.keys()), tuple(self.node_columns)) + if not self.skip_checks: + self.check_extra_fields(tuple(node.keys()), tuple(self.node_properties)) self.write_node(node) if edges: for edge in edges: if self.sssom_config: edge = self.sssom_config.apply_mapping(edge) - self.check_extra_fields(tuple(edge.keys()), tuple(self.edge_columns)) + if not self.skip_checks: + self.check_extra_fields(tuple(edge.keys()), tuple(self.edge_properties)) self.write_edge(edge) @staticmethod @@ -54,6 +60,10 @@ def check_extra_fields(row_keys: Tuple, columns: Tuple) -> None: if extra_fields: raise ValueError(f"Extra fields found in row: {sorted(set(row_keys) - set(columns))}") + @abstractmethod + def init(self): + pass + @abstractmethod def write_edge(self, edge: dict): pass