diff --git a/python/graphstorm/gconstruct/construct_graph.py b/python/graphstorm/gconstruct/construct_graph.py index a28582e59f..0bfdc66acc 100644 --- a/python/graphstorm/gconstruct/construct_graph.py +++ b/python/graphstorm/gconstruct/construct_graph.py @@ -71,7 +71,7 @@ def prepare_node_data(in_file, feat_ops, read_file): return feat_info -def parse_node_data(in_file, feat_ops, label_ops, node_id_col, read_file): +def parse_node_data(in_file, feat_ops, label_ops, node_id_col, read_file, ext_mem): """ Parse node data. The function parses a node file that contains node IDs, features and labels @@ -90,13 +90,15 @@ def parse_node_data(in_file, feat_ops, label_ops, node_id_col, read_file): The column name that contains the node ID. read_file : callable The function to read the node file + ext_mem: str + The address of external memory for multi-column feature Returns ------- tuple : node ID array and a dict of node feature tensors. """ data = read_file(in_file) - feat_data = process_features(data, feat_ops) if feat_ops is not None else {} + feat_data = process_features(data, feat_ops, ext_mem) if feat_ops is not None else {} if label_ops is not None: label_data = process_labels(data, label_ops) for key, val in label_data.items(): @@ -131,7 +133,7 @@ def prepare_edge_data(in_file, feat_ops, read_file): return feat_info def parse_edge_data(in_file, feat_ops, label_ops, node_id_map, read_file, - conf, skip_nonexist_edges): + conf, skip_nonexist_edges, ext_mem): """ Parse edge data. The function parses an edge file that contains the source and destination node @@ -167,7 +169,7 @@ def parse_edge_data(in_file, feat_ops, label_ops, node_id_map, read_file, edge_type = conf['relation'] data = read_file(in_file) - feat_data = process_features(data, feat_ops) if feat_ops is not None else {} + feat_data = process_features(data, feat_ops, ext_mem) if feat_ops is not None else {} if label_ops is not None: label_data = process_labels(data, label_ops) for key, val in label_data.items(): @@ -230,7 +232,7 @@ def _process_data(user_pre_parser, user_parser, return return_dict -def process_node_data(process_confs, arr_merger, remap_id, num_processes=1): +def process_node_data(process_confs, arr_merger, remap_id, ext_mem, num_processes=1): """ Process node data We need to process all node data before we can process edge data. @@ -306,7 +308,8 @@ def process_node_data(process_confs, arr_merger, remap_id, num_processes=1): user_parser = partial(parse_node_data, feat_ops=feat_ops, label_ops=label_ops, node_id_col=node_id_col, - read_file=read_file) + read_file=read_file, + ext_mem=ext_mem) return_dict = _process_data(user_pre_parser, user_parser, @@ -400,7 +403,7 @@ def process_node_data(process_confs, arr_merger, remap_id, num_processes=1): return (node_id_map, node_data, label_stats) def process_edge_data(process_confs, node_id_map, arr_merger, - num_processes=1, + ext_mem, num_processes=1, skip_nonexist_edges=False): """ Process edge data @@ -483,7 +486,8 @@ def process_edge_data(process_confs, node_id_map, arr_merger, node_id_map=id_map, read_file=read_file, conf=process_conf, - skip_nonexist_edges=skip_nonexist_edges) + skip_nonexist_edges=skip_nonexist_edges, + ext_mem=ext_mem) return_dict = _process_data(user_pre_parser, user_parser, @@ -659,12 +663,13 @@ def process_graph(args): node_id_map, node_data, node_label_stats = \ process_node_data(process_confs['nodes'], convert2ext_mem, - args.remap_node_id, - num_processes=num_processes_for_nodes) + args.remap_node_id, ext_mem_workspace, + num_processes=num_processes_for_nodes + ) sys_tracker.check('Process the node data') edges, edge_data, edge_label_stats = \ process_edge_data(process_confs['edges'], node_id_map, - convert2ext_mem, + convert2ext_mem, ext_mem_workspace, num_processes=num_processes_for_edges, skip_nonexist_edges=args.skip_nonexist_edges) sys_tracker.check('Process the edge data') diff --git a/python/graphstorm/gconstruct/transform.py b/python/graphstorm/gconstruct/transform.py index 8ab6bb5c9b..4356714a8e 100644 --- a/python/graphstorm/gconstruct/transform.py +++ b/python/graphstorm/gconstruct/transform.py @@ -1069,7 +1069,7 @@ def preprocess_features(data, ops): return pre_data -def process_features(data, ops): +def process_features(data, ops, ext_mem=None): """ Process the data with the specified operations. This function runs the input operations on the corresponding data @@ -1081,6 +1081,8 @@ def process_features(data, ops): The data stored as a dict. ops : list of FeatTransform The operations that transform features. + ext_mem: str + The address of external memory Returns ------- @@ -1088,7 +1090,8 @@ def process_features(data, ops): """ new_data = {} for op in ops: - feature_path = 'feature_{}'.format(op.feat_name) + feature_path = ext_mem + 'feature_intermediate/feature_{}'\ + .format(op.feat_name) if os.path.exists(feature_path): shutil.rmtree(feature_path) if isinstance(op.col_name, str): @@ -1110,7 +1113,8 @@ def process_features(data, ops): new_data[key] = val else: tmp_key = key - feature_path = 'feature_{}'.format(op.feat_name) + assert ext_mem is not None, \ + "external memory is necessary for multiple column" if not os.path.exists(feature_path): os.makedirs(feature_path) wrapper = ExtFeatureWrapper(feature_path, val.shape, val.dtype) diff --git a/python/graphstorm/gconstruct/utils.py b/python/graphstorm/gconstruct/utils.py index 6090d36688..eb0ec6573f 100644 --- a/python/graphstorm/gconstruct/utils.py +++ b/python/graphstorm/gconstruct/utils.py @@ -613,11 +613,9 @@ def merge(self): out_arr[:, col_start:col_end] = arr col_start = col_end - print("out_arr:", out_arr) out_arr.flush() del out_arr - - + def _merge_arrs(arrs, tensor_path): """ Merge the arrays.