diff --git a/beams/behavior_tree/ActionNode.py b/beams/behavior_tree/ActionNode.py index 23a9769..4ae5807 100644 --- a/beams/behavior_tree/ActionNode.py +++ b/beams/behavior_tree/ActionNode.py @@ -1,33 +1,38 @@ import atexit import os from multiprocessing import Event, Lock +from typing import Callable, Any, Optional, Union import py_trees from epics.multiproc import CAProcess +from beams.behavior_tree.ActionWorker import ActionWorker from beams.behavior_tree.VolatileStatus import VolatileStatus class ActionNode(py_trees.behaviour.Behaviour): def __init__( self, - name, - work_func, - completion_condition, + name: str, + work_func: Callable[[Any], None], + completion_condition: Callable[[Any], bool], work_gate=Event(), work_lock=Lock(), **kwargs, ): # TODO: can add failure condition argument... - super(ActionNode, self).__init__(name) - self.work_func = work_func - self.comp_condition = completion_condition + super().__init__(name) # print(type(self.status)) self.__volatile_status__ = VolatileStatus(self.status) - self.additional_args = kwargs - self.logger.debug("%s.__init__()" % (self.__class__.__name__)) - + # TODO may want to instantiate these locally and then decorate the passed work function with them self.work_gate = work_gate self.lock = work_lock + self.worker = ActionWorker(proc_name=name, + volatile_status=self.__volatile_status__, + work_func=work_func, + comp_cond=completion_condition, + stop_func=None + ) # TODO: some standard notion of stop function could be valuable + self.logger.debug("%s.__init__()" % (self.__class__.__name__)) def setup(self, **kwargs: int) -> None: """Kickstart the separate process this behaviour will work with. @@ -39,14 +44,9 @@ def setup(self, **kwargs: int) -> None: ) # Having this in setup means the workthread should always be running. - self.work_proc = CAProcess( - target=self.work_func, - args=(self.comp_condition, self.__volatile_status__), - kwargs=self.additional_args, - ) - self.work_proc.start() + self.worker.start_work() atexit.register( - self.work_proc.terminate + self.worker.stop_work ) # TODO(josh): make sure this cleans up resources when it dies def initialise(self) -> None: @@ -85,16 +85,12 @@ def update(self) -> py_trees.common.Status: return new_status def terminate(self, new_status: py_trees.common.Status) -> None: - """Nothing to clean up in this example.""" - print(f"TERMINATE CALLED ON {self.name}, pid: {os.getpid()}") - if self.work_proc.is_alive(): - print(f"The process is still alive on {os.getpid()}") - self.work_proc.terminate() - self.logger.debug( - py_trees.console.red - + "%s.terminate()[%s->%s]" - % (self.__class__.__name__, self.status, new_status) - ) + """Nothing to clean up.""" + self.logger.debug( + py_trees.console.red + + "%s.terminate()[%s->%s]" + % (self.__class__.__name__, self.status, new_status) + ) if __name__ == "__main__": diff --git a/beams/behavior_tree/ActionWorker.py b/beams/behavior_tree/ActionWorker.py new file mode 100644 index 0000000..5736576 --- /dev/null +++ b/beams/behavior_tree/ActionWorker.py @@ -0,0 +1,26 @@ +""" +A worker specialized to execute ActionNode work functions +""" +from typing import Callable, Any, Optional + +from epics.multiproc import CAProcess + +from beams.sequencer.helpers.Worker import Worker +from beams.behavior_tree.VolatileStatus import VolatileStatus + + +class ActionWorker(Worker): + def __init__(self, + proc_name: str, + volatile_status: VolatileStatus, + work_func: Callable[[Any], None], + comp_cond: Callable[[Any], bool], + stop_func: Optional[Callable[[None], None]] = None): + super().__init__(proc_name=proc_name, + stop_func=stop_func, + work_func=work_func, + proc_type=CAProcess, + add_args=(comp_cond, volatile_status)) + + # Note: there may be a world where we define a common stop_func here in which case + # the class may have maintain a reference to voltaile_status and or comp_cond \ No newline at end of file diff --git a/beams/behavior_tree/CheckAndDo.py b/beams/behavior_tree/CheckAndDo.py index 54b3433..aac3827 100644 --- a/beams/behavior_tree/CheckAndDo.py +++ b/beams/behavior_tree/CheckAndDo.py @@ -6,7 +6,7 @@ class CheckAndDo(py_trees.composites.Selector): def __init__(self, name: str, check: ConditionNode, do: ActionNode) -> None: - super().__init__(name, memory=True) + super().__init__(name, memory=False) self.name = name self.check = check self.do = do diff --git a/beams/sequencer/helpers/Worker.py b/beams/sequencer/helpers/Worker.py index ef37c17..fbfc68d 100644 --- a/beams/sequencer/helpers/Worker.py +++ b/beams/sequencer/helpers/Worker.py @@ -6,20 +6,26 @@ """ import logging from multiprocessing import Process, Value +from typing import Callable, Any, Optional, List -class Worker: - def __init__(self, proc_name, stop_func=None, work_func=None): - self.do_work = Value("i", False) +class Worker(): + def __init__(self, + proc_name: str, + stop_func: Optional[Callable[[None], None]] = None, + work_func: Optional[Callable[[Any], None]] = None, + proc_type: type[Process] = Process, + add_args: List[Any] = [] + ): + self.do_work = Value('i', False) self.proc_name = proc_name + self.proc_type = proc_type # TODO: we may want to decorate work func so it prints proc id... - if work_func is None: - self.work_proc = Process(target=self.work_func, name=self.proc_name) + if (work_func is None): + self.work_proc = proc_type(target=self.work_func, name=self.proc_name) else: - self.work_func = work_func - self.work_proc = Process( - target=self.work_func, name=self.proc_name, args=(self,) - ) + self.work_func = work_func + self.work_proc = proc_type(target=self.work_func, name=self.proc_name, args=(self, *add_args,)) self.stop_func = stop_func def start_work(self): @@ -28,7 +34,7 @@ def start_work(self): return self.do_work.value = True self.work_proc.start() - logging.info(f"Starting work on: {self.work_proc.pid}") + logging.debug(f"Starting work on: {self.work_proc.pid}") def stop_work(self): logging.info(f"Calling stop work on: {self.work_proc.pid}") @@ -36,12 +42,16 @@ def stop_work(self): logging.error("Not working, not stopping work") return self.do_work.value = False - if self.stop_func is not None: + logging.info(f"Sending terminate signal to{self.work_proc.pid}") + # Send kill signal to work process. # TODO: the exact loc ation of this is important. Reflect + # with self.do_work.get_lock(): + self.work_proc.terminate() + if (self.stop_func is not None): self.stop_func() - print("calling join") + logging.info("calling join") self.work_proc.join() - print("joined") + logging.info("joined") def work_func(self): """ @@ -55,6 +65,4 @@ def work_func(self): def set_work_func(self, work_func): self.work_func = work_func - self.work_proc = Process( - target=self.work_func, name=self.proc_name, args=(self,) - ) + self.work_proc = self.proc_type(target=self.work_func, name=self.proc_name, args=(self,)) diff --git a/beams/tests/artifacts/eggs.json b/beams/tests/artifacts/eggs.json index 573d1c8..03d0c2c 100644 --- a/beams/tests/artifacts/eggs.json +++ b/beams/tests/artifacts/eggs.json @@ -4,7 +4,7 @@ "name": "self_test", "description": "", "check": { - "name": "", + "name": "self_test_check", "description": "", "pv": "PERC:COMP", "value": 100, @@ -12,9 +12,9 @@ }, "do": { "IncPVActionItem": { - "name": "", + "name": "self_test_do", "description": "", - "loop_period_sec": 1.0, + "loop_period_sec": 0.01, "pv": "PERC:COMP", "increment": 10, "termination_check": { diff --git a/beams/tests/artifacts/eggs2.json b/beams/tests/artifacts/eggs2.json index 1ec0fd7..2bcba87 100644 --- a/beams/tests/artifacts/eggs2.json +++ b/beams/tests/artifacts/eggs2.json @@ -10,7 +10,7 @@ "name": "ret_find", "description": "", "check": { - "name": "", + "name": "ret_find_check", "description": "", "pv": "RET:FOUND", "value": 1, @@ -18,9 +18,9 @@ }, "do": { "SetPVActionItem": { - "name": "", + "name": "ret_find_do", "description": "", - "loop_period_sec": 1.0, + "loop_period_sec": 0.01, "pv": "RET:FOUND", "value": 1, "termination_check": { diff --git a/beams/tests/test_check_and_do.py b/beams/tests/test_check_and_do.py index 86430c4..6f2731b 100644 --- a/beams/tests/test_check_and_do.py +++ b/beams/tests/test_check_and_do.py @@ -10,13 +10,12 @@ class TestTask: def test_check_and_do(self, capsys): percentage_complete = Value("i", 0) - def thisjob(comp_condition, volatile_status, **kwargs) -> None: + def thisjob(myself, comp_condition, volatile_status) -> None: # TODO: grabbing intended keyword argument. Josh's less than pythonic mechanism for closures volatile_status.set_value(py_trees.common.Status.RUNNING) - percentage_complete = kwargs["percentage_complete"] while not comp_condition(percentage_complete.value): py_trees.console.logdebug( - f"yuh {percentage_complete.value}, {volatile_status.get_value()}" + f"PERC COMP {percentage_complete.value}, {volatile_status.get_value()}" ) percentage_complete.value += 10 if percentage_complete.value == 100: @@ -26,7 +25,9 @@ def thisjob(comp_condition, volatile_status, **kwargs) -> None: py_trees.logging.level = py_trees.logging.Level.DEBUG comp_cond = lambda x: x == 100 action = ActionNode.ActionNode( - "action", thisjob, comp_cond, percentage_complete=percentage_complete + name="action", + work_func=thisjob, + completion_condition=comp_cond ) checky = lambda x: x.value == 100 diff --git a/beams/tests/test_leaf_node.py b/beams/tests/test_leaf_node.py index 1ee87e3..953c139 100644 --- a/beams/tests/test_leaf_node.py +++ b/beams/tests/test_leaf_node.py @@ -13,31 +13,24 @@ def test_action_node(self, capsys): # For test percentage_complete = Value("i", 0) - def thisjob(comp_condition, volatile_status, **kwargs) -> None: - try: - # grabbing intended keyword argument. Josh's less than pythonic mechanism for closures - volatile_status.set_value(py_trees.common.Status.RUNNING) - percentage_complete = kwargs["percentage_complete"] - while not comp_condition(percentage_complete.value): - py_trees.console.logdebug( - f"yuh {percentage_complete.value}, {volatile_status.get_value()}" - ) - percentage_complete.value += 10 - if percentage_complete.value == 100: - volatile_status.set_value(py_trees.common.Status.SUCCESS) - time.sleep(0.001) - except KeyboardInterrupt: - pass + def thisjob(myself, comp_condition, volatile_status) -> None: + volatile_status.set_value(py_trees.common.Status.RUNNING) + while not comp_condition(percentage_complete.value): + py_trees.console.logdebug(f"yuh {percentage_complete.value}, {volatile_status.get_value()}") + percentage_complete.value += 10 + if percentage_complete.value == 100: + volatile_status.set_value(py_trees.common.Status.SUCCESS) + time.sleep(0.001) py_trees.logging.level = py_trees.logging.Level.DEBUG comp_cond = lambda x: x == 100 - action = ActionNode( - "action", thisjob, comp_cond, percentage_complete=percentage_complete - ) + action = ActionNode(name="action", + work_func=thisjob, + completion_condition=comp_cond) action.setup() for i in range(20): - time.sleep(0.01) - action.tick_once() + time.sleep(0.01) + action.tick_once() assert percentage_complete.value == 100 diff --git a/beams/tests/test_sequencer_state.py b/beams/tests/test_sequencer_state.py index 5393a0e..35ee610 100644 --- a/beams/tests/test_sequencer_state.py +++ b/beams/tests/test_sequencer_state.py @@ -28,24 +28,3 @@ def test_get_command_reply(self): assert y[SequencerStateVariable.STATUS.value] == TickStatus.UNKNOWN assert y[SequencerStateVariable.RUN_STATE.value] == RunStateType.STATE_UNKNOWN assert y["mess_t"] == MessageType.MESSAGE_TYPE_COMMAND_REPLY - - def test_multi_access(self): - x = SequencerState() - - def proc1work(x): - print("proc1 a go") - x.set_value(SequencerStateVariable.RUN_STATE, RunStateType.TICKING) - - def proc2work(x): - print("proc2 a go") - while x.get_value(SequencerStateVariable.RUN_STATE) != RunStateType.TICKING: - print("waiting for get value to return true") - time.sleep(0.1) - - assert x.get_value(SequencerStateVariable.RUN_STATE) == RunStateType.TICKING - - proc1 = Process(target=proc1work, args=(x,)) - proc2 = Process(target=proc2work, args=(x,)) - proc2.start() - time.sleep(0.4) - proc1.start() diff --git a/beams/tests/test_tree_generator.py b/beams/tests/test_tree_generator.py index b940d2c..eccabb4 100644 --- a/beams/tests/test_tree_generator.py +++ b/beams/tests/test_tree_generator.py @@ -33,8 +33,9 @@ def test_tree_obj_execution(request): py_trees.common.Status.SUCCESS, py_trees.common.Status.FAILURE, ): - tree.tick() - time.sleep(0.05) + for n in tree.root.tick(): + print(n) + time.sleep(0.05) rel_val = caget("PERC:COMP") assert rel_val >= 100 @@ -56,10 +57,11 @@ def test_father_tree_execution(request): not in (py_trees.common.Status.SUCCESS, py_trees.common.Status.FAILURE) and ct < 50 ): - ct += 1 - print((tree.root.status, tree.root.status, ct)) - tree.tick() - time.sleep(0.05) + for n in tree.root.tick(): + ct += 1 + print(n) + print((tree.root.status, tree.root.status, ct)) + time.sleep(0.05) check_insert = caget("RET:INSERT") diff --git a/beams/tests/test_worker.py b/beams/tests/test_worker.py index 87445bb..dc5af49 100644 --- a/beams/tests/test_worker.py +++ b/beams/tests/test_worker.py @@ -1,26 +1,36 @@ import time from multiprocessing import Value - from beams.sequencer.helpers.Worker import Worker class TestTask: def test_obj_instantiation(self): - class WorkChild(Worker): - def __init__(self): - super().__init__("test_worker") - self.value = Value("d", 0) # Note: here value is a member object - - def work_func(self): - while self.do_work.value or self.value.value < 100: - if self.value.value < 100: # Note: here we reference member object - self.value.value += 10 + class CoolWorker(Worker): + def __init__(self, + proc_name: str, + work_func): + super().__init__(proc_name=proc_name, + work_func=work_func, + add_args=()) + self.val = Value('i', 0) - w = WorkChild() - w.start_work() - w.stop_work() # blocking calls join + def work_func(self): + while self.do_work.value or self.val.value < 100: + if self.val.value < 100: # Note: value captured via closure + # Note: even in python if a full bore loop captures a lock other Processes may not have time + # to acquire this shared resource; therefore the sleep is very strongly suggested if not needed + # when using a lock + time.sleep(0.01) + with self.val.get_lock(): + self.val.value += 10 # Note: value captured via closure - assert w.value.value == 100 + c = CoolWorker("cool_guy", + work_func=work_func) + c.start_work() + time.sleep(1) # Note: need actual time for the thing to happen + c.stop_work() + with c.val.get_lock(): + assert c.val.value == 100 def test_class_member_instantiation(self): a = Worker("test_worker") @@ -43,10 +53,13 @@ def test_inline_instantation(self): def work_func(self): while self.do_work.value or val.value < 100: - if val.value < 100: # Note: value captured via closure - val.value += 10 + time.sleep(0.01) + with val.get_lock(): + if val.value < 100: # Note: value captured via closure + val.value += 10 a = Worker("test_worker", work_func=work_func) a.start_work() + time.sleep(1) a.stop_work() assert val.value == 100 diff --git a/beams/tree_config.py b/beams/tree_config.py index 4af74b3..11bbc74 100644 --- a/beams/tree_config.py +++ b/beams/tree_config.py @@ -167,13 +167,13 @@ class SetPVActionItem(ActionItem): termination_check: ConditionItem = field(default_factory=ConditionItem) def get_tree(self) -> ActionNode: + # TODO: can I put these two lines in a decorator which action node uses on the function? wait_for_tick = Event() wait_for_tick_lock = Lock() - def work_func(comp_condition, volatile_status): - py_trees.console.logdebug( - f"WAITING FOR INIT {os.getpid()} " f"from node: {self.name}" - ) + def work_func(myself, comp_condition, volatile_status): + py_trees.console.logdebug(f"WAITING FOR INIT {os.getpid()} " + f"from node: {self.name}") wait_for_tick.wait() # Set to running @@ -228,10 +228,9 @@ def get_tree(self) -> ActionNode: wait_for_tick = Event() wait_for_tick_lock = Lock() - def work_func(comp_condition, volatile_status): - py_trees.console.logdebug( - f"WAITING FOR INIT {os.getpid()} " f"from node: {self.name}" - ) + def work_func(myself, comp_condition, volatile_status): + py_trees.console.logdebug(f"WAITING FOR INIT {os.getpid()} " + f"from node: {self.name}") wait_for_tick.wait() # Set to running