Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager fixes for distributed execution #558

Open
wants to merge 2 commits into
base: future
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions compiler/definitions/ir/nodes/remote_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ def __init__(self, inputs, outputs, com_name, com_category,
com_redirs=com_redirs,
com_assignments=com_assignments)

def is_remote_read(self):
com_name = self.com_name.opt_serialize()
read_com = config.config['runtime']['remote_read_binary']
return read_com in com_name

def make_remote_pipe(inputs, outputs, host_ip, port, is_remote_read, id):
com_category = "pure"
options = []
Expand Down
7 changes: 7 additions & 0 deletions compiler/dspash/ir_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ def to_shell_file(graph: IR, args) -> str:
os.makedirs(directory, exist_ok=True)

if not args.no_eager:
# Set DFGNode next id to not clash with already existing ids
# TODO: ideally we should get the next_id from the graph object
# to avoid conflicts across parallel processes
DFGNode.next_id = max(DFGNode.next_id , max(graph.nodes.keys()) + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we do that in general? Outside of this if statement?

graph = pash_runtime.add_eager_nodes(graph, args.dgsh_tee)

script = to_shell(graph, args)
Expand Down Expand Up @@ -261,6 +265,9 @@ def assign_workers_to_subgraphs(subgraphs:List[IR], file_id_gen: FileIdGen, inpu
# sometimes a command can have both a file resource and an ephemeral resources (example: spell oneliner)
continue

# for worker, graph in worker_subgraph_pairs:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would delete this

# print(to_shell(graph, config.pash_args), file=sys.stderr)

return main_graph, worker_subgraph_pairs

def prepare_graph_for_remote_exec(filename:str, get_worker:Callable):
Expand Down
15 changes: 14 additions & 1 deletion compiler/pash_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import definitions.ir.nodes.r_split as r_split
import definitions.ir.nodes.r_unwrap as r_unwrap
import definitions.ir.nodes.dgsh_tee as dgsh_tee
import definitions.ir.nodes.remote_pipe as remote_pipe
import definitions.ir.nodes.dfs_split_reader as dfs_split_reader
# Distirbuted Exec
import dspash.hdfs_utils as hdfs_utils
Expand Down Expand Up @@ -721,14 +722,20 @@ def add_eager_nodes(graph, use_dgsh_tee):
intermediateFileIdGen = FileIdGen(0, runtime_config['eager_intermediate_prefix'])

## Get the next nodes
workset = [node for source_node_id in source_node_ids for node in graph.get_next_nodes(source_node_id)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be tempted to think that there was a reason why this was here. Maybe we don't want to start from all source_node_ids? Is there a particular reason you deleted this? The reason why I am asking is because regressions here cannot be caught by any test (the compiler won't fail most likely but maybe it won't optimize that much).

workset = source_node_ids
visited = set()
while (len(workset) > 0):
curr_id = workset.pop(0)
curr = graph.get_node(curr_id)

if (not curr_id in visited):
visited.add(curr_id)
next_node_ids = graph.get_next_nodes(curr_id)

# Skip if this is the last node
if not next_node_ids:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, is that necessary or an optimization?

continue

workset += next_node_ids

## TODO: Make sure that we don't add duplicate eager nodes
Expand Down Expand Up @@ -764,6 +771,12 @@ def add_eager_nodes(graph, use_dgsh_tee):
for edge_id in eager_input_ids:
add_eager(edge_id, graph, fileIdGen, intermediateFileIdGen, use_dgsh_tee)

## Add an eager after remote_pipe
if(isinstance(curr, remote_pipe.RemotePipe) and curr.is_remote_read()):
eager_input_ids = curr.outputs
for edge_id in eager_input_ids:
add_eager(edge_id, graph, fileIdGen, intermediateFileIdGen, use_dgsh_tee)

return graph


Expand Down