Skip to content

Commit

Permalink
Simplify init and remove unnecessary code
Browse files Browse the repository at this point in the history
  • Loading branch information
amc-corey-cox committed Nov 8, 2024
1 parent 979e25b commit 5a76b66
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 78 deletions.
27 changes: 11 additions & 16 deletions src/koza/io/writer/jsonl_writer.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
import json
import os
from typing import List, Optional
from typing import List, Optional, TextIO

from koza.io.writer.writer import KozaWriter
from koza.model.config.sssom_config import SSSOMConfig


class JSONLWriter(KozaWriter):
def __init__(
self,
output_dir: str,
source_name: str,
node_properties: List[str],
edge_properties: Optional[List[str]] = None,
sssom_config: SSSOMConfig = None,
):
super().__init__(output_dir, source_name, node_properties, edge_properties, sssom_config)
node_properties: List[str]
edge_properties: List[str]
nodeFH: Optional[TextIO]
edgeFH: Optional[TextIO]

os.makedirs(output_dir, exist_ok=True)
if node_properties:
self.nodeFH = open(f"{output_dir}/{source_name}_nodes.jsonl", "w")
if edge_properties:
self.edgeFH = open(f"{output_dir}/{source_name}_edges.jsonl", "w")
def init(self):
os.makedirs(self.output_dir, exist_ok=True)
if self.node_properties:
self.nodeFH = open(f"{self.output_dir}/{self.source_name}_nodes.jsonl", "w")
if self.edge_properties:
self.edgeFH = open(f"{self.output_dir}/{self.source_name}_edges.jsonl", "w")

def write_edge(self, edge: dict):
edge = json.dumps(edge, ensure_ascii=False)
Expand Down
91 changes: 35 additions & 56 deletions src/koza/io/writer/tsv_writer.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,71 @@
#### TSV Writer ####
# NOTE - May want to rename to KGXWriter at some point, if we develop writers for other models non biolink/kgx specific

from pathlib import Path
from typing import Dict, List, Literal, Set, Tuple, Union
from typing import Dict, List, Literal, Set, TextIO

from numpy.f2py.auxfuncs import throw_error
from ordered_set import OrderedSet

# from koza.converter.kgx_converter import KGXConverter
from koza.io.utils import build_export_row
from koza.io.writer.writer import KozaWriter
from koza.model.config.sssom_config import SSSOMConfig


class TSVWriter(KozaWriter):
def __init__(
self,
output_dir: Union[str, Path],
source_name: str,
node_properties: List[str] = None,
edge_properties: List[str] = None,
sssom_config: SSSOMConfig = None,
):
super().__init__(output_dir, source_name, node_properties, edge_properties, sssom_config)
self.delimiter = "\t"
self.list_delimiter = "|"
delimiter: str = "\t"
list_delimiter: str = "|"

nodes_file_name: Path
edges_file_name: Path

nodeFH: TextIO
edgeFH: TextIO

def init(self):
Path(self.output_dir).mkdir(parents=True, exist_ok=True)

if node_properties: # Make node file
self.node_columns = TSVWriter._order_columns(set(node_properties), "node")
if self.node_properties: # Make node file
self.node_properties = TSVWriter._order_columns(set(self.node_properties), "node")
self.nodes_file_name = Path(self.output_dir if self.output_dir else "", f"{self.source_name}_nodes.tsv")
self.nodeFH = open(self.nodes_file_name, "w")
self.nodeFH.write(self.delimiter.join(self.node_columns) + "\n")
self.nodeFH.write(self.delimiter.join(self.node_properties) + "\n")

if edge_properties: # Make edge file
if sssom_config:
edge_properties = self.add_sssom_columns(edge_properties)
self.edge_columns = TSVWriter._order_columns(set(edge_properties), "edge")
if self.edge_properties: # Make edge file
if self.sssom_config:
self.edge_properties = self.add_sssom_columns(self.edge_properties)
self.edge_properties = TSVWriter._order_columns(set(self.edge_properties), "edge")
self.edges_file_name = Path(self.output_dir if self.output_dir else "", f"{self.source_name}_edges.tsv")
self.edgeFH = open(self.edges_file_name, "w")
self.edgeFH.write(self.delimiter.join(self.edge_columns) + "\n")
self.edgeFH.write(self.delimiter.join(self.edge_properties) + "\n")

