From cdf8ba51d972273dac051af352e4e5b464d472d7 Mon Sep 17 00:00:00 2001 From: Filippo Airaldi Date: Thu, 26 Oct 2023 10:24:56 +0200 Subject: [PATCH] reworked sensitivities in Q learning and DPG --- src/mpcrl/agents/lstd_dpg.py | 2 +- src/mpcrl/agents/lstd_q_learning.py | 59 ++++++++++++++------------- src/mpcrl/agents/rl_learning_agent.py | 14 +------ 3 files changed, 33 insertions(+), 42 deletions(-) diff --git a/src/mpcrl/agents/lstd_dpg.py b/src/mpcrl/agents/lstd_dpg.py index dbe2123..33565e9 100644 --- a/src/mpcrl/agents/lstd_dpg.py +++ b/src/mpcrl/agents/lstd_dpg.py @@ -217,7 +217,7 @@ def update(self) -> Optional[str]: ) if self.policy_gradients is not None: self.policy_gradients.append(dJdtheta) - return self._do_gradient_update(dJdtheta) + return self.optimizer.update(dJdtheta) def train_one_episode( self, diff --git a/src/mpcrl/agents/lstd_q_learning.py b/src/mpcrl/agents/lstd_q_learning.py index 2d15842..59565fa 100644 --- a/src/mpcrl/agents/lstd_q_learning.py +++ b/src/mpcrl/agents/lstd_q_learning.py @@ -161,7 +161,7 @@ def update(self) -> Optional[str]: hessians.append(H) gradient = np.mean(gradients, 0) hessian = np.mean(hessians, 0) if self.hessian_type != "none" else None - return self._do_gradient_update(gradient, hessian) + return self.optimizer.update(gradient, hessian) def train_one_episode( self, @@ -205,36 +205,39 @@ def train_one_episode( def _init_sensitivity( self, hessian_type: Literal["none", "approx", "full"] - ) -> Callable[[cs.DM], tuple[np.ndarray, np.ndarray]]: + ) -> Union[ + Callable[[cs.DM], np.ndarray], Callable[[cs.DM], tuple[np.ndarray, np.ndarray]] + ]: """Internal utility to compute the derivative of Q(s,a) w.r.t. the learnable parameters, a.k.a., theta.""" + assert hessian_type in ("none", "approx", "full"), "Invalid hessian type." + order = self.optimizer._order theta = cs.vvcat(self._learnable_pars.sym.values()) nlp = self._Q.nlp nlp_ = NlpSensitivity(nlp, theta) - Lt = nlp_.jacobians["L-p"] # a.k.a., dQdtheta - Ltt = nlp_.hessians["L-pp"] # a.k.a., approximated d2Qdtheta2 + x_lam_p = cs.vertcat(nlp.primal_dual, nlp.p) + dQ = nlp_.jacobians["L-p"] # a.k.a., dQdtheta + if hessian_type == "none": - d2Qdtheta2 = cs.DM.nan() - elif hessian_type == "approx": - d2Qdtheta2 = Ltt - elif hessian_type == "full": - dydtheta, _ = nlp_.parametric_sensitivity(second_order=False) - d2Qdtheta2 = dydtheta.T @ nlp_.jacobians["K-p"] + Ltt + assert order == 1, "Expected 1st-order optimizer with `hessian_type=none`." + sensitivity = cs.Function( + "S", (x_lam_p,), (dQ,), ("x_lam_p",), ("dQ",), {"cse": True} + ) + return lambda v: np.asarray(sensitivity(v).elements()) + + assert ( + order == 2 + ), "Expected 2nd-order optimizer with `hessian_type=approx` or `full`." + if hessian_type == "approx": + ddQ = nlp_.hessians["L-pp"] else: - raise ValueError(f"Invalid type of hessian; got {hessian_type}.") + dydtheta, _ = nlp_.parametric_sensitivity(second_order=False) + ddQ = dydtheta.T @ nlp_.jacobians["K-p"] + nlp_.hessians["L-pp"] - # convert to function (much faster runtime) - x_lam_p = cs.vertcat(nlp.primal_dual, nlp.p) sensitivity = cs.Function( - "Q_sensitivity", - (x_lam_p,), - (Lt, d2Qdtheta2), - ("x_lam_p",), - ("dQ", "d2Q"), - {"cse": True}, + "S", (x_lam_p,), (dQ, ddQ), ("x_lam_p",), ("dQ", "ddQ"), {"cse": True} ) - # wrap to conveniently return numpy arrays def func(sol_values: cs.DM) -> tuple[np.ndarray, np.ndarray]: dQ, ddQ = sensitivity(sol_values) return np.asarray(dQ.elements()), ddQ.toarray() @@ -249,15 +252,15 @@ def _try_store_experience( it. Returns whether it was successful or not.""" if solQ.success and solV.success: sol_values = solQ.all_vals - dQ, ddQ = self._sensitivity(sol_values) td_error = cost + self.discount_factor * solV.f - solQ.f - g = -td_error * dQ - H = ( - (np.multiply.outer(dQ, dQ) - td_error * ddQ) - if self.hessian_type != "none" - else np.nan - ) - self.store_experience((g, H)) + if self.hessian_type == "none": + dQ = self._sensitivity(sol_values) + hessian = np.nan + else: + dQ, ddQ = self._sensitivity(sol_values) + hessian = np.multiply.outer(dQ, dQ) - td_error * ddQ + gradient = -td_error * dQ + self.store_experience((gradient, hessian)) success = True else: td_error = np.nan diff --git a/src/mpcrl/agents/rl_learning_agent.py b/src/mpcrl/agents/rl_learning_agent.py index 5ca1282..8a16989 100644 --- a/src/mpcrl/agents/rl_learning_agent.py +++ b/src/mpcrl/agents/rl_learning_agent.py @@ -1,7 +1,5 @@ from abc import ABC -from typing import Any, Generic, Optional, TypeVar - -import numpy as np +from typing import Any, Generic, TypeVar from mpcrl.agents.agent import SymType from mpcrl.agents.learning_agent import LearningAgent @@ -45,13 +43,3 @@ def establish_callback_hooks(self) -> None: lr_hook = lr.hook if lr_hook is not None: self.hook_callback(repr(lr), lr_hook, lr.step) - - def _do_gradient_update( - self, gradient: np.ndarray, hessian: Optional[np.ndarray] = None - ) -> Optional[str]: - """Internal utility to call the optimizer and perform the gradient update.""" - return ( - self.optimizer.update(gradient) - if hessian is None - else self.optimizer.update(gradient, hessian) - )