-
Notifications
You must be signed in to change notification settings - Fork 3
/
analyze_shanoir_db.py
189 lines (148 loc) · 9.43 KB
/
analyze_shanoir_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import time
from datetime import datetime
from pathlib import Path
import logging
import shutil
import requests
import pydicom
import pandas
import numpy as np
import shanoir_downloader
from py7zr import pack_7zarchive, unpack_7zarchive
# register 7zip file format
shutil.register_archive_format('7zip', pack_7zarchive, description='7zip archive')
shutil.register_unpack_format('7zip', ['.7z'], unpack_7zarchive)
Path.ls = lambda x: sorted(list(x.iterdir()))
parser = shanoir_downloader.create_arg_parser()
parser.add_argument('-u', '--username', required=True, help='Your shanoir username.')
parser.add_argument('-d', '--domain', default='shanoir.irisa.fr', help='The shanoir domain to query.')
parser.add_argument('-ids', '--dataset_ids', required=True, help='Path to a csv or tsv file containing the dataset ids to download (with the columns "sequence_id" and "subject_name").')
parser.add_argument('-of', '--output_folder', required=True, help='The destination folder where files will be downloaded.')
parser.add_argument('-mt', '--max_tries', type=int, default=10, help='The number of times to try a download before giving up.')
parser.add_argument('-ue', '--unrecoverable_errors', default=['status_code_404', 'anonymization_error', 'zip_compression_error', 'encryption_error'], nargs='*', help='The errors which should not trigger a new download.')
parser.add_argument('-dids', '--downloaded_datasets', default=None, help='Path to a tsv file containing the already downloaded datasets (generated by this script). Creates the file "downloaded_datasets.tsv" in the given output_folder by default. If the file already exists, it will be taken into account and updated with the new downloads.')
parser.add_argument('-mids', '--missing_datasets', default=None, help='Path to a tsv file containing the missing datasets (generated by this script). Creates the file "missings_datasets.tsv" in the given output_folder by default. If the file already exists, it will be taken into account and updated with the new errors.')
shanoir_downloader.add_configuration_arguments(parser)
args = parser.parse_args()
config = shanoir_downloader.initialize(args)
all_datasets = None
datasets_dtype = {'sequence_id': str, 'shanoir_name': str, 'series_description': str}
missing_datasets_dtype = {'sequence_id': str, 'n_tries': np.int64}
if args.dataset_ids.endswith('.csv') or args.dataset_ids.endswith('.tsv') or args.dataset_ids.endswith('.txt'):
all_datasets = pandas.read_csv(args.dataset_ids, sep=',' if args.dataset_ids.endswith('.csv') else '\t', dtype=datasets_dtype)
else:
all_datasets = pandas.read_excel(args.dataset_ids, dtype=datasets_dtype)
all_datasets.set_index('sequence_id', inplace=True)
output_folder = Path(config['output_folder'])
# Create missing_datasets and downloaded_datasets tsv files
missing_datasets_path = output_folder / f'missing_datasets.tsv' if args.missing_datasets is None else Path(args.missing_datasets)
downloaded_datasets_path = output_folder / f'downloaded_datasets.tsv' if args.downloaded_datasets is None else Path(args.downloaded_datasets)
missing_datasets = pandas.DataFrame(columns=['sequence_id', 'reason', 'message', 'n_tries']) if not missing_datasets_path.exists() else pandas.read_csv(str(missing_datasets_path), sep='\t', dtype=missing_datasets_dtype)
missing_datasets.set_index('sequence_id', inplace=True)
downloaded_datasets = pandas.DataFrame(columns=['sequence_id']) if not downloaded_datasets_path.exists() else pandas.read_csv(str(downloaded_datasets_path), sep='\t', dtype=datasets_dtype)
downloaded_datasets.set_index('sequence_id', inplace=True)
def add_missing_dataset(missing_datasets, sequence_id, reason, message, raw_folder):
logging.error(f'For dataset {sequence_id}: {message}')
if sequence_id in missing_datasets.index:
missing_datasets.loc[sequence_id, 'n_tries'] += 1
missing_datasets.loc[sequence_id, 'reason'] = reason
missing_datasets.loc[sequence_id, 'message'] = message
else:
missing_datasets = missing_datasets.append(pandas.Series({'reason': str(reason), 'message': str(message), 'n_tries': 1}, name=sequence_id))
missing_datasets.to_csv(str(missing_datasets_path), sep='\t')
if (raw_folder / sequence_id).exists() and reason not in args.unrecoverable_errors:
shutil.rmtree(raw_folder / sequence_id)
return missing_datasets
def add_downloaded_dataset(downloaded_datasets, missing_datasets, sequence_id):
if sequence_id in downloaded_datasets.index: return
downloaded_datasets = downloaded_datasets.append(all_datasets.loc[sequence_id])
downloaded_datasets.to_csv(str(downloaded_datasets_path), sep='\t')
missing_datasets.drop(sequence_id, inplace=True, errors='ignore')
missing_datasets.to_csv(str(missing_datasets_path), sep='\t')
return downloaded_datasets
def rename_path(old_path, new_path):
new_path.parent.mkdir(exist_ok=True, parents=True)
old_path.rename(new_path)
return new_path
datasets_to_download = all_datasets
raw_folder = output_folder / 'raw'
def replace_with_sequence_id(sequence_id, dataset, tag):
dataset.get(tag).value = sequence_id
# Download and process datasets until there are no more datasets to process
# (all the missing datasets are unrecoverable or tried more than args.max_tries times)
while len(datasets_to_download) > 0:
# datasets_to_download is all_datasets except those already downloaded and those missing which are unrecoverable
datasets_to_download = all_datasets[all_datasets.index.isin(downloaded_datasets.index) == False]
datasets_max_tries = missing_datasets[missing_datasets['n_tries'] >= args.max_tries].index
datasets_unrecoverable = missing_datasets[missing_datasets['reason'].isin(args.unrecoverable_errors)].index
datasets_to_download = datasets_to_download.drop(datasets_max_tries.union(datasets_unrecoverable))
logging.info(f'There are {len(datasets_to_download)} remaining datasets to download.')
now = datetime.now()
if now.hour > 3 and now.hour < 5:
future = datetime(now.year, now.month, now.day, 5, 0)
time.sleep((future-now).total_seconds())
if len(downloaded_datasets) > 0:
logging.info(f'{len(downloaded_datasets)} datasets have been downloaded already, over {len(all_datasets)} datasets.')
for index, row in datasets_to_download.iterrows():
sequence_id = index
shanoir_name = row['shanoir_name']
series_description = row['sequence_name'] if 'sequence_name' in row else row['series_description']
logging.info(f'Downloading dataset {sequence_id}, shanoir name: {shanoir_name}, series description: {series_description}')
# Create the destination folder for this dataset
destination_folder = raw_folder / sequence_id / 'downloaded_archive'
destination_folder.mkdir(exist_ok=True, parents=True)
config['output_folder'] = destination_folder
# Download the dataset
try:
shanoir_downloader.download_dataset(config, sequence_id, 'dicom')
except requests.HTTPError as e:
message = f'Response status code: {e.response.status_code}, reason: {e.response.reason}'
if hasattr(e.response, 'error') and e.response.error:
message += f', response error: {e.response.error}'
message += str(e)
missing_datasets = add_missing_dataset(missing_datasets, sequence_id, 'status_code_' + str(e.response.status_code), message, raw_folder)
continue
except Exception as e:
missing_datasets = add_missing_dataset(missing_datasets, sequence_id, 'unknown_http_error', str(e), raw_folder)
continue
# List the downloaded zip files
zip_files = list(destination_folder.glob('*.zip'))
if len(zip_files) != 1:
message = f'No zip file was found' if len(zip_files) == 0 else f'{len(zip_files)} zip files were found'
message += f' in the output directory {destination_folder}.'
message += f' Downloaded files: { destination_folder.ls() }'
missing_datasets = add_missing_dataset(missing_datasets, sequence_id, 'zip', message, raw_folder)
continue
# Extract the zip file
dicom_zip = zip_files[0]
logging.info(f' Extracting {dicom_zip}...')
dicom_folder = destination_folder.parent / dicom_zip.stem
dicom_folder.mkdir(exist_ok=True)
shutil.unpack_archive(str(dicom_zip), str(dicom_folder))
found_dcm = False
dicom_files = list(dicom_folder.glob('*.dcm'))
# Error if there are no dicom file found
if len(dicom_files) == 0:
missing_datasets = add_missing_dataset(missing_datasets, sequence_id, 'nodicom', f'No DICOM file was found in the dicom directory {dicom_folder}.', raw_folder)
continue
# Read the PatientName from the first file, make sure it corresponds to the shanoir_name
dicom_file = dicom_files[0]
logging.info(f' Verifying file {dicom_file}...')
ds = None
try:
ds = pydicom.dcmread(str(dicom_file))
if ds.PatientName != shanoir_name:
missing_datasets = add_missing_dataset(missing_datasets, sequence_id, 'content_shanoir_name', f'Shanoir name {shanoir_name} differs in dicom: {ds.PatientName}', raw_folder)
continue
if ds.SeriesDescription != series_description: # or if ds[0x0008, 0x103E].value != series_description:
missing_datasets = add_missing_dataset(missing_datasets, sequence_id, 'content_series_description', f'Series description {series_description} differs in dicom: {ds.SeriesDescription}', raw_folder)
continue
except Exception as e:
missing_datasets = add_missing_dataset(missing_datasets, sequence_id, 'content_read', f'Error while reading DICOM: {e}', raw_folder)
continue
# Remove and rename files
dicom_zip.unlink()
shutil.rmtree(dicom_folder)
shutil.rmtree(destination_folder)
# Add to downloaded datastes
downloaded_datasets = add_downloaded_dataset(downloaded_datasets, missing_datasets, sequence_id)