Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: making doing work better #34

Merged
merged 7 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 22 additions & 26 deletions beams/behavior_tree/ActionNode.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
import atexit
import os
from multiprocessing import Event, Lock
from typing import Callable, Any, Optional, Union

import py_trees
from epics.multiproc import CAProcess

from beams.behavior_tree.ActionWorker import ActionWorker
from beams.behavior_tree.VolatileStatus import VolatileStatus


class ActionNode(py_trees.behaviour.Behaviour):
def __init__(
self,
name,
work_func,
completion_condition,
name: str,
work_func: Callable[[Any], None],
completion_condition: Callable[[Any], bool],
work_gate=Event(),
work_lock=Lock(),
**kwargs,
): # TODO: can add failure condition argument...
super(ActionNode, self).__init__(name)
self.work_func = work_func
self.comp_condition = completion_condition
super().__init__(name)
# print(type(self.status))
self.__volatile_status__ = VolatileStatus(self.status)
self.additional_args = kwargs
self.logger.debug("%s.__init__()" % (self.__class__.__name__))

# TODO may want to instantiate these locally and then decorate the passed work function with them
self.work_gate = work_gate
self.lock = work_lock
self.worker = ActionWorker(proc_name=name,
volatile_status=self.__volatile_status__,
work_func=work_func,
comp_cond=completion_condition,
stop_func=None
) # TODO: some standard notion of stop function could be valuable
self.logger.debug("%s.__init__()" % (self.__class__.__name__))

def setup(self, **kwargs: int) -> None:
"""Kickstart the separate process this behaviour will work with.
Expand All @@ -39,14 +44,9 @@ def setup(self, **kwargs: int) -> None:
)

# Having this in setup means the workthread should always be running.
self.work_proc = CAProcess(
target=self.work_func,
args=(self.comp_condition, self.__volatile_status__),
kwargs=self.additional_args,
)
self.work_proc.start()
self.worker.start_work()
atexit.register(
self.work_proc.terminate
self.worker.stop_work
) # TODO(josh): make sure this cleans up resources when it dies

def initialise(self) -> None:
Expand Down Expand Up @@ -85,16 +85,12 @@ def update(self) -> py_trees.common.Status:
return new_status

def terminate(self, new_status: py_trees.common.Status) -> None:
"""Nothing to clean up in this example."""
print(f"TERMINATE CALLED ON {self.name}, pid: {os.getpid()}")
if self.work_proc.is_alive():
print(f"The process is still alive on {os.getpid()}")
self.work_proc.terminate()
self.logger.debug(
py_trees.console.red
+ "%s.terminate()[%s->%s]"
% (self.__class__.__name__, self.status, new_status)
)
"""Nothing to clean up."""
self.logger.debug(
py_trees.console.red
+ "%s.terminate()[%s->%s]"
% (self.__class__.__name__, self.status, new_status)
)


if __name__ == "__main__":
Expand Down
26 changes: 26 additions & 0 deletions beams/behavior_tree/ActionWorker.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new file (and many of the old files) has camelcase naming which is against the pep8 recommendations
https://peps.python.org/pep-0008/#package-and-module-names

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great call out, I might make this a seprate trivial PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""
A worker specialized to execute ActionNode work functions
"""
from typing import Callable, Any, Optional

from epics.multiproc import CAProcess

from beams.sequencer.helpers.Worker import Worker
from beams.behavior_tree.VolatileStatus import VolatileStatus


class ActionWorker(Worker):
def __init__(self,
proc_name: str,
volatile_status: VolatileStatus,
work_func: Callable[[Any], None],
comp_cond: Callable[[Any], bool],
stop_func: Optional[Callable[[None], None]] = None):
super().__init__(proc_name=proc_name,
stop_func=stop_func,
work_func=work_func,
proc_type=CAProcess,
add_args=(comp_cond, volatile_status))

