Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dspash future #707

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions compiler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ def add_common_arguments(parser):
parser.add_argument("--version",
action='version',
version='%(prog)s {version}'.format(version=__version__))
parser.add_argument("--worker_timeout",
help="determines if we will mock a timeout for worker node.",
default="")
parser.add_argument("--worker_timeout_choice",
help="determines which worker node will be timed out.",
default="")
return


Expand Down
12 changes: 9 additions & 3 deletions compiler/dspash/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import argparse
import requests
import time

DISH_TOP = os.environ['DISH_TOP']
PASH_TOP = os.environ['PASH_TOP']
Expand Down Expand Up @@ -128,26 +129,31 @@ def manage_connection(conn, addr):
if not data:
break

print("got new request")
request = decode_request(data)

print("got new request")
request = decode_request(data)
if request['type'] == 'Exec-Graph':
graph, shell_vars, functions = parse_exec_graph(request)
debug = True if request['debug'] else False
save_configs(graph, dfs_configs_paths)
time.sleep(int(request['worker_timeout']))
rc = exec_graph(graph, shell_vars, functions, debug)
rcs.append((rc, request))
body = {}
elif request['type'] == 'Done':
print("Received 'Done' signal. Closing connection from the worker.")
break
else:
print(f"Unsupported request {request}")
send_success(conn, body)
print("connection ended")

# Ensure subprocesses have finished, and releasing corresponding resources
for rc, request in rcs:
if request['debug']:
send_log(rc, request)
else:
rc.wait()
print("connection ended")


def parse_args():
Expand Down
1 change: 0 additions & 1 deletion compiler/dspash/worker.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/bin/bash

# trap ctrl-c and call ctrl_c()
trap cleanup INT

Expand Down
74 changes: 55 additions & 19 deletions compiler/dspash/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,34 @@ def get_running_processes(self):
# answer = self.socket.recv(1024)
return self._running_processes

def send_graph_exec_request(self, graph, shell_vars, functions, debug=False) -> bool:
def send_graph_exec_request(self, graph, shell_vars, functions, debug=False, worker_timeout=0) -> bool:
request_dict = { 'type': 'Exec-Graph',
'graph': graph,
'functions': functions,
'shell_variables': None, # Doesn't seem needed for now
'debug': None
'debug': None,
'worker_timeout': worker_timeout
}
if debug:
request_dict['debug'] = {'name': self.name, 'url': f'{DEBUG_URL}/putlog'}

request = encode_request(request_dict)
#TODO: do I need to open and close connection?
log(self._socket, self._port)
send_msg(self._socket, request)
# TODO wait until the command exec finishes and run this in parallel?
response_data = recv_msg(self._socket)
retries = 0
MAX_RETRIES = 2
RETRY_DELAY = 1
self._socket.settimeout(5)
response_data = None
while retries < MAX_RETRIES and not response_data:
try:
response_data = recv_msg(self._socket)
except socket.timeout:
log(f"Timeout encountered. Retry {retries + 1} of {MAX_RETRIES}.")
retries += 1
time.sleep(RETRY_DELAY)
if not response_data or decode_request(response_data)['status'] != "OK":
raise Exception(f"didn't recieved ack on request {response_data}")
else:
Expand All @@ -68,8 +81,9 @@ def send_graph_exec_request(self, graph, shell_vars, functions, debug=False) ->
return True

def close(self):
self._socket.send("Done")
self._socket.send(encode_request({"type": "Done"}))
self._socket.close()
self._online = False

def _wait_ack(self):
confirmation = self._socket.recv(4096)
Expand Down Expand Up @@ -147,22 +161,44 @@ def run(self):
elif request.startswith("Exec-Graph"):
args = request.split(':', 1)[1].strip()
filename, declared_functions_file = args.split()
numExecutedSubgraphs = 0
numTotalSubgraphs = None
crashed_worker = workers_manager.args.worker_timeout_choice if workers_manager.args.worker_timeout_choice != '' else "worker1" # default to be worker1
try:
while not numTotalSubgraphs or numExecutedSubgraphs < numTotalSubgraphs:
# In the naive fault tolerance, we want all workers to receive its subgraph(s) without crashing
# if a crash happens, we'll re-split the IR and do it again until scheduling is done without any crash.
numExecutedSubgraphs = 0
worker_subgraph_pairs, shell_vars, main_graph = prepare_graph_for_remote_exec(filename, workers_manager.get_worker)
if numTotalSubgraphs == None:
numTotalSubgraphs = len(worker_subgraph_pairs)
script_fname = to_shell_file(main_graph, workers_manager.args)
log("Master node graph stored in ", script_fname)

# Read functions
log("Functions stored in ", declared_functions_file)
declared_functions = read_file(declared_functions_file)

# Execute subgraphs on workers
for worker, subgraph in worker_subgraph_pairs:
worker_timeout = workers_manager.args.worker_timeout if worker.name == crashed_worker and workers_manager.args.worker_timeout else 0

try:
worker.send_graph_exec_request(subgraph, shell_vars, declared_functions, workers_manager.args.debug, worker_timeout)
numExecutedSubgraphs += 1
except Exception as e:
# worker timeout
worker.close()
log(f"{worker} closed")
# Report to main shell a script to execute
# Delay this to the very end when every worker has received the subgraph
response_msg = f"OK {script_fname}"
dspash_socket.respond(response_msg, conn)
except Exception as e:
print(e)



worker_subgraph_pairs, shell_vars, main_graph = prepare_graph_for_remote_exec(filename, workers_manager.get_worker)
script_fname = to_shell_file(main_graph, workers_manager.args)
log("Master node graph stored in ", script_fname)

# Read functions
log("Functions stored in ", declared_functions_file)
declared_functions = read_file(declared_functions_file)

# Report to main shell a script to execute
response_msg = f"OK {script_fname}"
dspash_socket.respond(response_msg, conn)

# Execute subgraphs on workers
for worker, subgraph in worker_subgraph_pairs:
worker.send_graph_exec_request(subgraph, shell_vars, declared_functions, workers_manager.args.debug)
else:
raise Exception(f"Unknown request: {request}")

Expand Down
10 changes: 10 additions & 0 deletions compiler/pash_init_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export pash_parallel_pipelines=0
export pash_daemon_communicates_through_unix_pipes_flag=0
export show_version=0
export distributed_exec=0
export worker_timeout=0
export worker_timeout_choice=0

for item in "$@"
do
Expand Down Expand Up @@ -102,6 +104,14 @@ do
if [ "--distributed_exec" == "$item" ]; then
export distributed_exec=1
fi

if [ "--worker_timeout" == "$item" ]; then
export worker_timeout=1
fi

if [ "--worker_timeout_choice" == "$item" ]; then
export worker_timeout_choice=1
fi
done

## `pash_redir_output` and `pash_redir_all_output` are strictly for logging.
Expand Down
Loading