-
Notifications
You must be signed in to change notification settings - Fork 1
/
smooth_trajectories.py
206 lines (145 loc) · 6.3 KB
/
smooth_trajectories.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Sat Apr 28 21:47:25 2018
@author: ron
implementation of 2 methods for smoothing trajectories.
1) with a polynomial fitting, after Luti et al 2005
2) with savizky golay
"""
import numpy as np
from flowtracks.io import Trajectory
from scipy.signal import savgol_filter as sgf
def smooth_traj_poly(traj, window, polyorder, FPS, cut_edges = True):
'''
will smooth a flowtracks trajectory.
smooth the position using a mooving polynomial, where
the velocities and accelerations are calculated by
differentiating the polynomial coeficcients analytically.
the new trajectory is shorter by one window size
the smoothed trajectory has also a new property - residuals.
this is the original position minus the smoothed position of the
particle and can be used for error assessments.
returns-
new_traj - a smoothed trajectory object
'''
if type(window) == int:
window = (window, window, window)
if type(polyorder) == int:
polyorder = (polyorder, polyorder, polyorder)
if type(window) != tuple:
raise TypeError('window must be either integer or tuple')
if type(polyorder) != tuple:
raise TypeError('polyorder must be either a number or tuple')
test = [w%2 != 1 for w in window]
if sum(test) != 0:
raise ValueError('each window must be a positive odd integer.')
test = [polyorder[i] > window[i] for i in [0,1,2]]
if sum(test) != 0:
raise ValueError('polyorder cannot be larger than window.')
N = len(traj)
test = [w > N for w in window]
if sum(test) != 0:
raise ValueError('window cannot be larger than the trajectory length.')
W = max(window)
p = traj.pos()
if cut_edges: new_N = N - W + 1
elif cut_edges==False : new_N = N
new_pos, new_vel, new_acc = np.empty((new_N ,3)), np.empty((new_N,3)), np.empty((new_N,3))
sequence = zip(range(3), window, polyorder)
# smoothing the trajectory with cutting the edges of the trajectory
if cut_edges:
for e,win,po in sequence:
time_ = np.arange(win)
ev_point = float(win/2)**np.arange(po + 1)[::-1]
skip = (W-win)/2
Deriv_mat = []
for i in range(po+1):
a = [0.0 for j in range(po+1)]
if i!=0:
a[i-1] = po - (i-1)
Deriv_mat.append(a)
Deriv_mat = np.array(Deriv_mat)
for i in range( new_N ):
p_ = p[ i + skip : i + win + skip, e]
C = np.polyfit(time_, p_, po)
C1 = np.dot(Deriv_mat, C)
C2 = np.dot(Deriv_mat, C1)
new_pos[i,e] = np.dot(ev_point, C)
new_vel[i,e] = np.dot(ev_point, C1)*FPS
new_acc[i,e] = np.dot(ev_point, C2)*FPS**2
# smoothing the entire trajectory, not cutting the edges
elif cut_edges==False:
for e,win,po in sequence:
time_ = np.arange(win)
ev_point = float(win/2)**np.arange(po + 1)[::-1]
Deriv_mat = []
for i in range(po+1):
a = [0.0 for j in range(po+1)]
if i!=0:
a[i-1] = po - (i-1)
Deriv_mat.append(a)
Deriv_mat = np.array(Deriv_mat)
for i in range( new_N ):
if i < win/2: # get the portion of size widon for
p_ = p[:win, e] # fitting the polynomial
elif N - (1+i) < win/2:
p_ = p[-1*win:, e]
else:
p_ = p[i-win/2 : i+win/2+1, e]
C = np.polyfit(time_, p_, po)
C1 = np.dot(Deriv_mat, C)
C2 = np.dot(Deriv_mat, C1)
if i < win/2:
ev_point = float(i%(win/2))**np.arange(po + 1)[::-1]
elif N - (1+i) < win/2:
ev_point = float(win + i - N)**np.arange(po + 1)[::-1]
else:
ev_point = float(win/2)**np.arange(po + 1)[::-1]
new_pos[i,e] = np.dot(ev_point, C)
new_vel[i,e] = np.dot(ev_point, C1)*FPS
new_acc[i,e] = np.dot(ev_point, C2)*FPS**2
if cut_edges:
residuals = new_pos - p[W/2 : N-W/2,:]
new_tr = Trajectory(new_pos, new_vel, traj.time()[W/2 : N-W/2],
traj.trajid(), accel = new_acc, residuals = residuals)
elif cut_edges==False :
residuals = new_pos - p
new_tr = Trajectory(new_pos, new_vel, traj.time(),
traj.trajid(), accel = new_acc, residuals = residuals)
return new_tr
def smooth_traj(traj, FPS, window = 11, polyorder = 2, n = 2, cut_edges=True):
'''
this function's input is a flowtracks trajectory object. the output
is a smoothed trajectory. for the smoothing a savizky golay filter
is applied n times the position of the trajectory. the velocity
and acceleration are calculated by a simple derivative.
if cut_edges==Ture - the returned trajectory will be shorter than the original
by 2*window since the begining and the end of the trajectory
are trimed each by window.
returns-
new_traj - a smoothed trajectory object
delta - the mean squared translation done by the smoothing
'''
if cut_edges:
if len(traj) < window*2 + 1:
raise ValueError('trajectory too short for window size %d with cutting'%window)
else:
if len(traj) < window + 1:
raise ValueError('trajectory too short for window size %d'%window)
p = traj.pos()
p_ = p
for i in range(n):
p_ = sgf(p_, window, polyorder, axis = 0)
v_ = np.gradient(p_)[0]*FPS
a_ = np.gradient(p_)[0]*FPS
delta = np.mean((p - p_)**2, axis = 0)
if cut_edges:
p_ = p_[window:-window, :]
v_ = v_[window:-window, :]
a_ = a_[window:-window, :]
t_ = traj.time()[window:-window]
else:
t_ = traj.time()
new_traj = Trajectory(p_, v_, t_, traj.trajid(), accel = a_)
return new_traj, delta