From 529c1a1673c4405c7e085a7191975eb92c5df6a7 Mon Sep 17 00:00:00 2001 From: "xiang song(charlie.song)" Date: Tue, 28 Nov 2023 01:04:36 -0800 Subject: [PATCH] Support output data in csv format in remap_result.py (#645) CSV a very common data format that accepted by many structured data storage systems. When outputing results (prediction results or node embeddings), we should support csv format. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --------- Co-authored-by: Xiang Song --- python/graphstorm/gconstruct/remap_result.py | 175 +++++++++++++++--- .../data_process/check_edge_predict_remap.py | 39 +++- .../data_process/check_emb_remap.py | 34 +++- tests/end2end-tests/data_process/test.sh | 26 +++ .../gconstruct/test_remap_result.py | 83 ++++++++- 5 files changed, 315 insertions(+), 42 deletions(-) diff --git a/python/graphstorm/gconstruct/remap_result.py b/python/graphstorm/gconstruct/remap_result.py index 870bf01609..b679a3a57c 100644 --- a/python/graphstorm/gconstruct/remap_result.py +++ b/python/graphstorm/gconstruct/remap_result.py @@ -25,7 +25,9 @@ import time import sys import math +from functools import partial +import pandas as pd import torch as th from ..model.utils import pad_file_index from .file_io import write_data_parquet @@ -40,6 +42,21 @@ BUILTIN_TASK_NODE_CLASSIFICATION, BUILTIN_TASK_NODE_REGRESSION) +GS_OUTPUT_FORMAT_PARQUET = "parquet" +GS_OUTPUT_FORMAT_CSV = "csv" + +GS_REMAP_NID_COL = "nid" +GS_REMAP_PREDICTION_COL = "pred" +GS_REMAP_SRC_NID_COL = "src_nid" +GS_REMAP_DST_NID_COL = "dst_nid" +GS_REMAP_EMBED_COL = "emb" + +GS_REMAP_BUILTIN_COLS = [GS_REMAP_NID_COL, + GS_REMAP_PREDICTION_COL, + GS_REMAP_SRC_NID_COL, + GS_REMAP_DST_NID_COL, + GS_REMAP_EMBED_COL] + # Id_maps is a global variable. # When using multi-processing to do id remap, # we do not want to pass id_maps to each worker process @@ -49,8 +66,76 @@ # id_maps to each worker process. id_maps = {} +def write_data_parquet_file(data, file_prefix, col_name_map=None): + """ Write data into disk using parquet format. + + Parameters + ---------- + data: dict of numpy Arrays + Data to be written into disk. + file_prefix: str + File prefix. The output will be .parquet. + col_name_map: dict + A mapping from builtin column name to user defined column name. + """ + if col_name_map is not None: + data = {col_name_map[key]: val for key, val in data.items()} + output_fname = f"{file_prefix}.parquet" + write_data_parquet(data, output_fname) + +def write_data_csv_file(data, file_prefix, delimiter=",", col_name_map=None): + """ Write data into disk using csv format. + + Multiple values for a field are specified with a semicolon (;) between values. + + Example: + + .. code:: + + nide, emb + 0, 0.001;1.2000;0.736;... + + Parameters + ---------- + data: dict of numpy Arrays + Data to be written into disk. + file_prefix: str + File prefix. The output will be .parquet. + delimiter: str + Delimiter used to separate columns. + col_name_map: dict + A mapping from builtin column name to user defined column name. + """ + if col_name_map is not None: + data = {col_name_map[key]: val for key, val in data.items()} + + output_fname = f"{file_prefix}.csv" + csv_data = {} + for key, vals in data.items(): + # Each pair represents the column name and + # the column data of a column. + if len(vals.shape) == 1: + # vals is a 1D matrix. + # The data will be saved as + # key, + # 0.1, + # 0.2, + # ... + csv_data[key] = vals.tolist() + elif len(vals.shape) == 2: + # vals is a 2D matrix. + # The data will be saved as + # key, + # 0.001;1.2000;0.736;..., + # 0.002;1.1010;0.834;..., + # ... + csv_data[key] = [";".join([str(v) for v in val]) \ + for val in vals.tolist()] + data_frame = pd.DataFrame(csv_data) + data_frame.to_csv(output_fname, index=False, sep=delimiter) + def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, - output_fname_prefix, chunk_size, preserve_input): + output_fname_prefix, chunk_size, output_func, preserve_input): """ Do one node prediction remapping task Parameters @@ -67,6 +152,8 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, Output file name prefix. chunk_size: int Max number of raws per output file. + output_func: func + Function used to write data to disk. preserve_input: bool Whether the input data should be removed. """ @@ -76,16 +163,13 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, num_chunks = math.ceil(len(node_data) / chunk_size) for i in range(num_chunks): - output_fname = f"{output_fname_prefix}_{pad_file_index(i)}.parquet" - start = i * chunk_size end = (i + 1) * chunk_size if i + 1 < num_chunks else len(node_data) data = node_data[start:end] nid = nid_map.map_id(nids[start:end]) data = {data_col_key: data, - "nid": nid} - - write_data_parquet(data, output_fname) + GS_REMAP_NID_COL: nid} + output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}") if preserve_input is False: os.remove(data_file_path) @@ -93,7 +177,8 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, def worker_remap_edge_pred(pred_file_path, src_nid_path, dst_nid_path, src_type, dst_type, - output_fname_prefix, chunk_size, preserve_input): + output_fname_prefix, chunk_size, + output_func, preserve_input): """ Do one edge remapping task Parameters @@ -112,6 +197,8 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path, Output file name prefix. chunk_size: int Max number of raws per output file. + output_func: func + Function used to write data to disk. preserve_input: bool Whether the input data should be removed. """ @@ -122,18 +209,16 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path, dst_id_map = id_maps[dst_type] num_chunks = math.ceil(len(pred_result) / chunk_size) for i in range(num_chunks): - output_fname = f"{output_fname_prefix}_{pad_file_index(i)}.parquet" - start = i * chunk_size end = (i + 1) * chunk_size if i + 1 < num_chunks else len(pred_result) pred = pred_result[start:end] src_nid = src_id_map.map_id(src_nids[start:end]) dst_nid = dst_id_map.map_id(dst_nids[start:end]) - data = {"pred": pred, - "src_nid": src_nid, - "dst_nid": dst_nid} + data = {GS_REMAP_PREDICTION_COL: pred, + GS_REMAP_SRC_NID_COL: src_nid, + GS_REMAP_DST_NID_COL: dst_nid} - write_data_parquet(data, output_fname) + output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}") if preserve_input is False: os.remove(pred_file_path) @@ -177,7 +262,8 @@ def _get_file_range(num_files, rank, world_size): def remap_node_emb(emb_ntypes, node_emb_dir, output_dir, out_chunk_size, num_proc, rank, world_size, - with_shared_fs, preserve_input=False): + with_shared_fs, output_func, + preserve_input=False): """ Remap node embeddings. The function will iterate all the node types that @@ -235,7 +321,9 @@ def remap_node_emb(emb_ntypes, node_emb_dir, world_size: int The total number of processes in the cluster. with_shared_fs: bool - Whether shared file system is avaliable + Whether shared file system is avaliable. + output_func: func + Function used to write data to disk. preserve_input: bool Whether the input data should be removed. """ @@ -278,10 +366,11 @@ def remap_node_emb(emb_ntypes, node_emb_dir, "data_file_path": os.path.join(input_emb_dir, emb_file), "nid_path": os.path.join(input_emb_dir, nid_file), "ntype": ntype, - "data_col_key": "emb", + "data_col_key": GS_REMAP_EMBED_COL, "output_fname_prefix": os.path.join(out_embdir, \ f"{emb_file[:emb_file.rindex('.')]}"), "chunk_size": out_chunk_size, + "output_func": output_func, "preserve_input": preserve_input }) @@ -290,7 +379,7 @@ def remap_node_emb(emb_ntypes, node_emb_dir, def remap_node_pred(pred_ntypes, pred_dir, output_dir, out_chunk_size, num_proc, rank, world_size, with_shared_fs, - preserve_input=False): + output_func, preserve_input=False): """ Remap node prediction result. The function wil iterate all the node types that @@ -327,7 +416,9 @@ def remap_node_pred(pred_ntypes, pred_dir, world_size: int The total number of processes in the cluster. with_shared_fs: bool - Whether shared file system is avaliable + Whether shared file system is avaliable. + output_func: func + Function used to write data to disk. preserve_input: bool Whether the input data should be removed. """ @@ -365,11 +456,12 @@ def remap_node_pred(pred_ntypes, pred_dir, "data_file_path": os.path.join(input_pred_dir, pred_file), "nid_path": os.path.join(input_pred_dir, nid_file), "ntype": ntype, - "data_col_key": "pred", + "data_col_key": GS_REMAP_PREDICTION_COL, "output_fname_prefix": os.path.join(out_pred_dir, \ f"pred.{pred_file[:pred_file.rindex('.')]}"), "chunk_size": out_chunk_size, - "preserve_input": preserve_input + "output_func": output_func, + "preserve_input": preserve_input, }) multiprocessing_remap(task_list, num_proc, worker_remap_node_data) @@ -381,7 +473,7 @@ def remap_node_pred(pred_ntypes, pred_dir, def remap_edge_pred(pred_etypes, pred_dir, output_dir, out_chunk_size, num_proc, rank, world_size, with_shared_fs, - preserve_input=False): + output_func, preserve_input=False): """ Remap edge prediction result. The function will iterate all the edge types that @@ -422,7 +514,9 @@ def remap_edge_pred(pred_etypes, pred_dir, world_size: int The total number of processes in the cluster. with_shared_fs: bool - Whether shared file system is avaliable + Whether shared file system is avaliable. + output_func: func + Function used to write data to disk. preserve_input: bool Whether the input data should be removed. """ @@ -480,6 +574,7 @@ def remap_edge_pred(pred_etypes, pred_dir, "output_fname_prefix": os.path.join(out_pred_dir, \ f"pred.{pred_file[:pred_file.rindex('.')]}"), "chunk_size": out_chunk_size, + "output_func": output_func, "preserve_input": preserve_input }) @@ -693,6 +788,27 @@ def main(args, gs_config_args): return num_proc = args.num_processes if args.num_processes > 0 else 1 + col_name_map = None + if args.column_names is not None: + col_name_map = {} + # Load customized column names + for col_rename_pair in args.column_names: + # : has special meaning in Graph Database like Neptune + # Here, we use , as the delimiter. + orig_name, new_name = col_rename_pair.split(",") + assert orig_name in GS_REMAP_BUILTIN_COLS, \ + f"Expect the original col name is in {GS_REMAP_BUILTIN_COLS}, " \ + f"but get {orig_name}" + col_name_map[orig_name] = new_name + if args.output_format == GS_OUTPUT_FORMAT_PARQUET: + output_func = partial(write_data_parquet_file, + col_name_map=col_name_map) + elif args.output_format == GS_OUTPUT_FORMAT_CSV: + output_func = partial(write_data_csv_file, + delimiter=args.output_delimiter, + col_name_map=col_name_map) + else: + raise TypeError(f"Output format not supported {args.output_format}") if len(emb_ntypes) > 0: emb_output = node_emb_dir @@ -705,6 +821,7 @@ def main(args, gs_config_args): rank, world_size, with_shared_fs, + output_func, args.preserve_input) if len(pred_etypes) > 0: @@ -718,6 +835,7 @@ def main(args, gs_config_args): rank, world_size, with_shared_fs, + output_func, args.preserve_input) if len(pred_ntypes) > 0: @@ -731,6 +849,7 @@ def main(args, gs_config_args): rank, world_size, with_shared_fs, + output_func, args.preserve_input) @@ -786,9 +905,17 @@ def generate_parser(): default=None, help="The directory storing the node embeddings.") group.add_argument("--output-format", type=str, - default="parquet", - choices=['parquet'], + default=GS_OUTPUT_FORMAT_PARQUET, + choices=[GS_OUTPUT_FORMAT_PARQUET, GS_OUTPUT_FORMAT_CSV], help="The format of the output.") + group.add_argument("--output-delimiter", type=str, default=",", + help="The delimiter used when saving data in CSV format.") + group.add_argument("--column-names", type=str, nargs="+", default=None, + help="Defines how to rename default column names to new names." + f"For example, given --column-names {GS_REMAP_NID_COL},~id " + f"{GS_REMAP_EMBED_COL},embedding. The column " + f"{GS_REMAP_NID_COL} will be renamed to ~id. " + f"The column {GS_REMAP_EMBED_COL} will be renamed to embedding.") group.add_argument("--logging-level", type=str, default="info", help="The logging level. The possible values: debug, info, warning, \ error. The default value is info.") diff --git a/tests/end2end-tests/data_process/check_edge_predict_remap.py b/tests/end2end-tests/data_process/check_edge_predict_remap.py index b1179ca899..43721f9ee2 100644 --- a/tests/end2end-tests/data_process/check_edge_predict_remap.py +++ b/tests/end2end-tests/data_process/check_edge_predict_remap.py @@ -16,8 +16,6 @@ """ import os import argparse -import json -import torch as th from graphstorm.gconstruct.file_io import read_data_parquet from numpy.testing import assert_equal @@ -25,31 +23,56 @@ def main(args): predict_path = args.remap_output + column_names = args.column_names etype0 = ("n0", "access", "n1") etype1 = ("n1", "access", "n0") + column_name_map = {} + if column_names is not None: + for col_rename_pair in column_names: + orig_name, new_name = col_rename_pair.split(",") + column_name_map[orig_name] = new_name + + data_fields = ["pred", "src_nid", "dst_nid"] if column_names is None \ + else [column_name_map["pred"], column_name_map["src_nid"], column_name_map["dst_nid"]] etype0_pred_path = os.path.join(predict_path, "_".join(etype0)) data = read_data_parquet( os.path.join(etype0_pred_path, "pred.predict-00000_00000.parquet"), - data_fields=["pred", "src_nid", "dst_nid"]) - + data_fields=data_fields) + if column_names is not None: + data["pred"] = data[column_name_map["pred"]] + data["src_nid"] = data[column_name_map["src_nid"]] + data["dst_nid"] = data[column_name_map["dst_nid"]] assert_equal(data["pred"][:,0].astype("str"), data["src_nid"]) assert_equal(data["pred"][:,1].astype("str"), data["dst_nid"]) + data = read_data_parquet( os.path.join(etype0_pred_path, "pred.predict-00001_00000.parquet"), - data_fields=["pred", "src_nid", "dst_nid"]) + data_fields=data_fields) + if column_names is not None: + data["pred"] = data[column_name_map["pred"]] + data["src_nid"] = data[column_name_map["src_nid"]] + data["dst_nid"] = data[column_name_map["dst_nid"]] assert_equal(data["pred"][:,0].astype("str"), data["src_nid"]) assert_equal(data["pred"][:,1].astype("str"), data["dst_nid"]) etype1_pred_path = os.path.join(predict_path, "_".join(etype1)) data = read_data_parquet( os.path.join(etype1_pred_path, "pred.predict-00000_00000.parquet"), - data_fields=["pred", "src_nid", "dst_nid"]) + data_fields=data_fields) + if column_names is not None: + data["pred"] = data[column_name_map["pred"]] + data["src_nid"] = data[column_name_map["src_nid"]] + data["dst_nid"] = data[column_name_map["dst_nid"]] assert_equal(data["pred"][:,0].astype("str"), data["src_nid"]) assert_equal(data["pred"][:,1].astype("str"), data["dst_nid"]) data = read_data_parquet( os.path.join(etype0_pred_path, "pred.predict-00001_00000.parquet"), - data_fields=["pred", "src_nid", "dst_nid"]) + data_fields=data_fields) + if column_names is not None: + data["pred"] = data[column_name_map["pred"]] + data["src_nid"] = data[column_name_map["src_nid"]] + data["dst_nid"] = data[column_name_map["dst_nid"]] assert_equal(data["pred"][:,0].astype("str"), data["src_nid"]) assert_equal(data["pred"][:,1].astype("str"), data["dst_nid"]) @@ -57,6 +80,8 @@ def main(args): argparser = argparse.ArgumentParser("Check edge prediction remapping") argparser.add_argument("--remap-output", type=str, required=True, help="Path to save the generated data") + argparser.add_argument("--column-names", type=str, nargs="+", default=None, + help="Defines how to rename default column names to new names.") args = argparser.parse_args() diff --git a/tests/end2end-tests/data_process/check_emb_remap.py b/tests/end2end-tests/data_process/check_emb_remap.py index 545f1a30eb..16ecd63e74 100644 --- a/tests/end2end-tests/data_process/check_emb_remap.py +++ b/tests/end2end-tests/data_process/check_emb_remap.py @@ -16,8 +16,6 @@ """ import os import argparse -import json -import torch as th from graphstorm.gconstruct.file_io import read_data_parquet from numpy.testing import assert_equal @@ -27,29 +25,49 @@ def main(args): ntype1 = "n1" emb_path = args.remap_output + column_names = args.column_names + column_name_map = {} + if column_names is not None: + for col_rename_pair in column_names: + orig_name, new_name = col_rename_pair.split(",") + column_name_map[orig_name] = new_name + data_fields = ["emb", "nid"] if column_names is None \ + else [column_name_map["emb"], column_name_map["nid"]] ntype0_emb_path = os.path.join(emb_path, ntype0) + data = read_data_parquet( os.path.join(ntype0_emb_path, "embed-00000_00000.parquet"), - data_fields=["emb", "nid"]) - + data_fields=data_fields) + if column_names is not None: + data["emb"] = data[column_name_map["emb"]] + data["nid"] = data[column_name_map["nid"]] assert_equal(data["emb"][:,0].astype("str"), data["nid"]) assert_equal(data["emb"][:,1].astype("str"), data["nid"]) data = read_data_parquet( os.path.join(ntype0_emb_path, "embed-00001_00000.parquet"), - data_fields=["emb", "nid"]) + data_fields=data_fields) + if column_names is not None: + data["emb"] = data[column_name_map["emb"]] + data["nid"] = data[column_name_map["nid"]] assert_equal(data["emb"][:,0].astype("str"), data["nid"]) assert_equal(data["emb"][:,1].astype("str"), data["nid"]) ntype1_emb_path = os.path.join(emb_path, ntype1) data = read_data_parquet( os.path.join(ntype1_emb_path, "embed-00000_00000.parquet"), - data_fields=["emb", "nid"]) + data_fields=data_fields) + if column_names is not None: + data["emb"] = data[column_name_map["emb"]] + data["nid"] = data[column_name_map["nid"]] assert_equal(data["emb"][:,0].astype("str"), data["nid"]) assert_equal(data["emb"][:,1].astype("str"), data["nid"]) data = read_data_parquet( os.path.join(ntype1_emb_path, "embed-00001_00000.parquet"), - data_fields=["emb", "nid"]) + data_fields=data_fields) + if column_names is not None: + data["emb"] = data[column_name_map["emb"]] + data["nid"] = data[column_name_map["nid"]] assert_equal(data["emb"][:,0].astype("str"), data["nid"]) assert_equal(data["emb"][:,1].astype("str"), data["nid"]) @@ -57,6 +75,8 @@ def main(args): argparser = argparse.ArgumentParser("Check edge prediction remapping") argparser.add_argument("--remap-output", type=str, required=True, help="Path to save the generated data") + argparser.add_argument("--column-names", type=str, nargs="+", default=None, + help="Defines how to rename default column names to new names.") args = argparser.parse_args() diff --git a/tests/end2end-tests/data_process/test.sh b/tests/end2end-tests/data_process/test.sh index 5544878236..61265e747e 100644 --- a/tests/end2end-tests/data_process/test.sh +++ b/tests/end2end-tests/data_process/test.sh @@ -86,6 +86,19 @@ python3 $GS_HOME/tests/end2end-tests/data_process/check_edge_predict_remap.py -- error_and_exit $? +cp -r /tmp/ep_remap/pred/ /tmp/ep_remap/rename-pred/ +# Test remap edge prediction results and rename col names +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 0 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" +error_and_exit $? + +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 1 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" +error_and_exit $? + +python3 $GS_HOME/tests/end2end-tests/data_process/check_edge_predict_remap.py --remap-output /tmp/ep_remap/rename-pred/ --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" + +error_and_exit $? +rm -fr /tmp/ep_remap/rename-pred/ + # Test without shared filesystem mkdir /tmp/ep_remap/pred/0/ mkdir /tmp/ep_remap/pred/1/ @@ -176,6 +189,19 @@ python3 $GS_HOME/tests/end2end-tests/data_process/check_emb_remap.py --remap-out error_and_exit $? +cp -r /tmp/em_remap/partial-emb/ /tmp/em_remap/partial-rename-emb/ + +# Test remap emb results and rename col names +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --preserve-input True --rank 0 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" +error_and_exit $? + +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --preserve-input True --rank 1 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" +error_and_exit $? + +python3 $GS_HOME/tests/end2end-tests/data_process/check_emb_remap.py --remap-output /tmp/em_remap/partial-rename-emb/ --column-names "nid,~id:STRING" "emb,emb:FLOAT" + +error_and_exit $? + # Test without shared filesystem echo "********* Test the remap partial node embedding without shared mem *********" mkdir /tmp/em_remap/partial-emb/0/ diff --git a/tests/unit-tests/gconstruct/test_remap_result.py b/tests/unit-tests/gconstruct/test_remap_result.py index ea26b9b7d2..17d43b20c6 100644 --- a/tests/unit-tests/gconstruct/test_remap_result.py +++ b/tests/unit-tests/gconstruct/test_remap_result.py @@ -17,10 +17,12 @@ import os import tempfile import pytest +from functools import partial +import pandas as pd import torch as th import numpy as np -from numpy.testing import assert_equal +from numpy.testing import assert_equal, assert_almost_equal from graphstorm.gconstruct import remap_result from graphstorm.gconstruct.file_io import read_data_parquet @@ -28,6 +30,8 @@ from graphstorm.gconstruct.remap_result import _get_file_range from graphstorm.gconstruct.remap_result import (worker_remap_edge_pred, worker_remap_node_data) +from graphstorm.gconstruct.remap_result import (write_data_parquet_file, + write_data_csv_file) def gen_id_maps(num_ids=1000): nid0 = np.random.permutation(num_ids).tolist() @@ -78,11 +82,28 @@ def test_worker_remap_node_data(data_col): remap_result.id_maps[ntype] = IdReverseMap(os.path.join(tmpdirname, ntype + "_id_remap.parquet")) worker_remap_node_data(data_path, nid_path, ntypes[0], data_col, - output_path_prefix, chunk_size, preserve_input=True) + output_path_prefix, chunk_size, + write_data_parquet_file, preserve_input=True) + worker_remap_node_data(data_path, nid_path, ntypes[0], data_col, + output_path_prefix, chunk_size, + partial(write_data_csv_file, delimiter=","), + preserve_input=True) + def read_csv(file, delimiter=","): + data = pd.read_csv(file, delimiter=delimiter) + nid = data["nid"].to_numpy() + data_ = [np.array(d.split(";")).astype(np.float32) for d in data[data_col]] + + return {"nid": nid, + data_col: data_} + assert os.path.exists(f"{output_path_prefix}_00000.parquet") assert os.path.exists(f"{output_path_prefix}_00001.parquet") assert os.path.exists(f"{output_path_prefix}_00002.parquet") assert os.path.exists(f"{output_path_prefix}_00003.parquet") + assert os.path.exists(f"{output_path_prefix}_00000.csv") + assert os.path.exists(f"{output_path_prefix}_00001.csv") + assert os.path.exists(f"{output_path_prefix}_00002.csv") + assert os.path.exists(f"{output_path_prefix}_00003.csv") data0 = read_data_parquet(f"{output_path_prefix}_00000.parquet", [data_col, "nid"]) @@ -92,21 +113,37 @@ def test_worker_remap_node_data(data_col): [data_col, "nid"]) data3 = read_data_parquet(f"{output_path_prefix}_00003.parquet", [data_col, "nid"]) + data0_csv = read_csv(f"{output_path_prefix}_00000.csv") + data1_csv = read_csv(f"{output_path_prefix}_00001.csv") + data2_csv = read_csv(f"{output_path_prefix}_00002.csv") + data3_csv = read_csv(f"{output_path_prefix}_00003.csv") assert len(data0[data_col]) == 256 assert len(data1[data_col]) == 256 assert len(data2[data_col]) == 256 assert len(data3[data_col]) == 232 + assert len(data0_csv[data_col]) == 256 + assert len(data1_csv[data_col]) == 256 + assert len(data2_csv[data_col]) == 256 + assert len(data3_csv[data_col]) == 232 data_ = [data0[data_col], data1[data_col], data2[data_col], data3[data_col]] nids_ = [data0["nid"], data1["nid"], data2["nid"], data3["nid"]] + csv_data_ = [data0_csv[data_col], data1_csv[data_col], + data2_csv[data_col], data3_csv[data_col]] + csv_nids_ = [data0_csv["nid"], data1_csv["nid"], + data2_csv["nid"], data3_csv["nid"]] data_ = np.concatenate(data_, axis=0) nids_ = np.concatenate(nids_, axis=0) + csv_data_ = np.concatenate(csv_data_, axis=0) + csv_nids_ = np.concatenate(csv_nids_, axis=0) + assert_almost_equal(data_, csv_data_, decimal=5) + assert_equal(nids_, csv_nids_) revserse_mapping = {} revserse_mapping[ntypes[0]] = {val: key for key, val in mappings[ntypes[0]]._ids.items()} for i in range(num_data): - assert_equal(data_[i], data[i].numpy()) + assert_almost_equal(data_[i], data[i].numpy(), decimal=4) assert_equal(nids_[i], revserse_mapping[ntypes[0]][int(nids[i])]) def test_worker_remap_edge_pred(): @@ -134,12 +171,30 @@ def test_worker_remap_edge_pred(): worker_remap_edge_pred(pred_path, src_nid_path, dst_nid_path, ntypes[0], ntypes[1], output_path_prefix, - chunk_size, preserve_input=True) + chunk_size, write_data_parquet_file, + preserve_input=True) + worker_remap_edge_pred(pred_path, src_nid_path, dst_nid_path, + ntypes[0], ntypes[1], output_path_prefix, + chunk_size, partial(write_data_csv_file, delimiter=","), + preserve_input=True) + def read_csv(file, delimiter=","): + data = pd.read_csv(file, delimiter=delimiter) + src_nid = data["src_nid"].to_numpy() + dst_nid = data["dst_nid"].to_numpy() + pred = [np.array(p.split(";")).astype(np.float32) for p in data["pred"]] + + return {"src_nid": src_nid, + "dst_nid": dst_nid, + "pred": pred} assert os.path.exists(f"{output_path_prefix}_00000.parquet") assert os.path.exists(f"{output_path_prefix}_00001.parquet") assert os.path.exists(f"{output_path_prefix}_00002.parquet") assert os.path.exists(f"{output_path_prefix}_00003.parquet") + assert os.path.exists(f"{output_path_prefix}_00000.csv") + assert os.path.exists(f"{output_path_prefix}_00001.csv") + assert os.path.exists(f"{output_path_prefix}_00002.csv") + assert os.path.exists(f"{output_path_prefix}_00003.csv") data0 = read_data_parquet(f"{output_path_prefix}_00000.parquet", ["pred", "src_nid", "dst_nid"]) data1 = read_data_parquet(f"{output_path_prefix}_00001.parquet", @@ -148,16 +203,36 @@ def test_worker_remap_edge_pred(): ["pred", "src_nid", "dst_nid"]) data3 = read_data_parquet(f"{output_path_prefix}_00003.parquet", ["pred", "src_nid", "dst_nid"]) + data0_csv = read_csv(f"{output_path_prefix}_00000.csv") + data1_csv = read_csv(f"{output_path_prefix}_00001.csv") + data2_csv = read_csv(f"{output_path_prefix}_00002.csv") + data3_csv = read_csv(f"{output_path_prefix}_00003.csv") assert len(data0["pred"]) == 256 assert len(data1["pred"]) == 256 assert len(data2["pred"]) == 256 assert len(data3["pred"]) == 232 + assert len(data0_csv["pred"]) == 256 + assert len(data1_csv["pred"]) == 256 + assert len(data2_csv["pred"]) == 256 + assert len(data3_csv["pred"]) == 232 preds_ = [data0["pred"], data1["pred"], data2["pred"], data3["pred"]] src_nids_ = [data0["src_nid"], data1["src_nid"], data2["src_nid"], data3["src_nid"]] dst_nids_ = [data0["dst_nid"], data1["dst_nid"], data2["dst_nid"], data3["dst_nid"]] + csv_preds_ = [data0_csv["pred"], data1_csv["pred"], + data2_csv["pred"], data3_csv["pred"]] + csv_src_nids_ = [data0_csv["src_nid"], data1_csv["src_nid"], + data2_csv["src_nid"], data3_csv["src_nid"]] + csv_dst_nids_ = [data0_csv["dst_nid"], data1_csv["dst_nid"], + data2_csv["dst_nid"], data3_csv["dst_nid"]] preds_ = np.concatenate(preds_, axis=0) src_nids_ = np.concatenate(src_nids_, axis=0) dst_nids_ = np.concatenate(dst_nids_, axis=0) + csv_preds_ = np.concatenate(csv_preds_, axis=0) + csv_src_nids_ = np.concatenate(csv_src_nids_, axis=0) + csv_dst_nids_ = np.concatenate(csv_dst_nids_, axis=0) + assert_almost_equal(preds_, csv_preds_, decimal=5) + assert_equal(src_nids_, csv_src_nids_) + assert_equal(dst_nids_, csv_dst_nids_) revserse_mapping = {} revserse_mapping[ntypes[0]] = {val: key for key, val in mappings[ntypes[0]]._ids.items()} revserse_mapping[ntypes[1]] = {val: key for key, val in mappings[ntypes[1]]._ids.items()}