Skip to content

Commit

Permalink
Support output data in csv format in remap_result.py (#645)
Browse files Browse the repository at this point in the history
CSV a very common data format that accepted by many structured data
storage systems. When outputing results (prediction results or node
embeddings), we should support csv format.



By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: Xiang Song <[email protected]>
  • Loading branch information
classicsong and Xiang Song authored Nov 28, 2023
1 parent cc1074f commit 529c1a1
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 42 deletions.
175 changes: 151 additions & 24 deletions python/graphstorm/gconstruct/remap_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import time
import sys
import math
from functools import partial

import pandas as pd
import torch as th
from ..model.utils import pad_file_index
from .file_io import write_data_parquet
Expand All @@ -40,6 +42,21 @@
BUILTIN_TASK_NODE_CLASSIFICATION,
BUILTIN_TASK_NODE_REGRESSION)

GS_OUTPUT_FORMAT_PARQUET = "parquet"
GS_OUTPUT_FORMAT_CSV = "csv"

GS_REMAP_NID_COL = "nid"
GS_REMAP_PREDICTION_COL = "pred"
GS_REMAP_SRC_NID_COL = "src_nid"
GS_REMAP_DST_NID_COL = "dst_nid"
GS_REMAP_EMBED_COL = "emb"

GS_REMAP_BUILTIN_COLS = [GS_REMAP_NID_COL,
GS_REMAP_PREDICTION_COL,
GS_REMAP_SRC_NID_COL,
GS_REMAP_DST_NID_COL,
GS_REMAP_EMBED_COL]

# Id_maps is a global variable.
# When using multi-processing to do id remap,
# we do not want to pass id_maps to each worker process
Expand All @@ -49,8 +66,76 @@
# id_maps to each worker process.
id_maps = {}

def write_data_parquet_file(data, file_prefix, col_name_map=None):
""" Write data into disk using parquet format.
Parameters
----------
data: dict of numpy Arrays
Data to be written into disk.
file_prefix: str
File prefix. The output will be <file_prefix>.parquet.
col_name_map: dict
A mapping from builtin column name to user defined column name.
"""
if col_name_map is not None:
data = {col_name_map[key]: val for key, val in data.items()}
output_fname = f"{file_prefix}.parquet"
write_data_parquet(data, output_fname)

def write_data_csv_file(data, file_prefix, delimiter=",", col_name_map=None):
""" Write data into disk using csv format.
Multiple values for a field are specified with a semicolon (;) between values.
Example:
.. code::
nide, emb
0, 0.001;1.2000;0.736;...
Parameters
----------
data: dict of numpy Arrays
Data to be written into disk.
file_prefix: str
File prefix. The output will be <file_prefix>.parquet.
delimiter: str
Delimiter used to separate columns.
col_name_map: dict
A mapping from builtin column name to user defined column name.
"""
if col_name_map is not None:
data = {col_name_map[key]: val for key, val in data.items()}

output_fname = f"{file_prefix}.csv"
csv_data = {}
for key, vals in data.items():
# Each <key, val> pair represents the column name and
# the column data of a column.
if len(vals.shape) == 1:
# vals is a 1D matrix.
# The data will be saved as
# key,
# 0.1,
# 0.2,
# ...
csv_data[key] = vals.tolist()
elif len(vals.shape) == 2:
# vals is a 2D matrix.
# The data will be saved as
# key,
# 0.001;1.2000;0.736;...,
# 0.002;1.1010;0.834;...,
# ...
csv_data[key] = [";".join([str(v) for v in val]) \
for val in vals.tolist()]
data_frame = pd.DataFrame(csv_data)
data_frame.to_csv(output_fname, index=False, sep=delimiter)

def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key,
output_fname_prefix, chunk_size, preserve_input):
output_fname_prefix, chunk_size, output_func, preserve_input):
""" Do one node prediction remapping task
Parameters
Expand All @@ -67,6 +152,8 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key,
Output file name prefix.
chunk_size: int
Max number of raws per output file.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
"""
Expand All @@ -76,24 +163,22 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key,
num_chunks = math.ceil(len(node_data) / chunk_size)

for i in range(num_chunks):
output_fname = f"{output_fname_prefix}_{pad_file_index(i)}.parquet"

