Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use external-memory arrays in the worker processes of gconstruct #639

Merged
merged 14 commits into from
Nov 28, 2023
38 changes: 25 additions & 13 deletions python/graphstorm/gconstruct/construct_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ def parse_edge_data(in_file, feat_ops, label_ops, node_id_map, read_file,

def _process_data(user_pre_parser, user_parser,
two_phase_feat_ops,
in_files, num_proc, task_info):
in_files, num_proc, task_info,
ext_mem_workspace):
""" Process node and edge data.

Parameter
Expand All @@ -216,25 +217,30 @@ def _process_data(user_pre_parser, user_parser,
Number of processes to do processing.
task_info: str
Task meta info for debugging.
ext_mem_workspace : str
The path of the external-memory work space.
"""
if len(two_phase_feat_ops) > 0:
pre_parse_start = time.time()
phase_one_ret = multiprocessing_data_read(in_files, num_proc, user_pre_parser)
phase_one_ret = multiprocessing_data_read(in_files, num_proc, user_pre_parser,
ext_mem_workspace)
update_two_phase_feat_ops(phase_one_ret, two_phase_feat_ops)

dur = time.time() - pre_parse_start
logging.debug("Preprocessing data files for %s takes %.3f seconds.",
task_info, dur)

start = time.time()
return_dict = multiprocessing_data_read(in_files, num_proc, user_parser)
return_dict = multiprocessing_data_read(in_files, num_proc, user_parser,
classicsong marked this conversation as resolved.
Show resolved Hide resolved
ext_mem_workspace)
dur = time.time() - start
logging.debug("Processing data files for %s takes %.3f seconds.",
task_info, dur)
return return_dict


def process_node_data(process_confs, arr_merger, remap_id, ext_mem=None, num_processes=1):
def process_node_data(process_confs, arr_merger, remap_id,
ext_mem_workspace=None, num_processes=1):
""" Process node data

We need to process all node data before we can process edge data.
Expand Down Expand Up @@ -272,8 +278,8 @@ def process_node_data(process_confs, arr_merger, remap_id, ext_mem=None, num_pro
Whether or not to remap node IDs
num_processes: int
The number of processes to process the input files.
ext_mem: str or None
The address of external memory for multi-column feature
ext_mem_workspace: str or None
The path of external-memory work space for multi-column features

Returns
-------
Expand Down Expand Up @@ -313,14 +319,17 @@ def process_node_data(process_confs, arr_merger, remap_id, ext_mem=None, num_pro
label_ops=label_ops,
node_id_col=node_id_col,
read_file=read_file,
ext_mem=ext_mem)
ext_mem=ext_mem_workspace)

ext_mem_workspace_type = os.path.join(ext_mem_workspace, node_type) \
zheng-da marked this conversation as resolved.
Show resolved Hide resolved
if ext_mem_workspace is not None else None
return_dict = _process_data(user_pre_parser,
user_parser,
two_phase_feat_ops,
in_files,
num_proc,
f"node {node_type}")
f"node {node_type}",
ext_mem_workspace_type)
type_node_id_map = [None] * len(return_dict)
type_node_data = {}
for i, (node_ids, data) in return_dict.items():
Expand Down Expand Up @@ -407,7 +416,7 @@ def process_node_data(process_confs, arr_merger, remap_id, ext_mem=None, num_pro
return (node_id_map, node_data, label_stats)

def process_edge_data(process_confs, node_id_map, arr_merger,
ext_mem=None, num_processes=1,
ext_mem_workspace=None, num_processes=1,
skip_nonexist_edges=False):
""" Process edge data

Expand Down Expand Up @@ -446,8 +455,8 @@ def process_edge_data(process_confs, node_id_map, arr_merger,
The number of processes to process the input files.
skip_nonexist_edges : bool
Whether or not to skip edges that don't exist.
ext_mem: str or None
The address of external memory for multi-column feature
ext_mem_workspace: str or None
The path of external-memory work space for multi-column features

Returns
-------
Expand Down Expand Up @@ -493,14 +502,17 @@ def process_edge_data(process_confs, node_id_map, arr_merger,
read_file=read_file,
conf=process_conf,
skip_nonexist_edges=skip_nonexist_edges,
ext_mem=ext_mem)
ext_mem=ext_mem_workspace)

ext_mem_workspace_type = os.path.join(ext_mem_workspace, "_".join(edge_type)) \
zheng-da marked this conversation as resolved.
Show resolved Hide resolved
if ext_mem_workspace is not None else None
return_dict = _process_data(user_pre_parser,
user_parser,
two_phase_feat_ops,
in_files,
num_proc,
f"edge {edge_type}")
f"edge {edge_type}",
ext_mem_workspace_type)
type_src_ids = [None] * len(return_dict)
type_dst_ids = [None] * len(return_dict)
type_edge_data = {}
Expand Down
89 changes: 72 additions & 17 deletions python/graphstorm/gconstruct/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,39 @@
SHARED_MEM_OBJECT_THRESHOLD = 1.9 * 1024 * 1024 * 1024 # must < 2GB
SHARED_MEMORY_CROSS_PROCESS_STORAGE = "shared_memory"
PICKLE_CROSS_PROCESS_STORAGE = "pickle"
EXT_MEMORY_STORAGE = "ext_memory"

