Skip to content

Commit

Permalink
Naive Fault Tolerance Support
Browse files Browse the repository at this point in the history
1. Added naive mock of worker crashes through controlled sleep via worker_timeout and worker_timeout_choice flags.
2. Added naive detection of worker crashes in scheduler based on a retry and backoff logic
3. Added naive fault tolerance logic in scheduler - re-split the original IR and distribute to healthy workers to (re)-execute again.
4. Upreved benchmarks based on data branch (though require some additional modifications to make these updates suitable for distr context (will be updated in a separate PR in dish repo).
  • Loading branch information
huangworld committed Nov 19, 2023
1 parent 5dee7dd commit 8544ab8
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 47 deletions.
6 changes: 6 additions & 0 deletions compiler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ def add_common_arguments(parser):
parser.add_argument("--version",
action='version',
version='%(prog)s {version}'.format(version=__version__))
parser.add_argument("--worker_timeout",
help="determines if we will mock a timeout for worker node.",
default="")
parser.add_argument("--worker_timeout_choice",
help="determines which worker node will be timed out.",
default="")
return


Expand Down
12 changes: 9 additions & 3 deletions compiler/dspash/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import argparse
import requests
import time

DISH_TOP = os.environ['DISH_TOP']
PASH_TOP = os.environ['PASH_TOP']
Expand Down Expand Up @@ -128,26 +129,31 @@ def manage_connection(conn, addr):
if not data:
break

print("got new request")
request = decode_request(data)

print("got new request")
request = decode_request(data)
if request['type'] == 'Exec-Graph':
graph, shell_vars, functions = parse_exec_graph(request)
debug = True if request['debug'] else False
save_configs(graph, dfs_configs_paths)
time.sleep(int(request['worker_timeout']))
rc = exec_graph(graph, shell_vars, functions, debug)
rcs.append((rc, request))
body = {}
elif request['type'] == 'Done':
print("Received 'Done' signal. Closing connection from the worker.")
break
else:
print(f"Unsupported request {request}")
send_success(conn, body)
print("connection ended")

# Ensure subprocesses have finished, and releasing corresponding resources
for rc, request in rcs:
if request['debug']:
send_log(rc, request)
else:
rc.wait()
print("connection ended")


def parse_args():
Expand Down
1 change: 0 additions & 1 deletion compiler/dspash/worker.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/bin/bash

# trap ctrl-c and call ctrl_c()
trap cleanup INT

Expand Down
74 changes: 55 additions & 19 deletions compiler/dspash/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,34 @@ def get_running_processes(self):
# answer = self.socket.recv(1024)
return self._running_processes

def send_graph_exec_request(self, graph, shell_vars, functions, debug=False) -> bool:
def send_graph_exec_request(self, graph, shell_vars, functions, debug=False, worker_timeout=0) -> bool:
request_dict = { 'type': 'Exec-Graph',
'graph': graph,
'functions': functions,
'shell_variables': None, # Doesn't seem needed for now
'debug': None
'debug': None,
'worker_timeout': worker_timeout
}
if debug:
request_dict['debug'] = {'name': self.name, 'url': f'{DEBUG_URL}/putlog'}

request = encode_request(request_dict)
#TODO: do I need to open and close connection?
log(self._socket, self._port)
send_msg(self._socket, request)
# TODO wait until the command exec finishes and run this in parallel?
response_data = recv_msg(self._socket)
retries = 0
MAX_RETRIES = 2
RETRY_DELAY = 1
self._socket.settimeout(5)
response_data = None
while retries < MAX_RETRIES and not response_data:
try:
response_data = recv_msg(self._socket)
except socket.timeout:
log(f"Timeout encountered. Retry {retries + 1} of {MAX_RETRIES}.")
retries += 1
time.sleep(RETRY_DELAY)
if not response_data or decode_request(response_data)['status'] != "OK":
raise Exception(f"didn't recieved ack on request {response_data}")
else:
Expand All @@ -68,8 +81,9 @@ def send_graph_exec_request(self, graph, shell_vars, functions, debug=False) ->
return True

def close(self):
self._socket.send("Done")
self._socket.send(encode_request({"type": "Done"}))
self._socket.close()
self._online = False

def _wait_ack(self):
confirmation = self._socket.recv(4096)
Expand Down Expand Up @@ -147,22 +161,44 @@ def run(self):
elif request.startswith("Exec-Graph"):
args = request.split(':', 1)[1].strip()
filename, declared_functions_file = args.split()
numExecutedSubgraphs = 0
numTotalSubgraphs = None
crashed_worker = workers_manager.args.worker_timeout_choice if workers_manager.args.worker_timeout_choice != '' else "worker1" # default to be worker1
try:
while not numTotalSubgraphs or numExecutedSubgraphs < numTotalSubgraphs:
# In the naive fault tolerance, we want all workers to receive its subgraph(s) without crashing
# if a crash happens, we'll re-split the IR and do it again until scheduling is done without any crash.
numExecutedSubgraphs = 0
worker_subgraph_pairs, shell_vars, main_graph = prepare_graph_for_remote_exec(filename, workers_manager.get_worker)
if numTotalSubgraphs == None:
numTotalSubgraphs = len(worker_subgraph_pairs)
script_fname = to_shell_file(main_graph, workers_manager.args)
log("Master node graph stored in ", script_fname)

# Read functions
log("Functions stored in ", declared_functions_file)
declared_functions = read_file(declared_functions_file)

# Execute subgraphs on workers
for worker, subgraph in worker_subgraph_pairs:
worker_timeout = workers_manager.args.worker_timeout if worker.name == crashed_worker and workers_manager.args.worker_timeout else 0

try:
worker.send_graph_exec_request(subgraph, shell_vars, declared_functions, workers_manager.args.debug, worker_timeout)
numExecutedSubgraphs += 1
except Exception as e:
# worker timeout
worker.close()
log(f"{worker} closed")
# Report to main shell a script to execute
# Delay this to the very end when every worker has received the subgraph
response_msg = f"OK {script_fname}"
dspash_socket.respond(response_msg, conn)
except Exception as e:
print(e)



worker_subgraph_pairs, shell_vars, main_graph = prepare_graph_for_remote_exec(filename, workers_manager.get_worker)
script_fname = to_shell_file(main_graph, workers_manager.args)
log("Master node graph stored in ", script_fname)

# Read functions
log("Functions stored in ", declared_functions_file)
declared_functions = read_file(declared_functions_file)

# Report to main shell a script to execute
response_msg = f"OK {script_fname}"
dspash_socket.respond(response_msg, conn)

# Execute subgraphs on workers
for worker, subgraph in worker_subgraph_pairs:
worker.send_graph_exec_request(subgraph, shell_vars, declared_functions, workers_manager.args.debug)
else:
raise Exception(f"Unknown request: {request}")

Expand Down
10 changes: 10 additions & 0 deletions compiler/pash_init_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export pash_parallel_pipelines=0
export pash_daemon_communicates_through_unix_pipes_flag=0
export show_version=0
export distributed_exec=0
export worker_timeout=0
export worker_timeout_choice=0

for item in "$@"
do
Expand Down Expand Up @@ -102,6 +104,14 @@ do
if [ "--distributed_exec" == "$item" ]; then
export distributed_exec=1
fi

if [ "--worker_timeout" == "$item" ]; then
export worker_timeout=1
fi

if [ "--worker_timeout_choice" == "$item" ]; then
export worker_timeout_choice=1
fi
done

## `pash_redir_output` and `pash_redir_all_output` are strictly for logging.
Expand Down
2 changes: 1 addition & 1 deletion evaluation/benchmarks/bio/bio-align/genome-diff.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# bacteria), and any regions with less than 10 supporting reads.

# Requires: samtools, minimap2, bcftools
# Data: http://ndr.md/data/bio/R1.fastq.gz http://ndr.md/data/bio/R2.fastq.gz http://ndr.md/data/bio/ref.fa
# Data: atlas-group.cs.brown.edu/data/bio/R1.fastq.gz atlas-group.cs.brown.edu/data/bio/R2.fastq.gz atlas-group.cs.brown.edu/data/bio/ref.fa

# https://github.com/samtools/samtools/releases/latest
# https://github.com/lh3/minimap2
Expand Down
2 changes: 1 addition & 1 deletion evaluation/benchmarks/bio/bio-align/genquality.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://thegenomefactory.blogspot.com/2019/09/25-reasons-assemblies-dont-make-it-into.html

# Require: csvkit
# Data: http://ndr.md/data/bio/genbank.txt
# Data: atlas-group.cs.brown.edu/data/bio/genbank.txt

IN=./input/genbank.txt
OUT=./output/out.txt
Expand Down
2 changes: 1 addition & 1 deletion evaluation/benchmarks/bio/bio1/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mkdir -p input
mkdir -p output
cd input
if [[ ! -f R1.fastq ]]; then
wget ndr.md/data/bio/{R1.fastq.gz,R2.fastq.gz,ref.fa}
wget atlas-group.cs.brown.edu/data/bio/{R1.fastq.gz,R2.fastq.gz,ref.fa}

gunzip R1.fastq.gz
gunzip R2.fastq.gz
Expand Down
4 changes: 2 additions & 2 deletions evaluation/benchmarks/max-temp/max-temp-preprocess.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/bin/bash

sed 's;^;http://ndr.md/data/noaa/;' |
sed 's;^;atlas-group.cs.brown.edu/data/noaa/;' |
sed 's;$;/;' |
xargs -r -n 1 curl -s |
grep gz |
tr -s ' \n' |
cut -d ' ' -f9 |
sed 's;^\(.*\)\(20[0-9][0-9]\).gz;\2/\1\2\.gz;' |
sed 's;^;http://ndr.md/data/noaa/;' |
sed 's;^;atlas-group.cs.brown.edu/data/noaa/;' |
xargs -n1 curl -s |
gunzip
2 changes: 1 addition & 1 deletion evaluation/benchmarks/max-temp/max-temp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

FROM=${FROM:-2015}
TO=${TO:-2015}
IN=${IN:-'http://ndr.md/data/noaa/'}
IN=${IN:-'atlas-group.cs.brown.edu/data/noaa/'}
fetch=${fetch:-"curl -s"}

seq $FROM $TO |
Expand Down
2 changes: 1 addition & 1 deletion evaluation/benchmarks/max-temp/temp-analytics.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

FROM=${FROM:-2015}
TO=${TO:-2015}
IN=${IN:-'http://ndr.md/data/noaa/'}
IN=${IN:-'atlas-group.cs.brown.edu/data/noaa/'}
fetch=${fetch:-"curl -s"}

data_file=temperatures.txt
Expand Down
2 changes: 1 addition & 1 deletion evaluation/benchmarks/nlp/input/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ setup_dataset() {
cd pg
if [[ "$1" == "--full" ]]; then
echo 'N.b.: download/extraction will take about 10min'
wget ndr.md/data/pg.tar.xz
wget atlas-group.cs.brown.edu/data/pg.tar.xz # FIXME: moving to PG soon
if [ $? -ne 0 ]; then
cat <<-'EOF' | sed 's/^ *//'
Downloading input dataset failed, thus need to manually rsync all books from project gutenberg:
Expand Down
8 changes: 4 additions & 4 deletions evaluation/benchmarks/oneliners/input/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ setup_dataset() {
fi

if [ ! -f ./1M.txt ]; then
curl -sf 'http://ndr.md/data/dummy/1M.txt' > 1M.txt
curl -sf 'atlas-group.cs.brown.edu/data/dummy/1M.txt' > 1M.txt
if [ $? -ne 0 ]; then
echo 'cannot find 1M.txt -- please contact the developers of pash'
exit 1
Expand All @@ -51,7 +51,7 @@ setup_dataset() {
fi

if [ ! -f ./1G.txt ]; then
curl -sf 'http://ndr.md/data/dummy/1G.txt' > 1G.txt
curl -sf 'atlas-group.cs.brown.edu/data/dummy/1G.txt' > 1G.txt
if [ $? -ne 0 ]; then
echo 'cannot find 1G.txt -- please contact the developers of pash'
exit 1
Expand All @@ -61,7 +61,7 @@ setup_dataset() {

# download wamerican-insane dictionary and sort according to machine
if [ ! -f ./dict.txt ]; then
curl -sf 'http://ndr.md/data/dummy/dict.txt' | sort > dict.txt
curl -sf 'atlas-group.cs.brown.edu/data/dummy/dict.txt' | sort > dict.txt
if [ $? -ne 0 ]; then
echo 'cannot find dict.txt -- please contact the developers of pash'
exit 1
Expand All @@ -70,7 +70,7 @@ setup_dataset() {
fi

if [ ! -f ./all_cmds.txt ]; then
curl -sf 'http://ndr.md/data/dummy/all_cmds.txt' > all_cmds.txt
curl -sf 'atlas-group.cs.brown.edu/data/dummy/all_cmds.txt' > all_cmds.txt
if [ $? -ne 0 ]; then
# This should be OK for tests, no need for abort
ls /usr/bin/* > all_cmds.txt
Expand Down
3 changes: 1 addition & 2 deletions evaluation/benchmarks/web-index/input/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ setup_dataset() {
wget $wiki_archive || eexit "cannot fetch wikipedia"
7za x wikipedia-en-html.tar.7z
tar -xvf wikipedia-en-html.tar
wget http://ndr.md/data/wikipedia/index.txt # || eexit "cannot fetch wiki indices"
# It is actually OK if we don't have this index since we download the 500/1000 below
wget atlas-group.cs.brown.edu/data/wikipedia/index.txt # FIXME: we download index below?
fi

if [ "$1" = "--small" ]; then
Expand Down
4 changes: 2 additions & 2 deletions evaluation/intro/input/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cd $(dirname $0)
[ "$1" = "-c" ] && rm-files 100M.txt words sorted_words

if [ ! -f ./100M.txt ]; then
curl -f 'ndr.md/data/dummy/100M.txt' > 100M.txt
curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/100M.txt' > 100M.txt
if [ $? -ne 0 ]; then
curl -f 'http://www.gutenberg.org/files/2600/2600-0.txt' | head -c 1M > 1M.txt
[ $? -ne 0 ] && eexit 'cannot find 1M.txt'
Expand All @@ -20,7 +20,7 @@ if [ ! -f ./100M.txt ]; then
fi

if [ ! -f ./words ]; then
curl -f 'http://ndr.md/data/dummy/words' > words
curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/words' > words
if [ $? -ne 0 ]; then
if [ $(uname) = 'Darwin' ]; then
cp /usr/share/dict/web2 words || eexit "cannot find dict file"
Expand Down
2 changes: 1 addition & 1 deletion evaluation/other/more-scripts/page-count.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

# Require: libimage-exiftool-perl, bc
# Data:
# http://ndr.md/data/dummy/large.pdf
# atlas-group.cs.brown.edu/data/large.pdf
# More data:
# https://arxiv.org/help/bulk_data

Expand Down
2 changes: 1 addition & 1 deletion evaluation/other/more-scripts/spell.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# TODO: `groff is an interesting "pure", whose wrapper only needs split input
# TODO: files carefully.

# Data: http://ndr.md/data/dummy/ronn.1
# Data: atlas-group.cs.brown.edu/data/dummy/ronn.1
# dict depends on the system (and has to be sorted), so we assume it exists
dict=./input/dict.txt

Expand Down
9 changes: 6 additions & 3 deletions evaluation/tests/input/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ esac
[ "$1" = "-c" ] && rm-files 1M.txt all_cmds.txt words sorted_words 10M.txt

if [ ! -f ./1M.txt ]; then
curl -sf 'http://ndr.md/data/dummy/1M.txt' > 1M.txt
curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/1M.txt' > 1M.txt
if [ $? -ne 0 ]; then
curl -sf 'http://www.gutenberg.org/files/2600/2600-0.txt' | head -c 1${head_sz} > 1M.txt
[ $? -ne 0 ] && eexit 'cannot find 1M.txt'
Expand All @@ -26,15 +26,18 @@ fi

if [ ! -f ./all_cmds.txt ]; then
if [ "$(hostname)" = "deathstar" ]; then
curl -sf 'http://ndr.md/data/dummy/all_cmds.txt' > all_cmds.txt || eexit "all_cmds not found"
curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/all_cmds.txt' > all_cmds.txt
if [ $? -ne 0 ]; then
curl -f 'https://zenodo.org/record/7650885/files/all_cmds.txt' > all_cmds.txt || eexit "all_cmds not found"
fi
else
ls /usr/bin/* > all_cmds.txt
fi
append_nl_if_not ./all_cmds.txt
fi

if [ ! -f ./words ]; then
curl -sf 'http://ndr.md/data/dummy/words' > words
curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/words' > words
if [ $? -ne 0 ]; then
if [ $(uname) = 'Darwin' ]; then
cp /usr/share/dict/web2 words || eexit "cannot find dict file"
Expand Down
6 changes: 3 additions & 3 deletions evaluation/tests/sed-test.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
cat $PASH_TOP/evaluation/tests/input/1M.txt |
sed 's;^d;da;' |
sed 's;^;http://ndr.md/data/noaa/;' |
sed 's;^;atlas-group.cs.brown.edu/data/noaa/;' |
sed 's;$;/;' |
sed 's;^\(.*\)\(20[0-9][0-9]\).gz;\2/\1\2\.gz;' |
sed 's;^;http://ndr.md/data/noaa/;' |
sed 's;^;atlas-group.cs.brown.edu/data/noaa/;' |
sed "s#^#$WIKI#" |
sed s/\$/'0s'/ |
sed 1d |
sed 4d |
sed "\$d"
sed "\$d"

0 comments on commit 8544ab8

Please sign in to comment.