diff --git a/.mypy.ini b/.mypy.ini index a42b15d26..e769179a9 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -15,3 +15,6 @@ ignore_missing_imports = True [mypy-pyparsing.*] ignore_missing_imports = True + +[mypy-pysmt.*] +ignore_missing_imports = True diff --git a/docs/code_snippets/multi_agent_and_contingent.py b/docs/code_snippets/multi_agent_and_contingent.py index f14e9cc76..a6ce21085 100644 --- a/docs/code_snippets/multi_agent_and_contingent.py +++ b/docs/code_snippets/multi_agent_and_contingent.py @@ -41,8 +41,7 @@ problem.add_goal(Equals(Dot(robot, robot_position), locations[-1])) -from unified_planning.model.contingent_problem import ContingentProblem -from unified_planning.model.action import SensingAction +from unified_planning.model import ContingentProblem, SensingAction problem = ContingentProblem("lost_packages") Package = UserType("Package") diff --git a/docs/problem_representation.rst b/docs/problem_representation.rst index 3c1d6897c..86926ef83 100644 --- a/docs/problem_representation.rst +++ b/docs/problem_representation.rst @@ -55,7 +55,7 @@ Contingent Example A contingent planning problem represents an action-based problem in which the exact initial state is not entirely known and some of the actions produce “observations” upon execution. More specifically, some actions can be SensingActions, which indicate which fluents they observe and after the successful execution of such actions, the observed fluents become known to the executor. The inherent non-determinism in the initial state can therefore be “shrinked” by performing suitable SensingActions and a plan is then a strategy that prescribes what to execute based on the past observations. .. literalinclude:: ./code_snippets/multi_agent_and_contingent.py - :lines: 44-94 + :lines: 44-93 Hierarchical Example diff --git a/requirements.txt b/requirements.txt index 62b2a154a..953c248c0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pyparsing networkx +pysmt diff --git a/setup.py b/setup.py index 4928a42e0..478106613 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ packages=find_packages(), include_package_data=True, python_requires=">=3.7", # supported Python ranges - install_requires=["pyparsing", "networkx"], + install_requires=["pyparsing", "networkx", "pysmt"], extras_require={ "dev": ["tarski[arithmetic]", "pytest", "pytest-cov", "mypy"], "grpc": ["grpcio", "grpcio-tools", "grpc-stubs"], diff --git a/unified_planning/engines/engine.py b/unified_planning/engines/engine.py index cdd0a43e4..ea27c277b 100644 --- a/unified_planning/engines/engine.py +++ b/unified_planning/engines/engine.py @@ -34,6 +34,7 @@ class OperationMode(Enum): SEQUENTIAL_SIMULATOR = "sequential_simulator" REPLANNER = "replanner" PLAN_REPAIRER = "plan_repairer" + ACTION_SELECTOR = "action_selector" class EngineMeta(ABCMeta): diff --git a/unified_planning/engines/factory.py b/unified_planning/engines/factory.py index 48f27cc93..a5866c981 100644 --- a/unified_planning/engines/factory.py +++ b/unified_planning/engines/factory.py @@ -36,6 +36,7 @@ from unified_planning.engines.mixins.sequential_simulator import ( SequentialSimulatorMixin, ) +from unified_planning.engines.mixins.action_selector import ActionSelectorMixin from unified_planning.engines.engine import OperationMode from typing import IO, Any, Dict, Tuple, Optional, List, Union, Type, Sequence, cast from pathlib import PurePath @@ -659,14 +660,19 @@ def _get_engine( msg = f"The problem has no quality metrics but the engine is required to be optimal!" raise up.exceptions.UPUsageError(msg) res = EngineClass(problem=problem, **params) - elif operation_mode == OperationMode.SEQUENTIAL_SIMULATOR: + elif ( + operation_mode == OperationMode.SEQUENTIAL_SIMULATOR + or operation_mode == OperationMode.ACTION_SELECTOR + ): assert problem is not None res = EngineClass( problem=problem, error_on_failed_checks=error_failed_checks, **params, ) - assert isinstance(res, SequentialSimulatorMixin) + assert isinstance(res, SequentialSimulatorMixin) or isinstance( + res, ActionSelectorMixin + ) elif operation_mode == OperationMode.COMPILER: res = EngineClass(**params) assert isinstance(res, CompilerMixin) @@ -961,6 +967,31 @@ def PlanRepairer( optimality_guarantee=optimality_guarantee, ) + def ActionSelector( + self, + problem: "up.model.AbstractProblem", + *, + name: Optional[str] = None, + params: Optional[Dict[str, str]] = None, + ) -> "up.engines.engine.Engine": + """ + Returns an ActionSelector. There are two ways to call this method: + + * | using ``problem_kind`` through the problem field. + | e.g. ``ActionSelector(problem)`` + * | using ``name`` (the name of a specific action selector) and eventually some ``params`` + | (engine dependent options). + | e.g. ``ActionSelector(problem, name='xxx')`` + """ + return self._get_engine( + OperationMode.ACTION_SELECTOR, + name, + None, + params, + problem.kind, + problem=problem, + ) + def PortfolioSelector( self, *, diff --git a/unified_planning/engines/mixins/action_selector.py b/unified_planning/engines/mixins/action_selector.py new file mode 100644 index 000000000..e4e29a99c --- /dev/null +++ b/unified_planning/engines/mixins/action_selector.py @@ -0,0 +1,85 @@ +# Copyright 2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unified_planning as up +from typing import Dict +from warnings import warn +from unified_planning.exceptions import UPUsageError + + +class ActionSelectorMixin: + """ + This class defines the interface that an :class:`~unified_planning.engines.Engine` + that is also an `ActionSelector` must implement. + + Important NOTE: The `AbstractProblem` instance is given at the constructor. + """ + + def __init__(self, problem: "up.model.AbstractProblem"): + self._problem = problem + self_class = type(self) + assert issubclass( + self_class, up.engines.engine.Engine + ), "ActionSelectorMixin does not implement the up.engines.Engine class" + assert isinstance(self, up.engines.engine.Engine) + if not self.skip_checks and not self_class.supports(problem.kind): + msg = f"We cannot establish whether {self.name} is able to handle this problem!" + if self.error_on_failed_checks: + raise UPUsageError(msg) + else: + warn(msg) + + @staticmethod + def is_action_selector() -> bool: + """ + Returns True if this engine is also an action selector, False otherwise. + + :return: True if this engine is also an action selector, False otherwise. + """ + return True + + def get_action(self) -> "up.plans.ActionInstance": + """ + Returns the next action to be taken in the current state of the problem. + + :return: An instance of `ActionInstance` representing the next action to be taken. + """ + return self._get_action() + + def update(self, observation: Dict["up.model.FNode", "up.model.FNode"]): + """ + Updates the internal state of the engine based on the given observation. + + :param observation: A dictionary from observed fluents to their observed values. + """ + self._update(observation) + + def _get_action(self) -> "up.plans.ActionInstance": + """ + Returns the next action to be taken in the current state of the problem. This method should be + implemented by subclasses. + + :return: An instance of `ActionInstance` representing the next action to be taken. + """ + raise NotImplementedError + + def _update(self, observation: Dict["up.model.FNode", "up.model.FNode"]): + """ + Updates the internal state of the engine based on the given observation. This method should be + implemented by subclasses. + + :param observation: A dictionary from observed fluents to their observed values. + """ + raise NotImplementedError diff --git a/unified_planning/model/__init__.py b/unified_planning/model/__init__.py index 6cd01394b..a83ee48b2 100644 --- a/unified_planning/model/__init__.py +++ b/unified_planning/model/__init__.py @@ -18,8 +18,8 @@ Action, InstantaneousAction, DurativeAction, - SensingAction, ) +from unified_planning.model.contingent.sensing_action import SensingAction from unified_planning.model.effect import Effect, SimulatedEffect, EffectKind from unified_planning.model.expression import ( BoolExpression, @@ -35,7 +35,7 @@ from unified_planning.model.parameter import Parameter from unified_planning.model.abstract_problem import AbstractProblem from unified_planning.model.problem import Problem -from unified_planning.model.contingent_problem import ContingentProblem +from unified_planning.model.contingent.contingent_problem import ContingentProblem from unified_planning.model.delta_stn import DeltaSimpleTemporalNetwork from unified_planning.model.problem_kind import ProblemKind from unified_planning.model.state import State, UPState diff --git a/unified_planning/model/action.py b/unified_planning/model/action.py index 88426485d..383ad4c74 100644 --- a/unified_planning/model/action.py +++ b/unified_planning/model/action.py @@ -663,76 +663,3 @@ def is_conditional(self) -> bool: """Returns `True` if the `action` has `conditional effects`, `False` otherwise.""" # re-implemenation needed for inheritance, delegate implementation. return TimedCondsEffs.is_conditional(self) - - -class SensingAction(InstantaneousAction): - """This class represents a sensing action.""" - - def __init__( - self, - _name: str, - _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, - _env: Optional[Environment] = None, - **kwargs: "up.model.types.Type", - ): - InstantaneousAction.__init__(self, _name, _parameters, _env, **kwargs) - self._observed_fluents: List["up.model.fnode.FNode"] = [] - - def __eq__(self, oth: object) -> bool: - if isinstance(oth, SensingAction): - return super().__eq__(oth) and set(self._observed_fluents) == set( - oth._observed_fluents - ) - else: - return False - - def __hash__(self) -> int: - res = super().__hash__() - for of in self._observed_fluents: - res += hash(of) - return res - - def clone(self): - new_params = OrderedDict() - for param_name, param in self._parameters.items(): - new_params[param_name] = param.type - new_sensing_action = SensingAction(self._name, new_params, self._environment) - new_sensing_action._preconditions = self._preconditions[:] - new_sensing_action._effects = [e.clone() for e in self._effects] - new_sensing_action._fluents_assigned = self._fluents_assigned.copy() - new_sensing_action._fluents_inc_dec = self._fluents_inc_dec.copy() - new_sensing_action._simulated_effect = self._simulated_effect - new_sensing_action._observed_fluents = self._observed_fluents.copy() - return new_sensing_action - - def add_observed_fluents(self, observed_fluents: Iterable["up.model.fnode.FNode"]): - """ - Adds the given list of observed fluents. - - :param observed_fluents: The list of observed fluents that must be added. - """ - for of in observed_fluents: - self.add_observed_fluent(of) - - def add_observed_fluent(self, observed_fluent: "up.model.fnode.FNode"): - """ - Adds the given observed fluent. - - :param observed_fluent: The observed fluent that must be added. - """ - self._observed_fluents.append(observed_fluent) - - @property - def observed_fluents(self) -> List["up.model.fnode.FNode"]: - """Returns the `list` observed fluents.""" - return self._observed_fluents - - def __repr__(self) -> str: - b = InstantaneousAction.__repr__(self)[0:-3] - s = ["sensing-", b] - s.append(" observations = [\n") - for e in self._observed_fluents: - s.append(f" {str(e)}\n") - s.append(" ]\n") - s.append(" }") - return "".join(s) diff --git a/unified_planning/model/contingent/__init__.py b/unified_planning/model/contingent/__init__.py new file mode 100644 index 000000000..81a8f1213 --- /dev/null +++ b/unified_planning/model/contingent/__init__.py @@ -0,0 +1,20 @@ +# Copyright 2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +from unified_planning.model.contingent.environment import ( + Environment, + SimulatedEnvironment, +) diff --git a/unified_planning/model/contingent_problem.py b/unified_planning/model/contingent/contingent_problem.py similarity index 98% rename from unified_planning/model/contingent_problem.py rename to unified_planning/model/contingent/contingent_problem.py index e7f9709ac..3fdb1e1c3 100644 --- a/unified_planning/model/contingent_problem.py +++ b/unified_planning/model/contingent/contingent_problem.py @@ -156,7 +156,8 @@ def initial_values(self) -> Dict["up.model.fnode.FNode", "up.model.fnode.FNode"] res = self._initial_value for f in self._fluents: for f_exp in get_all_fluent_exp(self, f): - res[f_exp] = self.initial_value(f_exp) + if f_exp not in self._hidden_fluents: + res[f_exp] = self.initial_value(f_exp) return res @property diff --git a/unified_planning/model/contingent/environment.py b/unified_planning/model/contingent/environment.py new file mode 100644 index 000000000..bc0454910 --- /dev/null +++ b/unified_planning/model/contingent/environment.py @@ -0,0 +1,152 @@ +# Copyright 2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import random +import unified_planning as up +from unified_planning.exceptions import UPUsageError +from pysmt.shortcuts import Solver, Not, And, Symbol, Or, ExactlyOne, EqualsOrIff +from pysmt.oracles import get_logic +from typing import Dict + + +class Environment: + """ + A base class that defines the interface for an environment in the planning domain. + """ + + def __init__( + self, problem: "up.model.contingent.contingent_problem.ContingentProblem" + ): + self._problem = problem + + def apply( + self, action: "up.plans.ActionInstance" + ) -> Dict["up.model.FNode", "up.model.FNode"]: + """ + Applies the given action to the current state of the environment and returns the resulting observation. + + :param action: A :class:`~unified_planning.plans.ActionInstance` object representing the action to apply. + :return: A dictionary mapping the fluent expressions observed by the sensing action to their corresponding values. + """ + raise NotImplementedError + + def is_goal_reached(self) -> bool: + """ + Determines whether the goal of the planning problem has been reached in the current state of the environment. + + :return: A boolean value indicating whether the goal has been reached. + """ + raise NotImplementedError + + +class SimulatedEnvironment(Environment): + """ + An implementation of an environment that simulates the effects of actions on a given contingent planning problem. + + :param problem: A :class:`~unified_planning.model.ContingentProblem` object representing the planning problem to simulate. + """ + + def __init__( + self, problem: "up.model.contingent.contingent_problem.ContingentProblem" + ): + super().__init__(problem) + self._deterministic_problem = problem.clone() + self._randomly_set_full_initial_state(self._deterministic_problem) + self._simulator = up.engines.UPSequentialSimulator( + self._deterministic_problem, False + ) + self._state = self._simulator.get_initial_state() + + def _randomly_set_full_initial_state( + self, problem: "up.model.contingent.contingent_problem.ContingentProblem" + ): + fnode_to_symbol = {} + symbol_to_fnode = {} + cnt = 0 + for hf in problem.hidden_fluents: + if not hf.is_not(): + s = Symbol(f"v_{cnt}") + fnode_to_symbol[hf] = s + symbol_to_fnode[s] = hf + cnt += 1 + + constraints = [] + for c in problem.oneof_constraints: + args = [] + for x in c: + if x.is_not(): + args.append(Not(fnode_to_symbol[x.arg(0)])) + else: + args.append(fnode_to_symbol[x]) + constraints.append(ExactlyOne(args)) + for c in problem.or_constraints: + args = [] + for x in c: + if x.is_not(): + args.append(Not(fnode_to_symbol[x.arg(0)])) + else: + args.append(fnode_to_symbol[x]) + constraints.append(Or(args)) + + res = random.choice(list(all_smt(And(constraints), symbol_to_fnode.keys()))) + + for k, v in res.items(): + f = symbol_to_fnode[k] + assert v.is_bool_constant() + problem.set_initial_value(f, v.is_true()) + + def apply( + self, action: "up.plans.ActionInstance" + ) -> Dict["up.model.FNode", "up.model.FNode"]: + """ + Applies the given action to the current state of the environment and returns the resulting observation. + + :param action: A :class:`~unified_planning.plans.ActionInstance` object representing the action to apply. + :return: A dictionary mapping the fluent expressions observed by the sensing action to their corresponding values. + """ + new_state = self._simulator.apply( + self._state, action.action, action.actual_parameters + ) + if new_state is None: + raise UPUsageError("The given action is not applicable!") + self._state = new_state + res = {} + subs: Dict["up.model.Expression", "up.model.Expression"] = dict( + zip(action.action.parameters, action.actual_parameters) + ) + if isinstance(action.action, up.model.contingent.sensing_action.SensingAction): + for f in action.action.observed_fluents: + f_exp = f.substitute(subs) + res[f_exp] = self._state.get_value(f_exp) + return res + + def is_goal_reached(self) -> bool: + """ + Determines whether the goal of the planning problem has been reached in the current state of the environment. + + :return: A boolean value indicating whether the goal has been reached. + """ + return self._simulator.is_goal(self._state) + + +def all_smt(formula, keys): + target_logic = get_logic(formula) + with Solver(logic=target_logic) as solver: + solver.add_assertion(formula) + while solver.solve(): + res = {k: solver.get_value(k) for k in keys} + yield res + partial_model = [EqualsOrIff(k, v) for k, v in res.items()] + solver.add_assertion(Not(And(partial_model))) diff --git a/unified_planning/model/contingent/sensing_action.py b/unified_planning/model/contingent/sensing_action.py new file mode 100644 index 000000000..05299d565 --- /dev/null +++ b/unified_planning/model/contingent/sensing_action.py @@ -0,0 +1,94 @@ +# Copyright 2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import unified_planning as up +from unified_planning.environment import Environment +from typing import List, Optional, Iterable +from collections import OrderedDict +from unified_planning.model.action import InstantaneousAction + + +class SensingAction(InstantaneousAction): + """This class represents a sensing action.""" + + def __init__( + self, + _name: str, + _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, + _env: Optional[Environment] = None, + **kwargs: "up.model.types.Type", + ): + InstantaneousAction.__init__(self, _name, _parameters, _env, **kwargs) + self._observed_fluents: List["up.model.fnode.FNode"] = [] + + def __eq__(self, oth: object) -> bool: + if isinstance(oth, SensingAction): + return super().__eq__(oth) and set(self._observed_fluents) == set( + oth._observed_fluents + ) + else: + return False + + def __hash__(self) -> int: + res = super().__hash__() + for of in self._observed_fluents: + res += hash(of) + return res + + def clone(self): + new_params = OrderedDict() + for param_name, param in self._parameters.items(): + new_params[param_name] = param.type + new_sensing_action = SensingAction(self._name, new_params, self._environment) + new_sensing_action._preconditions = self._preconditions[:] + new_sensing_action._effects = [e.clone() for e in self._effects] + new_sensing_action._fluents_assigned = self._fluents_assigned.copy() + new_sensing_action._fluents_inc_dec = self._fluents_inc_dec.copy() + new_sensing_action._simulated_effect = self._simulated_effect + new_sensing_action._observed_fluents = self._observed_fluents.copy() + return new_sensing_action + + def add_observed_fluents(self, observed_fluents: Iterable["up.model.fnode.FNode"]): + """ + Adds the given list of observed fluents. + + :param observed_fluents: The list of observed fluents that must be added. + """ + for of in observed_fluents: + self.add_observed_fluent(of) + + def add_observed_fluent(self, observed_fluent: "up.model.fnode.FNode"): + """ + Adds the given observed fluent. + + :param observed_fluent: The observed fluent that must be added. + """ + self._observed_fluents.append(observed_fluent) + + @property + def observed_fluents(self) -> List["up.model.fnode.FNode"]: + """Returns the `list` observed fluents.""" + return self._observed_fluents + + def __repr__(self) -> str: + b = InstantaneousAction.__repr__(self)[0:-3] + s = ["sensing-", b] + s.append(" observations = [\n") + for e in self._observed_fluents: + s.append(f" {str(e)}\n") + s.append(" ]\n") + s.append(" }") + return "".join(s) diff --git a/unified_planning/model/mixins/actions_set.py b/unified_planning/model/mixins/actions_set.py index 68ecef9f8..f5f98ba95 100644 --- a/unified_planning/model/mixins/actions_set.py +++ b/unified_planning/model/mixins/actions_set.py @@ -61,13 +61,15 @@ def instantaneous_actions(self) -> Iterator["up.model.action.InstantaneousAction yield a @property - def sensing_actions(self) -> Iterator["up.model.action.SensingAction"]: + def sensing_actions( + self, + ) -> Iterator["up.model.contingent.sensing_action.SensingAction"]: """Returs all the sensing actions of the problem. IMPORTANT NOTE: this property does some computation, so it should be called as seldom as possible.""" for a in self._actions: - if isinstance(a, up.model.action.SensingAction): + if isinstance(a, up.model.contingent.sensing_action.SensingAction): yield a @property diff --git a/unified_planning/model/problem.py b/unified_planning/model/problem.py index 96bff4d87..6d80dc43a 100644 --- a/unified_planning/model/problem.py +++ b/unified_planning/model/problem.py @@ -844,7 +844,7 @@ def _update_problem_kind_action( ): for p in action.parameters: self._update_problem_kind_type(p.type) - if isinstance(action, up.model.action.SensingAction): + if isinstance(action, up.model.contingent.sensing_action.SensingAction): self._kind.set_problem_class("CONTINGENT") if isinstance(action, up.model.tamp.InstantaneousMotionAction): if len(action.motion_constraints) > 0: diff --git a/unified_planning/shortcuts.py b/unified_planning/shortcuts.py index 182d48cd7..f84918476 100644 --- a/unified_planning/shortcuts.py +++ b/unified_planning/shortcuts.py @@ -720,6 +720,26 @@ def PlanRepairer( ) +def ActionSelector( + problem: "up.model.AbstractProblem", + *, + name: Optional[str] = None, + params: Optional[Dict[str, str]] = None, +) -> "up.engines.engine.Engine": + """ + Returns an ActionSelector. There are two ways to call this method: + + * | using ``problem_kind`` through the problem field. + | e.g. ``ActionSelector(problem)`` + * | using ``name`` (the name of a specific action selector) and eventually some ``params`` + | (engine dependent options). + | e.g. ``ActionSelector(problem, name='xxx')`` + """ + return get_environment().factory.ActionSelector( + problem=problem, name=name, params=params + ) + + def PortfolioSelector( *, name: Optional[str] = None,