-
Notifications
You must be signed in to change notification settings - Fork 1
/
submitter.py
154 lines (119 loc) · 4.79 KB
/
submitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from copy import deepcopy
from time import sleep
from itertools import product
import argparse as ap
from subprocess import check_output
from os import path, mkdir
def submit(batch, program_args, global_options, task_options, dry_run = None):
'''
Submit a job to the batch system.
Parameters
----------
batch : str
A string containing the framework of the batch script, with substitutable fields in
curly braces.
Fields that should be defined:
- "name", which is appended to with the task number
Extra fields which can be specified are:
- "task_idx", incremented for each task submitted
program_args : string or list
A string representing the command line options, with substitutable fields in braces.
If a list is passed, a CSV file will be written with the options.
global_options : dict
A dict containing keys corresponding to the fields in curly braces, with the default
options for all tasks
task_options : dict
A dict with keys corresponding to the parameters in curly braces, to be substituted
for each batch task submitted
Returns
-------
'''
if dry_run is None:
parser = ap.ArgumentParser()
parser.add_argument('--dry-run',help='output batch script but do not submit',
action="store_true")
args = parser.parse_args()
dry_run = args.dry_run
for k in global_options:
if isinstance(program_args, str):
program_arg_str = program_args
else:
program_arg_str = ''.join('{%s}'%x for x in program_args)
if '{'+str(k)+'}' not in batch + program_arg_str:
print('Warning: key %s from global_options not used.' % k)
for n,d in enumerate(task_options):
# be nice to the scheduler
if n > 0 and not dry_run:
sleep(1)
tmp_vals = deepcopy(global_options)
tmp_vals.update(d)
tmp_vals.update({'task_idx':n})
# append job index to the job name
tmp_vals['name'] = '{:d}_'.format(n) + tmp_vals['name']
# also make sure we don't overwrite the actual curly braces we want
# (this is kind of ridiculous)
tmp_vals['SLURM_ARRAY_TASK_ID'] = '{SLURM_ARRAY_TASK_ID}'
batch_out = batch.format(**tmp_vals).format(**tmp_vals)
if args.dry_run:
print('\n\n##############################################')
print('TASK %d\n' % n)
print('================ BATCH SCRIPT ================\n')
print(batch_out)
else:
# create output directory if it doesn't exist
out_path = tmp_vals['output_dir'].rstrip('/')
to_make = []
while out_path and not path.exists(out_path):
to_make.append(out_path)
out_path = path.split(out_path)[0]
for p in to_make[::-1]:
mkdir(p)
with open(path.join(tmp_vals['output_dir'], tmp_vals['name'])+'.batch','x') as f:
f.write(batch_out)
if args.dry_run:
print('================ PROGRAM ARGUMENTS ================\n')
expanded = expand_vals([tmp_vals])
for nv,v in enumerate(expanded):
v.update({'array_idx':nv})
if isinstance(program_args, str):
o = program_args.format(**v)
else:
o = ''
for option_name in program_args:
o += '%s,%s\n' % (option_name, v[option_name])
if args.dry_run:
print(nv, ':\n', o, sep='', end = '\n\n')
else:
with open(path.join(tmp_vals['output_dir'],str(n)+'_'+str(nv)+'.opts'),'w') as f:
f.write(o)
nv = len(expanded)
command = ['sbatch','--array=0-%d' % (nv-1)]
if args.dry_run:
print('\n================ SUBMIT COMMAND ================\n')
print('Command:',' '.join(command))
else:
print(check_output(command, input=batch_out, universal_newlines=True), end='')
print('(%d tasks)' % nv)
def expand_vals(vals):
'''
Take dictionary or a list of dictionaries, for which some values may be lists, and expand
them as a list of dictionaries for which the values are taken from the "tensor product" of
the lists.
'''
if isinstance(vals,dict):
vals = [vals]
out_dicts = []
for d in vals:
l = []
for k,v in d.items():
if isinstance(v,str):
v = [v]
try:
v = iter(v)
except TypeError:
v = [v]
l.append([(k,str(x)) for x in v])
p = product(*l)
for val_set in p:
out_dicts.append(dict(val_set))
return out_dicts