Skip to content

Commit

Permalink
[Bug fix] Fix the read/delete contention bug when running distributed…
Browse files Browse the repository at this point in the history
… remaping result/emb (#672)

*Issue #, if available:*
When doing remap_result in a distributed way. It is possible that some
processes are still collecting remap tasks (scanning the embedding files
and prediction files) while others have finished the tasks and start
removing processed files. This will cause an read/delete contention.

*Description of changes:*


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: Xiang Song <[email protected]>
  • Loading branch information
classicsong and Xiang Song authored Dec 1, 2023
1 parent 9c96751 commit 20394ee
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 84 deletions.
171 changes: 110 additions & 61 deletions python/graphstorm/gconstruct/remap_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def write_data_csv_file(data, file_prefix, delimiter=",", col_name_map=None):
data_frame.to_csv(output_fname, index=False, sep=delimiter)

def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key,
output_fname_prefix, chunk_size, output_func, preserve_input):
output_fname_prefix, chunk_size, output_func):
""" Do one node prediction remapping task
Parameters
Expand All @@ -154,8 +154,6 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key,
Max number of raws per output file.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
"""
node_data = th.load(data_file_path).numpy()
nids = th.load(nid_path).numpy()
Expand All @@ -171,14 +169,10 @@ def worker_remap_node_data(data_file_path, nid_path, ntype, data_col_key,
GS_REMAP_NID_COL: nid}
output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}")

if preserve_input is False:
os.remove(data_file_path)
os.remove(nid_path)