start = i * chunk_size
end = (i + 1) * chunk_size if i + 1 < num_chunks else len(node_data)
data = node_data[start:end]
nid = nid_map.map_id(nids[start:end])
data = {data_col_key: data,
"nid": nid}

write_data_parquet(data, output_fname)
GS_REMAP_NID_COL: nid}
output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}")

if preserve_input is False:
os.remove(data_file_path)
os.remove(nid_path)

def worker_remap_edge_pred(pred_file_path, src_nid_path,
dst_nid_path, src_type, dst_type,
output_fname_prefix, chunk_size, preserve_input):
output_fname_prefix, chunk_size,
output_func, preserve_input):
""" Do one edge remapping task
Parameters
Expand All @@ -112,6 +197,8 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path,
Output file name prefix.
chunk_size: int
Max number of raws per output file.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
"""
Expand All @@ -122,18 +209,16 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path,
dst_id_map = id_maps[dst_type]
num_chunks = math.ceil(len(pred_result) / chunk_size)
for i in range(num_chunks):
output_fname = f"{output_fname_prefix}_{pad_file_index(i)}.parquet"

start = i * chunk_size
end = (i + 1) * chunk_size if i + 1 < num_chunks else len(pred_result)
pred = pred_result[start:end]
src_nid = src_id_map.map_id(src_nids[start:end])
dst_nid = dst_id_map.map_id(dst_nids[start:end])
data = {"pred": pred,
"src_nid": src_nid,
"dst_nid": dst_nid}
data = {GS_REMAP_PREDICTION_COL: pred,
GS_REMAP_SRC_NID_COL: src_nid,
GS_REMAP_DST_NID_COL: dst_nid}

write_data_parquet(data, output_fname)
output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}")

if preserve_input is False:
os.remove(pred_file_path)
Expand Down Expand Up @@ -177,7 +262,8 @@ def _get_file_range(num_files, rank, world_size):
def remap_node_emb(emb_ntypes, node_emb_dir,
output_dir, out_chunk_size,
num_proc, rank, world_size,
with_shared_fs, preserve_input=False):
with_shared_fs, output_func,
preserve_input=False):
""" Remap node embeddings.
The function will iterate all the node types that
Expand Down Expand Up @@ -235,7 +321,9 @@ def remap_node_emb(emb_ntypes, node_emb_dir,
world_size: int
The total number of processes in the cluster.
with_shared_fs: bool
Whether shared file system is avaliable
Whether shared file system is avaliable.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
"""
Expand Down Expand Up @@ -278,10 +366,11 @@ def remap_node_emb(emb_ntypes, node_emb_dir,
"data_file_path": os.path.join(input_emb_dir, emb_file),
"nid_path": os.path.join(input_emb_dir, nid_file),
"ntype": ntype,
"data_col_key": "emb",
"data_col_key": GS_REMAP_EMBED_COL,
"output_fname_prefix": os.path.join(out_embdir, \
f"{emb_file[:emb_file.rindex('.')]}"),
"chunk_size": out_chunk_size,
"output_func": output_func,
"preserve_input": preserve_input
})

Expand All @@ -290,7 +379,7 @@ def remap_node_emb(emb_ntypes, node_emb_dir,
def remap_node_pred(pred_ntypes, pred_dir,
output_dir, out_chunk_size,
num_proc, rank, world_size, with_shared_fs,
preserve_input=False):
output_func, preserve_input=False):
""" Remap node prediction result.
The function wil iterate all the node types that
Expand Down Expand Up @@ -327,7 +416,9 @@ def remap_node_pred(pred_ntypes, pred_dir,
world_size: int
The total number of processes in the cluster.
with_shared_fs: bool
Whether shared file system is avaliable
Whether shared file system is avaliable.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
"""
Expand Down Expand Up @@ -365,11 +456,12 @@ def remap_node_pred(pred_ntypes, pred_dir,
"data_file_path": os.path.join(input_pred_dir, pred_file),
"nid_path": os.path.join(input_pred_dir, nid_file),
"ntype": ntype,
"data_col_key": "pred",
"data_col_key": GS_REMAP_PREDICTION_COL,
"output_fname_prefix": os.path.join(out_pred_dir, \
f"pred.{pred_file[:pred_file.rindex('.')]}"),
"chunk_size": out_chunk_size,
"preserve_input": preserve_input
"output_func": output_func,
"preserve_input": preserve_input,
})

