From 3e167e966c052b29d52780ebd06c48d519b4b59b Mon Sep 17 00:00:00 2001 From: Alessandro Valentini Date: Fri, 14 Apr 2023 15:47:55 +0200 Subject: [PATCH] Added the new ActionSelector OM and added a contingent environment --- .mypy.ini | 3 + .../multi_agent_and_contingent.py | 3 +- requirements.txt | 1 + setup.py | 2 +- unified_planning/engines/engine.py | 1 + unified_planning/engines/factory.py | 34 +++- .../engines/mixins/action_selector.py | 85 ++++++++++ unified_planning/model/__init__.py | 4 +- unified_planning/model/action.py | 73 --------- unified_planning/model/contingent/__init__.py | 20 +++ .../{ => contingent}/contingent_problem.py | 3 +- .../model/contingent/environment.py | 148 ++++++++++++++++++ .../model/contingent/sensing_action.py | 94 +++++++++++ unified_planning/model/mixins/actions_set.py | 6 +- unified_planning/model/problem.py | 2 +- unified_planning/shortcuts.py | 19 +++ 16 files changed, 414 insertions(+), 84 deletions(-) create mode 100644 unified_planning/engines/mixins/action_selector.py create mode 100644 unified_planning/model/contingent/__init__.py rename unified_planning/model/{ => contingent}/contingent_problem.py (98%) create mode 100644 unified_planning/model/contingent/environment.py create mode 100644 unified_planning/model/contingent/sensing_action.py diff --git a/.mypy.ini b/.mypy.ini index a42b15d26..e769179a9 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -15,3 +15,6 @@ ignore_missing_imports = True [mypy-pyparsing.*] ignore_missing_imports = True + +[mypy-pysmt.*] +ignore_missing_imports = True diff --git a/docs/code_snippets/multi_agent_and_contingent.py b/docs/code_snippets/multi_agent_and_contingent.py index f14e9cc76..a6ce21085 100644 --- a/docs/code_snippets/multi_agent_and_contingent.py +++ b/docs/code_snippets/multi_agent_and_contingent.py @@ -41,8 +41,7 @@ problem.add_goal(Equals(Dot(robot, robot_position), locations[-1])) -from unified_planning.model.contingent_problem import ContingentProblem -from unified_planning.model.action import SensingAction +from unified_planning.model import ContingentProblem, SensingAction problem = ContingentProblem("lost_packages") Package = UserType("Package") diff --git a/requirements.txt b/requirements.txt index 62b2a154a..953c248c0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pyparsing networkx +pysmt diff --git a/setup.py b/setup.py index 86bbb8f3b..cee269252 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ packages=find_packages(), include_package_data=True, python_requires=">=3.7", # supported Python ranges - install_requires=["pyparsing", "networkx"], + install_requires=["pyparsing", "networkx", "pysmt"], extras_require={ "dev": ["tarski[arithmetic]", "pytest", "pytest-cov", "mypy"], "grpc": ["grpcio", "grpcio-tools", "grpc-stubs"], diff --git a/unified_planning/engines/engine.py b/unified_planning/engines/engine.py index 59a35255f..9788741a5 100644 --- a/unified_planning/engines/engine.py +++ b/unified_planning/engines/engine.py @@ -33,6 +33,7 @@ class OperationMode(Enum): SEQUENTIAL_SIMULATOR = "sequential_simulator" REPLANNER = "replanner" PLAN_REPAIRER = "plan_repairer" + ACTION_SELECTOR = "action_selector" class EngineMeta(type): diff --git a/unified_planning/engines/factory.py b/unified_planning/engines/factory.py index ca5d80051..5945feb29 100644 --- a/unified_planning/engines/factory.py +++ b/unified_planning/engines/factory.py @@ -36,6 +36,7 @@ from unified_planning.engines.mixins.sequential_simulator import ( SequentialSimulatorMixin, ) +from unified_planning.engines.mixins.action_selector import ActionSelectorMixin from unified_planning.engines.engine import OperationMode from typing import IO, Any, Dict, Tuple, Optional, List, Union, Type, Sequence from pathlib import PurePath @@ -635,14 +636,19 @@ def _get_engine( msg = f"The problem has no quality metrics but the engine is required to be optimal!" raise up.exceptions.UPUsageError(msg) res = EngineClass(problem=problem, **params) - elif operation_mode == OperationMode.SEQUENTIAL_SIMULATOR: + elif ( + operation_mode == OperationMode.SEQUENTIAL_SIMULATOR + or operation_mode == OperationMode.ACTION_SELECTOR + ): assert problem is not None res = EngineClass( problem=problem, error_on_failed_checks=error_failed_checks, **params, ) - assert isinstance(res, SequentialSimulatorMixin) + assert isinstance(res, SequentialSimulatorMixin) or isinstance( + res, ActionSelectorMixin + ) elif operation_mode == OperationMode.COMPILER: res = EngineClass(**params) assert isinstance(res, CompilerMixin) @@ -926,6 +932,30 @@ def PlanRepairer( optimality_guarantee=optimality_guarantee, ) + def ActionSelector( + self, + problem: "up.model.AbstractProblem", + *, + name: Optional[str] = None, + params: Optional[Dict[str, str]] = None, + ) -> "up.engines.engine.Engine": + """ + Returns an ActionSelector. There are two ways to call this method: + - using 'problem_kind' through the problem field. + e.g. ActionSelector(problem) + - using 'name' (the name of a specific action selector) and eventually some 'params' + (engine dependent options). + e.g. ActionSelector(problem, name='xxx') + """ + return self._get_engine( + OperationMode.ACTION_SELECTOR, + name, + None, + params, + problem.kind, + problem=problem, + ) + def PortfolioSelector( self, *, diff --git a/unified_planning/engines/mixins/action_selector.py b/unified_planning/engines/mixins/action_selector.py new file mode 100644 index 000000000..e4e29a99c --- /dev/null +++ b/unified_planning/engines/mixins/action_selector.py @@ -0,0 +1,85 @@ +# Copyright 2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unified_planning as up +from typing import Dict +from warnings import warn +from unified_planning.exceptions import UPUsageError + + +class ActionSelectorMixin: + """ + This class defines the interface that an :class:`~unified_planning.engines.Engine` + that is also an `ActionSelector` must implement. + + Important NOTE: The `AbstractProblem` instance is given at the constructor. + """ + + def __init__(self, problem: "up.model.AbstractProblem"): + self._problem = problem + self_class = type(self) + assert issubclass( + self_class, up.engines.engine.Engine + ), "ActionSelectorMixin does not implement the up.engines.Engine class" + assert isinstance(self, up.engines.engine.Engine) + if not self.skip_checks and not self_class.supports(problem.kind): + msg = f"We cannot establish whether {self.name} is able to handle this problem!" + if self.error_on_failed_checks: + raise UPUsageError(msg) + else: + warn(msg) + + @staticmethod + def is_action_selector() -> bool: + """ + Returns True if this engine is also an action selector, False otherwise. + + :return: True if this engine is also an action selector, False otherwise. + """ + return True + + def get_action(self) -> "up.plans.ActionInstance": + """ + Returns the next action to be taken in the current state of the problem. + + :return: An instance of `ActionInstance` representing the next action to be taken. + """ + return self._get_action() + + def update(self, observation: Dict["up.model.FNode", "up.model.FNode"]): + """ + Updates the internal state of the engine based on the given observation. + + :param observation: A dictionary from observed fluents to their observed values. + """ + self._update(observation) + + def _get_action(self) -> "up.plans.ActionInstance": + """ + Returns the next action to be taken in the current state of the problem. This method should be + implemented by subclasses. + + :return: An instance of `ActionInstance` representing the next action to be taken. + """ + raise NotImplementedError + + def _update(self, observation: Dict["up.model.FNode", "up.model.FNode"]): + """ + Updates the internal state of the engine based on the given observation. This method should be + implemented by subclasses. + + :param observation: A dictionary from observed fluents to their observed values. + """ + raise NotImplementedError diff --git a/unified_planning/model/__init__.py b/unified_planning/model/__init__.py index 6cd01394b..a83ee48b2 100644 --- a/unified_planning/model/__init__.py +++ b/unified_planning/model/__init__.py @@ -18,8 +18,8 @@ Action, InstantaneousAction, DurativeAction, - SensingAction, ) +from unified_planning.model.contingent.sensing_action import SensingAction from unified_planning.model.effect import Effect, SimulatedEffect, EffectKind from unified_planning.model.expression import ( BoolExpression, @@ -35,7 +35,7 @@ from unified_planning.model.parameter import Parameter from unified_planning.model.abstract_problem import AbstractProblem from unified_planning.model.problem import Problem -from unified_planning.model.contingent_problem import ContingentProblem +from unified_planning.model.contingent.contingent_problem import ContingentProblem from unified_planning.model.delta_stn import DeltaSimpleTemporalNetwork from unified_planning.model.problem_kind import ProblemKind from unified_planning.model.state import State, UPState diff --git a/unified_planning/model/action.py b/unified_planning/model/action.py index b2688ed55..97baf89c9 100644 --- a/unified_planning/model/action.py +++ b/unified_planning/model/action.py @@ -660,76 +660,3 @@ def is_conditional(self) -> bool: """Returns `True` if the `action` has `conditional effects`, `False` otherwise.""" # re-implemenation needed for inheritance, delegate implementation. return TimedCondsEffs.is_conditional(self) - - -class SensingAction(InstantaneousAction): - """This class represents a sensing action.""" - - def __init__( - self, - _name: str, - _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, - _env: Optional[Environment] = None, - **kwargs: "up.model.types.Type", - ): - InstantaneousAction.__init__(self, _name, _parameters, _env, **kwargs) - self._observed_fluents: List["up.model.fnode.FNode"] = [] - - def __eq__(self, oth: object) -> bool: - if isinstance(oth, SensingAction): - return super().__eq__(oth) and set(self._observed_fluents) == set( - oth._observed_fluents - ) - else: - return False - - def __hash__(self) -> int: - res = super().__hash__() - for of in self._observed_fluents: - res += hash(of) - return res - - def clone(self): - new_params = OrderedDict() - for param_name, param in self._parameters.items(): - new_params[param_name] = param.type - new_sensing_action = SensingAction(self._name, new_params, self._environment) - new_sensing_action._preconditions = self._preconditions[:] - new_sensing_action._effects = [e.clone() for e in self._effects] - new_sensing_action._fluents_assigned = self._fluents_assigned.copy() - new_sensing_action._fluents_inc_dec = self._fluents_inc_dec.copy() - new_sensing_action._simulated_effect = self._simulated_effect - new_sensing_action._observed_fluents = self._observed_fluents.copy() - return new_sensing_action - - def add_observed_fluents(self, observed_fluents: Iterable["up.model.fnode.FNode"]): - """ - Adds the given list of observed fluents. - - :param observed_fluents: The list of observed fluents that must be added. - """ - for of in observed_fluents: - self.add_observed_fluent(of) - - def add_observed_fluent(self, observed_fluent: "up.model.fnode.FNode"): - """ - Adds the given observed fluent. - - :param observed_fluent: The observed fluent that must be added. - """ - self._observed_fluents.append(observed_fluent) - - @property - def observed_fluents(self) -> List["up.model.fnode.FNode"]: - """Returns the `list` observed fluents.""" - return self._observed_fluents - - def __repr__(self) -> str: - b = InstantaneousAction.__repr__(self)[0:-3] - s = ["sensing-", b] - s.append(" observations = [\n") - for e in self._observed_fluents: - s.append(f" {str(e)}\n") - s.append(" ]\n") - s.append(" }") - return "".join(s) diff --git a/unified_planning/model/contingent/__init__.py b/unified_planning/model/contingent/__init__.py new file mode 100644 index 000000000..81a8f1213 --- /dev/null +++ b/unified_planning/model/contingent/__init__.py @@ -0,0 +1,20 @@ +# Copyright 2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +from unified_planning.model.contingent.environment import ( + Environment, + SimulatedEnvironment, +) diff --git a/unified_planning/model/contingent_problem.py b/unified_planning/model/contingent/contingent_problem.py similarity index 98% rename from unified_planning/model/contingent_problem.py rename to unified_planning/model/contingent/contingent_problem.py index b1bd79a08..6a40ad026 100644 --- a/unified_planning/model/contingent_problem.py +++ b/unified_planning/model/contingent/contingent_problem.py @@ -155,7 +155,8 @@ def initial_values(self) -> Dict["up.model.fnode.FNode", "up.model.fnode.FNode"] res = self._initial_value for f in self._fluents: for f_exp in get_all_fluent_exp(self, f): - res[f_exp] = self.initial_value(f_exp) + if f_exp not in self._hidden_fluents: + res[f_exp] = self.initial_value(f_exp) return res @property diff --git a/unified_planning/model/contingent/environment.py b/unified_planning/model/contingent/environment.py new file mode 100644 index 000000000..69fdaf5dd --- /dev/null +++ b/unified_planning/model/contingent/environment.py @@ -0,0 +1,148 @@ +# Copyright 2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import random +import unified_planning as up +from unified_planning.exceptions import UPUsageError +from pysmt.shortcuts import Solver, Not, And, Symbol, Or, ExactlyOne, EqualsOrIff +from pysmt.oracles import get_logic +from typing import Dict + + +class Environment: + """ + A base class that defines the interface for an environment in the planning domain. + """ + + def __init__( + self, problem: "up.model.contingent.contingent_problem.ContingentProblem" + ): + self._problem = problem + + def apply( + self, action: "up.plans.ActionInstance" + ) -> Dict["up.model.FNode", "up.model.FNode"]: + """ + Applies the given action to the current state of the environment and returns the resulting observation. + + :param action: A :class:`~unified_planning.plans.ActionInstance` object representing the action to apply. + :return: A dictionary mapping the fluent expressions observed by the sensing action to their corresponding values. + """ + raise NotImplementedError + + def is_goal_reached(self) -> bool: + """ + Determines whether the goal of the planning problem has been reached in the current state of the environment. + + :return: A boolean value indicating whether the goal has been reached. + """ + raise NotImplementedError + + +class SimulatedEnvironment(Environment): + """ + An implementation of an environment that simulates the effects of actions on a given contingent planning problem. + + :param problem: A :class:`~unified_planning.model.ContingentProblem` object representing the planning problem to simulate. + """ + + def __init__( + self, problem: "up.model.contingent.contingent_problem.ContingentProblem" + ): + super().__init__(problem) + self._deterministic_problem = problem.clone() + self._randomly_set_full_initial_state(self._deterministic_problem) + self._simulator = up.engines.UPSequentialSimulator( + self._deterministic_problem, False + ) + self._state = self._simulator.get_initial_state() + + def _randomly_set_full_initial_state( + self, problem: "up.model.contingent.contingent_problem.ContingentProblem" + ): + fnode_to_symbol = {} + symbol_to_fnode = {} + cnt = 0 + for hf in problem.hidden_fluents: + if not hf.is_not(): + s = Symbol(f"v_{cnt}") + fnode_to_symbol[hf] = s + symbol_to_fnode[s] = hf + cnt += 1 + + constraints = [] + for c in problem.oneof_constraints: + args = [] + for x in c: + if x.is_not(): + args.append(Not(fnode_to_symbol[x.arg(0)])) + else: + args.append(fnode_to_symbol[x]) + constraints.append(ExactlyOne(args)) + for c in problem.or_constraints: + args = [] + for x in c: + if x.is_not(): + args.append(Not(fnode_to_symbol[x.arg(0)])) + else: + args.append(fnode_to_symbol[x]) + constraints.append(Or(args)) + + res = random.choice(list(all_smt(And(constraints), symbol_to_fnode.keys()))) + + for k, v in res.items(): + f = symbol_to_fnode[k] + assert v.is_bool_constant() + problem.set_initial_value(f, v.is_true()) + + def apply( + self, action: "up.plans.ActionInstance" + ) -> Dict["up.model.FNode", "up.model.FNode"]: + """ + Applies the given action to the current state of the environment and returns the resulting observation. + + :param action: A :class:`~unified_planning.plans.ActionInstance` object representing the action to apply. + :return: A dictionary mapping the fluent expressions observed by the sensing action to their corresponding values. + """ + new_state = self._simulator.apply( + self._state, action.action, action.actual_parameters + ) + if new_state is None: + raise UPUsageError("The given action is not applicable!") + self._state = new_state + res = {} + if isinstance(action, up.model.contingent.sensing_action.SensingAction): + for f in action.observed_fluents: + res[f] = self._state.get_value(f) + return res + + def is_goal_reached(self) -> bool: + """ + Determines whether the goal of the planning problem has been reached in the current state of the environment. + + :return: A boolean value indicating whether the goal has been reached. + """ + return self._simulator.is_goal(self._state) + + +def all_smt(formula, keys): + target_logic = get_logic(formula) + with Solver(logic=target_logic) as solver: + solver.add_assertion(formula) + while solver.solve(): + res = {k: solver.get_value(k) for k in keys} + yield res + partial_model = [EqualsOrIff(k, v) for k, v in res.items()] + solver.add_assertion(Not(And(partial_model))) diff --git a/unified_planning/model/contingent/sensing_action.py b/unified_planning/model/contingent/sensing_action.py new file mode 100644 index 000000000..05299d565 --- /dev/null +++ b/unified_planning/model/contingent/sensing_action.py @@ -0,0 +1,94 @@ +# Copyright 2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import unified_planning as up +from unified_planning.environment import Environment +from typing import List, Optional, Iterable +from collections import OrderedDict +from unified_planning.model.action import InstantaneousAction + + +class SensingAction(InstantaneousAction): + """This class represents a sensing action.""" + + def __init__( + self, + _name: str, + _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, + _env: Optional[Environment] = None, + **kwargs: "up.model.types.Type", + ): + InstantaneousAction.__init__(self, _name, _parameters, _env, **kwargs) + self._observed_fluents: List["up.model.fnode.FNode"] = [] + + def __eq__(self, oth: object) -> bool: + if isinstance(oth, SensingAction): + return super().__eq__(oth) and set(self._observed_fluents) == set( + oth._observed_fluents + ) + else: + return False + + def __hash__(self) -> int: + res = super().__hash__() + for of in self._observed_fluents: + res += hash(of) + return res + + def clone(self): + new_params = OrderedDict() + for param_name, param in self._parameters.items(): + new_params[param_name] = param.type + new_sensing_action = SensingAction(self._name, new_params, self._environment) + new_sensing_action._preconditions = self._preconditions[:] + new_sensing_action._effects = [e.clone() for e in self._effects] + new_sensing_action._fluents_assigned = self._fluents_assigned.copy() + new_sensing_action._fluents_inc_dec = self._fluents_inc_dec.copy() + new_sensing_action._simulated_effect = self._simulated_effect + new_sensing_action._observed_fluents = self._observed_fluents.copy() + return new_sensing_action + + def add_observed_fluents(self, observed_fluents: Iterable["up.model.fnode.FNode"]): + """ + Adds the given list of observed fluents. + + :param observed_fluents: The list of observed fluents that must be added. + """ + for of in observed_fluents: + self.add_observed_fluent(of) + + def add_observed_fluent(self, observed_fluent: "up.model.fnode.FNode"): + """ + Adds the given observed fluent. + + :param observed_fluent: The observed fluent that must be added. + """ + self._observed_fluents.append(observed_fluent) + + @property + def observed_fluents(self) -> List["up.model.fnode.FNode"]: + """Returns the `list` observed fluents.""" + return self._observed_fluents + + def __repr__(self) -> str: + b = InstantaneousAction.__repr__(self)[0:-3] + s = ["sensing-", b] + s.append(" observations = [\n") + for e in self._observed_fluents: + s.append(f" {str(e)}\n") + s.append(" ]\n") + s.append(" }") + return "".join(s) diff --git a/unified_planning/model/mixins/actions_set.py b/unified_planning/model/mixins/actions_set.py index 68ecef9f8..f5f98ba95 100644 --- a/unified_planning/model/mixins/actions_set.py +++ b/unified_planning/model/mixins/actions_set.py @@ -61,13 +61,15 @@ def instantaneous_actions(self) -> Iterator["up.model.action.InstantaneousAction yield a @property - def sensing_actions(self) -> Iterator["up.model.action.SensingAction"]: + def sensing_actions( + self, + ) -> Iterator["up.model.contingent.sensing_action.SensingAction"]: """Returs all the sensing actions of the problem. IMPORTANT NOTE: this property does some computation, so it should be called as seldom as possible.""" for a in self._actions: - if isinstance(a, up.model.action.SensingAction): + if isinstance(a, up.model.contingent.sensing_action.SensingAction): yield a @property diff --git a/unified_planning/model/problem.py b/unified_planning/model/problem.py index 348ad8a2f..9424dce8c 100644 --- a/unified_planning/model/problem.py +++ b/unified_planning/model/problem.py @@ -739,7 +739,7 @@ def _update_problem_kind_action( ): for p in action.parameters: self._update_problem_kind_type(p.type) - if isinstance(action, up.model.action.SensingAction): + if isinstance(action, up.model.contingent.sensing_action.SensingAction): self._kind.set_problem_class("CONTINGENT") if isinstance(action, up.model.tamp.InstantaneousMotionAction): if len(action.motion_constraints) > 0: diff --git a/unified_planning/shortcuts.py b/unified_planning/shortcuts.py index 883d34a4b..f9f49d5f3 100644 --- a/unified_planning/shortcuts.py +++ b/unified_planning/shortcuts.py @@ -694,6 +694,25 @@ def PlanRepairer( ) +def ActionSelector( + problem: "up.model.AbstractProblem", + *, + name: Optional[str] = None, + params: Optional[Dict[str, str]] = None, +) -> "up.engines.engine.Engine": + """ + Returns an ActionSelector. There are two ways to call this method: + - using 'problem_kind' through the problem field. + e.g. ActionSelector(problem) + - using 'name' (the name of a specific action selector) and eventually some 'params' + (engine dependent options). + e.g. ActionSelector(problem, name='xxx') + """ + return get_environment().factory.ActionSelector( + problem=problem, name=name, params=params + ) + + def PortfolioSelector( *, name: Optional[str] = None,