Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New OM: ActionSelector #353

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ ignore_missing_imports = True

[mypy-pyparsing.*]
ignore_missing_imports = True

[mypy-pysmt.*]
ignore_missing_imports = True
3 changes: 1 addition & 2 deletions docs/code_snippets/multi_agent_and_contingent.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
problem.add_goal(Equals(Dot(robot, robot_position), locations[-1]))


from unified_planning.model.contingent_problem import ContingentProblem
from unified_planning.model.action import SensingAction
from unified_planning.model import ContingentProblem, SensingAction

problem = ContingentProblem("lost_packages")
Package = UserType("Package")
Expand Down
2 changes: 1 addition & 1 deletion docs/problem_representation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Contingent Example
A contingent planning problem represents an action-based problem in which the exact initial state is not entirely known and some of the actions produce “observations” upon execution. More specifically, some actions can be SensingActions, which indicate which fluents they observe and after the successful execution of such actions, the observed fluents become known to the executor. The inherent non-determinism in the initial state can therefore be “shrinked” by performing suitable SensingActions and a plan is then a strategy that prescribes what to execute based on the past observations.

.. literalinclude:: ./code_snippets/multi_agent_and_contingent.py
:lines: 44-94
:lines: 44-93


Hierarchical Example
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pyparsing
networkx
pysmt
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
packages=find_packages(),
include_package_data=True,
python_requires=">=3.7", # supported Python ranges
install_requires=["pyparsing", "networkx"],
install_requires=["pyparsing", "networkx", "pysmt"],
extras_require={
"dev": ["tarski[arithmetic]", "pytest", "pytest-cov", "mypy"],
"grpc": ["grpcio", "grpcio-tools", "grpc-stubs"],
Expand Down
1 change: 1 addition & 0 deletions unified_planning/engines/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class OperationMode(Enum):
SEQUENTIAL_SIMULATOR = "sequential_simulator"
REPLANNER = "replanner"
PLAN_REPAIRER = "plan_repairer"
ACTION_SELECTOR = "action_selector"


class EngineMeta(ABCMeta):
Expand Down
35 changes: 33 additions & 2 deletions unified_planning/engines/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from unified_planning.engines.mixins.sequential_simulator import (
SequentialSimulatorMixin,
)
from unified_planning.engines.mixins.action_selector import ActionSelectorMixin
from unified_planning.engines.engine import OperationMode
from typing import IO, Any, Dict, Tuple, Optional, List, Union, Type, Sequence, cast
from pathlib import PurePath
Expand Down Expand Up @@ -659,14 +660,19 @@
msg = f"The problem has no quality metrics but the engine is required to be optimal!"
raise up.exceptions.UPUsageError(msg)
res = EngineClass(problem=problem, **params)
elif operation_mode == OperationMode.SEQUENTIAL_SIMULATOR:
elif (
operation_mode == OperationMode.SEQUENTIAL_SIMULATOR
or operation_mode == OperationMode.ACTION_SELECTOR
):
assert problem is not None
res = EngineClass(
problem=problem,
error_on_failed_checks=error_failed_checks,
**params,
)
assert isinstance(res, SequentialSimulatorMixin)
assert isinstance(res, SequentialSimulatorMixin) or isinstance(
res, ActionSelectorMixin
)
elif operation_mode == OperationMode.COMPILER:
res = EngineClass(**params)
assert isinstance(res, CompilerMixin)
Expand Down Expand Up @@ -961,6 +967,31 @@
optimality_guarantee=optimality_guarantee,
)

def ActionSelector(
self,
problem: "up.model.AbstractProblem",
*,
name: Optional[str] = None,
params: Optional[Dict[str, str]] = None,
) -> "up.engines.engine.Engine":
"""
Returns an ActionSelector. There are two ways to call this method:

* | using ``problem_kind`` through the problem field.
| e.g. ``ActionSelector(problem)``
* | using ``name`` (the name of a specific action selector) and eventually some ``params``
| (engine dependent options).
| e.g. ``ActionSelector(problem, name='xxx')``
"""
return self._get_engine(

Check warning on line 986 in unified_planning/engines/factory.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/engines/factory.py#L986

Added line #L986 was not covered by tests
OperationMode.ACTION_SELECTOR,
name,
None,
params,
problem.kind,
problem=problem,
)

