diff --git a/python/graphstorm/gconstruct/remap_result.py b/python/graphstorm/gconstruct/remap_result.py index 7ad43c61ad..ec3f97da48 100644 --- a/python/graphstorm/gconstruct/remap_result.py +++ b/python/graphstorm/gconstruct/remap_result.py @@ -135,7 +135,7 @@ def write_data_csv_file(data, file_prefix, delimiter=",", col_name_map=None): data_frame.to_csv(output_fname, index=False, sep=delimiter) def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, - output_fname_prefix, chunk_size, output_func, preserve_input): + output_fname_prefix, chunk_size, output_func): """ Do one node prediction remapping task Parameters @@ -154,8 +154,6 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, Max number of raws per output file. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. """ node_data = th.load(data_file_path).numpy() nids = th.load(nid_path).numpy() @@ -171,14 +169,10 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key, GS_REMAP_NID_COL: nid} output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}") - if preserve_input is False: - os.remove(data_file_path) - os.remove(nid_path) - def worker_remap_edge_pred(pred_file_path, src_nid_path, dst_nid_path, src_type, dst_type, output_fname_prefix, chunk_size, - output_func, preserve_input): + output_func): """ Do one edge remapping task Parameters @@ -199,8 +193,6 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path, Max number of raws per output file. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. """ pred_result = th.load(pred_file_path).numpy() src_nids = th.load(src_nid_path).numpy() @@ -220,11 +212,6 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path, output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}") - if preserve_input is False: - os.remove(pred_file_path) - os.remove(src_nid_path) - os.remove(dst_nid_path) - def _get_file_range(num_files, rank, world_size): """ Get the range of files to process by the current instance. @@ -259,11 +246,33 @@ def _get_file_range(num_files, rank, world_size): return start, end +def _remove_inputs(with_shared_fs, files_to_remove, + rank, world_size, work_dir): + if with_shared_fs is False: + # Not using shared file system. There is no contention. + # Each process will remove the files itself + for file in files_to_remove: + os.remove(file) + else: + # Shared file system is used. + # Only rank 0 is going to remove the files. + if rank == 0: + for i in range(1, world_size): + while not os.path.isfile(os.path.join(work_dir, f"SUCC_{i}")): + time.sleep(1) + os.remove(os.path.join(work_dir, f"SUCC_{i}")) + for file in files_to_remove: + os.remove(file) + else: + # Tell rank 0, rank n has finished its work. + with open(os.path.join(work_dir, f"SUCC_{rank}"), + 'w', encoding='utf-8') as f: # pylint: disable=unused-variable + pass + def remap_node_emb(emb_ntypes, node_emb_dir, output_dir, out_chunk_size, - num_proc, rank, world_size, - with_shared_fs, output_func, - preserve_input=False): + num_proc, rank, world_size, + with_shared_fs, output_func): """ Remap node embeddings. The function will iterate all the node types that @@ -324,10 +333,14 @@ def remap_node_emb(emb_ntypes, node_emb_dir, Whether shared file system is avaliable. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. + + Return + -------- + list of str + The list of files to be removed. """ task_list = [] + files_to_remove = [] for ntype in emb_ntypes: input_emb_dir = os.path.join(node_emb_dir, ntype) out_embdir = os.path.join(output_dir, ntype) @@ -345,6 +358,10 @@ def remap_node_emb(emb_ntypes, node_emb_dir, assert len(nid_files) == len(emb_files), \ "Number of nid files must match number of embedding files. " \ f"But get {len(nid_files)} and {len(emb_files)}." + files_to_remove += [os.path.join(input_emb_dir, nid_file) \ + for nid_file in nid_files] + files_to_remove += [os.path.join(input_emb_dir, emb_file) \ + for emb_file in emb_files] if with_shared_fs: # If the data are stored in a shared filesystem, @@ -371,15 +388,15 @@ def remap_node_emb(emb_ntypes, node_emb_dir, f"{emb_file[:emb_file.rindex('.')]}"), "chunk_size": out_chunk_size, "output_func": output_func, - "preserve_input": preserve_input }) multiprocessing_remap(task_list, num_proc, worker_remap_node_data) + return files_to_remove def remap_node_pred(pred_ntypes, pred_dir, output_dir, out_chunk_size, num_proc, rank, world_size, with_shared_fs, - output_func, preserve_input=False): + output_func): """ Remap node prediction result. The function wil iterate all the node types that @@ -419,11 +436,15 @@ def remap_node_pred(pred_ntypes, pred_dir, Whether shared file system is avaliable. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. + + Return + -------- + list of str + The list of files to be removed. """ start_time = time.time() task_list = [] + files_to_remove = [] for ntype in pred_ntypes: input_pred_dir = os.path.join(pred_dir, ntype) out_pred_dir = os.path.join(output_dir, ntype) @@ -435,6 +456,15 @@ def remap_node_pred(pred_ntypes, pred_dir, pred_files.sort() num_parts = len(pred_files) logging.debug("{%s} has {%d} prediction files", ntype, num_parts) + assert len(nid_files) == len(pred_files), \ + "Expect the number of nid files equal to " \ + "the number of prediction result files, but get " \ + f"{len(nid_files)} and {len(pred_files)}" + + files_to_remove += [os.path.join(input_pred_dir, nid_file) \ + for nid_file in nid_files] + files_to_remove += [os.path.join(input_pred_dir, pred_file) \ + for pred_file in pred_files] if with_shared_fs: # If the data are stored in a shared filesystem, @@ -461,19 +491,18 @@ def remap_node_pred(pred_ntypes, pred_dir, f"pred.{pred_file[:pred_file.rindex('.')]}"), "chunk_size": out_chunk_size, "output_func": output_func, - "preserve_input": preserve_input, }) multiprocessing_remap(task_list, num_proc, worker_remap_node_data) dur = time.time() - start_time logging.info("{%d} Remapping edge predictions takes {%f} secs", rank, dur) - + return files_to_remove def remap_edge_pred(pred_etypes, pred_dir, output_dir, out_chunk_size, num_proc, rank, world_size, with_shared_fs, - output_func, preserve_input=False): + output_func): """ Remap edge prediction result. The function will iterate all the edge types that @@ -517,11 +546,15 @@ def remap_edge_pred(pred_etypes, pred_dir, Whether shared file system is avaliable. output_func: func Function used to write data to disk. - preserve_input: bool - Whether the input data should be removed. + + Return + -------- + list of str + The list of files to be removed. """ start_time = time.time() task_list = [] + files_to_remove = [] for etype in pred_etypes: input_pred_dir = os.path.join(pred_dir, "_".join(etype)) out_pred_dir = os.path.join(output_dir, "_".join(etype)) @@ -543,6 +576,12 @@ def remap_edge_pred(pred_etypes, pred_dir, "Expect the number of destination nid files equal to " \ "the number of prediction result files, but get " \ f"{len(dst_nid_files)} and {len(pred_files)}" + files_to_remove += [os.path.join(input_pred_dir, src_nid_file) \ + for src_nid_file in src_nid_files] + files_to_remove += [os.path.join(input_pred_dir, dst_nid_file) \ + for dst_nid_file in dst_nid_files] + files_to_remove += [os.path.join(input_pred_dir, pred_file) \ + for pred_file in pred_files] if with_shared_fs: # If the data are stored in a shared filesystem, @@ -575,12 +614,13 @@ def remap_edge_pred(pred_etypes, pred_dir, f"pred.{pred_file[:pred_file.rindex('.')]}"), "chunk_size": out_chunk_size, "output_func": output_func, - "preserve_input": preserve_input }) multiprocessing_remap(task_list, num_proc, worker_remap_edge_pred) + dur = time.time() - start_time logging.debug("%d Finish edge rempaing in %f secs}", rank, dur) + return files_to_remove def _parse_gs_config(config): """ Get remapping related information from GSConfig @@ -810,48 +850,57 @@ def main(args, gs_config_args): else: raise TypeError(f"Output format not supported {args.output_format}") + files_to_remove = [] if len(emb_ntypes) > 0: emb_output = node_emb_dir # We need to do ID remapping for node embeddings - remap_node_emb(emb_ntypes, - node_emb_dir, - emb_output, - out_chunk_size, - num_proc, - rank, - world_size, - with_shared_fs, - output_func, - args.preserve_input) + emb_files_to_remove = \ + remap_node_emb(emb_ntypes, + node_emb_dir, + emb_output, + out_chunk_size, + num_proc, + rank, + world_size, + with_shared_fs, + output_func) + files_to_remove += emb_files_to_remove if len(pred_etypes) > 0: pred_output = predict_dir # We need to do ID remapping for edge prediction result - remap_edge_pred(pred_etypes, - predict_dir, - pred_output, - out_chunk_size, - num_proc, - rank, - world_size, - with_shared_fs, - output_func, - args.preserve_input) + pred_files_to_remove = \ + remap_edge_pred(pred_etypes, + predict_dir, + pred_output, + out_chunk_size, + num_proc, + rank, + world_size, + with_shared_fs, + output_func) + files_to_remove += pred_files_to_remove if len(pred_ntypes) > 0: pred_output = predict_dir # We need to do ID remapping for node prediction result - remap_node_pred(pred_ntypes, - predict_dir, - pred_output, - out_chunk_size, - num_proc, - rank, - world_size, - with_shared_fs, - output_func, - args.preserve_input) - + pred_files_to_remove = \ + remap_node_pred(pred_ntypes, + predict_dir, + pred_output, + out_chunk_size, + num_proc, + rank, + world_size, + with_shared_fs, + output_func) + files_to_remove += pred_files_to_remove + + if args.preserve_input is False and len(files_to_remove) > 0: + # If files_to_remove is not empty, at least node_emb_dir or + # predict_dir is not None. + _remove_inputs(with_shared_fs, files_to_remove, rank, world_size, + node_emb_dir if node_emb_dir is not None else predict_dir) def add_distributed_remap_args(parser): """ Distributed remapping only diff --git a/tests/end2end-tests/data_process/test.sh b/tests/end2end-tests/data_process/test.sh index 61265e747e..c235be9995 100644 --- a/tests/end2end-tests/data_process/test.sh +++ b/tests/end2end-tests/data_process/test.sh @@ -76,26 +76,68 @@ echo "********* Test the remap edge predictions *********" python3 $GS_HOME/tests/end2end-tests/data_process/gen_edge_predict_remap_test.py --output /tmp/ep_remap/ # Test remap edge prediction results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/ --rank 0 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/ --rank 1 --world-size 2 error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/ --rank 1 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/ --rank 0 --world-size 2 error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_edge_predict_remap.py --remap-output /tmp/ep_remap/pred/ error_and_exit $? +cnt=$(ls /tmp/ep_remap/pred/src_nids-*.pt | wc -l) +if test $cnt == 2 +then + echo "src_nids-xxx.pt must exist." + exit -1 +fi + +cnt=$(ls /tmp/ep_remap/pred/dst_nids-*.pt | wc -l) +if test $cnt == 2 +then + echo "dst_nids-xxx.pt must exist." + exit -1 +fi + +cnt=$(ls /tmp/ep_remap/pred/predict-*.pt | wc -l) +if test $cnt == 2 +then + echo "predict-xxx.pt must exist." + exit -1 +fi + cp -r /tmp/ep_remap/pred/ /tmp/ep_remap/rename-pred/ # Test remap edge prediction results and rename col names -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 0 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 1 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 1 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/rename-pred/ --rank 0 --world-size 2 --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_edge_predict_remap.py --remap-output /tmp/ep_remap/rename-pred/ --column-names "src_nid,~from:STRING" "dst_nid,~to:STRING" "pred,pred:FLOAT" +cnt=$(ls /tmp/ep_remap/rename-pred/src_nids-*.pt | wc -l) +if test $cnt == 0 +then + echo "src_nids-xxx.pt should be removed." + exit -1 +fi + +cnt=$(ls /tmp/ep_remap/rename-pred/dst_nids-*.pt | wc -l) +if test $cnt == 0 +then + echo "dst_nids-xxx.pt should be removed." + exit -1 +fi + +cnt=$(ls /tmp/ep_remap/rename-pred/predict-*.pt | wc -l) +if test $cnt == 0 +then + echo "predict-xxx.pt should be removed." + exit -1 +fi + error_and_exit $? rm -fr /tmp/ep_remap/rename-pred/ @@ -112,10 +154,10 @@ cp -r /tmp/ep_remap/pred/n1_access_n0/*0.pt /tmp/ep_remap/pred/0/n1_access_n0/ cp -r /tmp/ep_remap/pred/n1_access_n0/*1.pt /tmp/ep_remap/pred/1/n1_access_n0/ # Test remap edge prediction results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/0/ --rank 0 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/1/ --rank 1 --world-size 2 --with-shared-fs False error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/1/ --rank 1 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/ep_remap/id_mapping/ --logging-level debug --pred-etypes "n0,access,n1" "n1,access,n0" --preserve-input True --prediction-dir /tmp/ep_remap/pred/0/ --rank 0 --world-size 2 --with-shared-fs False error_and_exit $? mkdir /tmp/ep_remap/pred/no-share/ @@ -132,16 +174,30 @@ echo "********* Test the remap node predictions *********" python3 $GS_HOME/tests/end2end-tests/data_process/gen_node_predict_remap_test.py --output /tmp/np_remap/ # Test remap node prediction results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/ --rank 0 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/ --rank 1 --world-size 2 error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/ --rank 1 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/ --rank 0 --world-size 2 error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_node_predict_remap.py --remap-output /tmp/np_remap/pred/ error_and_exit $? +cnt=$(ls /tmp/np_remap/pred/predict_nids-*.pt | wc -l) +if test $cnt == 0 +then + echo "predict_nids-xxx.pt should be removed." + exit -1 +fi + +cnt=$(ls /tmp/np_remap/pred/predict-*.pt | wc -l) +if test $cnt == 0 +then + echo "predict-xxx.pt should be removed." + exit -1 +fi + # Test without shared filesystem echo "********* Test the remap node predictions without shared mem *********" mkdir /tmp/np_remap/pred/0/ @@ -158,10 +214,10 @@ cp -r /tmp/np_remap/pred/n1/*0.pt /tmp/np_remap/pred/0/n1/ cp -r /tmp/np_remap/pred/n1/*1.pt /tmp/np_remap/pred/1/n1/ # Test remap edge prediction results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/0/ --rank 0 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/1/ --rank 1 --world-size 2 --with-shared-fs False error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/1/ --rank 1 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/np_remap/id_mapping/ --logging-level debug --pred-ntypes "n0" "n1" --preserve-input True --prediction-dir /tmp/np_remap/pred/0/ --rank 0 --world-size 2 --with-shared-fs False error_and_exit $? mkdir /tmp/np_remap/pred/no-share/ @@ -179,29 +235,58 @@ echo "********* Test the remap node emb/partial emb *********" python3 $GS_HOME/tests/end2end-tests/data_process/gen_emb_predict_remap_test.py --output /tmp/em_remap/ # Test remap emb results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/ --preserve-input True --rank 0 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/ --preserve-input True --rank 1 --world-size 2 error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/ --preserve-input True --rank 1 --world-size 2 +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/ --preserve-input True --rank 0 --world-size 2 error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_emb_remap.py --remap-output /tmp/em_remap/partial-emb/ error_and_exit $? +cnt=$(ls /tmp/em_remap/partial-emb/embed_nids-*.pt | wc -l) +if test $cnt == 2 +then + echo "embed_nids-xxx.pt must exist." + exit -1 +fi + +cnt=$(ls /tmp/em_remap/partial-emb/embed-*.pt | wc -l) +if test $cnt == 2 +then + echo "embed-xxx.pt must exist." + exit -1 +fi + + cp -r /tmp/em_remap/partial-emb/ /tmp/em_remap/partial-rename-emb/ # Test remap emb results and rename col names -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --preserve-input True --rank 0 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --rank 1 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --preserve-input True --rank 1 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-rename-emb/ --rank 0 --world-size 2 --column-names "nid,~id:STRING" "emb,emb:FLOAT" error_and_exit $? python3 $GS_HOME/tests/end2end-tests/data_process/check_emb_remap.py --remap-output /tmp/em_remap/partial-rename-emb/ --column-names "nid,~id:STRING" "emb,emb:FLOAT" error_and_exit $? +cnt=$(ls /tmp/em_remap/partial-rename-emb/embed_nids-*.pt | wc -l) +if test $cnt == 0 +then + echo "embed_nids-xxx.pt should be removed." + exit -1 +fi + +cnt=$(ls /tmp/em_remap/partial-rename-emb/embed-*.pt | wc -l) +if test $cnt == 0 +then + echo "embed-xxx.pt should be removed." + exit -1 +fi + # Test without shared filesystem echo "********* Test the remap partial node embedding without shared mem *********" mkdir /tmp/em_remap/partial-emb/0/ @@ -217,10 +302,10 @@ cp -r /tmp/em_remap/partial-emb/n1/*0.pt /tmp/em_remap/partial-emb/0/n1/ cp -r /tmp/em_remap/partial-emb/n1/*1.pt /tmp/em_remap/partial-emb/1/n1/ # Test remap emb results -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/0/ --preserve-input True --rank 0 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/1/ --preserve-input True --rank 1 --world-size 2 --with-shared-fs False error_and_exit $? -python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/1/ --preserve-input True --rank 1 --world-size 2 --with-shared-fs False +python3 -m graphstorm.gconstruct.remap_result --num-processes 16 --node-id-mapping /tmp/em_remap/id_mapping/ --logging-level debug --node-emb-dir /tmp/em_remap/partial-emb/0/ --preserve-input True --rank 0 --world-size 2 --with-shared-fs False error_and_exit $? mkdir /tmp/em_remap/partial-emb/no-share/ diff --git a/tests/unit-tests/gconstruct/test_remap_result.py b/tests/unit-tests/gconstruct/test_remap_result.py index 17d43b20c6..1421c8f5b5 100644 --- a/tests/unit-tests/gconstruct/test_remap_result.py +++ b/tests/unit-tests/gconstruct/test_remap_result.py @@ -83,11 +83,10 @@ def test_worker_remap_node_data(data_col): worker_remap_node_data(data_path, nid_path, ntypes[0], data_col, output_path_prefix, chunk_size, - write_data_parquet_file, preserve_input=True) + write_data_parquet_file) worker_remap_node_data(data_path, nid_path, ntypes[0], data_col, output_path_prefix, chunk_size, - partial(write_data_csv_file, delimiter=","), - preserve_input=True) + partial(write_data_csv_file, delimiter=",")) def read_csv(file, delimiter=","): data = pd.read_csv(file, delimiter=delimiter) nid = data["nid"].to_numpy() @@ -171,12 +170,10 @@ def test_worker_remap_edge_pred(): worker_remap_edge_pred(pred_path, src_nid_path, dst_nid_path, ntypes[0], ntypes[1], output_path_prefix, - chunk_size, write_data_parquet_file, - preserve_input=True) + chunk_size, write_data_parquet_file) worker_remap_edge_pred(pred_path, src_nid_path, dst_nid_path, ntypes[0], ntypes[1], output_path_prefix, - chunk_size, partial(write_data_csv_file, delimiter=","), - preserve_input=True) + chunk_size, partial(write_data_csv_file, delimiter=",")) def read_csv(file, delimiter=","): data = pd.read_csv(file, delimiter=delimiter) src_nid = data["src_nid"].to_numpy()