def _to_ext_memory(name, data, path):
if isinstance(data, np.ndarray):
assert name is not None
path = os.path.join(path, f"{name}.npy")
if len(data) > 0:
logging.debug("save data %s in %s.", name, path)
data = convert_to_ext_mem_numpy(path, data)
# We need to pass the array to another process. We don't want it
# to reference to data in the file.
data.cleanup()
return data
elif isinstance(data, dict):
new_data = {}
for key, val in data.items():
new_data[key] = _to_ext_memory(key, val,
os.path.join(path, name) if name is not None else path)
return new_data
elif isinstance(data, list):
new_data = []
for i, val in enumerate(data):
new_data.append(_to_ext_memory(f"item-{i}", val,
os.path.join(path, name) if name is not None else path))
return new_data
elif isinstance(data, tuple):
new_data = []
for i, val in enumerate(list(data)):
new_data.append(_to_ext_memory(f"item-{i}", val,
os.path.join(path, name) if name is not None else path))
return tuple(new_data)
else:
return data

def _to_shared_memory(data):
""" Move all tensor objects into torch shared memory
Expand Down Expand Up @@ -149,7 +182,7 @@ def generate_hash():
random_uuid = uuid.uuid4()
return str(random_uuid)

def worker_fn(worker_id, task_queue, res_queue, user_parser):
def worker_fn(worker_id, task_queue, res_queue, user_parser, ext_mem_workspace):
""" The worker function in the worker pool

Parameters
Expand All @@ -163,6 +196,8 @@ def worker_fn(worker_id, task_queue, res_queue, user_parser):
communication between the worker processes and the master process.
user_parser : callable
The user-defined function to read and process the data files.
ext_mem_workspace : str
The path of the external-memory work space.
"""
# We need to set a GPU device for each worker process in case that
# some transformations (e.g., computing BERT embeddings) require GPU computation.
Expand All @@ -180,8 +215,10 @@ def worker_fn(worker_id, task_queue, res_queue, user_parser):
logging.debug("%d Processing %s", worker_id, in_file)
data = user_parser(in_file)
size = _estimate_sizeof(data)
if ext_mem_workspace is not None:
data = (EXT_MEMORY_STORAGE, _to_ext_memory(f"file-{i}", data, ext_mem_workspace))
# Max pickle obj size is 2 GByte
if size > SHARED_MEM_OBJECT_THRESHOLD:
elif size > SHARED_MEM_OBJECT_THRESHOLD:
# Use torch shared memory as a workaround
# This will consume shared memory and cause an additional
# data copy, i.e., general memory to torch shared memory.
Expand Down Expand Up @@ -218,7 +255,7 @@ def update_two_phase_feat_ops(phase_one_info, ops):
if op.feat_name in feat_info:
op.update_info(feat_info[op.feat_name])

def multiprocessing_data_read(in_files, num_processes, user_parser):
def multiprocessing_data_read(in_files, num_processes, user_parser, ext_mem_workspace=None):
""" Read data from multiple files with multiprocessing.

It creates a set of worker processes, each of which runs a worker function.
Expand All @@ -238,6 +275,8 @@ def multiprocessing_data_read(in_files, num_processes, user_parser):
The number of processes that run in parallel.
user_parser : callable
The user-defined function to read and process the data files.
ext_mem_workspace : str
The path of the external-memory work space.

Returns
-------
Expand All @@ -252,7 +291,8 @@ def multiprocessing_data_read(in_files, num_processes, user_parser):
for i, in_file in enumerate(in_files):
task_queue.put((i, in_file))
for i in range(num_processes):
proc = Process(target=worker_fn, args=(i, task_queue, res_queue, user_parser))
proc = Process(target=worker_fn, args=(i, task_queue, res_queue, user_parser,
ext_mem_workspace))
proc.start()
processes.append(proc)

Expand Down Expand Up @@ -282,6 +322,8 @@ def multiprocessing_data_read(in_files, num_processes, user_parser):
return_dict = {}
for i, in_file in enumerate(in_files):
return_dict[i] = user_parser(in_file)
if ext_mem_workspace is not None:
return_dict[i] = _to_ext_memory(f"file-{i}", return_dict[i], ext_mem_workspace)
return return_dict

def worker_fn_no_return(worker_id, task_queue, func):
Expand Down Expand Up @@ -579,7 +621,8 @@ def __getitem__(self, idx):
def cleanup(self):
""" Clean up the array.
"""
self._arr.flush()
if self._arr is not None:
self._arr.flush()
self._arr = None

def to_numpy(self):
Expand Down Expand Up @@ -636,7 +679,8 @@ def cleanup(self):
# Expected file structure:
# merged_file file_feature1 file_feature2
# rmtree will clean up all single feature files as ExtNumpyWrapper does not clean them up
self._arr.flush()
if self._arr is not None:
self._arr.flush()
self._arr = None
shutil.rmtree(self._directory_path)

