From 20394ee77bf3efcaaae4e84ab15056f1008e72ff Mon Sep 17 00:00:00 2001 From: "xiang song(charlie.song)" Date: Fri, 1 Dec 2023 13:20:28 -0800 Subject: [PATCH] [Bug fix] Fix the read/delete contention bug when running distributed remaping result/emb (#672) *Issue #, if available:* When doing remap_result in a distributed way. It is possible that some processes are still collecting remap tasks (scanning the embedding files and prediction files) while others have finished the tasks and start removing processed files. This will cause an read/delete contention. *Description of changes:* By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --------- Co-authored-by: Xiang Song --- python/graphstorm/gconstruct/remap_result.py | 171 +++++++++++------- tests/end2end-tests/data_process/test.sh | 117 ++++++++++-- .../gconstruct/test_remap_result.py | 11 +- 3 files changed, 215 insertions(+), 84 deletions(-) diff --git a/python/graphstorm/gconstruct/remap_result.py b/python/graphstorm/gconstruct/remap_result.py index 7ad43c61ad..ec3f97da48 100644 --- a/python/graphstorm/gconstruct/remap_result.py +++ b/python/graphstorm/gconstruct/remap_result.py @@ -135,7 +135,7 @@ def write_data_csv_file(data, file_prefix, delimiter=",", col_name_map=None): data_frame.to_csv(output_fname, index=False, sep=delimiter) def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, - output_fname_prefix, chunk_size, output_func, preserve_input): + output_fname_prefix, chunk_size, output_func): """ Do one node prediction remapping task Parameters @@ -154,8 +154,6 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, Max number of raws per output file. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. """ node_data = th.load(data_file_path).numpy() nids = th.load(nid_path).numpy() @@ -171,14 +169,10 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, GS_REMAP_NID_COL: nid} output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}") - if preserve_input is False: - os.remove(data_file_path) - os.remove(nid_path) - def worker_remap_edge_pred(pred_file_path, src_nid_path, dst_nid_path, src_type, dst_type, output_fname_prefix, chunk_size, - output_func, preserve_input): + output_func): """ Do one edge remapping task Parameters @@ -199,8 +193,6 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path, Max number of raws per output file. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. """ pred_result = th.load(pred_file_path).numpy() src_nids = th.load(src_nid_path).numpy() @@ -220,11 +212,6 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path, output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}") - if preserve_input is False: - os.remove(pred_file_path) - os.remove(src_nid_path) - os.remove(dst_nid_path) - def _get_file_range(num_files, rank, world_size): """ Get the range of files to process by the current instance. @@ -259,11 +246,33 @@ def _get_file_range(num_files, rank, world_size): return start, end +def _remove_inputs(with_shared_fs, files_to_remove, + rank, world_size, work_dir): + if with_shared_fs is False: + # Not using shared file system. There is no contention. + # Each process will remove the files itself + for file in files_to_remove: + os.remove(file) + else: + # Shared file system is used. + # Only rank 0 is going to remove the files. + if rank == 0: + for i in range(1, world_size): + while not os.path.isfile(os.path.join(work_dir, f"SUCC_{i}")): + time.sleep(1) + os.remove(os.path.join(work_dir, f"SUCC_{i}")) + for file in files_to_remove: + os.remove(file) + else: + # Tell rank 0, rank n has finished its work. + with open(os.path.join(work_dir, f"SUCC_{rank}"), + 'w', encoding='utf-8') as f: # pylint: disable=unused-variable + pass + def remap_node_emb(emb_ntypes, node_emb_dir, output_dir, out_chunk_size, - num_proc, rank, world_size, - with_shared_fs, output_func, - preserve_input=False): + num_proc, rank, world_size, + with_shared_fs, output_func): """ Remap node embeddings. The function will iterate all the node types that @@ -324,10 +333,14 @@ def remap_node_emb(emb_ntypes, node_emb_dir, Whether shared file system is avaliable. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. + + Return + -------- + list of str + The list of files to be removed. """ task_list = [] + files_to_remove = [] for ntype in emb_ntypes: input_emb_dir = os.path.join(node_emb_dir, ntype) out_embdir = os.path.join(output_dir, ntype) @@ -345,6 +358,10 @@ def remap_node_emb(emb_ntypes, node_emb_dir, assert len(nid_files) == len(emb_files), \ "Number of nid files must match number of embedding files. " \ f"But get {len(nid_files)} and {len(emb_files)}." + files_to_remove += [os.path.join(input_emb_dir, nid_file) \ + for nid_file in nid_files] + files_to_remove += [os.path.join(input_emb_dir, emb_file) \ + for emb_file in emb_files] if with_shared_fs: # If the data are stored in a shared filesystem, @@ -371,15 +388,15 @@ def remap_node_emb(emb_ntypes, node_emb_dir, f"{emb_file[:emb_file.rindex('.')]}"), "chunk_size": out_chunk_size, "output_func": output_func, - "preserve_input": preserve_input }) multiprocessing_remap(task_list, num_proc, worker_remap_node_data) + return files_to_remove def remap_node_pred(pred_ntypes, pred_dir, output_dir, out_chunk_size, num_proc, rank, world_size, with_shared_fs, - output_func, preserve_input=False): + output_func): """ Remap node prediction result. The function wil iterate all the node types that @@ -419,11 +436,15 @@ def remap_node_pred(pred_ntypes, pred_dir, Whether shared file system is avaliable. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. + + Return + -------- + list of str + The list of files to be removed. """ start_time = time.time() task_list = [] + files_to_remove = [] for ntype in pred_ntypes: input_pred_dir = os.path.join(pred_dir, ntype) out_pred_dir = os.path.join(output_dir, ntype) @@ -435,6 +456,15 @@ def remap_node_pred(pred_ntypes, pred_dir, pred_files.sort() num_parts = len(pred_files) logging.debug("{%s} has {%d} prediction files", ntype, num_parts) + assert len(nid_files) == len(pred_files), \ + "Expect the number of nid files equal to " \ + "the number of prediction result files, but get " \ + f"{len(nid_files)} and {len(pred_files)}" + + files_to_remove += [os.path.join(input_pred_dir, nid_file) \ + for nid_file in nid_files] + files_to_remove += [os.path.join(input_pred_dir, pred_file) \ + for pred_file in pred_files] if with_shared_fs: # If the data are stored in a shared filesystem, @@ -461,19 +491,18 @@ def remap_node_pred(pred_ntypes, pred_dir, f"pred.{pred_file[:pred_file.rindex('.')]}"), "chunk_size": out_chunk_size, "output_func": output_func, - "preserve_input": preserve_input, }) multiprocessing_remap(task_list, num_proc, worker_remap_node_data) dur = time.time() - start_time logging.info("{%d} Remapping edge predictions takes {%f} secs", rank, dur) - + return files_to_remove def remap_edge_pred(pred_etypes, pred_dir, output_dir, out_chunk_size, num_proc, rank, world_size, with_shared_fs, - output_func, preserve_input=False): + output_func): """ Remap edge prediction result. The function will iterate all the edge types that @@ -517,11 +546,15 @@ def remap_edge_pred(pred_etypes, pred_dir, Whether shared file system is avaliable. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. + + Return + -------- + list of str + The list of files to be removed. """ start_time = time.time() task_list = [] + files_to_remove = [] for etype in pred_etypes: input_pred_dir = os.path.join(pred_dir, "_".join(etype)) out_pred_dir = os.path.join(output_dir, "_".join(etype)) @@ -543,6 +576,12 @@ def remap_edge_pred(pred_etypes, pred_dir, "Expect the number of destination nid files equal to " \ "the number of prediction result files, but get " \ f"{len(dst_nid_files)} and {len(pred_files)}" + files_to_remove += [os.path.join(input_pred_dir, src_nid_file) \ + for src_nid_file in src_nid_files] + files_to_remove += [os.path.join(input_pred_dir, dst_nid_file) \ + for dst_nid_file in dst_nid_files] + files_to_remove += [os.path.join(input_pred_dir, pred_file) \ + for pred_file in pred_files] if with_shared_fs: # If the data are stored in a shared filesystem, @@ -575,12 +614,13 @@ def remap_edge_pred(pred_etypes, pred_dir, f"pred.{pred_file[:pred_file.rindex('.')]}"), "chunk_size": out_chunk_size, "output_func": output_func, - "preserve_input": preserve_input }) multiprocessing_remap(task_list, num_proc, worker_remap_edge_pred) + dur = time.time() - start_time logging.debug("%d Finish edge rempaing in %f secs}", rank, dur) + return files_to_remove def _parse_gs_config(config): """ Get remapping related information from GSConfig @@ -810,48 +850,57 @@ def main(args, gs_config_args): else: raise TypeError(f"Output format not supported {args.output_format}") + files_to_remove = [] if len(emb_ntypes) > 0: emb_output = node_emb_dir # We need to do ID remapping for node embeddings - remap_node_emb(emb_ntypes, - node_emb_dir, - emb_output, - out_chunk_size, - num_proc, - rank, - world_size, - with_shared_fs, - output_func, - args.preserve_input) + emb_files_to_remove = \ + remap_node_emb(emb_ntypes, + node_emb_dir, + emb_output, + out_chunk_size, + num_proc, + rank, + world_size, + with_shared_fs, + output_func) + files_to_remove += emb_files_to_remove if len(pred_etypes) > 0: pred_output = predict_dir # We need to do ID remapping for edge prediction result - remap_edge_pred(pred_etypes, - predict_dir, - pred_output, - out_chunk_size, - num_proc, - rank, - world_size, - with_shared_fs, - output_func, - args.preserve_input) + pred_files_to_remove = \ + remap_edge_pred(pred_etypes, + predict_dir, + pred_output, + out_chunk_size, + num_proc, + rank, + world_size, + with_shared_fs, + output_func) + files_to_remove += pred_files_to_remove if len(pred_ntypes) > 0: pred_output = predict_dir # We need to do ID remapping for node prediction result - remap_node_pred(pred_ntypes, - predict_dir, - pred_output, - out_chunk_size, - num_proc, - rank, - world_size, - with_shared_fs, - output_func, - args.preserve_input) - + pred_files_to_remove = \ + remap_node_pred(pred_ntypes, + predict_dir, + pred_output, + out_chunk_size, + num_proc, + rank, + world_size, + with_shared_fs, + output_func) + files_to_remove += pred_files_to_remove + + if args.preserve_input is False and len(files_to_remove) > 0: + # If files_to_remove is not empty, at least node_emb_dir or + # predict_dir is not None. + _remove_inputs(with_shared_fs, files_to_remove, rank, world_size, + node_emb_dir if node_emb_dir is not None else predict_dir) def add_distributed_remap_args(parser): """ Distributed remapping only diff --git a/tests/end2end-tests/data_process/test.sh b/tests/end2end-tests/data_process/test.sh index 61265e747e..c235be9995 100644 --- a/tests/end2end-tests/data_process/test.sh +++ b/tests/end2end-tests/data_process/test.sh @@ -76,26 +76,68 @@ echo "********* Test the remap edge predictions *********" python3 $GS_HOME/tests/end2end-tests/data_process/gen_edge_predict_remap_test.py --output /tmp/ep_remap/ # Test remap edge prediction results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/ --rank 0 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/ --rank 1 --world-size 2 error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/ --rank 1 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/ --rank 0 --world-size 2 error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_edge_predict_remap.py --remap-output /tmp/ep_remap/pred/ error_and_exit $? +cnt=$(ls /tmp/ep_remap/pred/src_nids-*.pt | wc -l) +if test $cnt == 2 +then + echo "src_nids-xxx.pt must exist." + exit -1 +fi + +cnt=$(ls /tmp/ep_remap/pred/dst_nids-*.pt | wc -l) +if test $cnt == 2 +then + echo "dst_nids-xxx.pt must exist." + exit -1 +fi + +cnt=$(ls /tmp/ep_remap/pred/predict-*.pt | wc -l) +if test $cnt == 2 +then + echo "predict-xxx.pt must exist." + exit -1 +fi + cp -r /tmp/ep_remap/pred/ /tmp/ep_remap/rename-pred/ # Test remap edge prediction results and rename col names -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 0 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 1 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 1 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 0 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_edge_predict_remap.py --remap-output /tmp/ep_remap/rename-pred/ --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" +cnt=$(ls /tmp/ep_remap/rename-pred/src_nids-*.pt | wc -l) +if test $cnt == 0 +then + echo "src_nids-xxx.pt should be removed." + exit -1 +fi + +cnt=$(ls /tmp/ep_remap/rename-pred/dst_nids-*.pt | wc -l) +if test $cnt == 0 +then + echo "dst_nids-xxx.pt should be removed." + exit -1 +fi + +cnt=$(ls /tmp/ep_remap/rename-pred/predict-*.pt | wc -l) +if test $cnt == 0 +then + echo "predict-xxx.pt should be removed." + exit -1 +fi + error_and_exit $? rm -fr /tmp/ep_remap/rename-pred/ @@ -112,10 +154,10 @@ cp -r /tmp/ep_remap/pred/n1_access_n0/*0.pt /tmp/ep_remap/pred/0/n1_access_n0/ cp -r /tmp/ep_remap/pred/n1_access_n0/*1.pt /tmp/ep_remap/pred/1/n1_access_n0/ # Test remap edge prediction results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/0/ --rank 0 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/1/ --rank 1 --world-size 2 --with-shared-fs False error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/1/ --rank 1 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/0/ --rank 0 --world-size 2 --with-shared-fs False error_and_exit $? mkdir /tmp/ep_remap/pred/no-share/ @@ -132,16 +174,30 @@ echo "********* Test the remap node predictions *********" python3 $GS_HOME/tests/end2end-tests/data_process/gen_node_predict_remap_test.py --output /tmp/np_remap/ # Test remap node prediction results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/ --rank 0 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/ --rank 1 --world-size 2 error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/ --rank 1 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/ --rank 0 --world-size 2 error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_node_predict_remap.py --remap-output /tmp/np_remap/pred/ error_and_exit $? +cnt=$(ls /tmp/np_remap/pred/predict_nids-*.pt | wc -l) +if test $cnt == 0 +then + echo "predict_nids-xxx.pt should be removed." + exit -1 +fi + +cnt=$(ls /tmp/np_remap/pred/predict-*.pt | wc -l) +if test $cnt == 0 +then + echo "predict-xxx.pt should be removed." + exit -1 +fi + # Test without shared filesystem echo "********* Test the remap node predictions without shared mem *********" mkdir /tmp/np_remap/pred/0/ @@ -158,10 +214,10 @@ cp -r /tmp/np_remap/pred/n1/*0.pt /tmp/np_remap/pred/0/n1/ cp -r /tmp/np_remap/pred/n1/*1.pt /tmp/np_remap/pred/1/n1/ # Test remap edge prediction results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/0/ --rank 0 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/1/ --rank 1 --world-size 2 --with-shared-fs False error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/1/ --rank 1 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/0/ --rank 0 --world-size 2 --with-shared-fs False error_and_exit $? mkdir /tmp/np_remap/pred/no-share/ @@ -179,29 +235,58 @@ echo "********* Test the remap node emb/partial emb *********" python3 $GS_HOME/tests/end2end-tests/data_process/gen_emb_predict_remap_test.py --output /tmp/em_remap/ # Test remap emb results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/ --preserve-input True --rank 0 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/ --preserve-input True --rank 1 --world-size 2 error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/ --preserve-input True --rank 1 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/ --preserve-input True --rank 0 --world-size 2 error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_emb_remap.py --remap-output /tmp/em_remap/partial-emb/ error_and_exit $? +cnt=$(ls /tmp/em_remap/partial-emb/embed_nids-*.pt | wc -l) +if test $cnt == 2 +then + echo "embed_nids-xxx.pt must exist." + exit -1 +fi + +cnt=$(ls /tmp/em_remap/partial-emb/embed-*.pt | wc -l) +if test $cnt == 2 +then + echo "embed-xxx.pt must exist." + exit -1 +fi + + cp -r /tmp/em_remap/partial-emb/ /tmp/em_remap/partial-rename-emb/ # Test remap emb results and rename col names -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --preserve-input True --rank 0 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --rank 1 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --preserve-input True --rank 1 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --rank 0 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_emb_remap.py --remap-output /tmp/em_remap/partial-rename-emb/ --column-names "nid,~id:STRING" "emb,emb:FLOAT" error_and_exit $? +cnt=$(ls /tmp/em_remap/partial-rename-emb/embed_nids-*.pt | wc -l) +if test $cnt == 0 +then + echo "embed_nids-xxx.pt should be removed." + exit -1 +fi + +cnt=$(ls /tmp/em_remap/partial-rename-emb/embed-*.pt | wc -l) +if test $cnt == 0 +then + echo "embed-xxx.pt should be removed." + exit -1 +fi + # Test without shared filesystem echo "********* Test the remap partial node embedding without shared mem *********" mkdir /tmp/em_remap/partial-emb/0/ @@ -217,10 +302,10 @@ cp -r /tmp/em_remap/partial-emb/n1/*0.pt /tmp/em_remap/partial-emb/0/n1/ cp -r /tmp/em_remap/partial-emb/n1/*1.pt /tmp/em_remap/partial-emb/1/n1/ # Test remap emb results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/0/ --preserve-input True --rank 0 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/1/ --preserve-input True --rank 1 --world-size 2 --with-shared-fs False error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/1/ --preserve-input True --rank 1 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/0/ --preserve-input True --rank 0 --world-size 2 --with-shared-fs False error_and_exit $? mkdir /tmp/em_remap/partial-emb/no-share/ diff --git a/tests/unit-tests/gconstruct/test_remap_result.py b/tests/unit-tests/gconstruct/test_remap_result.py index 17d43b20c6..1421c8f5b5 100644 --- a/tests/unit-tests/gconstruct/test_remap_result.py +++ b/tests/unit-tests/gconstruct/test_remap_result.py @@ -83,11 +83,10 @@ def test_worker_remap_node_data(data_col): worker_remap_node_data(data_path, nid_path, ntypes[0], data_col, output_path_prefix, chunk_size, - write_data_parquet_file, preserve_input=True) + write_data_parquet_file) worker_remap_node_data(data_path, nid_path, ntypes[0], data_col, output_path_prefix, chunk_size, - partial(write_data_csv_file, delimiter=","), - preserve_input=True) + partial(write_data_csv_file, delimiter=",")) def read_csv(file, delimiter=","): data = pd.read_csv(file, delimiter=delimiter) nid = data["nid"].to_numpy() @@ -171,12 +170,10 @@ def test_worker_remap_edge_pred(): worker_remap_edge_pred(pred_path, src_nid_path, dst_nid_path, ntypes[0], ntypes[1], output_path_prefix, - chunk_size, write_data_parquet_file, - preserve_input=True) + chunk_size, write_data_parquet_file) worker_remap_edge_pred(pred_path, src_nid_path, dst_nid_path, ntypes[0], ntypes[1], output_path_prefix, - chunk_size, partial(write_data_csv_file, delimiter=","), - preserve_input=True) + chunk_size, partial(write_data_csv_file, delimiter=",")) def read_csv(file, delimiter=","): data = pd.read_csv(file, delimiter=delimiter) src_nid = data["src_nid"].to_numpy()