forked from kvfrans/parallel-trpo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
value_function.py
75 lines (61 loc) · 2.75 KB
/
value_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import tensorflow as tf
import numpy as np
from utils import *
class VF(object):
coeffs = None
def __init__(self, session):
self.net = None
self.session = session
def create_net(self, shape):
hidden_size = 64
print(shape)
self.x = tf.placeholder(tf.float32, shape=[None, shape], name="x")
self.y = tf.placeholder(tf.float32, shape=[None], name="y")
weight_init = tf.random_uniform_initializer(-0.05, 0.05)
bias_init = tf.constant_initializer(0)
with tf.variable_scope("VF"):
h1 = tf.nn.relu(fully_connected(self.x, shape, hidden_size, weight_init, bias_init, "h1"))
h2 = tf.nn.relu(fully_connected(h1, hidden_size, hidden_size, weight_init, bias_init, "h2"))
h3 = fully_connected(h2, hidden_size, 1, weight_init, bias_init, "h3")
self.net = tf.reshape(h3, (-1,))
l2 = tf.nn.l2_loss(self.net - self.y)
self.train = tf.train.AdamOptimizer().minimize(l2)
self.session.run(tf.global_variables_initializer())
def _features(self, path):
o = path["obs"].astype('float32')
o = o.reshape(o.shape[0], -1)
act = path["action_dists"].astype('float32')
l = len(path["rewards"])
al = np.arange(l).reshape(-1, 1) / 10.0
ret = np.concatenate([o, act, al, np.ones((l, 1))], axis=1)
return ret
def fit(self, paths):
featmat = np.concatenate([self._features(path) for path in paths])
if self.net is None:
self.create_net(featmat.shape[1])
returns = np.concatenate([path["returns"] for path in paths])
for _ in range(50):
self.session.run(self.train, {self.x: featmat, self.y: returns})
def predict(self, path):
if self.net is None:
return np.zeros(len(path["rewards"]))
else:
ret = self.session.run(self.net, {self.x: self._features(path)})
return np.reshape(ret, (ret.shape[0], ))
class LinearVF(object):
coeffs = None
def _features(self, path):
o = path["obs"].astype('float32')
o = o.reshape(o.shape[0], -1)
l = len(path["rewards"])
al = np.arange(l).reshape(-1, 1) / 100.0
return np.concatenate([o, o**2, al, al**2, np.ones((l, 1))], axis=1)
def fit(self, paths):
featmat = np.concatenate([self._features(path) for path in paths])
returns = np.concatenate([path["returns"] for path in paths])
n_col = featmat.shape[1]
lamb = 2.0
self.coeffs = np.linalg.lstsq(featmat.T.dot(featmat) + lamb * np.identity(n_col), featmat.T.dot(returns))[0]
def predict(self, path):
return np.zeros(len(path["rewards"])) if self.coeffs is None else self._features(
path).dot(self.coeffs)