-
Notifications
You must be signed in to change notification settings - Fork 1
/
mdp_parser.py
155 lines (131 loc) · 5.87 KB
/
mdp_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""Implements a parser that converts an XADD RDDL model into an MDP."""
import itertools
from typing import Dict, List, Set, Tuple, Union
import symengine.lib.symengine_wrapper as core
from pyRDDLGym_symbolic.mdp.action import SingleAction, CAction, BActions
from pyRDDLGym_symbolic.mdp.mdp import MDP
from pyRDDLGym_symbolic.utils.xadd_utils import BoundAnalysis
from pyRDDLGym_symbolic.core.model import RDDLModelXADD
def _truncated_powerset(iterable, max_size: int, include_noop: bool):
"""Returns the powerset of an iterable."""
s = list(iterable)
return itertools.chain.from_iterable(
itertools.combinations(s, r) for r in range(1 - int(include_noop), max_size + 1)
)
class MDPParser:
def parse(
self,
model: RDDLModelXADD,
discount: float = 1.0,
concurrency: Union[str, int] = 1,
is_linear: bool = False,
include_noop: bool = True,
is_vi: bool = True,
) -> MDP:
"""Parses the RDDL model into an MDP object.
Args:
model: The RDDL model compiled in XADD.
discount: The discount factor.
concurrency: The number of concurrent boolean actions.
is_linear: Whether the MDP is linear or not.
include_noop: Whether to include the no-op action or not. Defaults to True.
is_vi: Whether solving Value Iteration (VI).
Returns:
The MDP object.
"""
if concurrency == 'pos-inf':
concurrency = int(1e9)
mdp = MDP(model, is_linear, discount, concurrency)
# Configure the bounds of continuous states.
cont_s_vars = set()
for s in model.state_fluents:
if model.variable_ranges[s] != 'real':
continue
cont_s_vars.add(model.ns[s])
cont_state_bounds = self.configure_bounds(mdp, model.invariants, cont_s_vars)
mdp.cont_state_bounds = cont_state_bounds
# Go throuah all actions and get corresponding CPFs and rewards.
# For Boolean actions, we have to handle concurrency using `ConcurrentBAction`
# class.
actions = model.action_fluents
bool_actions, cont_actions = [], []
for name, val in actions.items():
atype = 'bool' if isinstance(val, bool) else 'real'
a_symbol = model.ns.get(name)
if a_symbol is None:
print(f'Warning: action {name} not found in RDDLModelXADD.actions')
a_symbol, a_node_id = model.add_sym_var(name, atype)
if atype == 'bool':
bool_actions.append(SingleAction(name, a_symbol, model, 'bool'))
else:
cont_actions.append(CAction(name, a_symbol, model))
# Add concurrent actions for Boolean actions.
if is_vi:
# Need to consider all combinations of boolean actions.
# Note: there's always an implicit no-op action with which
# none of the boolean actions are set to True.
total_bool_actions = tuple(
_truncated_powerset(
bool_actions,
mdp.max_allowed_actions,
include_noop=include_noop,
))
for actions in total_bool_actions:
names = tuple(a.name for a in actions)
symbols = tuple(a.symbol for a in actions)
action = BActions(names, symbols, model)
mdp.add_action(action)
else:
for action in bool_actions:
name, symbol = action.name, action.symbol
mdp.add_action(
BActions((name,), (symbol,), model)
)
# Add continuous actions.
for action in cont_actions:
mdp.add_action(action)
# Configure the bounds of continuous actions.
if len(mdp.cont_a_vars) > 0:
cont_action_bounds = self.configure_bounds(mdp, model.preconditions, mdp.cont_a_vars)
mdp.cont_action_bounds = cont_action_bounds
# Update the state variable sets.
mdp.update_state_var_sets()
# Update the next state and interm variable sets.
mdp.update_i_and_ns_var_sets()
# Go through the actions and update the corresponding CPF XADDs.
mdp.update(is_vi=is_vi)
return mdp
def configure_bounds(
self, mdp: MDP, conditions: List[int], var_set: Set[core.Symbol],
) -> Dict[CAction, Tuple[float, float]]:
"""Configures the bounds over continuous variables."""
context = mdp.context
# Bounds dictionaries to be updated.
lb_dict: Dict[core.Symbol, List[core.Basic]] = {}
ub_dict: Dict[core.Symbol, List[core.Basic]] = {}
# Iterate over conditions (state invariants or action preconditions).
for p in conditions:
# Instantiate the leaf operation object.
leaf_op = BoundAnalysis(context=mdp.context, var_set=var_set)
# Perform recursion.
context.reduce_process_xadd_leaf(p, leaf_op, [], [])
# Retrieve bounds.
lb_d = leaf_op.lb_dict
ub_d = leaf_op.ub_dict
for v in lb_d:
v_max_lb = lb_dict.setdefault(v, lb_d[v])
v_max_lb = max(v_max_lb, lb_d[v]) # Get the largest lower bound.
lb_dict[v] = v_max_lb
for v in ub_d:
v_min_ub = ub_dict.setdefault(v, ub_d[v])
v_min_ub = min(v_min_ub, ub_d[v]) # Get the smallest upper bound.
ub_dict[v] = v_min_ub
# Convert the bounds dictionaries into a dictionary of tuples.
bounds = {}
for v in var_set:
lb = lb_dict.get(v, -float('inf'))
ub = ub_dict.get(v, float('inf'))
lb, ub = float(lb), float(ub)
bounds[v] = (lb, ub)
context.update_bounds(bounds)
return bounds