forked from cumbof/pyRNAcentral
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrnacentral.py
190 lines (179 loc) · 9.21 KB
/
rnacentral.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/usr/bin/env python
# http://rnacentral.org/api
import sys, os, optparse, requests, json
### app ###
__version__ = '1.0.0'
#### RNAcentral ####
__rnacentral_api_url__ = 'http://rnacentral.org/api/v1/rna/'
__rnacentral_api_xrefs_url__ = 'http://rnacentral.org/api/v1/rna/__ID__/xrefs'
__rnacentral_api_version__ = 'v1'
### exit codes ###
ERR_EXIT_CODE = 2
OK_EXIT_CODE = 0
def raiseException( exitcode, message, output_dir_path, errorfilename=None ):
if errorfilename != None:
errorfilepath = os.path.join( output_dir_path, errorfilename + '_txt' )
with open(errorfilepath, 'w') as out:
out.write(message)
sys.exit(exitcode)
def format_metadata( xrefs_json_content ):
metadata = { }
result_count = 0
if 'results' in xrefs_json_content:
result_count += 1
for result in xrefs_json_content['results']:
for attribute in result:
if isinstance( result[attribute], dict ): # collapse dictionaries
dict_value = dict(result[attribute])
for dict_attr in dict_value:
extended_attribute = attribute + '__' + dict_attr
if not extended_attribute in metadata:
metadata[extended_attribute] = [ ]
if result_count > 1:
metadata[extended_attribute] = [ 'None' ] * ( result_count - 1 )
metadata[extended_attribute].append( str(dict_value[dict_attr]) )
elif isinstance( result[attribute], list ): # skip arrays
continue
else:
if not attribute in metadata:
metadata[attribute] = [ ]
if result_count > 1:
metadata[attribute] = [ 'None' ] * ( result_count - 1 )
metadata[attribute].append( str(result[attribute]) )
# fix the arrays size
for attribute in metadata:
if len(metadata[attribute]) < result_count:
metadata[attribute].append( 'None' )
result_count += 1
return metadata, result_count-1
# rnacentral_id is case sensitive
def query_rnacentral( options, args, rnacentral_ids ):
fasta_dir_path = options.fastadir
metadata_dir_path = options.metadir
# set the content type to application/json
headers = {'Content-type': 'application/json'}
# create a session
session = requests.Session()
for rnacentral_id in rnacentral_ids:
rnacentral_id = rnacentral_id.split('_')[0]
# make a get request to the rnacentral apis
query_url = __rnacentral_api_url__ + rnacentral_id
req = session.get(query_url, headers=headers)
resp_code = req.status_code
#print(str(req.content)+"\n\n");
if resp_code == requests.codes.ok:
resp_content = str(req.content)
# convert out to json
json_content = json.loads(resp_content)
# status variable
something_wrong = False
# create a metadata file for the current rnacentral_id
metadata_file_path = os.path.join( metadata_dir_path, rnacentral_id + '.tsv' )
open(metadata_file_path, 'a').close()
metadata_xrefs_url = __rnacentral_api_xrefs_url__.replace( '__ID__', rnacentral_id )
xrefs_req = session.get(metadata_xrefs_url, headers=headers)
xrefs_resp_code = xrefs_req.status_code
if xrefs_resp_code == requests.codes.ok:
xrefs_resp_content = str(xrefs_req.content)
xrefs_json_content = json.loads(xrefs_resp_content)
metadata, levels = format_metadata( xrefs_json_content )
if len(metadata) > 0:
#print metadata;
#print('levels: ' + str(levels));
# write metadata on metadata_file_path
metadata_file = open(metadata_file_path, 'w')
header_line = ''
for header_attribute in metadata.keys():
header_line += header_attribute + '\t'
metadata_file.write( '%s\n' % header_line.strip() )
for level in range(0, levels):
metadata_file.write( '%s\n' % '\t'.join(metadata[attribute][level] for attribute in metadata ) )
metadata_file.close()
else:
something_wrong = True
# create a fasta file for the current rnacentral_id
fasta_file_path = os.path.join( fasta_dir_path, rnacentral_id + '.fasta' )
fasta_file = open(fasta_file_path, 'w')
fasta_file.write( '> %s\n' % rnacentral_id )
# each line of a sequence should have fewer than 80 characters
# use 60 as limit
chunks, chunk_size = len( json_content['sequence'] ), 60
seq_split = [ json_content['sequence'][i:i+chunk_size] for i in range(0, chunks, chunk_size) ]
for seq_part in seq_split:
fasta_file.write( '%s\n' % seq_part )
fasta_file.close()
if not something_wrong:
yield rnacentral_id, OK_EXIT_CODE
else:
yield rnacentral_id, ERR_EXIT_CODE
else:
yield rnacentral_id, ERR_EXIT_CODE
def retrieve_data( options, args ):
errorfile = None
if options.errorfile:
errorfile = str(options.errorfile)
rnacentral_ids = [ ]
if options.id:
if ' ' in options.id or '\t' in options.id:
print( 'Error: the RNAcentral ID is not well formatted' )
return raiseException( ERR_EXIT_CODE, 'Error: the RNAcentral ID is not well formatted', './', errorfile )
rnacentral_ids.append( options.id )
elif options.file:
with open(options.file) as file:
for line in file:
line = line.strip()
if line != '':
if ' ' in line or '\t' in line:
print( 'Error: the input file is not well formatted' )
return raiseException( ERR_EXIT_CODE, 'Error: the input file is not well formatted', './', errorfile )
rnacentral_ids.append( line )
if len(rnacentral_ids) > 0:
for rnacentral_id, exit_code in query_rnacentral( options, args, rnacentral_ids ):
if exit_code == 0:
print( '> %s processed' % str(rnacentral_id) )
else:
print( '> an error has occurred while processing %s has been correctly processed' % str(rnacentral_id) )
return OK_EXIT_CODE
else:
print( 'Error: at least one RNAcentral ID shoud be specified' )
return raiseException( ERR_EXIT_CODE, 'Error: at least one RNAcentral ID shoud be specified', './', errorfile )
def __main__():
# Parse the command line options
# create a fasta file and a metadata file for each of the input rnacentral ids
usage = 'Usage: \n\t1. rnacentral.py --file file_path --fastadir fasta_dir_path --metadir metadata_dir_path\n\t2. rnacentral.py --id rnacentral_id --fastadir fasta_dir_path --metadir metadata_dir_path'
parser = optparse.OptionParser(usage = usage)
parser.add_option('-v', '--version', action='store_true', dest='version',
default=False, help='display version and exit')
parser.add_option('-u', '--usage', action='store_true', dest='usage',
default=False, help='display usage')
parser.add_option('-f', '--file', type='string',
action='store', dest='file', help='list of RNAcentral IDs, one for each row')
parser.add_option('-i', '--id', type='string',
action='store', dest='id', help='RNAcentral id')
parser.add_option('-o', '--fastadir', type='string', default='./',
action='store', dest='fastadir', help='output directory (collection) path for fasta files')
parser.add_option('-m', '--metadir', type='string', default='./',
action='store', dest='metadir', help='output directory (collection) path for metadata files')
parser.add_option('-r', '--errorfile', type='string', default='error_txt',
action='store', dest='errorfile', help='error file name containing error messages')
(options, args) = parser.parse_args()
if options.version:
print( 'Tool: %s \nAPI: %s' % ( __version__, __rnacentral_api_version__ ) )
elif options.usage:
print( usage )
else:
if options.file and options.id:
print( '--file and --id parameters can\'t be used at the same time' )
elif not options.file and not options.id:
print( 'specify at least one parameter between --file and --id' )
else:
fasta_dir_path = options.fastadir
# if fasta_dir_path does not exist -> create directory
if not os.path.exists(fasta_dir_path):
os.makedirs(fasta_dir_path)
metadata_dir_path = options.metadir
# if fasta_dir_path does not exist -> create directory
if not os.path.exists(metadata_dir_path):
os.makedirs(metadata_dir_path)
return retrieve_data( options, args )
if __name__ == "__main__": __main__()