-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpnn.py
300 lines (269 loc) · 11.9 KB
/
rpnn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""
Recursive model that predicts a gaussian distribtion.
Author: Ian Char
Date: 10/27/2022
"""
from typing import Dict, Callable, Tuple, Any, Sequence, Optional
import hydra.utils
import torch
import torch.nn.functional as F
from omegaconf import DictConfig
from dynamics_toolbox.constants import sampling_modes
from dynamics_toolbox.models.pl_models.sequential_models.abstract_sequential_model \
import AbstractSequentialModel
from dynamics_toolbox.utils.pytorch.metrics import SequentialExplainedVariance
class RPNN(AbstractSequentialModel):
"""RPNN network."""
def __init__(
self,
input_dim: int,
output_dim: int,
encode_dim: int,
rnn_num_layers: int,
rnn_hidden_size: int,
encoder_cfg: DictConfig,
pnn_decoder_cfg: DictConfig,
rnn_type: str = 'gru',
warm_up_period: int = 0,
learning_rate: float = 1e-3,
logvar_lower_bound: Optional[float] = None,
logvar_upper_bound: Optional[float] = None,
logvar_bound_loss_coef: float = 1e-3,
sample_mode: str = sampling_modes.SAMPLE_FROM_DIST,
weight_decay: Optional[float] = 0.0,
use_layer_norm: bool = True,
**kwargs,
):
"""Constructor.
Args:
input_dim: The input dimension.
output_dim: The output dimension.
encode_dim: The dimension of the encoder output.
rnn_num_layers: Number of layers in the memory unit.
rnn_hidden_size: The number hidden units in the memory unit.
encoder_cfg: The configuration for the encoder network.
pnn_decoder_cfg: The configuration for the decoder network. Should
be a PNN.
rnn_type: Name of the rnn type. Can accept GRU or LSTM.
warm_up_period: The amount of data to take in before predictions begin to
be made.
learning_rate: The learning rate for the network.
logvar_lower_bound: Lower bound on the log variance.
If none there is no bound.
logvar_upper_bound: Lower bound on the log variance.
If none there is no bound.
logvar_bound_loss_coef: Coefficient on bound loss to add to loss.
sample_mode: The method to use for sampling.
weight_decay: The weight decay for the optimizer.
use_layer_norm: Whether to use layer norm.
"""
super().__init__(input_dim, output_dim, **kwargs)
self._encoder = hydra.utils.instantiate(
encoder_cfg,
input_dim=input_dim,
output_dim=encode_dim,
_recursive_=False,
)
self._decoder = hydra.utils.instantiate(
pnn_decoder_cfg,
input_dim=encode_dim + rnn_hidden_size,
output_dim=output_dim,
_recursive_=False,
)
self.rnn_type = rnn_type.lower()
if rnn_type.lower() == 'gru':
rnn_class = torch.nn.GRU
elif rnn_type.lower() == 'lstm':
rnn_class = torch.nn.LSTM
else:
raise ValueError(f'Cannot recognize RNN type {rnn_type}')
self._memory_unit = rnn_class(encode_dim, rnn_hidden_size,
num_layers=rnn_num_layers,
batch_first=True)
self._memory_unit = self._memory_unit.to(self.device)
if use_layer_norm:
self._layer_norm = torch.nn.LayerNorm(encode_dim)
self._input_dim = input_dim
self._output_dim = output_dim
self._encode_dim = encode_dim
self._hidden_size = rnn_hidden_size
self._num_layers = rnn_num_layers
self._warm_up_period = warm_up_period
self._learning_rate = learning_rate
self._weight_decay = weight_decay
self._sample_mode = sample_mode
self._record_history = True
self._hidden_state = None
self._use_layer_norm = use_layer_norm
# Set up variance pinning.
self._var_pinning = (logvar_lower_bound is not None
and logvar_upper_bound is not None)
if self._var_pinning:
self._min_logvar = torch.nn.Parameter(
torch.Tensor([logvar_lower_bound])
* torch.ones(1, output_dim, dtype=torch.float32, requires_grad=True))
self._max_logvar = torch.nn.Parameter(
torch.Tensor([logvar_upper_bound])
* torch.ones(1, output_dim, dtype=torch.float32, requires_grad=True))
else:
self._min_logvar = None
self._max_logvar = None
self._logvar_bound_loss_coef = logvar_bound_loss_coef
# TODO: In the future we may want to pass this in as an argument.
self._metrics = {
'EV': SequentialExplainedVariance(),
'IndvEV': SequentialExplainedVariance('raw_values'),
}
def get_net_out(self, batch: Sequence[torch.Tensor]) -> Dict[str, torch.Tensor]:
"""Get the output of the network and organize into dictionary.
Args:
batch: The batch passed into the network. This is expected to be a tuple
* x: (Batch_size, Sequence Length, dim)
* y: (Batch_size, Sequence Length, dim)
* mask: (Batch_size, Sequence Length, 1)
Returns:
Dictionary of name to tensor.
"""
encoded = self._encoder(batch[0])
if self._use_layer_norm:
encoded = self._layer_norm(encoded)
mem_out = self._memory_unit(encoded)[0]
mean, logvar = self._decoder(torch.cat([encoded, mem_out], dim=-1))
if self._var_pinning:
logvar = self._max_logvar - F.softplus(self._max_logvar - logvar)
logvar = self._min_logvar + F.softplus(logvar - self._min_logvar)
return {'mean': mean, 'logvar': logvar}
def loss(self, net_out: Dict[str, torch.Tensor], batch: Sequence[torch.Tensor]) -> \
Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
"""Compute the loss function.
Args:
net_out: The output of the network.
batch: The batch passed into the network. This is expected to be a tuple
* x: (Batch_size, Sequence Length, dim)
* y: (Batch_size, Sequence Length, dim)
* mask: (Batch_size, Sequence Length, 1)
Returns:
The loss and a dictionary of other statistics.
"""
mean = net_out['mean']
logvar = net_out['logvar']
y, mask = batch[1:]
mask[:, :self._warm_up_period, :] = 0
sq_diffs = (mean * mask - y * mask).pow(2)
mse = torch.mean(sq_diffs)
loss = torch.mean(torch.exp(-logvar) * sq_diffs + logvar * mask)
stats = dict(
nll=loss.item(),
mse=mse.item(),
)
stats['logvar/mean'] = (logvar * mask).mean().item()
if self._var_pinning:
bound_loss = self._logvar_bound_loss_coef * \
torch.abs(self._max_logvar - self._min_logvar).mean()
stats['bound_loss'] = bound_loss.item()
stats['logvar_lower_bound/mean'] = self._min_logvar.mean().item()
stats['logvar_upper_bound/mean'] = self._max_logvar.mean().item()
stats['logvar_bound_difference'] = (
self._max_logvar - self._min_logvar).mean().item()
loss += bound_loss
stats['loss'] = loss.item()
return loss, stats
def single_sample_output_from_torch(self, net_in: torch.Tensor) -> Tuple[
torch.Tensor, Dict[str, Any]]:
"""Get the output for a single sample in the model.
Args:
net_in: The input for the network with expected shape (batch size, dim)
Returns:
The predictions for a single function sample.
"""
if self._hidden_state is None:
if self.rnn_type == 'gru':
self._hidden_state = torch.zeros(self._num_layers, net_in.shape[0],
self._hidden_size, device=self.device)
else:
self._hidden_state = tuple(
torch.zeros(self._num_layers, net_in.shape[0],
self._hidden_size, device=self.device)
for _ in range(2))
else:
tocompare = (self._hidden_state if self.rnn_type == 'gru'
else self._hidden_state[0])
if tocompare.shape[1] != net_in.shape[0]:
raise ValueError('Number of inputs does not match previously given '
f'number. Expected {tocompare.shape[1]} but received'
f' {net_in.shape[0]}.')
with torch.no_grad():
encoded = self._encoder(net_in).unsqueeze(1)
if self._use_layer_norm:
encoded = self._layer_norm(encoded)
mem_out, hidden_out = self._memory_unit(encoded, self._hidden_state)
if self._record_history:
self._hidden_state = hidden_out
mean_predictions, logvar_predictions =\
(output.squeeze(1) for output in
self._decoder(torch.cat([encoded, mem_out], dim=-1)))
std_predictions = (0.5 * logvar_predictions).exp()
if self._sample_mode == sampling_modes.SAMPLE_FROM_DIST:
predictions = (torch.randn_like(mean_predictions) * std_predictions
+ mean_predictions)
else:
predictions = mean_predictions
info = {'predictions': predictions,
'mean_predictions': mean_predictions,
'std_predictions': std_predictions}
return predictions, info
def multi_sample_output_from_torch(self, net_in: torch.Tensor) -> Tuple[
torch.Tensor, Dict[str, Any]]:
"""Get the output where each input is assumed to be from a different sample.
Args:
net_in: The input for the network.
Returns:
The deltas for next states and dictionary of info.
"""
return self.single_sample_output_from_torch(net_in)
@property
def metrics(self) -> Dict[str, Callable[[torch.Tensor], torch.Tensor]]:
return self._metrics
@property
def learning_rate(self) -> float:
return self._learning_rate
@property
def weight_decay(self) -> float:
return self._weight_decay
@property
def sample_mode(self) -> str:
"""The sample mode is the method that in which we get next state."""
return self._sample_mode
@sample_mode.setter
def sample_mode(self, mode: str) -> None:
"""Set the sample mode to the appropriate mode."""
if self._sample_mode not in [sampling_modes.SAMPLE_FROM_DIST,
sampling_modes.RETURN_MEAN]:
raise ValueError(
f'PNN sample mode must either be {sampling_modes.SAMPLE_FROM_DIST} '
f'or {sampling_modes.RETURN_MEAN}, but received {mode}.')
self._sample_mode = mode
@property
def input_dim(self) -> int:
return self._input_dim
@property
def output_dim(self) -> int:
return self._output_dim
@property
def record_history(self) -> bool:
"""Whether to keep track of the quantities being fed into the neural net."""
return self._record_history
@record_history.setter
def record_history(self, mode: bool) -> None:
"""Set whether to keep track of quantities being fed into the neural net."""
self._record_history = mode
@property
def warm_up_period(self) -> int:
"""Amount of data to take in before starting to predict"""
return self._warm_up_period
def clear_history(self) -> None:
"""Clear the history."""
self._hidden_state = None
def reset(self) -> None:
"""Reset the dynamics model."""
self.clear_history()