multiprocessing_remap(task_list, num_proc, worker_remap_node_data)
Expand All @@ -381,7 +473,7 @@ def remap_node_pred(pred_ntypes, pred_dir,
def remap_edge_pred(pred_etypes, pred_dir,
output_dir, out_chunk_size,
num_proc, rank, world_size, with_shared_fs,
preserve_input=False):
output_func, preserve_input=False):
""" Remap edge prediction result.
The function will iterate all the edge types that
Expand Down Expand Up @@ -422,7 +514,9 @@ def remap_edge_pred(pred_etypes, pred_dir,
world_size: int
The total number of processes in the cluster.
with_shared_fs: bool
Whether shared file system is avaliable
Whether shared file system is avaliable.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
"""
Expand Down Expand Up @@ -480,6 +574,7 @@ def remap_edge_pred(pred_etypes, pred_dir,
"output_fname_prefix": os.path.join(out_pred_dir, \
f"pred.{pred_file[:pred_file.rindex('.')]}"),
"chunk_size": out_chunk_size,
"output_func": output_func,
"preserve_input": preserve_input
})

Expand Down Expand Up @@ -693,6 +788,27 @@ def main(args, gs_config_args):
return

num_proc = args.num_processes if args.num_processes > 0 else 1
col_name_map = None
if args.column_names is not None:
col_name_map = {}
# Load customized column names
for col_rename_pair in args.column_names:
# : has special meaning in Graph Database like Neptune
# Here, we use , as the delimiter.
orig_name, new_name = col_rename_pair.split(",")
assert orig_name in GS_REMAP_BUILTIN_COLS, \
f"Expect the original col name is in {GS_REMAP_BUILTIN_COLS}, " \
f"but get {orig_name}"
col_name_map[orig_name] = new_name
if args.output_format == GS_OUTPUT_FORMAT_PARQUET:
output_func = partial(write_data_parquet_file,
col_name_map=col_name_map)
elif args.output_format == GS_OUTPUT_FORMAT_CSV:
output_func = partial(write_data_csv_file,
delimiter=args.output_delimiter,
col_name_map=col_name_map)
else:
raise TypeError(f"Output format not supported {args.output_format}")

if len(emb_ntypes) > 0:
emb_output = node_emb_dir
Expand All @@ -705,6 +821,7 @@ def main(args, gs_config_args):
rank,
world_size,
with_shared_fs,
output_func,
args.preserve_input)

if len(pred_etypes) > 0:
Expand All @@ -718,6 +835,7 @@ def main(args, gs_config_args):
rank,
world_size,
with_shared_fs,
output_func,
args.preserve_input)

if len(pred_ntypes) > 0:
Expand All @@ -731,6 +849,7 @@ def main(args, gs_config_args):
rank,
world_size,
with_shared_fs,
output_func,
args.preserve_input)


Expand Down Expand Up @@ -786,9 +905,17 @@ def generate_parser():
default=None,
help="The directory storing the node embeddings.")
group.add_argument("--output-format", type=str,
default="parquet",
choices=['parquet'],
default=GS_OUTPUT_FORMAT_PARQUET,
choices=[GS_OUTPUT_FORMAT_PARQUET, GS_OUTPUT_FORMAT_CSV],
help="The format of the output.")
group.add_argument("--output-delimiter", type=str, default=",",
help="The delimiter used when saving data in CSV format.")
group.add_argument("--column-names", type=str, nargs="+", default=None,
help="Defines how to rename default column names to new names."
f"For example, given --column-names {GS_REMAP_NID_COL},~id "
f"{GS_REMAP_EMBED_COL},embedding. The column "
f"{GS_REMAP_NID_COL} will be renamed to ~id. "
f"The column {GS_REMAP_EMBED_COL} will be renamed to embedding.")
group.add_argument("--logging-level", type=str, default="info",
help="The logging level. The possible values: debug, info, warning, \
error. The default value is info.")
Expand Down
Loading

0 comments on commit 529c1a1

Please sign in to comment.