-
Notifications
You must be signed in to change notification settings - Fork 16
/
duke_dbt_data.py
222 lines (189 loc) · 7.92 KB
/
duke_dbt_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from typing import AnyStr, BinaryIO, Dict, List, NamedTuple, Optional, Union
import matplotlib
import numpy as np
import pandas as pd
import pydicom as dicom
from skimage.exposure import rescale_intensity
def dcmread_image(
fp: Union[str, "os.PathLike[AnyStr]", BinaryIO],
view: str,
index: Optional[np.uint] = None,
) -> np.ndarray:
"""Read pixel array from DBT DICOM file"""
ds = dicom.dcmread(fp)
ds.decompress(handler_name="pylibjpeg")
pixel_array = ds.pixel_array
view_laterality = view[0].upper()
image_laterality = _get_image_laterality(pixel_array[index or 0])
if index is not None:
pixel_array = pixel_array[index]
if not image_laterality == view_laterality:
pixel_array = np.flip(pixel_array, axis=(-1, -2))
window_center = _get_window_center(ds)
window_width = _get_window_width(ds)
low = (2 * window_center - window_width) / 2
high = (2 * window_center + window_width) / 2
pixel_array = rescale_intensity(
pixel_array, in_range=(low, high), out_range="dtype"
)
return pixel_array
def read_boxes(
boxes_fp: pd._typing.FilePathOrBuffer, filepaths_fp: pd._typing.FilePathOrBuffer
) -> pd.DataFrame:
"""Read pandas DataFrame with bounding boxes joined with file paths"""
df_boxes = pd.read_csv(boxes_fp)
df_filepaths = pd.read_csv(filepaths_fp)
primary_key = ("PatientID", "StudyUID", "View")
if not all([key in df_boxes.columns for key in primary_key]):
raise AssertionError(
f"Not all primary key columns {primary_key} are present in bounding boxes columns {df_boxes.columns}"
)
if not all([key in df_boxes.columns for key in primary_key]):
raise AssertionError(
f"Not all primary key columns {primary_key} are present in file paths columns {df_filepaths.columns}"
)
return pd.merge(df_boxes, df_filepaths, on=primary_key)
def draw_box(
image: np.ndarray,
x: int,
y: int,
width: int,
height: int,
color: Optional[Union[int, tuple]] = None,
lw=4,
):
"""Draw bounding box on the image"""
x = min(max(x, 0), image.shape[1] - 1)
y = min(max(y, 0), image.shape[0] - 1)
if color is None:
color = np.max(image)
if len(image.shape) > 2 and not hasattr(color, "__len__"):
color = (color,) + (0,) * (image.shape[-1] - 1)
image[y : y + lw, x : x + width] = color
image[y + height - lw : y + height, x : x + width] = color
image[y : y + height, x : x + lw] = color
image[y : y + height, x + width - lw : x + width] = color
return image
def evaluate(
labels_fp: pd._typing.FilePathOrBuffer,
boxes_fp: pd._typing.FilePathOrBuffer,
predictions_fp: pd._typing.FilePathOrBuffer,
) -> Dict[str, float]:
"""Evaluate predictions"""
df_labels = pd.read_csv(labels_fp)
df_boxes = pd.read_csv(boxes_fp, dtype={"VolumeSlices": float})
df_pred = pd.read_csv(predictions_fp, dtype={"Score": float})
df_labels = df_labels.reset_index().set_index(["StudyUID", "View"]).sort_index()
df_boxes = df_boxes.reset_index().set_index(["StudyUID", "View"]).sort_index()
df_pred = df_pred.reset_index().set_index(["StudyUID", "View"]).sort_index()
df_pred["TP"] = 0
df_pred["GTID"] = -1
thresholds = [df_pred["Score"].max() + 1.0]
# find true positive predictions and assign detected ground truth box ID
for box_pred in df_pred.itertuples():
if box_pred.Index not in df_boxes.index:
continue
df_boxes_view = df_boxes.loc[[box_pred.Index]]
view_slice_offset = df_boxes.loc[[box_pred.Index], "VolumeSlices"].iloc[0] / 4
tp_boxes = [
b
for b in df_boxes_view.itertuples()
if _is_tp(box_pred, b, slice_offset=view_slice_offset)
]
if len(tp_boxes) > 1:
# find the nearest GT box
tp_distances = [_distance(box_pred, b) for b in tp_boxes]
tp_boxes = [tp_boxes[np.argmin(tp_distances)]]
if len(tp_boxes) > 0:
tp_i = tp_boxes[0].index
df_pred.loc[df_pred["index"] == box_pred.index, ("TP", "GTID")] = (1, tp_i)
thresholds.append(box_pred.Score)
thresholds.append(df_pred["Score"].min() - 1.0)
# compute sensitivity at 2 FPs/volume on all cases
evaluation_fps_all = (2.0,)
tpr_all = _froc(
df_pred=df_pred,
thresholds=thresholds,
n_volumes=len(df_labels),
n_boxes=len(df_boxes),
evaluation_fps=evaluation_fps_all,
)
result = {f"sensitivity_at_2_fps_all": tpr_all[0]}
# compute mean sensitivity at 1, 2, 3, 4 FPs/volume on positive cases
df_pred = df_pred[df_pred.index.isin(df_boxes.index)]
df_labels = df_labels[df_labels.index.isin(df_boxes.index)]
evaluation_fps_positive = (1.0, 2.0, 3.0, 4.0)
tpr_positive = _froc(
df_pred=df_pred,
thresholds=thresholds,
n_volumes=len(df_labels),
n_boxes=len(df_boxes),
evaluation_fps=evaluation_fps_positive,
)
result.update(
dict(
(f"sensitivity_at_{int(x)}_fps_positive", y)
for x, y in zip(evaluation_fps_positive, tpr_positive)
)
)
result.update({"mean_sensitivity_positive": np.mean(tpr_positive)})
return result
def _froc(
df_pred: pd.DataFrame,
thresholds: List[float],
n_volumes: int,
n_boxes: int,
evaluation_fps: tuple,
) -> List[float]:
tpr = []
fps = []
for th in sorted(thresholds, reverse=True):
df_th = df_pred.loc[df_pred["Score"] >= th]
df_th_unique_tp = df_th.reset_index().drop_duplicates(
subset=["StudyUID", "View", "TP", "GTID"]
)
n_tps_th = float(sum(df_th_unique_tp["TP"]))
tpr_th = n_tps_th / n_boxes
n_fps_th = float(len(df_th[df_th["TP"] == 0]))
fps_th = n_fps_th / n_volumes
tpr.append(tpr_th)
fps.append(fps_th)
if fps_th > max(evaluation_fps):
break
return [np.interp(x, fps, tpr) for x in evaluation_fps]
def _is_tp(
box_pred: NamedTuple, box_true: NamedTuple, slice_offset: int, min_dist: int = 100
) -> bool:
pred_y = box_pred.Y + box_pred.Height / 2
pred_x = box_pred.X + box_pred.Width / 2
pred_z = box_pred.Z + box_pred.Depth / 2
true_y = box_true.Y + box_true.Height / 2
true_x = box_true.X + box_true.Width / 2
true_z = box_true.Slice
# 2D distance between true and predicted center points
dist = np.linalg.norm((pred_x - true_x, pred_y - true_y))
# compute radius based on true box size
dist_threshold = np.sqrt(box_true.Width ** 2 + box_true.Height ** 2) / 2.0
dist_threshold = max(dist_threshold, min_dist)
slice_diff = np.abs(pred_z - true_z)
# TP if predicted center within radius and slice within slice offset
return dist <= dist_threshold and slice_diff <= slice_offset
def _distance(box_pred: NamedTuple, box_true: NamedTuple) -> float:
pred_y = box_pred.Y + box_pred.Height / 2
pred_x = box_pred.X + box_pred.Width / 2
pred_z = box_pred.Z + box_pred.Depth / 2
true_y = box_true.Y + box_true.Height / 2
true_x = box_true.X + box_true.Width / 2
true_z = box_true.Slice
return np.linalg.norm((pred_x - true_x, pred_y - true_y, pred_z - true_z))
def _get_dicom_laterality(ds: dicom.dataset.FileDataset) -> str:
"""Unreliable - DICOM laterality is incorrect for some cases"""
return ds[0x5200, 0x9229][0][0x0020, 0x9071][0][0x0020, 0x9072].value
def _get_image_laterality(pixel_array: np.ndarray) -> str:
left_edge = np.sum(pixel_array[:, 0]) # sum of left edge pixels
right_edge = np.sum(pixel_array[:, -1]) # sum of right edge pixels
return "R" if left_edge < right_edge else "L"
def _get_window_center(ds: dicom.dataset.FileDataset) -> np.float32:
return np.float32(ds[0x5200, 0x9229][0][0x0028, 0x9132][0][0x0028, 0x1050].value)
def _get_window_width(ds: dicom.dataset.FileDataset) -> np.float32:
return np.float32(ds[0x5200, 0x9229][0][0x0028, 0x9132][0][0x0028, 0x1051].value)