# Note: there may be a world where we define a common stop_func here in which case
# the class may have maintain a reference to voltaile_status and or comp_cond
2 changes: 1 addition & 1 deletion beams/behavior_tree/CheckAndDo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class CheckAndDo(py_trees.composites.Selector):
def __init__(self, name: str, check: ConditionNode, do: ActionNode) -> None:
super().__init__(name, memory=True)
super().__init__(name, memory=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowledge check for me: does this make it so we always re-check the "check" side of the "check and do" on every tick even if the "do" is already running?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own reference: https://py-trees.readthedocs.io/en/devel/composites.html

if RUNNING on the previous tick, resume with the RUNNING child

I believe what Zach has concluded here is correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, I realized that error in this PR. We want the trees to err reactive. I.e. if a condition is no longer true or true we want to catch that even if executing another action.

We should call this out more explicitly / write unit tests to show the difference. This is a great call out

self.name = name
self.check = check
self.do = do
Expand Down
40 changes: 24 additions & 16 deletions beams/sequencer/helpers/Worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,26 @@
"""
import logging
from multiprocessing import Process, Value
from typing import Callable, Any, Optional, List


class Worker:
def __init__(self, proc_name, stop_func=None, work_func=None):
self.do_work = Value("i", False)
class Worker():
def __init__(self,
proc_name: str,
stop_func: Optional[Callable[[None], None]] = None,
work_func: Optional[Callable[[Any], None]] = None,
proc_type: type[Process] = Process,
add_args: List[Any] = []
):
self.do_work = Value('i', False)
self.proc_name = proc_name
self.proc_type = proc_type
# TODO: we may want to decorate work func so it prints proc id...
if work_func is None:
self.work_proc = Process(target=self.work_func, name=self.proc_name)
if (work_func is None):
self.work_proc = proc_type(target=self.work_func, name=self.proc_name)
else:
self.work_func = work_func
self.work_proc = Process(
target=self.work_func, name=self.proc_name, args=(self,)
)
self.work_func = work_func
self.work_proc = proc_type(target=self.work_func, name=self.proc_name, args=(self, *add_args,))
self.stop_func = stop_func

def start_work(self):
Expand All @@ -28,20 +34,24 @@ def start_work(self):
return
self.do_work.value = True
self.work_proc.start()
logging.info(f"Starting work on: {self.work_proc.pid}")
logging.debug(f"Starting work on: {self.work_proc.pid}")

def stop_work(self):
logging.info(f"Calling stop work on: {self.work_proc.pid}")
if not self.do_work.value:
logging.error("Not working, not stopping work")
return
self.do_work.value = False
if self.stop_func is not None:
logging.info(f"Sending terminate signal to{self.work_proc.pid}")
# Send kill signal to work process. # TODO: the exact loc ation of this is important. Reflect
# with self.do_work.get_lock():
self.work_proc.terminate()
if (self.stop_func is not None):
self.stop_func()

print("calling join")
logging.info("calling join")
self.work_proc.join()
print("joined")
logging.info("joined")

def work_func(self):
"""
Expand All @@ -55,6 +65,4 @@ def work_func(self):

def set_work_func(self, work_func):
self.work_func = work_func
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of bad / confusing overloading. We define Worker.work_func as a method of Worker, but then overwrite it with a function that is passed in at init. Furthermore this function can have any arbitrary signature, depending on the purpose of the worker.

We need to choose our approach here. We're either:

  • subclassing Worker for new behaviors, and making sure that Worker.work_func adheres to the signature / interface defined above
  • accepting any arbitrary function, storing it, and sending it off to a multiprocessing.Process etc.

We appear to be doing a mix of both in this codebase. (Sequencer overloads Worker.work_func, and ActionWorker overwrites it with a function)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, especially with what you taught me about bound vs passed (?) class methods. Removing this functionality may be best, I wanted to leave it flexible till I found what seemed preferable.

It seems we have agreed upon not using bound class members (means we have to pass self as an argument for spawned Processes) as it patters well to utilizing the base Worker class to spawn our work threads (and therefore concretize feature adds / debugging in spawned processes)

self.work_proc = Process(
target=self.work_func, name=self.proc_name, args=(self,)
)
self.work_proc = self.proc_type(target=self.work_func, name=self.proc_name, args=(self,))
6 changes: 3 additions & 3 deletions beams/tests/artifacts/eggs.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
"name": "self_test",
"description": "",
"check": {
"name": "",
"name": "self_test_check",
"description": "",
"pv": "PERC:COMP",
"value": 100,
"operator": "ge"
},
"do": {
"IncPVActionItem": {
"name": "",
"name": "self_test_do",
"description": "",
"loop_period_sec": 1.0,
"loop_period_sec": 0.01,
"pv": "PERC:COMP",
"increment": 10,
"termination_check": {
Expand Down
6 changes: 3 additions & 3 deletions beams/tests/artifacts/eggs2.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
"name": "ret_find",
"description": "",
"check": {
"name": "",
"name": "ret_find_check",
"description": "",
"pv": "RET:FOUND",
"value": 1,
"operator": "ge"
},
"do": {
"SetPVActionItem": {
"name": "",
"name": "ret_find_do",
"description": "",
"loop_period_sec": 1.0,
"loop_period_sec": 0.01,
"pv": "RET:FOUND",
"value": 1,
"termination_check": {
Expand Down
9 changes: 5 additions & 4 deletions beams/tests/test_check_and_do.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ class TestTask:
def test_check_and_do(self, capsys):
percentage_complete = Value("i", 0)

def thisjob(comp_condition, volatile_status, **kwargs) -> None:
def thisjob(myself, comp_condition, volatile_status) -> None:
# TODO: grabbing intended keyword argument. Josh's less than pythonic mechanism for closures
volatile_status.set_value(py_trees.common.Status.RUNNING)
percentage_complete = kwargs["percentage_complete"]
while not comp_condition(percentage_complete.value):
py_trees.console.logdebug(
f"yuh {percentage_complete.value}, {volatile_status.get_value()}"
f"PERC COMP {percentage_complete.value}, {volatile_status.get_value()}"
)
percentage_complete.value += 10
if percentage_complete.value == 100:
Expand All @@ -26,7 +25,9 @@ def thisjob(comp_condition, volatile_status, **kwargs) -> None:
py_trees.logging.level = py_trees.logging.Level.DEBUG
comp_cond = lambda x: x == 100
action = ActionNode.ActionNode(
"action", thisjob, comp_cond, percentage_complete=percentage_complete
name="action",
work_func=thisjob,
completion_condition=comp_cond
)

checky = lambda x: x.value == 100
Expand Down
33 changes: 13 additions & 20 deletions beams/tests/test_leaf_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,24 @@ def test_action_node(self, capsys):
# For test
percentage_complete = Value("i", 0)

def thisjob(comp_condition, volatile_status, **kwargs) -> None:
try:
# grabbing intended keyword argument. Josh's less than pythonic mechanism for closures
volatile_status.set_value(py_trees.common.Status.RUNNING)
percentage_complete = kwargs["percentage_complete"]
while not comp_condition(percentage_complete.value):
py_trees.console.logdebug(
f"yuh {percentage_complete.value}, {volatile_status.get_value()}"
)
percentage_complete.value += 10
if percentage_complete.value == 100:
volatile_status.set_value(py_trees.common.Status.SUCCESS)
time.sleep(0.001)
except KeyboardInterrupt:
pass
def thisjob(myself, comp_condition, volatile_status) -> None:
volatile_status.set_value(py_trees.common.Status.RUNNING)
while not comp_condition(percentage_complete.value):
py_trees.console.logdebug(f"yuh {percentage_complete.value}, {volatile_status.get_value()}")
percentage_complete.value += 10
if percentage_complete.value == 100:
volatile_status.set_value(py_trees.common.Status.SUCCESS)
time.sleep(0.001)

py_trees.logging.level = py_trees.logging.Level.DEBUG
comp_cond = lambda x: x == 100
action = ActionNode(
"action", thisjob, comp_cond, percentage_complete=percentage_complete
)
action = ActionNode(name="action",
work_func=thisjob,
completion_condition=comp_cond)
action.setup()
for i in range(20):
time.sleep(0.01)
action.tick_once()
time.sleep(0.01)
action.tick_once()

assert percentage_complete.value == 100

Expand Down
21 changes: 0 additions & 21 deletions beams/tests/test_sequencer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,3 @@ def test_get_command_reply(self):
assert y[SequencerStateVariable.STATUS.value] == TickStatus.UNKNOWN
assert y[SequencerStateVariable.RUN_STATE.value] == RunStateType.STATE_UNKNOWN
assert y["mess_t"] == MessageType.MESSAGE_TYPE_COMMAND_REPLY

def test_multi_access(self):
x = SequencerState()

def proc1work(x):
print("proc1 a go")
x.set_value(SequencerStateVariable.RUN_STATE, RunStateType.TICKING)

def proc2work(x):
print("proc2 a go")
while x.get_value(SequencerStateVariable.RUN_STATE) != RunStateType.TICKING:
print("waiting for get value to return true")
time.sleep(0.1)

assert x.get_value(SequencerStateVariable.RUN_STATE) == RunStateType.TICKING

proc1 = Process(target=proc1work, args=(x,))
proc2 = Process(target=proc2work, args=(x,))
proc2.start()
time.sleep(0.4)
proc1.start()
14 changes: 8 additions & 6 deletions beams/tests/test_tree_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def test_tree_obj_execution(request):
py_trees.common.Status.SUCCESS,
py_trees.common.Status.FAILURE,
):
tree.tick()
time.sleep(0.05)
for n in tree.root.tick():
print(n)
time.sleep(0.05)

rel_val = caget("PERC:COMP")
assert rel_val >= 100
Expand All @@ -56,10 +57,11 @@ def test_father_tree_execution(request):
not in (py_trees.common.Status.SUCCESS, py_trees.common.Status.FAILURE)
and ct < 50
):
ct += 1
print((tree.root.status, tree.root.status, ct))
tree.tick()
time.sleep(0.05)
for n in tree.root.tick():
ct += 1
print(n)
print((tree.root.status, tree.root.status, ct))
time.sleep(0.05)

check_insert = caget("RET:INSERT")

Expand Down
Loading
Loading