Skip to content

Commit

Permalink
[GConstruct] Allow parquet file as input for customized edge split (#798
Browse files Browse the repository at this point in the history
)

*Issue #, if available:*

*Description of changes:*


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: runjie <[email protected]>
Co-authored-by: xiang song(charlie.song) <[email protected]>
  • Loading branch information
3 people authored Apr 19, 2024
1 parent 5b96dd8 commit b0a8c77
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 6 deletions.
39 changes: 37 additions & 2 deletions docs/source/tutorials/own-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ The examplary ACM graph also predifines two sets of labels. One set of labels ar

Customized label split
`````````````````````````
If users want to split labels with your own logics, e.g., time sequence, you can split labels first, and then provide the split information in the configuration JSON file like the below example.
If users want to split labels with your own logics, e.g., time sequence, you can split labels first, and then provide the split information in the configuration JSON file or parquet file like the example below.
When using parquet files as input, please specify the column object you want to use for your labels. The column object here can be either a string or a list with a single string. When using parquet input, it allows input be either string or list of strings and it allows wildcard as the input.

JSON:

.. code-block:: json
Expand All @@ -166,6 +169,21 @@ If users want to split labels with your own logics, e.g., time sequence, you can
}
]
Parquet:

.. code-block:: json
"labels": [
{
"label_col": "label",
"task_type": "classification",
"custom_split_filenames": {"train": "/tmp/acm_raw/nodes/train_idx.parquet",
"valid": ["/tmp/acm_raw/nodes/val_idx_1.parquet", "/tmp/acm_raw/nodes/val_idx_2.parquet"],
"test": "/tmp/acm_raw/nodes/test_idx_*.parquet"
"column": "ID"}
}
]
Instead of using the ``split_pct``, users can specify the ``custom_split_filenames`` configuration with a value, which is a dictionary, to use custom data split. The dictionary's keys could include ``train``, ``valid``, and ``test``, and values of the dictionary are JSON files that contains node IDs in each set.

These JSON files only need to list the IDs on its own set. For example, in a node classification task, there are 100 nodes and node ID starts from 0, and assume the last 50 nodes (ID from 49 to 99) have labels associated. For some business logic, users want to have the first 10 of the 50 labeled nodes as training set, the last 30 as the test set, and the middle 10 as the validation set. Then the `train_idx.json` file should contain the integer from 50 to 59, and one integer per line. Similarly, the `val_idx.json` file should contain the integer from 60 to 69, and the `test_idx.json` file should contain the integer from 70 to 99. Contents of the `train_idx.json` file are like the followings.
Expand All @@ -178,7 +196,9 @@ These JSON files only need to list the IDs on its own set. For example, in a nod
...
59
For edge data, users can do the similar thing as defining customized node labels to define the customized edge labels. The configuration looks the same:
For edge data, users can do the similar thing as defining customized node labels to define the customized edge labels. The configuration looks same for JSON files, for parquet files, users need to specify both the source id column and destination id column in a list of strings:

JSON:

.. code-block:: json
Expand All @@ -192,6 +212,21 @@ For edge data, users can do the similar thing as defining customized node labels
}
]
Parquet:

.. code-block:: json
"labels": [
{
"label_col": "label",
"task_type": "classification",
"custom_split_filenames": {"train": "/tmp/acm_raw/edges/train_idx.parquet",
"valid": "/tmp/acm_raw/edges/val_idx.parquet",
"test": "/tmp/acm_raw/edges/test_idx.parquet",
"column": ["src", "dst"]}
}
]
The values of dictionary files should be json as well here. Each line of the json file should an array with the source node and destination node. For example, contents of `train_idx.json` should look like the following:

.. code-block:: yaml
Expand Down
95 changes: 95 additions & 0 deletions python/graphstorm/gconstruct/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,101 @@

from .utils import HDF5Handle, HDF5Array


def read_index(split_info):
""" Read the index from a JSON/parquet file.
Parameters
----------
split_info : dict
Customized split information
Returns
-------
tuple of numpy.ndarray
Returns a tuple containing three numpy arrays:
- First element: Data from the training split, if not available, [].
- Second element: Data from the validation split, if not available, [].
- Third element: Data from the test split, if not available, [].
If the file extension is not '.json' or '.parquet', a ValueError is raised.
"""
res = []
for idx in ['train', 'valid', 'test']:
if idx not in split_info:
res.append([])
continue
if isinstance(split_info[idx], str):
_, extension = os.path.splitext(split_info[idx])
else:
extensions = [os.path.splitext(path)[1] for path in split_info[idx]]
assert len(set(extensions)) == 1, f"each file should be in the same format, " \
f"but get {extensions}"
extension = extensions[0]

# Normalize the extension to ensure case insensitivity
extension = extension.lower()

# json files should be ended with .json and parquet files should be ended with parquet
if extension == '.json':
res.append(read_index_json(split_info[idx]))
elif extension == '.parquet':
# We should make sure there are multiple parquet files instead of one
res.append(read_index_parquet(split_info[idx], split_info['column']))
else:
raise ValueError(f"Expect mask data format be one of parquet "
f"and json, but get {extension}")
return res[0], res[1], res[2]


def expand_wildcard(data_files):
"""
Expand the wildcard to the actual file lists.
Parameters
----------
data_files : list[str]
The parquet files that contain the index.
"""
expanded_files = []
for item in data_files:
if '*' in item:
matched_files = glob.glob(item)
assert len(matched_files) > 0, \
f"There is no file matching {item} pattern"
expanded_files.extend(matched_files)
else:
expanded_files.append(item)
return expanded_files

def read_index_parquet(data_file, column):
"""
Read the index from a parquet file.
Parameters
----------
data_file : str or list[str]
The parquet file that contains the index.
column: list[str]
Column names on parquet which contain the index
"""
if isinstance(data_file, str):
data_file = [data_file]
data_file = expand_wildcard(data_file)
df_list = [pd.read_parquet(file) for file in data_file]
df = pd.concat(df_list, ignore_index=True)

if len(column) == 1:
res_array = df[column[0]].to_numpy()
elif len(df.columns) == 2:
res_array = list(zip(df[column[0]].to_numpy(), df[column[1]].to_numpy()))
else:
raise ValueError("The Parquet file on node mask must contain exactly one column, "
"and on edge mask must contain exactly two columns.")

return res_array

def read_index_json(data_file):
""" Read the index from a JSON file.
Expand Down
11 changes: 7 additions & 4 deletions python/graphstorm/gconstruct/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from transformers import AutoTokenizer
from transformers import AutoModel, AutoConfig

from .file_io import read_index_json
from .file_io import read_index
from .utils import ExtMemArrayWrapper, ExtFeatureWrapper, generate_hash

LABEL_STATS_FIELD = "training_label_stats"
Expand Down Expand Up @@ -1674,9 +1674,12 @@ def parse_label_ops(confs, is_node):
custom_split = label_conf['custom_split_filenames']
assert isinstance(custom_split, dict), \
"Custom data split needs to provide train/val/test index."
train_idx = read_index_json(custom_split['train']) if 'train' in custom_split else None
val_idx = read_index_json(custom_split['valid']) if 'valid' in custom_split else None
test_idx = read_index_json(custom_split['test']) if 'test' in custom_split else None
if "column" not in custom_split:
custom_split["column"] = []
# Treat all input as an input of list[str]
if isinstance(custom_split['column'], str):
custom_split["column"] = [custom_split["column"]]
train_idx, val_idx, test_idx = read_index(custom_split)
label_col = label_conf['label_col'] if 'label_col' in label_conf else None
if "node_id_col" in confs:
return [CustomLabelProcessor(col_name=label_col, label_name=label_col,
Expand Down
51 changes: 51 additions & 0 deletions tests/unit-tests/gconstruct/test_gconstruct_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
read_data_hdf5,
get_in_files,
write_data_parquet)
from graphstorm.gconstruct.file_io import read_index, write_index_json
from graphstorm.gconstruct.file_io import (read_data_csv,
read_data_json,
read_data_parquet)
Expand Down Expand Up @@ -450,6 +451,55 @@ def test_shuffle_hard_nids():
p1_etype1_neg0_shuffled)


def test_read_index():
write_index_json([(3, 3)], "/tmp/test_idx.json")
split_info = {"valid": "/tmp/test_idx.json"}
_, json_content, _ = read_index(split_info)
assert json_content == [(3, 3)]

write_index_json(np.arange(3), "/tmp/test_idx.json")
split_info = {"train": "/tmp/test_idx.json"}
json_content, _, _ = read_index(split_info)
assert json_content == [0, 1, 2]

data = ["p70", "p71", "p72", "p73", "p74", "p75"]
df = pd.DataFrame(data, columns=['ID'])
df.to_parquet('/tmp/test_idx.parquet')
split_info = {"train": "/tmp/test_idx.parquet", "column": ["ID"]}
parquet_content, _, _ = read_index(split_info)
assert np.array_equal(parquet_content, data)

data_multi = ["p702", "p712", "p722", "p732", "p742", "p752"]
df = pd.DataFrame(data_multi, columns=['ID'])
# test with wildcard
df.to_parquet('/tmp/test_idx_multi.parquet')
split_info = {"train": "/tmp/test_idx*.parquet", "column": ["ID"]}
parquet_content, _, _ = read_index(split_info)
assert np.array_equal(parquet_content, data + data_multi)
# test with list input
split_info = {"train": ["/tmp/test_idx.parquet", "/tmp/test_idx_multi.parquet"],
"column": ["ID"]}
parquet_content, _, _ = read_index(split_info)
assert np.array_equal(parquet_content, data + data_multi)

data, data2 = ["p1", "p2"], ["p3", "p4"]
df = pd.DataFrame({'src': data, 'dst': data2})
df.to_parquet('/tmp/train_idx.parquet')
split_info = {"train": "/tmp/train_idx.parquet", "column": ["src", "dst"]}
parquet_content, _, _ = read_index(split_info)
assert parquet_content == [("p1", "p3"), ("p2", "p4")]

data3, data4 = ["p5", "p6"], ["p7", "p8"]
df = pd.DataFrame({'src': data3, 'dst': data4})
df.to_parquet('/tmp/test_idx.parquet')
split_info = {"train": "/tmp/train_idx.parquet",
"test": "/tmp/test_idx.parquet", "column": ["src", "dst"]}
train_content, _, test_content = read_index(split_info)
assert train_content == [("p1", "p3"), ("p2", "p4")]
assert test_content == [("p5", "p7"), ("p6", "p8")]



if __name__ == '__main__':
test_shuffle_hard_nids()
test_save_load_maps()
Expand All @@ -462,3 +512,4 @@ def test_shuffle_hard_nids():
test_object_conversion()
test_ext_mem_array()
test_multiprocessing_read()
test_read_index()

0 comments on commit b0a8c77

Please sign in to comment.