Skip to content

Commit

Permalink
Added the new ActionSelector OM and added a contingent environment
Browse files Browse the repository at this point in the history
  • Loading branch information
alvalentini committed Apr 14, 2023
1 parent 3155b60 commit 3e167e9
Show file tree
Hide file tree
Showing 16 changed files with 414 additions and 84 deletions.
3 changes: 3 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ ignore_missing_imports = True

[mypy-pyparsing.*]
ignore_missing_imports = True

[mypy-pysmt.*]
ignore_missing_imports = True
3 changes: 1 addition & 2 deletions docs/code_snippets/multi_agent_and_contingent.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
problem.add_goal(Equals(Dot(robot, robot_position), locations[-1]))


from unified_planning.model.contingent_problem import ContingentProblem
from unified_planning.model.action import SensingAction
from unified_planning.model import ContingentProblem, SensingAction

problem = ContingentProblem("lost_packages")
Package = UserType("Package")
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pyparsing
networkx
pysmt
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
packages=find_packages(),
include_package_data=True,
python_requires=">=3.7", # supported Python ranges
install_requires=["pyparsing", "networkx"],
install_requires=["pyparsing", "networkx", "pysmt"],
extras_require={
"dev": ["tarski[arithmetic]", "pytest", "pytest-cov", "mypy"],
"grpc": ["grpcio", "grpcio-tools", "grpc-stubs"],
Expand Down
1 change: 1 addition & 0 deletions unified_planning/engines/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class OperationMode(Enum):
SEQUENTIAL_SIMULATOR = "sequential_simulator"
REPLANNER = "replanner"
PLAN_REPAIRER = "plan_repairer"
ACTION_SELECTOR = "action_selector"


class EngineMeta(type):
Expand Down
34 changes: 32 additions & 2 deletions unified_planning/engines/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from unified_planning.engines.mixins.sequential_simulator import (
SequentialSimulatorMixin,
)
from unified_planning.engines.mixins.action_selector import ActionSelectorMixin
from unified_planning.engines.engine import OperationMode
from typing import IO, Any, Dict, Tuple, Optional, List, Union, Type, Sequence
from pathlib import PurePath
Expand Down Expand Up @@ -635,14 +636,19 @@ def _get_engine(
msg = f"The problem has no quality metrics but the engine is required to be optimal!"
raise up.exceptions.UPUsageError(msg)
res = EngineClass(problem=problem, **params)
elif operation_mode == OperationMode.SEQUENTIAL_SIMULATOR:
elif (
operation_mode == OperationMode.SEQUENTIAL_SIMULATOR
or operation_mode == OperationMode.ACTION_SELECTOR
):
assert problem is not None
res = EngineClass(
problem=problem,
error_on_failed_checks=error_failed_checks,
**params,
)
assert isinstance(res, SequentialSimulatorMixin)
assert isinstance(res, SequentialSimulatorMixin) or isinstance(
res, ActionSelectorMixin
)
elif operation_mode == OperationMode.COMPILER:
res = EngineClass(**params)
assert isinstance(res, CompilerMixin)
Expand Down Expand Up @@ -926,6 +932,30 @@ def PlanRepairer(
optimality_guarantee=optimality_guarantee,
)

def ActionSelector(
self,
problem: "up.model.AbstractProblem",
*,
name: Optional[str] = None,
params: Optional[Dict[str, str]] = None,
) -> "up.engines.engine.Engine":
"""
Returns an ActionSelector. There are two ways to call this method:
- using 'problem_kind' through the problem field.
e.g. ActionSelector(problem)
- using 'name' (the name of a specific action selector) and eventually some 'params'
(engine dependent options).
e.g. ActionSelector(problem, name='xxx')
"""
return self._get_engine(
OperationMode.ACTION_SELECTOR,
name,
None,
params,
problem.kind,
problem=problem,
)

def PortfolioSelector(
self,
*,
Expand Down
85 changes: 85 additions & 0 deletions unified_planning/engines/mixins/action_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unified_planning as up
from typing import Dict
from warnings import warn
from unified_planning.exceptions import UPUsageError


class ActionSelectorMixin:
"""
This class defines the interface that an :class:`~unified_planning.engines.Engine`
that is also an `ActionSelector` must implement.
Important NOTE: The `AbstractProblem` instance is given at the constructor.
"""

def __init__(self, problem: "up.model.AbstractProblem"):
self._problem = problem
self_class = type(self)
assert issubclass(
self_class, up.engines.engine.Engine
), "ActionSelectorMixin does not implement the up.engines.Engine class"
assert isinstance(self, up.engines.engine.Engine)
if not self.skip_checks and not self_class.supports(problem.kind):
msg = f"We cannot establish whether {self.name} is able to handle this problem!"
if self.error_on_failed_checks:
raise UPUsageError(msg)
else:
warn(msg)

@staticmethod
def is_action_selector() -> bool:
"""
Returns True if this engine is also an action selector, False otherwise.
:return: True if this engine is also an action selector, False otherwise.
"""
return True

def get_action(self) -> "up.plans.ActionInstance":
"""
Returns the next action to be taken in the current state of the problem.
:return: An instance of `ActionInstance` representing the next action to be taken.
"""
return self._get_action()

def update(self, observation: Dict["up.model.FNode", "up.model.FNode"]):
"""
Updates the internal state of the engine based on the given observation.
:param observation: A dictionary from observed fluents to their observed values.
"""
self._update(observation)

def _get_action(self) -> "up.plans.ActionInstance":
"""
Returns the next action to be taken in the current state of the problem. This method should be
implemented by subclasses.
:return: An instance of `ActionInstance` representing the next action to be taken.
"""
raise NotImplementedError

def _update(self, observation: Dict["up.model.FNode", "up.model.FNode"]):
"""
Updates the internal state of the engine based on the given observation. This method should be
implemented by subclasses.
:param observation: A dictionary from observed fluents to their observed values.
"""
raise NotImplementedError
4 changes: 2 additions & 2 deletions unified_planning/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
Action,
InstantaneousAction,
DurativeAction,
SensingAction,
)
from unified_planning.model.contingent.sensing_action import SensingAction
from unified_planning.model.effect import Effect, SimulatedEffect, EffectKind
from unified_planning.model.expression import (
BoolExpression,
Expand All @@ -35,7 +35,7 @@
from unified_planning.model.parameter import Parameter
from unified_planning.model.abstract_problem import AbstractProblem
from unified_planning.model.problem import Problem
from unified_planning.model.contingent_problem import ContingentProblem
from unified_planning.model.contingent.contingent_problem import ContingentProblem
from unified_planning.model.delta_stn import DeltaSimpleTemporalNetwork
from unified_planning.model.problem_kind import ProblemKind
from unified_planning.model.state import State, UPState
Expand Down
73 changes: 0 additions & 73 deletions unified_planning/model/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,76 +660,3 @@ def is_conditional(self) -> bool:
"""Returns `True` if the `action` has `conditional effects`, `False` otherwise."""
# re-implemenation needed for inheritance, delegate implementation.
return TimedCondsEffs.is_conditional(self)


class SensingAction(InstantaneousAction):
"""This class represents a sensing action."""

def __init__(
self,
_name: str,
_parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None,
_env: Optional[Environment] = None,
**kwargs: "up.model.types.Type",
):
InstantaneousAction.__init__(self, _name, _parameters, _env, **kwargs)
self._observed_fluents: List["up.model.fnode.FNode"] = []

def __eq__(self, oth: object) -> bool:
if isinstance(oth, SensingAction):
return super().__eq__(oth) and set(self._observed_fluents) == set(
oth._observed_fluents
)
else:
return False

def __hash__(self) -> int:
res = super().__hash__()
for of in self._observed_fluents:
res += hash(of)
return res

def clone(self):
new_params = OrderedDict()
for param_name, param in self._parameters.items():
new_params[param_name] = param.type
new_sensing_action = SensingAction(self._name, new_params, self._environment)
new_sensing_action._preconditions = self._preconditions[:]
new_sensing_action._effects = [e.clone() for e in self._effects]
new_sensing_action._fluents_assigned = self._fluents_assigned.copy()
new_sensing_action._fluents_inc_dec = self._fluents_inc_dec.copy()
new_sensing_action._simulated_effect = self._simulated_effect
new_sensing_action._observed_fluents = self._observed_fluents.copy()
return new_sensing_action

def add_observed_fluents(self, observed_fluents: Iterable["up.model.fnode.FNode"]):
"""
Adds the given list of observed fluents.
:param observed_fluents: The list of observed fluents that must be added.
"""
for of in observed_fluents:
self.add_observed_fluent(of)

def add_observed_fluent(self, observed_fluent: "up.model.fnode.FNode"):
"""
Adds the given observed fluent.
:param observed_fluent: The observed fluent that must be added.
"""
self._observed_fluents.append(observed_fluent)

@property
def observed_fluents(self) -> List["up.model.fnode.FNode"]:
"""Returns the `list` observed fluents."""
return self._observed_fluents

def __repr__(self) -> str:
b = InstantaneousAction.__repr__(self)[0:-3]
s = ["sensing-", b]
s.append(" observations = [\n")
for e in self._observed_fluents:
s.append(f" {str(e)}\n")
s.append(" ]\n")
s.append(" }")
return "".join(s)
20 changes: 20 additions & 0 deletions unified_planning/model/contingent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


from unified_planning.model.contingent.environment import (
Environment,
SimulatedEnvironment,
)
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ def initial_values(self) -> Dict["up.model.fnode.FNode", "up.model.fnode.FNode"]
res = self._initial_value
for f in self._fluents:
for f_exp in get_all_fluent_exp(self, f):
res[f_exp] = self.initial_value(f_exp)
if f_exp not in self._hidden_fluents:
res[f_exp] = self.initial_value(f_exp)
return res

@property
Expand Down
Loading

0 comments on commit 3e167e9

Please sign in to comment.