Expand Down Expand Up @@ -673,10 +717,7 @@ def append(self, feature):
if isinstance(feature, np.ndarray):
hash_hex = generate_hash()
path = self._directory_path + '/{}.npy'.format(hash_hex)
ext_val = np.memmap(path, feature.dtype, mode="w+", shape=feature.shape)
ext_val[:] = feature[:]
ext_val.flush()
feature = ExtNumpyWrapper(path, feature.shape, feature.dtype)
feature = convert_to_ext_mem_numpy(path, feature)
self._wrapper.append(feature)

def merge(self):
Expand Down Expand Up @@ -788,13 +829,27 @@ def __call__(self, arrs, name):
if len(arrs) > 1:
return _merge_arrs(arrs, tensor_path)
else:
# To get the output dtype or arrs
dtype = _get_arrs_out_dtype(arrs)
arr = arrs[0]
em_arr = np.memmap(tensor_path, dtype, mode="w+", shape=shape)
em_arr[:] = arr[:]
em_arr.flush()
return ExtNumpyWrapper(tensor_path, em_arr.shape, em_arr.dtype)
return convert_to_ext_mem_numpy(tensor_path, arrs[0])

def convert_to_ext_mem_numpy(tensor_path, arr):
""" Convert a numpy array to memory mapped array.

Parameters
----------
tensor_path : str
The path of the file to store the Numpy array.
arr : Numpy array
The Numpy array

Returns
-------
ExtNumpyWrapper : the wrapper of the memory mapped array.
"""
os.makedirs(os.path.dirname(tensor_path), exist_ok=True)
em_arr = np.memmap(tensor_path, arr.dtype, mode="w+", shape=arr.shape)
em_arr[:] = arr[:]
em_arr.flush()
return ExtNumpyWrapper(tensor_path, em_arr.shape, em_arr.dtype)

def save_maps(output_dir, fname, map_data):
""" Save node id mapping or edge id mapping
Expand Down
30 changes: 26 additions & 4 deletions tests/unit-tests/gconstruct/test_gconstruct_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from graphstorm.gconstruct.utils import _estimate_sizeof, _to_numpy_array, _to_shared_memory
from graphstorm.gconstruct.utils import HDF5Array, ExtNumpyWrapper
from graphstorm.gconstruct.utils import convert_to_ext_mem_numpy, _to_ext_memory
from graphstorm.gconstruct.utils import multiprocessing_data_read
from graphstorm.gconstruct.file_io import (write_data_hdf5,
read_data_hdf5,
Expand Down Expand Up @@ -175,9 +176,30 @@ def test_ext_mem_array():
with tempfile.TemporaryDirectory() as tmpdirname:
data = np.random.uniform(size=(1000, 10)).astype(np.float32)
tensor_path = os.path.join(tmpdirname, "tmp1.npy")
out_arr = np.memmap(tensor_path, np.float32, mode="w+", shape=(1000, 10))
out_arr[:] = data
check_ext_mem_array(ExtNumpyWrapper(tensor_path, out_arr.shape, out_arr.dtype), data)
check_ext_mem_array(convert_to_ext_mem_numpy(tensor_path, data), data)

data1 = np.random.uniform(size=(1000, 10)).astype(np.float32)
data2 = np.random.uniform(size=(1000,)).astype(np.float32)
data3 = np.random.uniform(size=(1000, 10)).astype(np.float32)
data4 = np.random.uniform(size=(1000,)).astype(np.float32)
arr_dict = {
"test1": (data1, data2),
"test2": [data3, data4],
}
arr_dict1 = _to_ext_memory(None, arr_dict, tmpdirname)
assert isinstance(arr_dict1, dict)
assert "test1" in arr_dict1
assert "test2" in arr_dict1
assert isinstance(arr_dict1["test1"], tuple)
assert isinstance(arr_dict1["test2"], list)
assert isinstance(arr_dict1["test1"][0], ExtNumpyWrapper)
assert isinstance(arr_dict1["test1"][1], ExtNumpyWrapper)
assert isinstance(arr_dict1["test2"][0], ExtNumpyWrapper)
assert isinstance(arr_dict1["test2"][1], ExtNumpyWrapper)
assert np.all(arr_dict1["test1"][0].to_numpy() == data1)
assert np.all(arr_dict1["test1"][1].to_numpy() == data2)
assert np.all(arr_dict1["test2"][0].to_numpy() == data3)
assert np.all(arr_dict1["test2"][1].to_numpy() == data4)

tensor_path = os.path.join(tmpdirname, "tmp2.hdf5")
write_data_hdf5({"test": data}, tensor_path)
Expand Down Expand Up @@ -281,7 +303,7 @@ def test_get_in_files():
test_read_empty_parquet()
test_read_empty_json()
test_read_empty_csv()
test_multiprocessing_read()
test_estimate_sizeof()
test_object_conversion()
test_ext_mem_array()
test_multiprocessing_read()
Loading