-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: making doing work better #34
Conversation
e932fc4
to
b289496
Compare
Fixed up the merge conflicts. Waiting to push for a moment. test_check_and_do.py had a trivial fix. |
b289496
to
5f62106
Compare
bound class instance functions and passed member instance functions
5f62106
to
c18ee2a
Compare
Closes #28 |
519649d
to
31de5a9
Compare
beams/behavior_tree/ActionNode.py
Outdated
+ "%s.terminate()[%s->%s]" | ||
% (self.__class__.__name__, self.status, new_status) | ||
) | ||
# TODO: serious introspection about that we want to do here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally anti commented code being persisted in the code base but I think this serves as an adequate TODO. Termination is a py_tree step triggered by a composite node, we likely do want to take advantage of this function not for clean up as is commented here (but maybe if we go to multi shot Processes) but for logging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good change, I remember most of my comments being nitpicks or questions
beams/behavior_tree/ActionNode.py
Outdated
completion_condition, | ||
name: str, | ||
work_func: Callable[[Any], None], | ||
completion_condition: Callable[[Any], bool], | ||
work_gate=Event(), | ||
work_lock=Lock(), | ||
**kwargs, | ||
): # TODO: can add failure condition argument... | ||
super(ActionNode, self).__init__(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: this one of the old py2 syntaxes for super, it still works but it's wordier than the new way. I realize this is not part of the diff and that it matches the docs example but it caught my eye.
super(ActionNode, self).__init__(name) | |
super().__init__(name) |
beams/behavior_tree/ActionNode.py
Outdated
# def terminate(self, new_status: py_trees.common.Status) -> None: | ||
# """Nothing to clean up in this example.""" | ||
# print(f"TERMINATE CALLED ON {self.name}, pid: {os.getpid()}") | ||
# if self.work_proc.is_alive(): | ||
# print(f"The process is still alive on {os.getpid()}") | ||
# self.work_proc.terminate() | ||
# self.logger.debug( | ||
# py_trees.console.red | ||
# + "%s.terminate()[%s->%s]" | ||
# % (self.__class__.__name__, self.status, new_status) | ||
# ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given your above comment, maybe we can put in a basic log message instead of wiping the function entirely? I'm not sure precisely which info is useful here.
# def terminate(self, new_status: py_trees.common.Status) -> None: | |
# """Nothing to clean up in this example.""" | |
# print(f"TERMINATE CALLED ON {self.name}, pid: {os.getpid()}") | |
# if self.work_proc.is_alive(): | |
# print(f"The process is still alive on {os.getpid()}") | |
# self.work_proc.terminate() | |
# self.logger.debug( | |
# py_trees.console.red | |
# + "%s.terminate()[%s->%s]" | |
# % (self.__class__.__name__, self.status, new_status) | |
# ) | |
def terminate(self, new_status: py_trees.common.Status) -> None: | |
"""Nothing to clean up."" | |
logger.debug("%s terminated", self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new file (and many of the old files) has camelcase naming which is against the pep8 recommendations
https://peps.python.org/pep-0008/#package-and-module-names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great call out, I might make this a seprate trivial PR
beams/behavior_tree/ActionWorker.py
Outdated
proc_type=CAProcess, | ||
add_args=(comp_cond, volatile_status)) | ||
self.comp_condition = comp_cond | ||
self.__volatile_status__ = volatile_status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is __volatile_status__
used for in workers? Does it do something already that I'm missing or is it planned for the future?
Why are these using the python double-underscore formatting wherever they appear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that the double-underscore is largely unnecessary here, in python it implies a special method or attribute that python provides. If you intended it to be private, the best way to hint that is with a single leading underscore. (Though nothing is truly private in python)
Correct me if I'm wrong Josh, but the VolatileStatus
is just a way to communicate the status of the worker process to the main process. It's a multiprocessing-safe enum.
I've been pondering whether or not this is really necessary. Could we not just set up a multiprocessing.PIPE
and pass primitive values that we cast into enums? They may still be enums on either side of the pipe, but just passing the value prevents us from having to jump through all these hoops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does the functionality critical work of inter process communication in BTs. As Worker doesn't necessarily require IPC this ActionWorker class extends the Worker class for the subclass of things that will require IPC as they are part of trees.
We had this conversation once before. I totally agree other mechanisms can be used to facilitate IPC. Pythonically Queue
s and Pipe
s are multiprocessing safe but so to is Value
. Because we are strictly passing one of three values completely encapsulated by an enum I found Value much more elegant than needing to call send and receive on a pipe or pop from a queue. Value just wraps a portion of shared memory which seemed more elegant at this level of dev space.
On our long call on Friday Robert and I sketched a similar idea that py_trees reach with a source of truth "database" in which all processes have access too; however, to prevent race conditions you would still have to share a Process safe lock between these workers which seems like doing the same thing as sharing a Value.
I wanted to indicate that future developers should only very intentionally mess with volatile_status with the dunders but I agree now that that is the incorrect way to signal that.
Totally open to Pipe's and Queue's but in this specific case (of just passing a wrapped int / enum) it seems more cumbersome than is worth. Again somewhat potato potato
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is important to communicate as long as these are seperate processes we need IPC safe mechanisms of passing information therefore it is strictly necessary to have something of this sort. There may be an async implementation in which these processes (or then called python threads) all happen on the same stack and can share information easier via the GIL. I have reasons I can expoind upon of why I don't love this implementation but it would wokr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original question is less conceptual and more that I don't understand (could not find) precisely where in the code this volatile status is used. Therefore, I either don't actually understand the code flow or there's some issue in the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't be sorry its a good question. Worker needs to pass it to the constructor of Process. Worker owns the process; ActionNode which instantiates Worker by way of ActionWorker owns the IPC related variables namely here volatile status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to hit approve but I maintain that everything still works if you remove this specific line of code since it will never be accessed this way again during execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah.... Sorry it took me till now to see your exact point. Its specifically the the ActionWorker does not need to maintain a reference to this "piece of shared memory"
From one perspective that is totally, totally correct. Lets not leave a loaded gun laying around (sorry the metaphor was too fun to avoid). However, there is also a world where ActionWorker defines stop_work functions for all work that gets packaged and sent off to Worker and it may be useful to have a reference to it here... Though an argument could be made that that should be grabbed by invoking super
in some way I am not pythonically familiar with yet...
TLDR: you're right I can remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a guideline I read somewhere that PR's should never include pieces that "will be used later on"
I catch myself doing that all too often. It's definitely good to help keep diffs understandable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah agreed, i've had peers willing to die on the hill of commented out code should't live in commits either
@@ -6,7 +6,7 @@ | |||
|
|||
class CheckAndDo(py_trees.composites.Selector): | |||
def __init__(self, name: str, check: ConditionNode, do: ActionNode) -> None: | |||
super().__init__(name, memory=True) | |||
super().__init__(name, memory=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knowledge check for me: does this make it so we always re-check the "check" side of the "check and do" on every tick even if the "do" is already running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own reference: https://py-trees.readthedocs.io/en/devel/composites.html
if RUNNING on the previous tick, resume with the RUNNING child
I believe what Zach has concluded here is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100%, I realized that error in this PR. We want the trees to err reactive. I.e. if a condition is no longer true or true we want to catch that even if executing another action.
We should call this out more explicitly / write unit tests to show the difference. This is a great call out
beams/sequencer/helpers/Worker.py
Outdated
print(self.work_func) | ||
# breakpoint() | ||
self.work_proc.start() | ||
logging.info(f"Starting work on: {self.work_proc.pid}") | ||
logging.debug(f"Starting work on: {self.work_proc.pid}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've got some mixing of logging and printing here, and this isn't in the multiprocess context yet so it kind of seems more messy than intentional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a widespread thing we'll have to clean up (#19)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes good call out. when debugging I kinda spam both as they tend to print differently, not good
beams/sequencer/helpers/Worker.py
Outdated
proc_name: str, | ||
stop_func: Optional[Callable[[None], None]] = None, | ||
work_func: Optional[Callable[[Any], None]] = None, | ||
proc_type: Union[Process, CAProcess] = Process, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiny tiny type hinting nitpicks:
- CAProcess is a subclass of Process, so the Union is unnecessary for the type hint
- We pass in a type here rather than an object, so the type keyword is needed
proc_type: Union[Process, CAProcess] = Process, | |
proc_type: type[Process] = Process, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this, since only ActionWorker
really expects to work with CAProcess, the base Worker
does not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet, ya I was slightly abusing the notation to show the users what types of things to pass here but from an OOP standpoint and generally you are totally correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts, I think this is largely good as well.
I do think there is more to discuss w.r.t. how work is done, particularly when it comes to encapsulating the work functions. Josh and I sat on a call and spent a good amount of time struggling to debug things, which might suggest there are better ways about it.
The summary of my shower thought is that we could try to make this more event-driven, passing messages between sub-processes via multiprocessing.Pipe
. This aligns with the general guidance from python to avoid sharing state when possible. If we can avoid having to manage Locks entirely, perhaps our lives will be made easier in the future. I envisioned an alternate formulation for work functions looking something like
def work_func(conn: multiprocessing.connection.Connection):
value = 0
do_work = True
while do_work or value < 100:
if conn.poll(): # returns true if there is data to receive
# receive message and process it maybe, here I'm assuming it's just a bool
do_work = conn.recv()
# could also respond to requests for status here maybe?
if value < 100: # the rest of the work
value += 10
conn.send((value, status))
maybe I'm just too event-pilled for my own good
@@ -6,7 +6,7 @@ | |||
|
|||
class CheckAndDo(py_trees.composites.Selector): | |||
def __init__(self, name: str, check: ConditionNode, do: ActionNode) -> None: | |||
super().__init__(name, memory=True) | |||
super().__init__(name, memory=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own reference: https://py-trees.readthedocs.io/en/devel/composites.html
if RUNNING on the previous tick, resume with the RUNNING child
I believe what Zach has concluded here is correct.
beams/sequencer/helpers/Worker.py
Outdated
print(self.work_func) | ||
# breakpoint() | ||
self.work_proc.start() | ||
logging.info(f"Starting work on: {self.work_proc.pid}") | ||
logging.debug(f"Starting work on: {self.work_proc.pid}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a widespread thing we'll have to clean up (#19)
@@ -55,6 +69,4 @@ def work_func(self): | |||
|
|||
def set_work_func(self, work_func): | |||
self.work_func = work_func |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of bad / confusing overloading. We define Worker.work_func
as a method of Worker
, but then overwrite it with a function that is passed in at init. Furthermore this function can have any arbitrary signature, depending on the purpose of the worker.
We need to choose our approach here. We're either:
- subclassing
Worker
for new behaviors, and making sure thatWorker.work_func
adheres to the signature / interface defined above - accepting any arbitrary function, storing it, and sending it off to a
multiprocessing.Process
etc.
We appear to be doing a mix of both in this codebase. (Sequencer
overloads Worker.work_func
, and ActionWorker
overwrites it with a function)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, especially with what you taught me about bound vs passed (?) class methods. Removing this functionality may be best, I wanted to leave it flexible till I found what seemed preferable.
It seems we have agreed upon not using bound class members (means we have to pass self as an argument for spawned Processes) as it patters well to utilizing the base Worker class to spawn our work threads (and therefore concretize feature adds / debugging in spawned processes)
beams/sequencer/helpers/Worker.py
Outdated
logging.error("Already working, not starting work") | ||
return | ||
self.do_work.value = True | ||
print(self.work_func) | ||
# breakpoint() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# breakpoint() |
an orphaned breakpoint 😢
beams/behavior_tree/ActionWorker.py
Outdated
proc_type=CAProcess, | ||
add_args=(comp_cond, volatile_status)) | ||
self.comp_condition = comp_cond | ||
self.__volatile_status__ = volatile_status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that the double-underscore is largely unnecessary here, in python it implies a special method or attribute that python provides. If you intended it to be private, the best way to hint that is with a single leading underscore. (Though nothing is truly private in python)
Correct me if I'm wrong Josh, but the VolatileStatus
is just a way to communicate the status of the worker process to the main process. It's a multiprocessing-safe enum.
I've been pondering whether or not this is really necessary. Could we not just set up a multiprocessing.PIPE
and pass primitive values that we cast into enums? They may still be enums on either side of the pipe, but just passing the value prevents us from having to jump through all these hoops
beams/sequencer/helpers/Worker.py
Outdated
proc_name: str, | ||
stop_func: Optional[Callable[[None], None]] = None, | ||
work_func: Optional[Callable[[Any], None]] = None, | ||
proc_type: Union[Process, CAProcess] = Process, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this, since only ActionWorker
really expects to work with CAProcess, the base Worker
does not
beams/sequencer/helpers/Worker.py
Outdated
self.stop_func = stop_func | ||
|
||
def start_work(self): | ||
if self.do_work.value: | ||
if (self.do_work.value): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you find you needed the parens here? They shouldn't be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c habits die hard
def work_func(self): | ||
while self.do_work.value or self.val.value < 100: | ||
if self.val.value < 100: # Note: value captured via closure | ||
time.sleep(0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note from a call Josh and I sat on for a while, this sleep is necessary. Without this, the loop runs fast enough to terminate and have trouble acquiring the lock (I think)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was somewhat surprised to find this to be the case in python. It is strictly necessary in a language like C in which if you were to go full bore in a loop acquiring a Lock, there would not be CPU cycles for another, lower or equiv priority thread to acquire the same lock
On one hand you're right it took both of us a few hours to iron out. To be fair the largest difficulty I had was understanding the notion of bound class members vs passed class members. The locking stuff became fairly trivial after that, especially after learning python doesnt like full bore acquisition of locks. I am not sure I agree with what this work function achieves but I see the general intent. We would have to send the "volatile status" on the socket and the update method of the action node would have to recv it. There are some nuances there where I would use an Event() variable to signal that data is available on the socket to make sure we ack every write (i.e. are not checking stale data on either end) and more critically do not block in the update method. I do not think this maps to my notion of event driven. In which we bind functions to signals handled by an event queue. I hope I am not coming across as overly pedantic or portraying this as overly complicated these are bog standard trade offs in systems design; the hard thing is getting it right for the right reasons then it becomes a game of abstraction. Maybe pythonically a Queue of size 1 would be the best method to do IPC I am not sure. My goal was simplicity because these things can and will be abused especially in a language as permissive as python. If infrastructurally we limit what we can pass between these Work Proceses to the strictly necessary set (RUNNING, SUCCEED, FAILURE) I think we set ourselves up for long term success. Sounds like a great discussion for Wednesday. Its fairly critical we align on this; the good news is this schema works. I would argue we should run with until we find a breaking case |
A lot of good discussion. At the last beams meeting we determined the implementation details decided in this PR are sufficient to carry us on the order of many months and should this application experience more success, development hours we can revisit some of the decisions made here. Attached are slides generated to reflect the various trade offs and calculations made to settle on these implementation details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On to the next
The base
Worker
class was previously implemented to wrapmultiprocessing.Process
such that we could standardize process spawning and management mechanisms and propagate those changes efficiently through the codebase.With this in mind it made sense for our ActionNodes which inherently utilize Process's to build off of this functionality.
Description
We made a subclass of
Worker
calledActionNodeWorker
which extends the Worker base class to hold the necessary interprocess communication elements needed by any ActionNode in relation to its spawned processes namely:multiprocessing.Value
which wraps an enum to communcate BT status up to parent nodeCallable[[None], bool]
1 which determines work function completionThis has a two fold effect:
Motivation and Context
How Has This Been Tested?
Unit tests (run better now even)
Where Has This Been Documented?
Slides
Todo:
In a PR based of this one we will take advantage of the "more than one shot" trees which will take full advantage of the reactivity of BTs. Two things are needed:
I am leaving these as future TODOs to keep this PR readable and make pace of dev apparent.
Pre-merge checklist
docs/pre-release-notes.sh
and created a pre-release documentation pageFootnotes
for full transparency this is captured by closure in the
ConditionItem
class which is partly why its valuable to pass an argument to the spawned process by the CAProcess command ↩