-
Notifications
You must be signed in to change notification settings - Fork 0
/
waypoints_distance.py
94 lines (74 loc) · 3.09 KB
/
waypoints_distance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""ROI-based distances.
"""
from __future__ import division, print_function, absolute_import
import numpy as np
import nibabel as nib
from nibabel.affines import apply_affine
from scipy.spatial.distance import cdist
from sklearn.neighbors import KDTree
try:
from joblib import Parallel, delayed, cpu_count
joblib_available = True
except ImportError:
joblib_available = False
def bundle2roi_distance(bundle, roi_mask, distance='euclidean'):
"""Compute the minimum euclidean distance between a
set of streamlines and a ROI nifti mask.
"""
data = roi_mask.get_data()
affine = roi_mask.affine
roi_coords = np.array(np.where(data)).T
x_roi_coords = apply_affine(affine, roi_coords)
result=[]
for sl in bundle:
d = cdist(sl, x_roi_coords, distance)
result.append(np.min(d))
return result
def bundle2roi_distance_kdt(bundle, roi_mask, distance='euclidean'):
"""Compute the minimum euclidean distance between a
set of streamlines and a ROI nifti mask.
"""
data = roi_mask.get_data()
affine = roi_mask.affine
roi_coords = np.array(np.where(data)).T
x_roi_coords = apply_affine(affine, roi_coords)
kdt = KDTree(x_roi_coords)
result=[]
for sl in bundle:
d,i = kdt.query(sl, k=1)
result.append(np.min(d))
return result
def bundles_distances_roi(bundle, superset, roi1, roi2):
roi1_dist = bundle2roi_distance(superset, roi1)
roi2_dist = bundle2roi_distance(superset, roi2)
roi_vector = np.add(roi1_dist, roi2_dist)
roi_matrix = np.zeros((len(bundle), len(superset)))
roi1_ex_dist = bundle2roi_distance(bundle, roi1)
roi2_ex_dist = bundle2roi_distance(bundle, roi2)
roi_ex_vector = np.add(roi1_ex_dist, roi2_ex_dist)
#subtraction
for i in range(len(bundle)):
for j in range(len(superset)):
roi_matrix[i,j] = np.abs(np.subtract(roi_ex_vector[i], roi_vector[j]))
return roi_matrix
def wrapper_bundle2roi_distance(bundle, roi, n_jobs=-1):
if joblib_available and n_jobs != 1:
if n_jobs is None or n_jobs == -1:
n_jobs = cpu_count()
tmp = np.linspace(0, len(bundle), n_jobs + 1).astype(np.int)
chunks = zip(tmp[:-1], tmp[1:])
roi_dist = np.hstack(Parallel(n_jobs=n_jobs)(delayed(bundle2roi_distance_kdt)(bundle[start:stop], roi) for start, stop in chunks))
return roi_dist.flatten()
def bundles_distances_roi_fastest(bundle, superset, roi1, roi2):
roi1_dist = wrapper_bundle2roi_distance(superset, roi1)
roi2_dist = wrapper_bundle2roi_distance(superset, roi2)
roi_vector = np.add(roi1_dist, roi2_dist)
roi_matrix = np.zeros((len(bundle), len(superset)))
roi1_ex_dist = wrapper_bundle2roi_distance(bundle, roi1)
roi2_ex_dist = wrapper_bundle2roi_distance(bundle, roi2)
roi_ex_vector = np.add(roi1_ex_dist, roi2_ex_dist)
#subtraction
for i in range(len(bundle)):
for j in range(len(superset)):
roi_matrix[i,j] = np.abs(np.subtract(roi_ex_vector[i], roi_vector[j]))
return roi_matrix