-
Notifications
You must be signed in to change notification settings - Fork 4
/
dmp_position.py
119 lines (85 loc) · 3.58 KB
/
dmp_position.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from __future__ import division, print_function
import numpy as np
from canonical_system import CanonicalSystem
class PositionDMP():
def __init__(self, n_bfs=10, alpha=48.0, beta=None, cs_alpha=None, cs=None):
self.n_bfs = n_bfs
self.alpha = alpha
self.beta = beta if beta is not None else self.alpha / 4
self.cs = cs if cs is not None else CanonicalSystem(alpha=cs_alpha if cs_alpha is not None else self.alpha/2)
# Centres of the Gaussian basis functions
self.c = np.exp(-self.cs.alpha * np.linspace(0, 1, self.n_bfs))
# Variance of the Gaussian basis functions
self.h = 1.0 / np.gradient(self.c)**2
# Scaling factor
self.Dp = np.identity(3)
# Initially weights are zero (no forcing term)
self.w = np.zeros((3, self.n_bfs))
# Initial- and goal positions
self.p0 = np.zeros(3)
self.gp = np.zeros(3)
self.reset()
def step(self, x, dt, tau):
def fp(xj):
psi = np.exp(-self.h * (xj - self.c)**2)
return self.Dp.dot(self.w.dot(psi) / psi.sum() * xj)
# DMP system acceleration
# TODO: Implement the transformation system differential equation for the acceleration, given that you know the
# values of the following variables:
# self.alpha, self.beta, self.gp, self.p, self.dp, tau, x
self.ddp = (self.alpha*(self.beta*(self.gp-self.p)-tau*self.dp)+fp(x))/tau**2
# Integrate acceleration to obtain velocity
self.dp += self.ddp * dt
# Integrate velocity to obtain position
self.p += self.dp * dt
return self.p, self.dp, self.ddp
def rollout(self, ts, tau):
self.reset()
if np.isscalar(tau):
tau = np.full_like(ts, tau)
x = self.cs.rollout(ts, tau) # Integrate canonical system
dt = np.gradient(ts) # Differential time vector
n_steps = len(ts)
p = np.empty((n_steps, 3))
dp = np.empty((n_steps, 3))
ddp = np.empty((n_steps, 3))
for i in range(n_steps):
p[i], dp[i], ddp[i] = self.step(x[i], dt[i], tau[i])
return p, dp, ddp
def reset(self):
self.p = self.p0.copy()
self.dp = np.zeros(3)
self.ddp = np.zeros(3)
def train(self, positions, ts, tau):
p = positions
# Sanity-check input
if len(p) != len(ts):
raise RuntimeError("len(p) != len(ts)")
# Initial- and goal positions
self.p0 = p[0]
self.gp = p[-1]
# Differential time vector
dt = np.gradient(ts)[:,np.newaxis]
# Scaling factor
self.Dp = np.diag(self.gp - self.p0)
Dp_inv = np.linalg.inv(self.Dp)
# Desired velocities and accelerations
d_p = np.gradient(p, axis=0) / dt
dd_p = np.gradient(d_p, axis=0) / dt
# Integrate canonical system
x = self.cs.rollout(ts, tau)
# Set up system of equations to solve for weights
def features(xj):
psi = np.exp(-self.h * (xj - self.c)**2)
return xj * psi / psi.sum()
def forcing(j):
return Dp_inv.dot(tau**2 * dd_p[j]
- self.alpha * (self.beta * (self.gp - p[j]) - tau * d_p[j]))
A = np.stack(features(xj) for xj in x)
f = np.stack(forcing(j) for j in range(len(ts)))
# Least squares solution for Aw = f (for each column of f)
self.w = np.linalg.lstsq(A, f, rcond=None)[0].T
# Cache variables for later inspection
self.train_p = p
self.train_d_p = d_p
self.train_dd_p = dd_p