-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathmakeflow_blast
executable file
·312 lines (266 loc) · 9.95 KB
/
makeflow_blast
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
#!/usr/bin/python
# CCTOOLS_PYTHON_VERSION 2.7 2.6
# Copyright (C) 2011- The University of Notre Dame
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.
#
# This program generates makeflows to parallelize the
# popular blastall program.
#
import sys, re, string, optparse, subprocess, os, logging as log, stat
class PassThroughParser(optparse.OptionParser):
def _process_args(self, largs, rargs, values):
while rargs:
try:
optparse.OptionParser._process_args(self,largs,rargs,values)
except (optparse.BadOptionError,optparse.AmbiguousOptionError), e:
largs.append(e.opt_str)
#Parser initialization
def initialize_blast_parser(parser):
blast = optparse.OptionGroup(parser, "BLAST Options")
blast.add_option('-p', dest='process', help='Program Name [String]')
blast.add_option('-d', dest='database', help='Database [String] default = nr')
blast.add_option('-i', dest='query', help='Query File [File In]')
blast.add_option('-o', dest='output', help='BLAST report Output File [File Out]')
parser.add_option_group(blast)
def initialize_makeflow_parser(parser):
makeflow = optparse.OptionGroup(parser, "Makeflow Preparation Options")
makeflow.add_option('--makeflow', dest='makeflow',help='Makeflow destination file [stdout]')
makeflow.add_option('--makeflow-config', dest='config',help='Makeflow Configuration File')
makeflow.add_option('--num_seq', dest='num_seq',help='Determines number of sequences per partition [1000]',default=1000,type=int)
makeflow.add_option('--verbose', dest='verbose',help='Show verbose level output',action='store_true',default="False")
parser.add_option_group(makeflow)
#Helper function to find executables in path
def search_file(filename, search_path, pathsep=os.pathsep):
""" Given a search path, find file with requested name """
for path in string.split(search_path, pathsep):
candidate = os.path.join(path, filename)
if os.path.exists(candidate): return os.path.abspath(candidate)
return None
#Counts number of splits that will be made for makeflow creation
def count_splits(num_seq, query):
log.info("Counting Number of Partitions")
FILE = open(query, "r")
num_chars = 0
num_queries = 0
num_jobs = 1
for line in FILE:
if(re.search('^>', line)):
if(num_queries > (num_seq - 1)):
num_jobs = num_jobs + 1
num_queries = 1
if ( num_jobs % 10 == 0 ):
log.info("\tCurrent Number of Partitions : {0}".format(num_jobs))
else:
num_queries = num_queries + 1
FILE.close()
log.info("\tTotal Number of Partitions : {0}".format(num_jobs))
return num_jobs
def write_split_fasta(destination):
split_fasta = open(destination, "w")
try:
log.info("\tWriting Partitioning Script to : {0}".format(destination))
split_fasta.write('''#! /usr/bin/env python
# Copyright (C) 2011- The University of Notre Dame
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.
#
# This simple script splits a fasta file into pieces
#
import sys, re
if(len(sys.argv) < 3) :
sys.stderr.write("Usage: split_fasta.py query_granularity fasta_file")
sys.exit(1)
num_seq = sys.argv[1]
query = sys.argv[2]
num_chars = 0
num_queries = 0
num_jobs = 0
FILE = open(query, "r")
OF = open(query + "." + str(num_jobs), "w")
for line in FILE:
if(re.search('^>', line)):
if( num_queries > (int(num_seq) - 1) ):
OF.close()
num_jobs = num_jobs + 1
num_queries = 0
OF = open(query + "." + str(num_jobs), "w")
OF.write(line)
else :
OF.write(line)
num_queries += 1
else :
OF.write(line)
FILE.close()
OF.close()
sys.exit(0)
''')
finally:
split_fasta.close()
def write_cat_blast(destination):
cat_blast = open(destination, "w")
try:
log.info("\tWriting Joingin Script to : {0}".format(destination))
cat_blast.write('''#!/usr/bin/perl
# Copyright (C) 2013- The University of Notre Dame
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.
my $output = shift;
if (-e $output) {
unlink($output) || die \"could not delete $output\";
}
open $OUT,'>>',$output or die(\"Could not open \" + $output + \" file.\");;
$file = 0;
$max = scalar @ARGV;
while ($in = shift) {
++$file;
open(IN,$in) or die(\"Could not open \" + $in + \" file.\");
@size = <IN>;
$lines = @size;
close(IN);
open(IN,$in) or die(\"Could not open \" + $in + \" file.\");
$print = 1;
while(my $line = <IN>) {
if (($line =~ /BLAST/) and ($file ne 1)) { $print = 0;}
if (($line =~ /Query/) and ($print eq 0)) { $print = 1; }
last if (($line =~ /^\s+Database:/) and ($file lt $max));
if ($print gt 0){
print { $OUT } $line;
}
}
close (IN);
}
close (OUT);
''')
finally:
cat_blast.close()
def write_makeflow(destination):
if destination:
makeflow = open(destination, 'w')
log.info("Writing Makeflow to : {0}".format(destination))
else:
makeflow = sys.stdout
log.info("Writing Makeflow to : STDOUT")
try:
if options.config:
log.info("Reading Configuration from : {0}".format(configuration))
config = open(options.config, 'r')
makeflow.write(config.read())
inputs = []
outputs = []
errors = []
num_splits = count_splits(options.num_seq, options.query)
for i in range(num_splits):
inputs.append("{0}.{1}".format(options.query,i))
outputs.append("{0}.{1}.out".format(options.query,i))
errors.append("{0}.{1}.err".format(options.query,i))
log.info("Writing Partitioning Rules")
#Here we actually start generating the Makeflow
#How to get inputs
makeflow.write("\n\n{0} : {1} {2}".format(' '.join(inputs), options.query, split_name))
makeflow.write("\n\tLOCAL ./{0} {1} {2}".format(split_name, options.num_seq, options.query))
log.info("Writing BLAST Rules")
#How to get outputs
db_split = re.split("/",options.database)
database = db_split[0]
for i in range(num_splits):
makeflow.write("\n\n{0} {1} : {2} {3} {4}".format(
outputs[i], errors[i], "blastall", inputs[i],
options.database))
makeflow.write("\n\t./blastall -p {0} -d {1}/{1} -i {2} -o {3} 2> {4}".format(
options.process, options.database, inputs[i],
outputs[i], errors[i]))
log.info("Writing Joining Rules")
#How to concatenate and cleanup outputs (very naive)
makeflow.write("\n\n{0} : {1} {2}".format(options.output, cat_name, ' '.join(outputs)))
makeflow.write("\n\tLOCAL ./{0} {1} {2}".format(cat_name, options.output, ' '.join(outputs)))
#How to concatenate and cleanup errors (very naive)
makeflow.write("\n\n{0}.err : {1}".format(options.output, ' '.join(errors)))
makeflow.write("\n\tLOCAL cat {0} > {1}.err\n".format(' '.join(errors), options.output))
finally:
if options.makeflow is not None:
makeflow.close()
#Start of Main function
parser = optparse.OptionParser(usage="usage: %prog -p process -i input -o output -d ref_db [options]")
initialize_blast_parser(parser)
initialize_makeflow_parser(parser)
(options, args) = parser.parse_args()
if options.verbose == True:
log.basicConfig(format="%(levelname)s: %(message)s", level=log.DEBUG)
log.info("Verbose output.")
else:
log.basicConfig(format="%(levelname)s: %(message)s")
if not options.query:
log.error("Query Fasta required : -i")
sys.exit(4)
else:
log.info("Query Fasta : {0}".format(options.query))
if not options.database:
log.error("Reference Database required : -d")
sys.exit(4)
else:
log.info("Reference Database : {0}".format(options.database))
if not options.process:
log.error("Process required : -p")
sys.exit(4)
else:
log.info("Selected Process : {0}".format(options.process))
log.info("Partition Size : {0}".format(options.num_seq))
#find blast process and legacy_blast.py
path = os.getenv("PATH")
#log.info("Searching PATH for {0}".format(options.process))
#process = search_file(options.process, path)
#if os.path.exists("./{0}".format(options.process)):
# log.info("{0} found locally")
#elif process and not os.path.exists("./{0}".format(options.process)):
# os.symlink(process, options.process)
# log.info("Process linked from : {0}".format(process))
#elif not os.path.exists("./{0}".format(options.process)):
# log.error("Unable to find process : {0}".format(options.process))
# sys.exit(5)
#log.info("Searching PATH for {0}".format("legacy_blast.pl"))
#legacy = search_file("legacy_blast.pl", path)
#if os.path.exists("./legacy_blast.pl"):
# log.info("legacy_blast.pl found locally")
#if legacy and not os.path.exists("./legacy_blast.pl"):
# os.symlink(legacy, "legacy_blast.pl")
# log.info("legacy_blast.pl linked from : " + legacy)
#elif not os.path.exists("./legacy_blast.pl"):
# log.error("Unable to find legacy_blast.pl")
# sys.exit(5)
#Find blast db
dbpath = path
if os.getenv("BLASTDB"):
dbpath += os.pathsep + os.getenv("BLASTDB")
log.info("Searching PATH and BLASTDB for database {0}".format(options.database))
if os.path.exists("./{0}".format(options.database)):
log.info("Database {0} found locally".format(options.database))
else:
db = search_file("{0}.nal".format(options.database), dbpath)
os.mkdir(options.database)
if db and not os.path.exists("./{0}.nal".format(options.database)):
dbpath = os.path.dirname(db)
for fn in os.listdir(dbpath):
oloc = "{0}/{1}\n".format(dbpath, fn)
print "{0}\n".format(oloc)
if options.database in fn:
os.symlink(oloc, "{0}/{1}".format(options.database, fn))
log.info("Databse linked from : {0}/{1}".format(dbpath, options.database))
elif not os.path.exists("./{0}".format(options.database)):
log.error("Unable to find database : {0}".format(options.database))
sys.exit(5)
split_name = "split_fasta"
if os.path.exists(split_name):
os.remove(split_name)
write_split_fasta(split_name)
st = os.stat(split_name)
os.chmod(split_name, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
log.info("{0} made executable".format(split_name))
cat_name = "cat_blast"
if os.path.exists(cat_name):
os.remove(cat_name)
write_cat_blast(cat_name)
st = os.stat(cat_name)
os.chmod(cat_name, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
log.info("{0} made executable".format(cat_name))
write_makeflow(options.makeflow)