diff --git a/pyRDDLGym/core/intervals.py b/pyRDDLGym/core/intervals.py index 16901cc8..413cfa0a 100644 --- a/pyRDDLGym/core/intervals.py +++ b/pyRDDLGym/core/intervals.py @@ -21,6 +21,8 @@ class RDDLIntervalAnalysis: def __init__(self, rddl: RDDLPlanningModel, logger: Optional[Logger]=None) -> None: '''Creates a new interval analysis object for the given RDDL domain. + Bounds on probability distributions are calculated using their exact + support, which can be unbounded intervals. :param rddl: the RDDL domain to analyze :param logger: to log compilation information during tracing to file @@ -39,18 +41,21 @@ def __init__(self, rddl: RDDLPlanningModel, logger: Optional[Logger]=None) -> No self.NUMPY_LITERAL_TO_INT = np.vectorize(self.rddl.object_to_index.__getitem__) def bound(self, action_bounds: Optional[Bounds]=None, - per_epoch: bool=False) -> Bounds: + per_epoch: bool=False, + state_bounds: Optional[Bounds]=None) -> Bounds: '''Computes intervals on all fluents and reward for the planning problem. :param action_bounds: optional bounds on action fluents (defaults to a "random" policy otherwise) :param per_epoch: if True, the returned bounds are tensors with leading dimension indicating the decision epoch; if False, the returned bounds - are valid across all decision epochs. + are valid across all decision epochs + :param state_bounds: optional bounds on state fluents (defaults to + the initial state values otherwise). ''' # get initial values as bounds - intervals = self._bound_initial_values() + intervals = self._bound_initial_values(state_bounds) if per_epoch: result = {} @@ -72,7 +77,7 @@ def bound(self, action_bounds: Optional[Bounds]=None, else: return intervals - def _bound_initial_values(self): + def _bound_initial_values(self, state_bounds=None): rddl = self.rddl # initially all bounds are calculated based on the initial values @@ -90,7 +95,10 @@ def _bound_initial_values(self): params = rddl.variable_params[name] shape = rddl.object_counts(params) values = np.reshape(values, newshape=shape) - intervals[name] = (values, values) + if state_bounds is not None and name in state_bounds: + intervals[name] = state_bounds[name] + else: + intervals[name] = (values, values) return intervals def _bound_next_epoch(self, intervals, action_bounds=None, per_epoch=False): @@ -158,10 +166,11 @@ def _bound(self, expr, intervals): result = self._bound_control(expr, intervals) elif etype == 'randomvar': result = self._bound_random(expr, intervals) - elif etype == 'randomvector': - result = self._bound_random_vector(expr, intervals) - elif etype == 'matrix': - result = self._bound_matrix(expr, intervals) + # TODO: complete randomvector and matrix + # elif etype == 'randomvector': + # result = self._bound_random_vector(expr, intervals) + # elif etype == 'matrix': + # result = self._bound_matrix(expr, intervals) else: raise RDDLNotImplementedError( f'Internal error: expression type {etype} is not supported.\n' + @@ -240,6 +249,13 @@ def _bound_pvar(self, expr, intervals): @staticmethod def _mask_assign(dest, mask, value, mask_value=False): + '''Assings a value to a destination array based on a mask. + + :param dest: the destination array to assign to + :param mask: the mask array to determine where to assign + :param value: the value to assign + :param mask_value: if True, the value is also masked + ''' assert (np.shape(dest) == np.shape(mask)) if np.shape(dest): if mask_value: @@ -971,8 +987,8 @@ def _bound_gamma(self, expr, intervals): (lsh, ush) = self._bound(shape, intervals) (lsc, usc) = self._bound(scale, intervals) - lower = np.zeros(shape=np.shape(ls), dtype=np.float64) - upper = np.full(shape=np.shape(us), fill_value=np.inf, dtype=np.float64) + lower = np.zeros(shape=np.shape(lsh), dtype=np.float64) + upper = np.full(shape=np.shape(ush), fill_value=np.inf, dtype=np.float64) return (lower, upper) def _bound_binomial(self, expr, intervals): @@ -1035,6 +1051,16 @@ def _bound_gumbel(self, expr, intervals): upper = np.full(shape=np.shape(um), fill_value=+np.inf, dtype=np.float64) return (lower, upper) + def _bound_laplace(self, expr, intervals): + args = expr.args + mean, scale = args + (lm, um) = self._bound(mean, intervals) + (ls, us) = self._bound(scale, intervals) + + lower = np.full(shape=np.shape(lm), fill_value=-np.inf, dtype=np.float64) + upper = np.full(shape=np.shape(um), fill_value=+np.inf, dtype=np.float64) + return (lower, upper) + def _bound_cauchy(self, expr, intervals): args = expr.args mean, scale = args @@ -1099,6 +1125,3 @@ def _bound_discrete_pvar(self, expr, intervals, unnorm): bounds = [(lower_prob[..., i], upper_prob[..., i]) for i in range(lower_prob.shape[-1])] return self._bound_discrete_helper(bounds) - - - \ No newline at end of file