diff --git a/python/graphstorm/gconstruct/construct_graph.py b/python/graphstorm/gconstruct/construct_graph.py index ae563312e0..0065da5403 100644 --- a/python/graphstorm/gconstruct/construct_graph.py +++ b/python/graphstorm/gconstruct/construct_graph.py @@ -199,7 +199,8 @@ def parse_edge_data(in_file, feat_ops, label_ops, node_id_map, read_file, def _process_data(user_pre_parser, user_parser, two_phase_feat_ops, - in_files, num_proc, task_info): + in_files, num_proc, task_info, + ext_mem_workspace): """ Process node and edge data. Parameter @@ -216,10 +217,13 @@ def _process_data(user_pre_parser, user_parser, Number of processes to do processing. task_info: str Task meta info for debugging. + ext_mem_workspace : str + The path of the external-memory work space. """ if len(two_phase_feat_ops) > 0: pre_parse_start = time.time() - phase_one_ret = multiprocessing_data_read(in_files, num_proc, user_pre_parser) + phase_one_ret = multiprocessing_data_read(in_files, num_proc, user_pre_parser, + ext_mem_workspace) update_two_phase_feat_ops(phase_one_ret, two_phase_feat_ops) dur = time.time() - pre_parse_start @@ -227,14 +231,16 @@ def _process_data(user_pre_parser, user_parser, task_info, dur) start = time.time() - return_dict = multiprocessing_data_read(in_files, num_proc, user_parser) + return_dict = multiprocessing_data_read(in_files, num_proc, user_parser, + ext_mem_workspace) dur = time.time() - start logging.debug("Processing data files for %s takes %.3f seconds.", task_info, dur) return return_dict -def process_node_data(process_confs, arr_merger, remap_id, ext_mem=None, num_processes=1): +def process_node_data(process_confs, arr_merger, remap_id, + ext_mem_workspace=None, num_processes=1): """ Process node data We need to process all node data before we can process edge data. @@ -272,8 +278,8 @@ def process_node_data(process_confs, arr_merger, remap_id, ext_mem=None, num_pro Whether or not to remap node IDs num_processes: int The number of processes to process the input files. - ext_mem: str or None - The address of external memory for multi-column feature + ext_mem_workspace: str or None + The path of external-memory work space for multi-column features Returns ------- @@ -313,14 +319,17 @@ def process_node_data(process_confs, arr_merger, remap_id, ext_mem=None, num_pro label_ops=label_ops, node_id_col=node_id_col, read_file=read_file, - ext_mem=ext_mem) + ext_mem=ext_mem_workspace) + ext_mem_workspace_type = os.path.join(ext_mem_workspace, node_type) \ + if ext_mem_workspace is not None else None return_dict = _process_data(user_pre_parser, user_parser, two_phase_feat_ops, in_files, num_proc, - f"node {node_type}") + f"node {node_type}", + ext_mem_workspace_type) type_node_id_map = [None] * len(return_dict) type_node_data = {} for i, (node_ids, data) in return_dict.items(): @@ -407,7 +416,7 @@ def process_node_data(process_confs, arr_merger, remap_id, ext_mem=None, num_pro return (node_id_map, node_data, label_stats) def process_edge_data(process_confs, node_id_map, arr_merger, - ext_mem=None, num_processes=1, + ext_mem_workspace=None, num_processes=1, skip_nonexist_edges=False): """ Process edge data @@ -446,8 +455,8 @@ def process_edge_data(process_confs, node_id_map, arr_merger, The number of processes to process the input files. skip_nonexist_edges : bool Whether or not to skip edges that don't exist. - ext_mem: str or None - The address of external memory for multi-column feature + ext_mem_workspace: str or None + The path of external-memory work space for multi-column features Returns ------- @@ -493,14 +502,17 @@ def process_edge_data(process_confs, node_id_map, arr_merger, read_file=read_file, conf=process_conf, skip_nonexist_edges=skip_nonexist_edges, - ext_mem=ext_mem) + ext_mem=ext_mem_workspace) + ext_mem_workspace_type = os.path.join(ext_mem_workspace, "_".join(edge_type)) \ + if ext_mem_workspace is not None else None return_dict = _process_data(user_pre_parser, user_parser, two_phase_feat_ops, in_files, num_proc, - f"edge {edge_type}") + f"edge {edge_type}", + ext_mem_workspace_type) type_src_ids = [None] * len(return_dict) type_dst_ids = [None] * len(return_dict) type_edge_data = {} diff --git a/python/graphstorm/gconstruct/utils.py b/python/graphstorm/gconstruct/utils.py index 7e32aa8b54..7ed99dcc03 100644 --- a/python/graphstorm/gconstruct/utils.py +++ b/python/graphstorm/gconstruct/utils.py @@ -36,6 +36,39 @@ SHARED_MEM_OBJECT_THRESHOLD = 1.9 * 1024 * 1024 * 1024 # must < 2GB SHARED_MEMORY_CROSS_PROCESS_STORAGE = "shared_memory" PICKLE_CROSS_PROCESS_STORAGE = "pickle" +EXT_MEMORY_STORAGE = "ext_memory" + +def _to_ext_memory(name, data, path): + if isinstance(data, np.ndarray): + assert name is not None + path = os.path.join(path, f"{name}.npy") + if len(data) > 0: + logging.debug("save data %s in %s.", name, path) + data = convert_to_ext_mem_numpy(path, data) + # We need to pass the array to another process. We don't want it + # to reference to data in the file. + data.cleanup() + return data + elif isinstance(data, dict): + new_data = {} + for key, val in data.items(): + new_data[key] = _to_ext_memory(key, val, + os.path.join(path, name) if name is not None else path) + return new_data + elif isinstance(data, list): + new_data = [] + for i, val in enumerate(data): + new_data.append(_to_ext_memory(f"item-{i}", val, + os.path.join(path, name) if name is not None else path)) + return new_data + elif isinstance(data, tuple): + new_data = [] + for i, val in enumerate(list(data)): + new_data.append(_to_ext_memory(f"item-{i}", val, + os.path.join(path, name) if name is not None else path)) + return tuple(new_data) + else: + return data def _to_shared_memory(data): """ Move all tensor objects into torch shared memory @@ -149,7 +182,7 @@ def generate_hash(): random_uuid = uuid.uuid4() return str(random_uuid) -def worker_fn(worker_id, task_queue, res_queue, user_parser): +def worker_fn(worker_id, task_queue, res_queue, user_parser, ext_mem_workspace): """ The worker function in the worker pool Parameters @@ -163,6 +196,8 @@ def worker_fn(worker_id, task_queue, res_queue, user_parser): communication between the worker processes and the master process. user_parser : callable The user-defined function to read and process the data files. + ext_mem_workspace : str + The path of the external-memory work space. """ # We need to set a GPU device for each worker process in case that # some transformations (e.g., computing BERT embeddings) require GPU computation. @@ -180,8 +215,10 @@ def worker_fn(worker_id, task_queue, res_queue, user_parser): logging.debug("%d Processing %s", worker_id, in_file) data = user_parser(in_file) size = _estimate_sizeof(data) + if ext_mem_workspace is not None: + data = (EXT_MEMORY_STORAGE, _to_ext_memory(f"file-{i}", data, ext_mem_workspace)) # Max pickle obj size is 2 GByte - if size > SHARED_MEM_OBJECT_THRESHOLD: + elif size > SHARED_MEM_OBJECT_THRESHOLD: # Use torch shared memory as a workaround # This will consume shared memory and cause an additional # data copy, i.e., general memory to torch shared memory. @@ -218,7 +255,7 @@ def update_two_phase_feat_ops(phase_one_info, ops): if op.feat_name in feat_info: op.update_info(feat_info[op.feat_name]) -def multiprocessing_data_read(in_files, num_processes, user_parser): +def multiprocessing_data_read(in_files, num_processes, user_parser, ext_mem_workspace=None): """ Read data from multiple files with multiprocessing. It creates a set of worker processes, each of which runs a worker function. @@ -238,6 +275,8 @@ def multiprocessing_data_read(in_files, num_processes, user_parser): The number of processes that run in parallel. user_parser : callable The user-defined function to read and process the data files. + ext_mem_workspace : str + The path of the external-memory work space. Returns ------- @@ -252,7 +291,8 @@ def multiprocessing_data_read(in_files, num_processes, user_parser): for i, in_file in enumerate(in_files): task_queue.put((i, in_file)) for i in range(num_processes): - proc = Process(target=worker_fn, args=(i, task_queue, res_queue, user_parser)) + proc = Process(target=worker_fn, args=(i, task_queue, res_queue, user_parser, + ext_mem_workspace)) proc.start() processes.append(proc) @@ -282,6 +322,8 @@ def multiprocessing_data_read(in_files, num_processes, user_parser): return_dict = {} for i, in_file in enumerate(in_files): return_dict[i] = user_parser(in_file) + if ext_mem_workspace is not None: + return_dict[i] = _to_ext_memory(f"file-{i}", return_dict[i], ext_mem_workspace) return return_dict def worker_fn_no_return(worker_id, task_queue, func): @@ -675,10 +717,7 @@ def append(self, feature): if isinstance(feature, np.ndarray): hash_hex = generate_hash() path = self._directory_path + '/{}.npy'.format(hash_hex) - ext_val = np.memmap(path, feature.dtype, mode="w+", shape=feature.shape) - ext_val[:] = feature[:] - ext_val.flush() - feature = ExtNumpyWrapper(path, feature.shape, feature.dtype) + feature = convert_to_ext_mem_numpy(path, feature) self._wrapper.append(feature) def merge(self): @@ -790,13 +829,27 @@ def __call__(self, arrs, name): if len(arrs) > 1: return _merge_arrs(arrs, tensor_path) else: - # To get the output dtype or arrs - dtype = _get_arrs_out_dtype(arrs) - arr = arrs[0] - em_arr = np.memmap(tensor_path, dtype, mode="w+", shape=shape) - em_arr[:] = arr[:] - em_arr.flush() - return ExtNumpyWrapper(tensor_path, em_arr.shape, em_arr.dtype) + return convert_to_ext_mem_numpy(tensor_path, arrs[0]) + +def convert_to_ext_mem_numpy(tensor_path, arr): + """ Convert a numpy array to memory mapped array. + + Parameters + ---------- + tensor_path : str + The path of the file to store the Numpy array. + arr : Numpy array + The Numpy array + + Returns + ------- + ExtNumpyWrapper : the wrapper of the memory mapped array. + """ + os.makedirs(os.path.dirname(tensor_path), exist_ok=True) + em_arr = np.memmap(tensor_path, arr.dtype, mode="w+", shape=arr.shape) + em_arr[:] = arr[:] + em_arr.flush() + return ExtNumpyWrapper(tensor_path, em_arr.shape, em_arr.dtype) def save_maps(output_dir, fname, map_data): """ Save node id mapping or edge id mapping diff --git a/tests/unit-tests/gconstruct/test_gconstruct_utils.py b/tests/unit-tests/gconstruct/test_gconstruct_utils.py index c09daa5b87..b2472058b8 100644 --- a/tests/unit-tests/gconstruct/test_gconstruct_utils.py +++ b/tests/unit-tests/gconstruct/test_gconstruct_utils.py @@ -25,6 +25,7 @@ from graphstorm.gconstruct.utils import _estimate_sizeof, _to_numpy_array, _to_shared_memory from graphstorm.gconstruct.utils import HDF5Array, ExtNumpyWrapper +from graphstorm.gconstruct.utils import convert_to_ext_mem_numpy, _to_ext_memory from graphstorm.gconstruct.utils import multiprocessing_data_read from graphstorm.gconstruct.file_io import (write_data_hdf5, read_data_hdf5, @@ -175,9 +176,30 @@ def test_ext_mem_array(): with tempfile.TemporaryDirectory() as tmpdirname: data = np.random.uniform(size=(1000, 10)).astype(np.float32) tensor_path = os.path.join(tmpdirname, "tmp1.npy") - out_arr = np.memmap(tensor_path, np.float32, mode="w+", shape=(1000, 10)) - out_arr[:] = data - check_ext_mem_array(ExtNumpyWrapper(tensor_path, out_arr.shape, out_arr.dtype), data) + check_ext_mem_array(convert_to_ext_mem_numpy(tensor_path, data), data) + + data1 = np.random.uniform(size=(1000, 10)).astype(np.float32) + data2 = np.random.uniform(size=(1000,)).astype(np.float32) + data3 = np.random.uniform(size=(1000, 10)).astype(np.float32) + data4 = np.random.uniform(size=(1000,)).astype(np.float32) + arr_dict = { + "test1": (data1, data2), + "test2": [data3, data4], + } + arr_dict1 = _to_ext_memory(None, arr_dict, tmpdirname) + assert isinstance(arr_dict1, dict) + assert "test1" in arr_dict1 + assert "test2" in arr_dict1 + assert isinstance(arr_dict1["test1"], tuple) + assert isinstance(arr_dict1["test2"], list) + assert isinstance(arr_dict1["test1"][0], ExtNumpyWrapper) + assert isinstance(arr_dict1["test1"][1], ExtNumpyWrapper) + assert isinstance(arr_dict1["test2"][0], ExtNumpyWrapper) + assert isinstance(arr_dict1["test2"][1], ExtNumpyWrapper) + assert np.all(arr_dict1["test1"][0].to_numpy() == data1) + assert np.all(arr_dict1["test1"][1].to_numpy() == data2) + assert np.all(arr_dict1["test2"][0].to_numpy() == data3) + assert np.all(arr_dict1["test2"][1].to_numpy() == data4) tensor_path = os.path.join(tmpdirname, "tmp2.hdf5") write_data_hdf5({"test": data}, tensor_path) @@ -281,7 +303,7 @@ def test_get_in_files(): test_read_empty_parquet() test_read_empty_json() test_read_empty_csv() - test_multiprocessing_read() test_estimate_sizeof() test_object_conversion() test_ext_mem_array() + test_multiprocessing_read()