Skip to content

Commit

Permalink
Put temporary files in subdirectory of specified location to ensure m…
Browse files Browse the repository at this point in the history
…ultiple PWGS instances do not collide.
  • Loading branch information
jwintersinger committed Nov 9, 2015
1 parent fcd9b45 commit 4bfd4b0
Showing 1 changed file with 42 additions and 22 deletions.
64 changes: 42 additions & 22 deletions evolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
from printo import *

import argparse
import threading
import signal
import tempfile
import threading
import traceback
from datetime import datetime

# num_samples: number of MCMC samples
# mh_itr: number of metropolis-hasting iterations
# rand_seed: random seed (initialization). Set to None to choose random seed automatically.
def start_new_run(state_manager, backup_manager, safe_to_exit, run_succeeded, ssm_file, cnv_file, top_k_trees_file, clonal_freqs_file, burnin_samples, num_samples, mh_itr, mh_std, write_state_every, write_backups_every, rand_seed, tmp_dir):
def start_new_run(state_manager, backup_manager, safe_to_exit, run_succeeded, config, ssm_file, cnv_file, top_k_trees_file, clonal_freqs_file, burnin_samples, num_samples, mh_itr, mh_std, write_state_every, write_backups_every, rand_seed, tmp_dir):
state = {}

with open('random_seed.txt', 'w') as seedf:
Expand Down Expand Up @@ -104,9 +105,9 @@ def start_new_run(state_manager, backup_manager, safe_to_exit, run_succeeded, ss
with open('mcmc_samples.txt', 'w') as mcmcf:
mcmcf.write('Iteration\tLLH\tTime\n')

do_mcmc(state_manager, backup_manager, safe_to_exit, run_succeeded, state, tree_writer, codes, n_ssms, n_cnvs, NTPS, tmp_dir)
do_mcmc(state_manager, backup_manager, safe_to_exit, run_succeeded, config, state, tree_writer, codes, n_ssms, n_cnvs, NTPS, tmp_dir)

def resume_existing_run(state_manager, backup_manager, safe_to_exit, run_succeeded):
def resume_existing_run(state_manager, backup_manager, safe_to_exit, run_succeeded, config):
# If error occurs, restore the backups and try again. Never try more than two
# times, however -- if the primary file and the backup file both fail, the
# error is unrecoverable.
Expand All @@ -127,14 +128,19 @@ def resume_existing_run(state_manager, backup_manager, safe_to_exit, run_succeed
codes, n_ssms, n_cnvs = load_data(state['ssm_file'], state['cnv_file'])
NTPS = len(codes[0].a) # number of samples / time point

do_mcmc(state_manager, backup_manager, safe_to_exit, run_succeeded, state, tree_writer, codes, n_ssms, n_cnvs, NTPS, state['tmp_dir'])
do_mcmc(state_manager, backup_manager, safe_to_exit, run_succeeded, config, state, tree_writer, codes, n_ssms, n_cnvs, NTPS, state['tmp_dir'])

def do_mcmc(state_manager, backup_manager, safe_to_exit, run_succeeded, state, tree_writer, codes, n_ssms, n_cnvs, NTPS, tmp_dir):
def do_mcmc(state_manager, backup_manager, safe_to_exit, run_succeeded, config, state, tree_writer, codes, n_ssms, n_cnvs, NTPS, tmp_dir_parent):
start_iter = state['last_iteration'] + 1
unwritten_trees = []
mcmc_sample_times = []
last_mcmc_sample_time = time.time()

# If --tmp-dir is not specified on the command line, it will by default be
# None, which will cause mkdtemp() to place this directory under the system's
# temporary directory. This is the desired behaviour.
config['tmp_dir'] = tempfile.mkdtemp(prefix='pwgsdataexchange.', dir=tmp_dir_parent)

for iteration in range(start_iter, state['num_samples']):
safe_to_exit.set()
if iteration < 0:
Expand Down Expand Up @@ -171,7 +177,7 @@ def do_mcmc(state_manager, backup_manager, safe_to_exit, run_succeeded, state, t
state['cnv_file'],
state['rand_seed'],
NTPS,
tmp_dir
config['tmp_dir']
)
if float(state['mh_acc']) < 0.08 and state['mh_std'] < 10000:
state['mh_std'] = state['mh_std']*2.0
Expand Down Expand Up @@ -268,7 +274,7 @@ def parse_args():
help='Number of Metropolis-Hastings iterations')
parser.add_argument('-r', '--random-seed', dest='random_seed', type=int,
help='Random seed for initializing MCMC sampler')
parser.add_argument('-t', '--tmp-dir', dest='tmp_dir', default='.',
parser.add_argument('-t', '--tmp-dir', dest='tmp_dir',
help='Path to directory for temporary files')
parser.add_argument('ssm_file',
help='File listing SSMs (simple somatic mutations, i.e., single nucleotide variants. For proper format, see README.md.')
Expand All @@ -277,13 +283,13 @@ def parse_args():
args = parser.parse_args()
return args

def run(safe_to_exit, run_succeeded):
def run(safe_to_exit, run_succeeded, config):
state_manager = StateManager()
backup_manager = BackupManager([StateManager.default_last_state_fn, TreeWriter.default_archive_fn])

if state_manager.state_exists():
logmsg('Resuming existing run. Ignoring command-line parameters.')
resume_existing_run(state_manager, backup_manager, safe_to_exit, run_succeeded)
resume_existing_run(state_manager, backup_manager, safe_to_exit, run_succeeded, config)
else:
args = parse_args()
# Ensure input files exist and can be read.
Expand All @@ -301,6 +307,7 @@ def run(safe_to_exit, run_succeeded):
backup_manager,
safe_to_exit,
run_succeeded,
config,
args.ssm_file,
args.cnv_file,
top_k_trees_file=args.top_k_trees,
Expand All @@ -315,20 +322,17 @@ def run(safe_to_exit, run_succeeded):
tmp_dir=args.tmp_dir
)

def remove_tmp_files():
try:
initial_state = StateManager().load_initial_state()
except IOError:
# If user runs with -h to get help on first run, then initial state file
# won't exist.
return
tmp_dir = initial_state['tmp_dir']
def remove_tmp_files(tmp_dir):
tmp_filenames = get_c_fnames(tmp_dir)
for tmpfn in tmp_filenames:
try:
os.remove(tmpfn)
except OSError:
pass
try:
os.rmdir(tmp_dir)
except OSError:
pass

def main():
# Introducing threading is necessary to allow write operations to complete
Expand All @@ -348,11 +352,26 @@ def main():
# Our strategy should be sufficient for the moment, though.
run_succeeded = threading.Event()

# We must know where temporary files are stored from within main() so that we
# can remove them when we exit. However, we don't know this location until
# the run thread starts, as when PWGS resumes an existing run, the parent
# directory for the temporary files is stored in the state pickle file. Thus,
# the run thread will set this value once it is established.
#
# So long as this dictionary is used only as a key-value store for primitive
# objects, it's thread safe and doesn't require the use of a mutex. See
# http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm.
# If more complex values are stored here, we must introduce a mutex.
config = {
'tmp_dir': None
}

def sigterm_handler(_signo, _stack_frame):
logmsg('Signal %s received.' % _signo, sys.stderr)
safe_to_exit.wait()
# Exit with non-zero to indicate run didn't finish.
remove_tmp_files(config['tmp_dir'])
logmsg('Exiting now.')
# Exit with non-zero to indicate run didn't finish.
sys.exit(3)

# SciNet will supposedly send SIGTERM 30 s before hard-killing the process.
Expand All @@ -363,7 +382,7 @@ def sigterm_handler(_signo, _stack_frame):
# data being written. Permit these operations to finish before exiting.
signal.signal(signal.SIGINT, sigterm_handler)

run_thread = threading.Thread(target=run, args=(safe_to_exit, run_succeeded))
run_thread = threading.Thread(target=run, args=(safe_to_exit, run_succeeded, config))
# Thread must be a daemon thread, or sys.exit() will wait until the thread
# finishes execution completely.
run_thread.daemon = True
Expand All @@ -380,11 +399,12 @@ def sigterm_handler(_signo, _stack_frame):
# has expired.
run_thread.join(10)

remove_tmp_files()

remove_tmp_files(config['tmp_dir'])
if run_succeeded.is_set():
logmsg('Run succeeded.')
sys.exit(0)
else:
logmsg('Run failed.')
sys.exit(1)

def logmsg(msg, fd=sys.stdout):
Expand Down

0 comments on commit 4bfd4b0

Please sign in to comment.