def worker_remap_edge_pred(pred_file_path, src_nid_path,
dst_nid_path, src_type, dst_type,
output_fname_prefix, chunk_size,
output_func, preserve_input):
output_func):
""" Do one edge remapping task
Parameters
Expand All @@ -199,8 +193,6 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path,
Max number of raws per output file.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
"""
pred_result = th.load(pred_file_path).numpy()
src_nids = th.load(src_nid_path).numpy()
Expand All @@ -220,11 +212,6 @@ def worker_remap_edge_pred(pred_file_path, src_nid_path,

output_func(data, f"{output_fname_prefix}_{pad_file_index(i)}")

if preserve_input is False:
os.remove(pred_file_path)
os.remove(src_nid_path)
os.remove(dst_nid_path)

def _get_file_range(num_files, rank, world_size):
""" Get the range of files to process by the current instance.
Expand Down Expand Up @@ -259,11 +246,33 @@ def _get_file_range(num_files, rank, world_size):

return start, end

def _remove_inputs(with_shared_fs, files_to_remove,
rank, world_size, work_dir):
if with_shared_fs is False:
# Not using shared file system. There is no contention.
# Each process will remove the files itself
for file in files_to_remove:
os.remove(file)
else:
# Shared file system is used.
# Only rank 0 is going to remove the files.
if rank == 0:
for i in range(1, world_size):
while not os.path.isfile(os.path.join(work_dir, f"SUCC_{i}")):
time.sleep(1)
os.remove(os.path.join(work_dir, f"SUCC_{i}"))
for file in files_to_remove:
os.remove(file)
else:
# Tell rank 0, rank n has finished its work.
with open(os.path.join(work_dir, f"SUCC_{rank}"),
'w', encoding='utf-8') as f: # pylint: disable=unused-variable
pass

def remap_node_emb(emb_ntypes, node_emb_dir,
output_dir, out_chunk_size,
num_proc, rank, world_size,
with_shared_fs, output_func,
preserve_input=False):
num_proc, rank, world_size,
with_shared_fs, output_func):
""" Remap node embeddings.
The function will iterate all the node types that
Expand Down Expand Up @@ -324,10 +333,14 @@ def remap_node_emb(emb_ntypes, node_emb_dir,
Whether shared file system is avaliable.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
Return
--------
list of str
The list of files to be removed.
"""
task_list = []
files_to_remove = []
for ntype in emb_ntypes:
input_emb_dir = os.path.join(node_emb_dir, ntype)
out_embdir = os.path.join(output_dir, ntype)
Expand All @@ -345,6 +358,10 @@ def remap_node_emb(emb_ntypes, node_emb_dir,
assert len(nid_files) == len(emb_files), \
"Number of nid files must match number of embedding files. " \
f"But get {len(nid_files)} and {len(emb_files)}."
files_to_remove += [os.path.join(input_emb_dir, nid_file) \
for nid_file in nid_files]
files_to_remove += [os.path.join(input_emb_dir, emb_file) \
for emb_file in emb_files]

if with_shared_fs:
# If the data are stored in a shared filesystem,
Expand All @@ -371,15 +388,15 @@ def remap_node_emb(emb_ntypes, node_emb_dir,
f"{emb_file[:emb_file.rindex('.')]}"),
"chunk_size": out_chunk_size,
"output_func": output_func,
"preserve_input": preserve_input
})

multiprocessing_remap(task_list, num_proc, worker_remap_node_data)
return files_to_remove

def remap_node_pred(pred_ntypes, pred_dir,
output_dir, out_chunk_size,
num_proc, rank, world_size, with_shared_fs,
output_func, preserve_input=False):
output_func):
""" Remap node prediction result.
The function wil iterate all the node types that
Expand Down Expand Up @@ -419,11 +436,15 @@ def remap_node_pred(pred_ntypes, pred_dir,
Whether shared file system is avaliable.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
Return
--------
list of str
The list of files to be removed.
"""
start_time = time.time()
task_list = []
files_to_remove = []
for ntype in pred_ntypes:
input_pred_dir = os.path.join(pred_dir, ntype)
out_pred_dir = os.path.join(output_dir, ntype)
Expand All @@ -435,6 +456,15 @@ def remap_node_pred(pred_ntypes, pred_dir,
pred_files.sort()
num_parts = len(pred_files)
logging.debug("{%s} has {%d} prediction files", ntype, num_parts)
assert len(nid_files) == len(pred_files), \
"Expect the number of nid files equal to " \
"the number of prediction result files, but get " \
f"{len(nid_files)} and {len(pred_files)}"

files_to_remove += [os.path.join(input_pred_dir, nid_file) \
for nid_file in nid_files]
files_to_remove += [os.path.join(input_pred_dir, pred_file) \
for pred_file in pred_files]

if with_shared_fs:
# If the data are stored in a shared filesystem,
Expand All @@ -461,19 +491,18 @@ def remap_node_pred(pred_ntypes, pred_dir,
f"pred.{pred_file[:pred_file.rindex('.')]}"),
"chunk_size": out_chunk_size,
"output_func": output_func,
"preserve_input": preserve_input,
})

multiprocessing_remap(task_list, num_proc, worker_remap_node_data)

dur = time.time() - start_time
logging.info("{%d} Remapping edge predictions takes {%f} secs", rank, dur)

return files_to_remove

def remap_edge_pred(pred_etypes, pred_dir,
output_dir, out_chunk_size,
num_proc, rank, world_size, with_shared_fs,
output_func, preserve_input=False):
output_func):
""" Remap edge prediction result.
The function will iterate all the edge types that
Expand Down Expand Up @@ -517,11 +546,15 @@ def remap_edge_pred(pred_etypes, pred_dir,
Whether shared file system is avaliable.
output_func: func
Function used to write data to disk.
preserve_input: bool
Whether the input data should be removed.
Return
--------
list of str
The list of files to be removed.
"""
start_time = time.time()
task_list = []
files_to_remove = []
for etype in pred_etypes:
input_pred_dir = os.path.join(pred_dir, "_".join(etype))
out_pred_dir = os.path.join(output_dir, "_".join(etype))
Expand All @@ -543,6 +576,12 @@ def remap_edge_pred(pred_etypes, pred_dir,
"Expect the number of destination nid files equal to " \
"the number of prediction result files, but get " \
f"{len(dst_nid_files)} and {len(pred_files)}"
files_to_remove += [os.path.join(input_pred_dir, src_nid_file) \
for src_nid_file in src_nid_files]
files_to_remove += [os.path.join(input_pred_dir, dst_nid_file) \
for dst_nid_file in dst_nid_files]
files_to_remove += [os.path.join(input_pred_dir, pred_file) \
for pred_file in pred_files]

if with_shared_fs:
# If the data are stored in a shared filesystem,
Expand Down Expand Up @@ -575,12 +614,13 @@ def remap_edge_pred(pred_etypes, pred_dir,
f"pred.{pred_file[:pred_file.rindex('.')]}"),
"chunk_size": out_chunk_size,
"output_func": output_func,
"preserve_input": preserve_input
})

multiprocessing_remap(task_list, num_proc, worker_remap_edge_pred)

dur = time.time() - start_time
logging.debug("%d Finish edge rempaing in %f secs}", rank, dur)
return files_to_remove

def _parse_gs_config(config):
""" Get remapping related information from GSConfig
Expand Down Expand Up @@ -810,48 +850,57 @@ def main(args, gs_config_args):
else:
raise TypeError(f"Output format not supported {args.output_format}")

files_to_remove = []
if len(emb_ntypes) > 0:
emb_output = node_emb_dir
# We need to do ID remapping for node embeddings
remap_node_emb(emb_ntypes,
node_emb_dir,
emb_output,
out_chunk_size,
num_proc,
rank,
world_size,
with_shared_fs,
output_func,
args.preserve_input)
emb_files_to_remove = \
remap_node_emb(emb_ntypes,
node_emb_dir,
emb_output,
out_chunk_size,
num_proc,
rank,
world_size,
with_shared_fs,
output_func)
files_to_remove += emb_files_to_remove

if len(pred_etypes) > 0:
pred_output = predict_dir
# We need to do ID remapping for edge prediction result
remap_edge_pred(pred_etypes,
predict_dir,
pred_output,
out_chunk_size,
num_proc,
rank,
world_size,
with_shared_fs,
output_func,
args.preserve_input)
pred_files_to_remove = \
remap_edge_pred(pred_etypes,
predict_dir,
pred_output,
out_chunk_size,
num_proc,
rank,
world_size,
with_shared_fs,
output_func)
files_to_remove += pred_files_to_remove

if len(pred_ntypes) > 0:
pred_output = predict_dir
# We need to do ID remapping for node prediction result
remap_node_pred(pred_ntypes,
predict_dir,
pred_output,
out_chunk_size,
num_proc,
rank,
world_size,
with_shared_fs,
output_func,
args.preserve_input)

pred_files_to_remove = \
remap_node_pred(pred_ntypes,
predict_dir,
pred_output,
out_chunk_size,
num_proc,
rank,
world_size,
with_shared_fs,
output_func)
files_to_remove += pred_files_to_remove

if args.preserve_input is False and len(files_to_remove) > 0:
# If files_to_remove is not empty, at least node_emb_dir or
# predict_dir is not None.
_remove_inputs(with_shared_fs, files_to_remove, rank, world_size,
node_emb_dir if node_emb_dir is not None else predict_dir)

def add_distributed_remap_args(parser):
""" Distributed remapping only
Expand Down
Loading

0 comments on commit 20394ee

Please sign in to comment.