def PortfolioSelector(
self,
*,
Expand Down
85 changes: 85 additions & 0 deletions unified_planning/engines/mixins/action_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unified_planning as up
from typing import Dict
from warnings import warn
from unified_planning.exceptions import UPUsageError


class ActionSelectorMixin:
"""
This class defines the interface that an :class:`~unified_planning.engines.Engine`
that is also an `ActionSelector` must implement.

Important NOTE: The `AbstractProblem` instance is given at the constructor.
"""

def __init__(self, problem: "up.model.AbstractProblem"):
self._problem = problem
self_class = type(self)
assert issubclass(

Check warning on line 33 in unified_planning/engines/mixins/action_selector.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/engines/mixins/action_selector.py#L31-L33

Added lines #L31 - L33 were not covered by tests
self_class, up.engines.engine.Engine
), "ActionSelectorMixin does not implement the up.engines.Engine class"
assert isinstance(self, up.engines.engine.Engine)
if not self.skip_checks and not self_class.supports(problem.kind):
msg = f"We cannot establish whether {self.name} is able to handle this problem!"
if self.error_on_failed_checks:
raise UPUsageError(msg)

Check warning on line 40 in unified_planning/engines/mixins/action_selector.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/engines/mixins/action_selector.py#L36-L40

Added lines #L36 - L40 were not covered by tests
else:
warn(msg)

Check warning on line 42 in unified_planning/engines/mixins/action_selector.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/engines/mixins/action_selector.py#L42

Added line #L42 was not covered by tests

@staticmethod
def is_action_selector() -> bool:
"""
Returns True if this engine is also an action selector, False otherwise.

:return: True if this engine is also an action selector, False otherwise.
"""
return True

Check warning on line 51 in unified_planning/engines/mixins/action_selector.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/engines/mixins/action_selector.py#L51

Added line #L51 was not covered by tests

def get_action(self) -> "up.plans.ActionInstance":
"""
Returns the next action to be taken in the current state of the problem.
alvalentini marked this conversation as resolved.
Show resolved Hide resolved

:return: An instance of `ActionInstance` representing the next action to be taken.
"""
return self._get_action()

Check warning on line 59 in unified_planning/engines/mixins/action_selector.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/engines/mixins/action_selector.py#L59

Added line #L59 was not covered by tests

def update(self, observation: Dict["up.model.FNode", "up.model.FNode"]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why two methods (update+get_action) instead of a single one get_next_action(observation)?

"""
Updates the internal state of the engine based on the given observation.

:param observation: A dictionary from observed fluents to their observed values.
"""
self._update(observation)

Check warning on line 67 in unified_planning/engines/mixins/action_selector.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/engines/mixins/action_selector.py#L67

Added line #L67 was not covered by tests

def _get_action(self) -> "up.plans.ActionInstance":
"""
Returns the next action to be taken in the current state of the problem. This method should be
alvalentini marked this conversation as resolved.
Show resolved Hide resolved
implemented by subclasses.

:return: An instance of `ActionInstance` representing the next action to be taken.
"""
raise NotImplementedError

Check warning on line 76 in unified_planning/engines/mixins/action_selector.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/engines/mixins/action_selector.py#L76

Added line #L76 was not covered by tests

def _update(self, observation: Dict["up.model.FNode", "up.model.FNode"]):
"""
Updates the internal state of the engine based on the given observation. This method should be
implemented by subclasses.

:param observation: A dictionary from observed fluents to their observed values.
"""
raise NotImplementedError

Check warning on line 85 in unified_planning/engines/mixins/action_selector.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/engines/mixins/action_selector.py#L85

Added line #L85 was not covered by tests
4 changes: 2 additions & 2 deletions unified_planning/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
Action,
InstantaneousAction,
DurativeAction,
SensingAction,
)
from unified_planning.model.contingent.sensing_action import SensingAction
from unified_planning.model.effect import Effect, SimulatedEffect, EffectKind
from unified_planning.model.expression import (
BoolExpression,
Expand All @@ -35,7 +35,7 @@
from unified_planning.model.parameter import Parameter
from unified_planning.model.abstract_problem import AbstractProblem
from unified_planning.model.problem import Problem
from unified_planning.model.contingent_problem import ContingentProblem
from unified_planning.model.contingent.contingent_problem import ContingentProblem
from unified_planning.model.delta_stn import DeltaSimpleTemporalNetwork
from unified_planning.model.problem_kind import ProblemKind
from unified_planning.model.state import State, UPState
Expand Down
73 changes: 0 additions & 73 deletions unified_planning/model/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,76 +663,3 @@ def is_conditional(self) -> bool:
"""Returns `True` if the `action` has `conditional effects`, `False` otherwise."""
# re-implemenation needed for inheritance, delegate implementation.
return TimedCondsEffs.is_conditional(self)


class SensingAction(InstantaneousAction):
"""This class represents a sensing action."""

def __init__(
self,
_name: str,
_parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None,
_env: Optional[Environment] = None,
**kwargs: "up.model.types.Type",
):
InstantaneousAction.__init__(self, _name, _parameters, _env, **kwargs)
self._observed_fluents: List["up.model.fnode.FNode"] = []

def __eq__(self, oth: object) -> bool:
if isinstance(oth, SensingAction):
return super().__eq__(oth) and set(self._observed_fluents) == set(
oth._observed_fluents
)
else:
return False

def __hash__(self) -> int:
res = super().__hash__()
for of in self._observed_fluents:
res += hash(of)
return res

def clone(self):
new_params = OrderedDict()
for param_name, param in self._parameters.items():
new_params[param_name] = param.type
new_sensing_action = SensingAction(self._name, new_params, self._environment)
new_sensing_action._preconditions = self._preconditions[:]
new_sensing_action._effects = [e.clone() for e in self._effects]
new_sensing_action._fluents_assigned = self._fluents_assigned.copy()
new_sensing_action._fluents_inc_dec = self._fluents_inc_dec.copy()
new_sensing_action._simulated_effect = self._simulated_effect
new_sensing_action._observed_fluents = self._observed_fluents.copy()
return new_sensing_action

def add_observed_fluents(self, observed_fluents: Iterable["up.model.fnode.FNode"]):
"""
Adds the given list of observed fluents.

:param observed_fluents: The list of observed fluents that must be added.
"""
for of in observed_fluents:
self.add_observed_fluent(of)

def add_observed_fluent(self, observed_fluent: "up.model.fnode.FNode"):
"""
Adds the given observed fluent.

:param observed_fluent: The observed fluent that must be added.
"""
self._observed_fluents.append(observed_fluent)

@property
def observed_fluents(self) -> List["up.model.fnode.FNode"]:
"""Returns the `list` observed fluents."""
return self._observed_fluents

def __repr__(self) -> str:
b = InstantaneousAction.__repr__(self)[0:-3]
s = ["sensing-", b]
s.append(" observations = [\n")
for e in self._observed_fluents:
s.append(f" {str(e)}\n")
s.append(" ]\n")
s.append(" }")
return "".join(s)
20 changes: 20 additions & 0 deletions unified_planning/model/contingent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


from unified_planning.model.contingent.environment import (
Environment,
SimulatedEnvironment,
)
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@
res = self._initial_value
for f in self._fluents:
for f_exp in get_all_fluent_exp(self, f):
res[f_exp] = self.initial_value(f_exp)
if f_exp not in self._hidden_fluents:
res[f_exp] = self.initial_value(f_exp)

Check warning on line 160 in unified_planning/model/contingent/contingent_problem.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/model/contingent/contingent_problem.py#L159-L160

Added lines #L159 - L160 were not covered by tests
return res

@property
Expand Down
Loading