def write_edge(self, edge: dict):
self.write_row(edge, record_type="edge")
"""Write an edge to the underlying store.
def write_node(self, node: dict):
self.write_row(node, record_type="node")
Args:
edge: dict - An edge record
"""
row = build_export_row(edge, list_delimiter=self.list_delimiter)
values = self.get_columns(row, self.edge_properties)
self.edgeFH.write(self.delimiter.join(values) + "\n")

def write_row(self, record: Dict, record_type: Literal["node", "edge"]) -> None:
"""Write a row to the underlying store.
def write_node(self, node: dict):
"""Write a node to the underlying store.
Args:
record: Dict - A node or edge record
record_type: Literal["node", "edge"] - The record_type of record
node: dict - A node record
"""
fh = self.nodeFH if record_type == "node" else self.edgeFH
columns = self.node_columns if record_type == "node" else self.edge_columns
row = build_export_row(record, list_delimiter=self.list_delimiter)

# Throw error if the record has extra columns
columns_tuple = tuple(columns)
row_keys_tuple = tuple(row.keys())
if self.has_extra_columns(row_keys_tuple, columns_tuple):
throw_error(f"Record has extra columns: {set(row.keys()) - set(columns)} not defined in {record_type}")
row = build_export_row(node, list_delimiter=self.list_delimiter)
row["id"] = node["id"]
values = self.get_columns(row, self.node_properties)
self.nodeFH.write(self.delimiter.join(values) + "\n")

@staticmethod
def get_columns(row: Dict, columns) -> List[str]:
values = []
if record_type == "node":
row["id"] = record["id"]
for c in columns:
if c in row:
values.append(str(row[c]))
else:
values.append("")
fh.write(self.delimiter.join(values) + "\n")
return values

def finalize(self):
"""Close file handles."""
Expand All @@ -83,19 +75,6 @@ def finalize(self):
if hasattr(self, "edgeFH"):
self.edgeFH.close()

@staticmethod
def has_extra_columns(row_keys: Tuple[str, ...], columns_tuple: Tuple[str, ...]) -> bool:
"""Check if a row has extra columns.
Args:
row_keys: Tuple[str, ...] - A tuple of row keys
columns_tuple: Tuple[str, ...] - A tuple of columns
Returns:
bool - True if row has extra columns, False otherwise
"""
return not set(row_keys).issubset(set(columns_tuple))

@staticmethod
def _order_columns(cols: Set, record_type: Literal["node", "edge"]) -> OrderedSet:
"""Arrange node or edge columns in a defined order.
Expand Down
22 changes: 16 additions & 6 deletions src/koza/io/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,34 @@ def __init__(
node_properties: List[str] = None,
edge_properties: List[str] = None,
sssom_config: SSSOMConfig = None,
skip_checks: bool = False,
):
"""Do not override this method; implement `init` instead."""
self.output_dir = output_dir
self.source_name = source_name
self.node_columns = node_properties
self.edge_columns = edge_properties
self.node_properties = node_properties
self.edge_properties = edge_properties
self.sssom_config = sssom_config

self.skip_checks = skip_checks
self.converter = KGXConverter()

def write(self, entities: Iterable, skip_checks: bool = False):
self.init()

def write(self, entities: Iterable):
nodes, edges = self.converter.convert(entities)

if nodes:
for node in nodes:
self.check_extra_fields(tuple(node.keys()), tuple(self.node_columns))
if not self.skip_checks:
self.check_extra_fields(tuple(node.keys()), tuple(self.node_properties))
self.write_node(node)

if edges:
for edge in edges:
if self.sssom_config:
edge = self.sssom_config.apply_mapping(edge)
self.check_extra_fields(tuple(edge.keys()), tuple(self.edge_columns))
if not self.skip_checks:
self.check_extra_fields(tuple(edge.keys()), tuple(self.edge_properties))
self.write_edge(edge)

@staticmethod
Expand All @@ -54,6 +60,10 @@ def check_extra_fields(row_keys: Tuple, columns: Tuple) -> None:
if extra_fields:
raise ValueError(f"Extra fields found in row: {sorted(set(row_keys) - set(columns))}")

@abstractmethod
def init(self):
pass

@abstractmethod
def write_edge(self, edge: dict):
pass
Expand Down

0 comments on commit 5a76b66

Please sign in to comment.