diff --git a/.vscode/settings.json b/.vscode/settings.json index 122bead..bf6870d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,9 +1,12 @@ { - "editor.defaultFormatter": "ms-python.black-formatter", "editor.formatOnSave": true, + "black-formatter.importStrategy": "fromEnvironment", "python.testing.pytestArgs": [ "." ], "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true + "python.testing.pytestEnabled": true, + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + } } diff --git a/examples/entering-a-room.json b/examples/entering-a-room.json index 0b9ee06..2c96146 100644 --- a/examples/entering-a-room.json +++ b/examples/entering-a-room.json @@ -21,61 +21,61 @@ "behavior_library": [ { "id": "anonymous-sequence", - "display_name": "", + "name": "", "type": "Sequence", "show": false }, { "id": "approach-door", - "display_name": "Approach the door", + "name": "Approach the door", "type": "Behavior", "show": true }, { "id": "open-door", - "display_name": "Open the door", + "name": "Open the door", "type": "Behavior", "show": true }, { "id": "go-through-doorway", - "display_name": "Go through the doorway", + "name": "Go through the doorway", "type": "Behavior", "show": true }, { "id": "knock-on-the-door", - "display_name": "Knock on the door", + "name": "Knock on the door", "type": "Behavior", "show": true }, { "id": "sing-a-song", - "display_name": "Sing a song", + "name": "Sing a song", "type": "Behavior", "show": true }, { "id": "announce-your-presence", - "display_name": "Announce your presence", + "name": "Announce your presence", "type": "Behavior", "show": true }, { "id": "contact-supervisor-guidance", - "display_name": "Contact supervisor for guidance", + "name": "Contact supervisor for guidance", "type": "Behavior", "show": true }, { "id": "stop", - "display_name": "Stop", + "name": "Stop", "type": "Behavior", "show": true }, { "id": "return-to-charging-station", - "display_name": "Return to charging station", + "name": "Return to charging station", "type": "Behavior", "show": false } diff --git a/pyproject.toml b/pyproject.toml index efc7ca5..3ff2675 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -123,7 +123,3 @@ version.source = "vcs" [tool.pytest.ini_options] addopts = "--doctest-modules" - -[tool.black] -target-version = ['py311'] - diff --git a/src/social_norms_trees/atomic_mutations.py b/src/social_norms_trees/atomic_mutations.py new file mode 100644 index 0000000..850c319 --- /dev/null +++ b/src/social_norms_trees/atomic_mutations.py @@ -0,0 +1,745 @@ +from collections import namedtuple +import inspect +from functools import partial, wraps +import logging +from types import GenericAlias +from typing import Callable, List, Mapping, NamedTuple, Tuple, TypeVar, Union, Dict + +import click +import py_trees +import typer + +_logger = logging.getLogger(__name__) + +# ============================================================================= +# Argument types +# ============================================================================= + +ExistingNode = TypeVar("ExistingNode", bound=py_trees.behaviour.Behaviour) +NewNode = TypeVar("NewNode", bound=py_trees.behaviour.Behaviour) +CompositeIndex = TypeVar( + "CompositeIndex", bound=Tuple[py_trees.composites.Composite, int] +) +BehaviorIdentifier = TypeVar( + "BehaviorIdentifier", bound=Union[ExistingNode, NewNode, CompositeIndex] +) +BehaviorTreeNode = TypeVar("BehaviorTreeNode", bound=py_trees.behaviour.Behaviour) +BehaviorTree = TypeVar("BehaviorTree", bound=BehaviorTreeNode) +BehaviorLibrary = TypeVar("BehaviorLibrary", bound=List[BehaviorTreeNode]) +TreeOrLibrary = TypeVar("TreeOrLibrary", bound=Union[BehaviorTree, BehaviorLibrary]) + + +# ============================================================================= +# Atomic operations +# ============================================================================= + +# The very top line of each operation's docstring is used as the +# description of the operation in the UI, so it's required. +# The argument annotations are vital, because they tell the UI which prompt +# to use. + + +def remove(node: ExistingNode) -> ExistingNode: + """Remove a node. + Examples: + >>> tree = py_trees.composites.Sequence("", False, children=[ + ... py_trees.behaviours.Success(), + ... failure := py_trees.behaviours.Failure()]) + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Success + --> Failure + >>> removed = remove(failure) + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Success + """ + if node.parent is None: + msg = ( + f"%s's parent is None, so we can't remove it. We can't remove the root node." + % (node) + ) + raise ValueError(msg) + elif isinstance(node.parent, py_trees.composites.Composite): + node.parent.remove_child(node) + else: + raise NotImplementedError() + return node + + +def insert(node: NewNode, where: CompositeIndex) -> None: + """Insert a new node. + Examples: + >>> tree = py_trees.composites.Sequence("", False, children=[ + ... py_trees.behaviours.Success() + ... ]) + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Success + + >>> insert(py_trees.behaviours.Failure(), (tree, 1)) + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Success + --> Failure + + >>> insert(py_trees.behaviours.Dummy(), (tree, 0)) + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Dummy + --> Success + --> Failure + """ + parent, index = where + parent.insert_child(node, index) + return + + +def move( + node: ExistingNode, + where: CompositeIndex, +) -> None: + """Move a node. + Examples: + >>> tree = py_trees.composites.Sequence("", False, children=[ + ... failure_node := py_trees.behaviours.Failure(), + ... success_node := py_trees.behaviours.Success(), + ... ]) + + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Failure + --> Success + + >>> move(failure_node, (tree, 1)) + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Success + --> Failure + >>> move(failure_node, (tree, 1)) + + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Success + --> Failure + """ + parent, index = where + insert(remove(node), (parent, index)) + return + + +def exchange( + node0: ExistingNode, + node1: ExistingNode, +) -> None: + """Exchange two nodes. + Examples: + >>> tree = py_trees.composites.Sequence("", False, children=[ + ... s := py_trees.behaviours.Success(), + ... f := py_trees.behaviours.Failure(), + ... ]) + + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Success + --> Failure + + >>> exchange(s, f) + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + --> Failure + --> Success + + >>> tree = py_trees.composites.Sequence("", False, children=[ + ... a:= py_trees.composites.Sequence("A", False, children=[ + ... py_trees.behaviours.Dummy() + ... ]), + ... py_trees.composites.Sequence("B", False, children=[ + ... py_trees.behaviours.Success(), + ... c := py_trees.composites.Sequence("C", False, children=[]) + ... ]) + ... ]) + + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + [-] A + --> Dummy + [-] B + --> Success + [-] C + + >>> exchange(a, c) + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + [-] C + [-] B + --> Success + [-] A + --> Dummy + + >>> tree = py_trees.composites.Sequence("", False, children=[ + ... py_trees.composites.Sequence("1", False, children=[ + ... a := py_trees.behaviours.Dummy("A") + ... ]), + ... py_trees.composites.Sequence("2", False, children=[ + ... b := py_trees.behaviours.Dummy("B") + ... ]) + ... ]) + + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + [-] 1 + --> A + [-] 2 + --> B + + >>> exchange(a, b) + >>> print(py_trees.display.ascii_tree(tree)) + ... # doctest: +NORMALIZE_WHITESPACE + [-] + [-] 1 + --> B + [-] 2 + --> A + """ + + node0_parent = node0.parent + node0_index = node0.parent.children.index(node0) + node1_parent = node1.parent + node1_index = node1.parent.children.index(node1) + + move(node0, (node1_parent, node1_index)) + move(node1, (node0_parent, node0_index)) + + return + + +# ============================================================================= +# Node and Position Selectors +# ============================================================================= + + +def iterate_nodes(tree: py_trees.behaviour.Behaviour): + """ + Examples: + >>> list(iterate_nodes(py_trees.behaviours.Dummy())) + ... # doctest: +ELLIPSIS + [] + >>> list(iterate_nodes( + ... py_trees.composites.Sequence("", False, children=[ + ... py_trees.behaviours.Dummy(), + ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + [, + ] + >>> list(iterate_nodes( + ... py_trees.composites.Sequence("", False, children=[ + ... py_trees.behaviours.Dummy(), + ... py_trees.behaviours.Dummy(), + ... py_trees.composites.Sequence("", False, children=[ + ... py_trees.behaviours.Dummy(), + ... ]), + ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + [, + , + , + , + ] + """ + yield tree + for child in tree.children: + yield from iterate_nodes(child) + + +def enumerate_nodes(tree: py_trees.behaviour.Behaviour): + """ + Examples: + + >>> list(enumerate_nodes(py_trees.behaviours.Dummy())) + ... # doctest: +ELLIPSIS + [(0, )] + >>> list(enumerate_nodes( + ... py_trees.composites.Sequence("", False, children=[ + ... py_trees.behaviours.Dummy(), + ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + [(0, ), + (1, )] + >>> list(enumerate_nodes( + ... py_trees.composites.Sequence("s1", False, children=[ + ... py_trees.behaviours.Dummy(), + ... py_trees.behaviours.Success(), + ... py_trees.composites.Sequence("s2", False, children=[ + ... py_trees.behaviours.Dummy(), + ... ]), + ... py_trees.composites.Sequence("", False, children=[ + ... py_trees.behaviours.Failure(), + ... py_trees.behaviours.Periodic("p", n=1), + ... ]), + ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + [(0, ), + (1, ), + (2, ), + (3, ), + (4, ), + (5, ), + (6, ), + (7, )] + """ + return enumerate(iterate_nodes(tree)) + + +def label_tree_lines( + tree: py_trees.behaviour.Behaviour, + labels: List[str], + representation=py_trees.display.unicode_tree, +) -> str: + """Label the lines of a tree. + Examples: + >>> print(label_tree_lines(py_trees.behaviours.Dummy(), labels=["0"])) + 0: --> Dummy + >>> tree = py_trees.composites.Sequence( + ... "S1", + ... False, + ... children=[ + ... py_trees.behaviours.Dummy(), + ... py_trees.behaviours.Dummy()] + ... ) + >>> print(label_tree_lines(tree, labels=["A", "B", "C"])) + A: [-] S1 + B: --> Dummy + C: --> Dummy + >>> print(label_tree_lines(tree, labels=["AAA", "BB", "C"])) + AAA: [-] S1 + BB: --> Dummy + C: --> Dummy + + If there are more labels than lines, then those are shown too: + >>> print(label_tree_lines(tree, labels=["AAA", "BB", "C", "O"])) + AAA: [-] S1 + BB: --> Dummy + C: --> Dummy + O: + """ + max_len = max([len(s) for s in labels]) + padded_labels = [s.rjust(max_len) for s in labels] + + tree_representation_lines = representation(tree).split("\n") + + enumerated_tree_representation_lines = [ + # Make the line. If `t` is missing, + # then we don't want a trailing space + # so we strip that away + f"{i}: {t}".rstrip() + for i, t in zip(padded_labels, tree_representation_lines) + ] + + output = "\n".join(enumerated_tree_representation_lines) + return output + + +# TODO: Split each of these functions into one which +# returns a labeled representation of the tree, +# a mapping of allowed values to nodes and +# a separate function which does the prompting. +# This should help testing. + +# Edge cases: what happens if the name of a node is really long - does the ascii representation wrap around? + + +class NodeMappingRepresentation(NamedTuple): + mapping: Dict[str, py_trees.behaviour.Behaviour] + labels: List[str] + representation: str + + +def prompt_identify( + tree: TreeOrLibrary, + function: Callable[ + [TreeOrLibrary], Tuple[Mapping[str, BehaviorIdentifier], List[str], str] + ], + message: str = "Which?", + display_nodes: bool = True, +) -> BehaviorIdentifier: + + mapping, labels, representation = function(tree) + + if display_nodes: + text = f"{representation}\n{message}" + else: + text = f"{message}" + + key = click.prompt(text=text, type=click.Choice(labels)) + node = mapping[key] + return node + + +def get_node_mapping(tree: BehaviorTree) -> NodeMappingRepresentation: + """ + Examples: + >>> a = get_node_mapping(py_trees.behaviours.Dummy()) + >>> a.mapping + {'0': } + + >>> a.labels + ['0'] + + >>> print(a.representation) + 0: --> Dummy + + >>> b = get_node_mapping(py_trees.composites.Sequence("", False, children=[py_trees.behaviours.Dummy()])) + >>> b.mapping # doctest: +NORMALIZE_WHITESPACE + {'0': , + '1': } + + >>> b.labels + ['0', '1'] + + >>> print(b.representation) + 0: [-] + 1: --> Dummy + + """ + mapping = {str(i): n for i, n in enumerate_nodes(tree)} + labels = list(mapping.keys()) + representation = label_tree_lines(tree=tree, labels=labels) + return NodeMappingRepresentation(mapping, labels, representation) + + +prompt_identify_node = partial(prompt_identify, function=get_node_mapping) + + +def get_library_mapping(library: BehaviorLibrary) -> NodeMappingRepresentation: + """ + Examples: + >>> from py_trees.behaviours import Success, Failure + >>> n = get_library_mapping([]) + >>> n.mapping + {} + + >>> n.labels + [] + + >>> print(n.representation) + + + >>> a = get_library_mapping([Success(), Failure()]) + >>> a.mapping # doctest: +NORMALIZE_WHITESPACE + {'0': , + '1': } + + >>> a.labels + ['0', '1'] + + >>> print(a.representation) + 0: Success + 1: Failure + """ + + mapping = {str(i): n for i, n in enumerate(library)} + labels = list(mapping.keys()) + representation = "\n".join([f"{i}: {n.name}" for i, n in enumerate(library)]) + return NodeMappingRepresentation(mapping, labels, representation) + + +prompt_identify_library_node = partial(prompt_identify, function=get_library_mapping) + + +def get_composite_mapping(tree: BehaviorTree, skip_label="_"): + """ + Examples: + >>> a = get_composite_mapping(py_trees.behaviours.Dummy()) + >>> a.mapping + {} + + >>> a.labels + [] + + >>> print(a.representation) + _: --> Dummy + + >>> b = get_composite_mapping(py_trees.composites.Sequence("", False, children=[py_trees.behaviours.Dummy()])) + >>> b.mapping # doctest: +NORMALIZE_WHITESPACE + {'0': } + + >>> b.labels + ['0'] + + >>> print(b.representation) + 0: [-] + _: --> Dummy + """ + mapping = {} + display_labels, allowed_labels = [], [] + + for i, node in enumerate_nodes(tree): + label = str(i) + if isinstance(node, py_trees.composites.Composite): + mapping[label] = node + display_labels.append(label) + allowed_labels.append(label) + else: + display_labels.append(skip_label) + representation = label_tree_lines(tree=tree, labels=display_labels) + + return NodeMappingRepresentation(mapping, allowed_labels, representation) + + +prompt_identify_composite = partial(prompt_identify, function=get_composite_mapping) + + +def get_child_index_mapping(tree: BehaviorTree, skip_label="_"): + """ + Examples: + >>> a = get_child_index_mapping(py_trees.behaviours.Dummy()) + >>> a.mapping + {'0': 0} + + >>> a.labels + ['0'] + + >>> print(a.representation) + _: --> Dummy + 0: + + >>> b = get_child_index_mapping(py_trees.composites.Sequence("", False, children=[py_trees.behaviours.Dummy()])) + >>> b.mapping # doctest: +NORMALIZE_WHITESPACE + {'0': 0, '1': 1} + + >>> b.labels + ['0', '1'] + + >>> print(b.representation) + _: [-] + 0: --> Dummy + 1: + """ + mapping = {} + display_labels, allowed_labels = [], [] + + for node in iterate_nodes(tree): + if node in tree.children: + index = tree.children.index(node) + label = str(index) + mapping[label] = index + allowed_labels.append(label) + display_labels.append(label) + else: + display_labels.append(skip_label) + + # Add the "after all the elements" label + post_list_index = len(tree.children) + post_list_label = str(post_list_index) + allowed_labels.append(post_list_label) + display_labels.append(post_list_label) + mapping[post_list_label] = post_list_index + + representation = label_tree_lines(tree=tree, labels=display_labels) + + return NodeMappingRepresentation(mapping, allowed_labels, representation) + + +prompt_identify_child_index = partial(prompt_identify, function=get_child_index_mapping) + + +def get_position_mapping(tree): + """ + + + + [-] S0 + --> {1} + [-] S1 + --> {2} + --> Dummy + --> {3} + --> {4} + [-] S2 + --> {5} + --> Failure + --> {6} + --> {7} + + + + """ + pass + + +# ============================================================================= +# User Interface +# ============================================================================= + + +# Wrapper functions for the atomic operations which give them a UI. +MutationResult = namedtuple("MutationResult", ["result", "tree", "function", "kwargs"]) + + +def mutate_chooser(*fs: Union[Callable], message="Which action?"): + """Prompt the user to choose one of the functions f. + Returns the wrapped version of the function. + """ + n_fs = len(fs) + docstring_summaries = [f_.__doc__.split("\n")[0] for f_ in fs] + text = ( + "\n".join( + [ + f"{i}: {docstring_summary}" + for i, docstring_summary in enumerate(docstring_summaries) + ] + ) + + f"\n{message}" + ) + i = click.prompt(text=text, type=click.IntRange(0, n_fs - 1)) + f = mutate_ui(fs[i]) + + return f + + +def mutate_ui( + f: Callable, +) -> Callable[ + [py_trees.behaviour.Behaviour, List[py_trees.behaviour.Behaviour]], MutationResult +]: + """Factory function for a tree mutator UI. + This creates a version of the atomic function `f` + which prompts the user for the appropriate arguments + based on `f`'s type annotations. + """ + + signature = inspect.signature(f) + + @wraps(f) + def f_inner(tree, library): + kwargs = {} + for parameter_name in signature.parameters.keys(): + annotation = signature.parameters[parameter_name].annotation + _logger.debug(f"getting arguments for {annotation=}") + value = prompt_get_mutate_arguments(annotation, tree, library) + kwargs[parameter_name] = value + inner_result = f(**kwargs) + return_value = MutationResult( + result=inner_result, tree=tree, function=f, kwargs=kwargs + ) + _logger.debug(return_value) + return return_value + + return f_inner + + +def prompt_get_mutate_arguments(annotation: GenericAlias, tree, library): + """Prompt the user to specify nodes and positions in the tree.""" + annotation_ = str(annotation) + assert isinstance(annotation_, str) + + if annotation_ == str(inspect.Parameter.empty): + _logger.debug("No argument annotation, returning None") + return None + elif annotation_ == str(ExistingNode): + _logger.debug("in ExistingNode") + node = prompt_identify_node(tree) + return node + elif annotation_ == str(CompositeIndex): + _logger.debug("in CompositeIndex") + composite_node = prompt_identify_composite(tree, message="Which parent?") + index = prompt_identify_child_index(composite_node) + return composite_node, index + elif annotation_ == str(NewNode): + _logger.debug("in NewNode") + new_node = prompt_identify_library_node( + library, message="Which node from the library?" + ) + return new_node + else: + _logger.debug("in 'else'") + msg = "Can't work out what to do with %s" % annotation + raise NotImplementedError(msg) + + +# ============================================================================= +# Utility functions +# ============================================================================= + + +class QuitException(Exception): + pass + + +def end_experiment(): + """I'm done, end the experiment.""" + raise QuitException("User ended the experiment.") + + +# ============================================================================= +# Main Loop +# ============================================================================= + + +def load_experiment(): + """Placeholder function for loading a tree and library (should come from a file).""" + tree = py_trees.composites.Sequence( + "S0", + False, + children=[ + py_trees.behaviours.Dummy("A"), + py_trees.composites.Sequence( + "S1", + memory=False, + children=[ + py_trees.behaviours.Dummy("B"), + py_trees.behaviours.Dummy("C"), + py_trees.behaviours.Dummy("D"), + ], + ), + py_trees.composites.Selector( + "S2", + memory=False, + children=[ + py_trees.behaviours.Dummy("E"), + py_trees.behaviours.Dummy("F"), + py_trees.behaviours.Failure(), + ], + ), + py_trees.behaviours.Success(), + ], + ) + library = [py_trees.behaviours.Success(), py_trees.behaviours.Failure()] + return tree, library + + +def save_results(tree, protocol): + _logger.info("saving results") + print(f"protocol: {protocol}") + print(f"tree:\n{py_trees.display.unicode_tree(tree)}") + + +app = typer.Typer() + + +@app.command() +def main(): + logging.basicConfig(level=logging.DEBUG) + tree, library = load_experiment() + protocol = [] + + # The main loop of the experiment + while f := mutate_chooser(insert, move, exchange, remove, end_experiment): + results = f(tree, library) + _logger.debug(results) + protocol.append(results) + print(py_trees.display.ascii_tree(tree)) + + +if __name__ == "__main__": + app() diff --git a/src/social_norms_trees/behavior_library.py b/src/social_norms_trees/behavior_library.py index 58c784f..4cbd710 100644 --- a/src/social_norms_trees/behavior_library.py +++ b/src/social_norms_trees/behavior_library.py @@ -2,6 +2,10 @@ class BehaviorLibrary: def __init__(self, behavior_list): self.behaviors = behavior_list self.behavior_from_display_name = { - behavior["display_name"]: behavior for behavior in behavior_list + behavior["name"]: behavior for behavior in behavior_list } self.behavior_from_id = {behavior["id"]: behavior for behavior in behavior_list} + + def __iter__(self): + for i in self.behaviors: + yield i diff --git a/src/social_norms_trees/mutate_tree.py b/src/social_norms_trees/mutate_tree.py deleted file mode 100644 index 3146f15..0000000 --- a/src/social_norms_trees/mutate_tree.py +++ /dev/null @@ -1,563 +0,0 @@ -"""Example of using worlds with just an integer for the state of the world""" - -import warnings -from functools import partial, wraps -from itertools import islice -from typing import TypeVar, Optional, List - -import click -import py_trees - -from datetime import datetime - -from social_norms_trees.custom_node_library import CustomBehavior, CustomSequence - - -T = TypeVar("T", bound=py_trees.behaviour.Behaviour) - - -def print_tree(tree: py_trees.behaviour.Behaviour): - tree_display = py_trees.display.unicode_tree(tree) - print(tree_display) - - -def iterate_nodes(tree: py_trees.behaviour.Behaviour): - """ - - Examples: - >>> list(iterate_nodes(py_trees.behaviours.Dummy())) # doctest: +ELLIPSIS - [] - - >>> list(iterate_nodes( - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - [, - ] - - >>> list(iterate_nodes( - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Dummy(), - ... py_trees.behaviours.Dummy(), - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]), - ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - [, - , - , - , - ] - - """ - yield tree - for child in tree.children: - yield from iterate_nodes(child) - - -def enumerate_nodes(tree: py_trees.behaviour.Behaviour): - """ - - Examples: - >>> list(enumerate_nodes(py_trees.behaviours.Dummy())) # doctest: +ELLIPSIS - [(0, )] - - >>> list(enumerate_nodes( - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - [(0, ), - (1, )] - - >>> list(enumerate_nodes( - ... py_trees.composites.Sequence("s1", False, children=[ - ... py_trees.behaviours.Dummy(), - ... py_trees.behaviours.Success(), - ... py_trees.composites.Sequence("s2", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]), - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Failure(), - ... py_trees.behaviours.Periodic("p", n=1), - ... ]), - ... ]))) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - [(0, ), - (1, ), - (2, ), - (3, ), - (4, ), - (5, ), - (6, ), - (7, )] - - """ - return enumerate(iterate_nodes(tree)) - - -def label_tree_lines( - tree: py_trees.behaviour.Behaviour, - labels: List[str], - representation=py_trees.display.unicode_tree, -) -> str: - max_len = max([len(s) for s in labels]) - padded_labels = [s.rjust(max_len) for s in labels] - - tree_representation_lines = representation(tree).split("\n") - enumerated_tree_representation_lines = [ - f"{i}: {t}" for i, t in zip(padded_labels, tree_representation_lines) - ] - - output = "\n".join(enumerated_tree_representation_lines) - return output - - -def format_children_with_indices(composite: py_trees.composites.Composite) -> str: - """ - Examples: - >>> tree = py_trees.composites.Sequence("s1", False, children=[ - ... py_trees.behaviours.Dummy(), - ... py_trees.behaviours.Success(), - ... py_trees.composites.Sequence("s2", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]), - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Failure(), - ... py_trees.behaviours.Periodic("p", n=1), - ... ]), - ... ]) - >>> print(format_children_with_indices(tree)) # doctest: +NORMALIZE_WHITESPACE - _: [-] s1 - 0: --> Dummy - 1: --> Success - 2: [-] s2 - _: --> Dummy - 3: [-] - _: --> Failure - _: --> p - """ - index_strings = [] - i = 0 - for b in iterate_nodes(composite): - if b in composite.children: - index_strings.append(str(i)) - i += 1 - else: - index_strings.append("_") - - index_strings.append(str(i)) - - output = label_tree_lines(composite, index_strings) - return output - - -def format_parents_with_indices(composite: py_trees.composites.Composite) -> str: - index_strings = [] - i = 0 - for b in iterate_nodes(composite): - if ( - b.__class__.__name__ == "CustomSequence" - or b.__class__.__name__ == "CustomSelector" - ): - index_strings.append(str(i)) - else: - index_strings.append("_") - i += 1 - - output = label_tree_lines(composite, index_strings) - return output - - -def format_tree_with_indices( - tree: py_trees.behaviour.Behaviour, - show_root: bool = False, -) -> tuple[str, List[str]]: - """ - Examples: - >>> print(format_tree_with_indices(py_trees.behaviours.Dummy())) - 0: --> Dummy - - >>> tree = py_trees.composites.Sequence("s1", False, children=[ - ... py_trees.behaviours.Dummy(), - ... py_trees.behaviours.Success(), - ... py_trees.composites.Sequence("s2", False, children=[ - ... py_trees.behaviours.Dummy(), - ... ]), - ... py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Failure(), - ... py_trees.behaviours.Periodic("p", n=1), - ... ]), - ... ]) - >>> print(format_tree_with_indices(tree)) # doctest: +NORMALIZE_WHITESPACE - 0: [-] s1 - 1: --> Dummy - 2: --> Success - 3: [-] s2 - 4: --> Dummy - 5: [-] - 6: --> Failure - 7: --> p - - """ - - index_strings = [] - index = 0 - for i, node in enumerate_nodes(tree): - if i == 0 and not show_root: - index_strings.append("_") - else: - index_strings.append(str(index)) - index += 1 - output = label_tree_lines(tree, index_strings) - - return output, index_strings[1:] - - -def say(message): - print(message) - - -def prompt_identify_node( - tree: py_trees.behaviour.Behaviour, - message: str = "Which node?", - display_nodes: bool = True, - show_root: bool = False, -) -> py_trees.behaviour.Behaviour: - node_index = prompt_identify_tree_iterator_index( - tree=tree, message=message, display_nodes=display_nodes, show_root=show_root - ) - node = next(islice(iterate_nodes(tree), node_index, node_index + 1)) - return node - - -def prompt_identify_parent_node( - tree: py_trees.behaviour.Behaviour, - message: str = "Which position?", - display_nodes: bool = True, -) -> int: - if display_nodes: - text = f"{format_parents_with_indices(tree)}\n{message}" - else: - text = f"{message}" - node_index = click.prompt( - text=text, - type=int, - ) - - node = next(islice(iterate_nodes(tree), node_index, node_index + 1)) - return node - - -def prompt_identify_tree_iterator_index( - tree: py_trees.behaviour.Behaviour, - message: str = "Which position?", - display_nodes: bool = True, - show_root: bool = False, -) -> int: - if display_nodes: - format_tree_text, index_options = format_tree_with_indices(tree, show_root) - text = f"{format_tree_text}\n{message}" - else: - _, index_options = format_tree_with_indices(tree, show_root) - text = f"{message}" - node_index = click.prompt( - text=text, - type=click.Choice(index_options, case_sensitive=False), - show_choices=False, - ) - return int(node_index) - - -def prompt_identify_child_index( - tree: py_trees.behaviour.Behaviour, - message: str = "Which position?", - display_nodes: bool = True, -) -> int: - if display_nodes: - text = f"{format_children_with_indices(tree)}\n{message}" - else: - text = f"{message}" - node_index = click.prompt( - text=text, - type=int, - ) - return node_index - - -def add_child( - tree: T, - parent: Optional[py_trees.composites.Composite] = None, - child: Optional[py_trees.behaviour.Behaviour] = None, -) -> T: - """Add a behaviour to the tree - - Examples: - >>> tree = py_trees.composites.Sequence("", False, children=[]) - >>> print(py_trees.display.ascii_tree(tree)) # doctest: +NORMALIZE_WHITESPACE - [-] - - >>> print(py_trees.display.ascii_tree(add_child(tree, py_trees.behaviours.Success()))) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - - """ - if parent is None: - parent = prompt_identify_node( - tree, f"Which parent node do you want to add the child to?" - ) - if child is None: - child_key = click.prompt( - text="What should the child do?", type=click.Choice(["say"]) - ) - match child_key: - case "say": - message = click.prompt(text="What should it say?", type=str) - - child_function = wraps(say)(partial(say, message)) - child_type = py_trees.meta.create_behaviour_from_function( - child_function - ) - child = child_type() - case _: - raise NotImplementedError() - parent.add_child(child) - return tree - - -def remove_node(tree: T, node: Optional[py_trees.behaviour.Behaviour] = None) -> T: - """Remove a behaviour from the tree - - Examples: - >>> tree = py_trees.composites.Sequence("", False, children=[ - ... py_trees.behaviours.Success(), - ... failure_node := py_trees.behaviours.Failure()]) - >>> print(py_trees.display.ascii_tree(tree)) # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - --> Failure - - >>> print(py_trees.display.ascii_tree(remove_node(tree, failure_node))) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - - """ - if node is None: - node = prompt_identify_node(tree, f"Which node do you want to remove?") - parent_node = node.parent - if parent_node is None: - warnings.warn( - f"{node}'s parent is None, so we can't remove it. You can't remove the root node." - ) - action_log = {} - return (tree,) - elif isinstance(parent_node, py_trees.composites.Composite): - parent_node.remove_child(node) - action_log = { - "type": "remove_node", - "nodes": [ - {"id_": node.id_, "display_name": node.display_name}, - ], - "timestamp": datetime.now().isoformat(), - } - else: - raise NotImplementedError() - - return tree, action_log - - -def move_node( - tree: T, - node: Optional[py_trees.behaviour.Behaviour] = None, - new_parent: Optional[py_trees.behaviour.Behaviour] = None, - index: int = None, - internal_call: bool = False, -) -> T: - """Exchange two behaviours in the tree - - Examples: - >>> tree = py_trees.composites.Sequence("", False, children=[]) - - """ - - if node is None: - node = prompt_identify_node(tree, f"Which node do you want to move?") - if new_parent is None: - new_parent = prompt_identify_parent_node( - tree, f"What should its parent be?", display_nodes=True - ) - if index is None: - index = prompt_identify_child_index(new_parent) - - assert isinstance(new_parent, py_trees.composites.Composite) - assert isinstance(node.parent, py_trees.composites.Composite) - - # old_parent = node.parent.name - node.parent.remove_child(node) - new_parent.insert_child(node, index) - - if not internal_call: - action_log = { - "type": "move_node", - "nodes": [ - { - "id": node.id_, - "display_name": node.display_name, - }, - ], - "timestamp": datetime.now().isoformat(), - } - return tree, action_log - - return tree - - -def exchange_nodes( - tree: T, - node0: Optional[py_trees.behaviour.Behaviour] = None, - node1: Optional[py_trees.behaviour.Behaviour] = None, -) -> T: - """Exchange two behaviours in the tree - - Examples: - >>> tree = py_trees.composites.Sequence("", False, children=[ - ... s:=py_trees.behaviours.Success(), - ... f:=py_trees.behaviours.Failure(), - ... ]) - >>> print(py_trees.display.ascii_tree(tree)) # doctest: +NORMALIZE_WHITESPACE - [-] - --> Success - --> Failure - - >>> print(py_trees.display.ascii_tree(exchange_nodes(tree, s, f))) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - --> Failure - --> Success - - >>> tree = py_trees.composites.Sequence("", False, children=[ - ... a:= py_trees.composites.Sequence("A", False, children=[ - ... py_trees.behaviours.Dummy() - ... ]), - ... py_trees.composites.Sequence("B", False, children=[ - ... py_trees.behaviours.Success(), - ... c := py_trees.composites.Sequence("C", False, children=[]) - ... ]) - ... ]) - >>> print(py_trees.display.ascii_tree(tree)) # doctest: +NORMALIZE_WHITESPACE - [-] - [-] A - --> Dummy - [-] B - --> Success - [-] C - >>> print(py_trees.display.ascii_tree(exchange_nodes(tree, a, c))) - ... # doctest: +NORMALIZE_WHITESPACE - [-] - [-] C - [-] B - --> Success - [-] A - --> Dummy - """ - - if node0 is None: - node0 = prompt_identify_node(tree, f"Which node do you want to switch?") - if node1 is None: - node1 = prompt_identify_node( - tree, f"Which node do you want to switch?", display_nodes=False - ) - - node0_parent, node0_index = node0.parent, node0.parent.children.index(node0) - node1_parent, node1_index = node1.parent, node1.parent.children.index(node1) - - tree = move_node(tree, node0, node1_parent, node1_index, True) - tree = move_node(tree, node1, node0_parent, node0_index, True) - - nodes = [] - if node0.__class__.__name__ != "CustomBehavior": - nodes.append( - { - "display_name": node0.name, - } - ) - else: - nodes.append({"id": node0.id_, "display_name": node0.display_name}) - - if node1.__class__.__name__ != "CustomBehavior": - nodes.append( - { - "display_name": node1.display_name, - } - ) - else: - nodes.append({"id": node1.id_, "display_name": node1.display_name}) - - action_log = { - "type": "exchange_nodes", - "nodes": nodes, - "timestamp": datetime.now().isoformat(), - } - return tree, action_log - - -def prompt_select_node(behavior_library, text): - for idx, tree_name in enumerate( - behavior_library.behavior_from_display_name.keys(), 1 - ): - print(f"{idx}. {tree_name}") - - choices = [str(i + 1) for i in range(len(behavior_library.behaviors))] - node_index = click.prompt(text=text, type=click.Choice(choices), show_choices=False) - - node_key = list(behavior_library.behavior_from_display_name.keys())[node_index - 1] - - return behavior_library.behavior_from_display_name[node_key] - - -def add_node( - tree: T, - behavior_library: object, -) -> T: - """Exchange two behaviours in the tree - - Examples: - >>> tree = py_trees.composites.Sequence("", False, children=[]) - - """ - - behavior = prompt_select_node( - behavior_library, f"Which behavior do you want to add?" - ) - - if behavior["type"] == "Behavior": - new_node = CustomBehavior( - name=behavior["display_name"], - id_=behavior["id"], - display_name=behavior["display_name"], - ) - - elif behavior["type"] == "Sequence": - new_node = CustomSequence( - name=behavior["display_name"], - id_=behavior["id"], - display_name=behavior["display_name"], - ) - - new_parent = prompt_identify_parent_node( - tree, f"What should its parent be?", display_nodes=True - ) - - index = prompt_identify_child_index(new_parent) - - assert isinstance(new_parent, py_trees.composites.Composite) - - new_parent.insert_child(new_node, index) - - action_log = { - "type": "add_node", - "node": {"id": new_node.id_, "display_name": new_node.display_name}, - "timestamp": datetime.now().isoformat(), - } - - return tree, action_log diff --git a/src/social_norms_trees/serialize_tree.py b/src/social_norms_trees/serialize_tree.py index a3424f9..930b107 100644 --- a/src/social_norms_trees/serialize_tree.py +++ b/src/social_norms_trees/serialize_tree.py @@ -1,17 +1,125 @@ -import py_trees from social_norms_trees.custom_node_library import CustomBehavior, CustomSequence -def serialize_tree(tree): - def serialize_node(node): - data = { - "type": node.__class__.__name__, - "name": node.name, - "children": [serialize_node(child) for child in node.children], - } - return data +def serialize_tree(tree, include_children=True): + """ + Examples: + >>> from py_trees.behaviours import Dummy, Success, Failure + >>> from py_trees.composites import Sequence, Selector - return serialize_node(tree) + >>> serialize_tree(Dummy()) + {'type': 'Dummy', 'name': 'Dummy'} + + >>> serialize_tree(Success()) + {'type': 'Success', 'name': 'Success'} + + >>> serialize_tree(Failure()) + {'type': 'Failure', 'name': 'Failure'} + + >>> serialize_tree(Sequence("root", True, children=[Dummy()])) + {'type': 'Sequence', 'name': 'root', 'children': [{'type': 'Dummy', 'name': 'Dummy'}]} + + >>> serialize_tree(CustomBehavior("behavior", "theid", "display behavior")) + {'type': 'CustomBehavior', 'name': 'behavior', 'display_name': 'display behavior', 'id_': 'theid'} + + >>> serialize_tree(CustomSequence("root", "theid", "display root", children=[Dummy()])) + {'type': 'CustomSequence', 'name': 'root', 'display_name': 'display root', 'id_': 'theid', 'children': [{'type': 'Dummy', 'name': 'Dummy'}]} + """ + + data = { + "type": tree.__class__.__name__, + "name": tree.name, + } + if hasattr(tree, "display_name"): + data["display_name"] = tree.display_name + if hasattr(tree, "id_"): + data["id_"] = tree.id_ + if include_children and tree.children: + data["children"] = [serialize_tree(child) for child in tree.children] + + return data + + +def deserialize_library_element(description: dict): + """ + Examples: + >>> s = deserialize_library_element({"type": "Sequence", "name": "Sequence 0", "id": "s0"}) + >>> s + + + >>> s.id_ + 's0' + + >>> s.name + 'Sequence 0' + + >>> s.children + [] + + TODO: Implement selectors + >>> deserialize_library_element({"type": "Selector", "name": "Selector 0", "id": "s0"}) + Traceback (most recent call last): + ... + NotImplementedError: node_type=Selector is not implemented + + >>> b = deserialize_library_element({"type": "Behavior", "name": "Behavior 0", "id": "b0"}) + >>> b + + + >>> b.id_ + 'b0' + + >>> b.name + 'Behavior 0' + + + """ + assert isinstance(description["type"], str), ( + f"\nThere was an invalid configuration detected in the inputted behavior tree: " + f"Invalid type for node attribute 'type' found for node '{description['name']}'. " + f"Please ensure that the 'name' attribute is a string." + ) + + node_type = description["type"] + assert node_type in {"Sequence", "Selector", "Behavior"}, ( + f"\nThere was an invalid configuration detected in the inputted behavior tree: " + f"Invalid node type '{node_type}' found for node '{description['name']}'. " + f"Please ensure that all node types are correct and supported." + ) + + if node_type == "Sequence": + if "children" in description.keys(): + children = [ + deserialize_library_element(child) for child in description["children"] + ] + else: + children = [] + + node = CustomSequence( + name=description["name"], + id_=description["id"], + display_name=description["name"], + children=children, + ) + + elif node_type == "Behavior": + assert "children" not in description or len(description["children"]) == 0, ( + f"\nThere was an invalid configuration detected in the inputted behavior tree: " + f"Children were detected for Behavior type node '{description['name']}': " + f"Behavior nodes should not have any children. Please check the structure of your behavior tree." + ) + + node = CustomBehavior( + name=description["name"], + id_=description["id"], + display_name=description["name"], + ) + + else: + msg = "node_type=%s is not implemented" % node_type + raise NotImplementedError(msg) + + return node def deserialize_tree(tree, behavior_library): @@ -41,9 +149,9 @@ def deserialize_node(node): if behavior: return CustomSequence( - name=behavior["display_name"], + name=behavior["name"], id_=behavior["id"], - display_name=behavior["display_name"], + display_name=behavior["name"], children=children, ) else: @@ -62,9 +170,9 @@ def deserialize_node(node): if behavior: return CustomBehavior( - name=behavior["display_name"], + name=behavior["name"], id_=behavior["id"], - display_name=behavior["display_name"], + display_name=behavior["name"], ) else: raise ValueError( diff --git a/src/social_norms_trees/ui_wrapper.py b/src/social_norms_trees/ui_wrapper.py index e6188b3..a81fe6e 100644 --- a/src/social_norms_trees/ui_wrapper.py +++ b/src/social_norms_trees/ui_wrapper.py @@ -1,6 +1,6 @@ +import logging import pathlib -import time -from typing import Annotated +from typing import Annotated, List import click from datetime import datetime import json @@ -11,16 +11,25 @@ import typer -from social_norms_trees.mutate_tree import ( - move_node, - exchange_nodes, - remove_node, - add_node, +from social_norms_trees.atomic_mutations import ( + QuitException, + exchange, + insert, + move, + mutate_chooser, + remove, + end_experiment, +) +from social_norms_trees.serialize_tree import ( + deserialize_library_element, + serialize_tree, + deserialize_tree, ) -from social_norms_trees.serialize_tree import serialize_tree, deserialize_tree from social_norms_trees.behavior_library import BehaviorLibrary +_logger = logging.getLogger(__name__) + def load_db(db_file): if os.path.exists(db_file): @@ -35,8 +44,10 @@ def save_db(db, db_file): print(f"\nWriting results of simulation to {db_file}...") + json_representation = json.dumps(db, indent=4) + with open(db_file, "w") as f: - json.dump(db, f, indent=4) + f.write(json_representation) def experiment_setup(db, origin_tree): @@ -71,9 +82,9 @@ def load_resources(file_path): behavior_list = resources.get("behavior_library") context_paragraph = resources.get("context") - behavior_library = BehaviorLibrary(behavior_list) + behavior_tree = deserialize_tree(behavior_tree, BehaviorLibrary(behavior_list)) - behavior_tree = deserialize_tree(behavior_tree, behavior_library) + behavior_library = [deserialize_library_element(e) for e in behavior_list] print("Loading success.") return behavior_tree, behavior_library, context_paragraph @@ -97,64 +108,69 @@ def initialize_experiment_record(db, participant_id, origin_tree): return experiment_id -def run_experiment(db, origin_tree, experiment_id, behavior_library): +def display_tree(tree): + print(py_trees.display.ascii_tree(tree)) + return + + +def serialize_function_arguments(args): + results = {} + if isinstance(args, dict): + for key, value in args.items(): + results[key] = serialize_function_arguments(value) + return results + elif isinstance(args, py_trees.behaviour.Behaviour): + value = serialize_tree(args, include_children=False) + return value + elif isinstance(args, tuple): + value = tuple(serialize_function_arguments(i) for i in args) + return value + elif isinstance(args, list): + value = [serialize_function_arguments(i) for i in args] + return value + else: + return args + + +def run_experiment(tree, library): # Loop for the actual experiment part, which takes user input to decide which action to take print("\nExperiment beginning...\n") + results_dict = { + "start_time": datetime.now().isoformat(), + "initial_behavior_tree": serialize_tree(tree), + "action_log": [], + } + try: while True: - print(py_trees.display.ascii_tree(origin_tree)) - user_choice = click.prompt( - "Would you like to perform an action on the behavior tree?", - show_choices=True, - type=click.Choice(["y", "n"], case_sensitive=False), + display_tree(tree) + f = mutate_chooser(insert, move, exchange, remove, end_experiment) + if f is end_experiment: + break + results = f(tree, library) + results_dict["action_log"].append( + { + "type": results.function.__name__, + "kwargs": serialize_function_arguments(results.kwargs), + "time": datetime.now().isoformat(), + } ) - if user_choice == "y": - action = click.prompt( - "1. move node\n" - + "2. exchange node\n" - + "3. remove node\n" - + "4. add node\n" - + "Please select an action to perform on the behavior tree", - type=click.IntRange(min=1, max=4), - show_choices=True, - ) - - if action == 1: - origin_tree, action_log = move_node(origin_tree) - db[experiment_id]["action_history"].append(action_log) - elif action == 2: - origin_tree, action_log = exchange_nodes(origin_tree) - db[experiment_id]["action_history"].append(action_log) - - elif action == 3: - origin_tree, action_log = remove_node(origin_tree) - db[experiment_id]["action_history"].append(action_log) - - elif action == 4: - origin_tree, action_log = add_node(origin_tree, behavior_library) - db[experiment_id]["action_history"].append(action_log) - - else: - print( - "Invalid choice, please select a valid number (1, 2, 3, or 4).\n" - ) - - # user_choice == "n", end simulation run - else: - break + except QuitException: + pass except Exception: print( "\nAn error has occured during the experiment, the experiment will now end." ) - db[experiment_id]["error_log"] = traceback.format_exc() + results_dict["error_log"] = traceback.format_exc() - finally: - db[experiment_id]["final_behavior_tree"] = serialize_tree(origin_tree) - db[experiment_id]["end_date"] = datetime.now().isoformat() - return db + # finally: + results_dict["final_behavior_tree"] = serialize_tree(tree) + results_dict["start_time"] = datetime.now().isoformat() + + return results_dict app = typer.Typer() @@ -172,7 +188,18 @@ def main( pathlib.Path, typer.Option(help="file where the experimental results will be written"), ] = "db.json", + verbose: Annotated[bool, typer.Option("--verbose")] = False, + debug: Annotated[bool, typer.Option("--debug")] = False, ): + if debug: + logging.basicConfig(level=logging.DEBUG) + _logger.debug("debug logging") + elif verbose: + logging.basicConfig(level=logging.INFO) + _logger.debug("verbose logging") + else: + logging.basicConfig() + print("AIT Prototype #1 Simulator") # TODO: write up some context, assumptions made in the README @@ -185,7 +212,9 @@ def main( print(f"\nContext of this experiment: {context_paragraph}") participant_id, experiment_id = experiment_setup(db, original_tree) - db = run_experiment(db, original_tree, experiment_id, behavior_library) + results = run_experiment(original_tree, behavior_library) + db[experiment_id] = results + _logger.debug(db) save_db(db, db_file) # TODO: define export file, that will be where we export the results to