-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eager fixes for distributed execution #558
base: future
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,10 @@ def to_shell_file(graph: IR, args) -> str: | |
os.makedirs(directory, exist_ok=True) | ||
|
||
if not args.no_eager: | ||
# Set DFGNode next id to not clash with already existing ids | ||
# TODO: ideally we should get the next_id from the graph object | ||
# to avoid conflicts across parallel processes | ||
DFGNode.next_id = max(DFGNode.next_id , max(graph.nodes.keys()) + 1) | ||
graph = pash_runtime.add_eager_nodes(graph, args.dgsh_tee) | ||
|
||
script = to_shell(graph, args) | ||
|
@@ -261,6 +265,9 @@ def assign_workers_to_subgraphs(subgraphs:List[IR], file_id_gen: FileIdGen, inpu | |
# sometimes a command can have both a file resource and an ephemeral resources (example: spell oneliner) | ||
continue | ||
|
||
# for worker, graph in worker_subgraph_pairs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would delete this |
||
# print(to_shell(graph, config.pash_args), file=sys.stderr) | ||
|
||
return main_graph, worker_subgraph_pairs | ||
|
||
def prepare_graph_for_remote_exec(filename:str, get_worker:Callable): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
import definitions.ir.nodes.r_split as r_split | ||
import definitions.ir.nodes.r_unwrap as r_unwrap | ||
import definitions.ir.nodes.dgsh_tee as dgsh_tee | ||
import definitions.ir.nodes.remote_pipe as remote_pipe | ||
import definitions.ir.nodes.dfs_split_reader as dfs_split_reader | ||
# Distirbuted Exec | ||
import dspash.hdfs_utils as hdfs_utils | ||
|
@@ -721,14 +722,20 @@ def add_eager_nodes(graph, use_dgsh_tee): | |
intermediateFileIdGen = FileIdGen(0, runtime_config['eager_intermediate_prefix']) | ||
|
||
## Get the next nodes | ||
workset = [node for source_node_id in source_node_ids for node in graph.get_next_nodes(source_node_id)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would be tempted to think that there was a reason why this was here. Maybe we don't want to start from all |
||
workset = source_node_ids | ||
visited = set() | ||
while (len(workset) > 0): | ||
curr_id = workset.pop(0) | ||
curr = graph.get_node(curr_id) | ||
|
||
if (not curr_id in visited): | ||
visited.add(curr_id) | ||
next_node_ids = graph.get_next_nodes(curr_id) | ||
|
||
# Skip if this is the last node | ||
if not next_node_ids: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly, is that necessary or an optimization? |
||
continue | ||
|
||
workset += next_node_ids | ||
|
||
## TODO: Make sure that we don't add duplicate eager nodes | ||
|
@@ -764,6 +771,12 @@ def add_eager_nodes(graph, use_dgsh_tee): | |
for edge_id in eager_input_ids: | ||
add_eager(edge_id, graph, fileIdGen, intermediateFileIdGen, use_dgsh_tee) | ||
|
||
## Add an eager after remote_pipe | ||
if(isinstance(curr, remote_pipe.RemotePipe) and curr.is_remote_read()): | ||
eager_input_ids = curr.outputs | ||
for edge_id in eager_input_ids: | ||
add_eager(edge_id, graph, fileIdGen, intermediateFileIdGen, use_dgsh_tee) | ||
|
||
return graph | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we do that in general? Outside of this
if
statement?