-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH/REF: Enable logging through multiprocessing, refactor node work signatures #41
Conversation
…ing from multiprocessing.Process
…n.Status, normalize logging in ActionNode/ConditionNode, adjust affected tests
… more explicit and clear about what these work functions require as arguments
Asking for preliminary reviews as I vomit words into the PR description (Please unfold the details sections I spent a lot of time on them) |
Hmm this looks great and I see tests are passing on CI. However on my machine test_father_tree_execution fails only when the two other tests in the file run. If we recall we had this bug before which is strange. I will try debug. |
I want to recall that being a caproto bug? (Or something related to how we're running the caproto IOCs in the suite?) Something about IOCs not being cleaned up? Both on CI here and on my local I can run the full test suite without problems, strange 🤔 |
beams/behavior_tree/ActionNode.py
Outdated
while not self.completion_condition(): | ||
logger.debug(f"CALLING CAGET FROM from node ({self.name})") | ||
status = self.work_func(self.completion_condition) | ||
volatile_status.set_value(status) | ||
logger.debug(f"Setting node ({self.name}): {volatile_status.get_value()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This while loop has no failure escape condition. It only ends if we error out and bounce out of the function entirely or if the completion condition succeeds. Perhaps we should be expecting to catch an exception? Perhaps we should have a built-in timeout at this level?
This also means that the volatile_status
can never be permenently set to a failure state as-written, since we'll never leave the loop without completion_condition being True
and resetting the volatile_status to a success state below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're absolutely right, though this has been the reality from before this PR. Josh's implementation of multi-shot trees provides one escape hatch for this, but I think breaking on exception is a good stop-gap here. I have some difficulty thinking of a generally suitable timeout / iteration count limit.
For an action node, I think it does make sense to keep trying until we succeed. Maybe @joshc-slac can chime in here as to what would be appropriate, whether we address this here or let the multi-shot tree handle this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other thing to consider is whether or not we could rely on the work function itself to update the status and end the loop based on the status, I'm sure there are other ways to think about this though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZLLentz thanks for the super valuable diligence. To those points
- I want to add the ability for all these nodes to timeout. This PR is a great step to enabling that in a single location. I think there may be other more cases in which we want to interrupt the while. Robert is right that Determine if and move work functions from the serialization document tree_config.py #40 allows program termination to clean up these, I might table that part of the discussion for Wednesday
- @tangkong thanks to Zach's careful glance I too am realizing this would be a great place to implement benchmarking surrounding the work function. We can leave as a TODO, but would be neat to return how long the actual work of this action node took
- To Zach's final point.... I could be convinced both ways, I am erring on agreeing with Robert's implementation here mainly for purposes of keeping IPC things at this layer. You're suggestion is elegant as well though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I'll just add an exception catch and we can expand on the behavior in another PR. I want to keep the contributions here as logging-focused as possible, I've submitted too many multi-faceted PRs haha
beams/behavior_tree/ActionNode.py
Outdated
work_func=self.work_wrapper, | ||
comp_cond=completion_condition, | ||
stop_func=None | ||
) # TODO: some standard notion of stop function could be valuable | ||
self.logger.debug("%s.__init__()" % (self.__class__.__name__)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some inconsistencies here where sometimes we use self.logger
and other times we use logger
. Is there some intuitive rule for when to use each one? Does there need to be some naming changes to make this clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops I forgot I also need to switch out self.logger. self.logger
invokes py_tree's logging facilities, which I haven't addressed in this PR. That should definitely be a followup.
I advocate for using logger = logging.getLogger(__name__)
(which will give you a child logger of the beams
logger throughout our library code.
""" | ||
log_configurer(log_queue) | ||
logger.debug(f"WAITING FOR INIT from node: {self.name}") | ||
self.work_gate.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is self
meant to be accessed from within the work_wrapper
? Is this running in another process? How is the data shared?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We pass work_wrapper
through the multiprocessing.Process
, in which self
will not be the same as the ActionNode
in the main process. it's possible here that because work_gate is a process-safe Event
that accessing it here and sharing it is ok. It is probably more clear for the work_gate to be passed through as an arg in the ActionWorker
as you note.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll spend some time next week (or this weekend if I get bored) getting a better understanding of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did understand this correctly: when we pass an instance method to the process, the instance gets pickled and passed along with it. We can make no guarantees about any non-process-safe member of that instance, but the Value
/Event
/Gate
objects will be synchronized across processes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know!
|
||
import yaml | ||
|
||
LOGGER_QUEUE = mp.Queue(-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find a documentation reference to what -1
as maxsize
does. What does it do? The maximum finite max size maybe? The docs I can find indicate that leaving this unset is how you get an unbounded queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does give you an infinite size queue (closest docs are here https://docs.python.org/3/library/queue.html#queue.Queue, from the implication that mp.Queue
implements most methods of queue.Queue
)
Using -1 here should be equivalent to setting maxsize=0
or leaving it unset
Queue that passes logging records between processes | ||
""" | ||
h = QueueHandler(queue) # Just the one handler needed | ||
root = logging.getLogger("beams") # root logger for beams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this means that workers will only log messages that originate from beams itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is correct (as in this is the intention of the code written here)
The logging config actually currently assigns both handlers to the root logger as well, which I did not initially intend. If we want to we can make this the root logger and just pass through all logging messages we receive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think general loggers can be noisy enough that this makes sense as-is. Maybe we can consider a more verbose mode in the future if needed.
LOGGER_THREAD = threading.Thread( | ||
target=logger_thread, args=(LOGGER_QUEUE,), daemon=True | ||
) | ||
LOGGER_THREAD.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the main thread also send log messages to the queue, or does the main thread handle these differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main thread logs messages normally, without the QueueHandler
. The QueueHandler
is only necessary to liberate worker logs from their multiprocessing prison.
Here normally just means there are no additional handlers intercepting the log message before it gets processed by the handlers we define on logging.yml
for file and stream logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got brain-crossed between threads and multiprocessing again, this is a good way to do it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked a lot of questions but I like the approach you've taken to logging here
Going over this with a fine comb to understand why my tests are failing locally (again locally only in the old caproto failure way where they pass when only one caproto test is run) may indicate some weirdness with caproto or more concerningly some weirdness with the library such that it is hardware dependent... Pedantic other findings from that here:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna leave this here before I forget because I like the logging
In response @joshc-slac 's more specific addition requests, I think there's a lot of places where we can flesh out logging messages. I might rather tackle those in a separate PR or in PRs that touch that code specifically. Mostly because I think it's a slippery slope toward making the diff in this PR cover the entire codebase. I do agree with all of the suggestions, I just want to keep this PR focused. 🙏 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great. I'm going to stop driving myself insane about this caproto pytest issue on my end.
Update: I'm silly, I had a rouge caproto process from a test id ctrl c'ed out of. This also would have been resolved if I rebooted my machine more frequently. Warning for future silly folks.
More importantly great logging PR thanks!
Update update this does seem real, but we can move forward and track in #42
# TODO: we may want to decorate work func so it prints proc id... | ||
if (work_func is None): | ||
self.work_proc = proc_type(target=self.work_func, name=self.proc_name) | ||
else: | ||
self.work_func = work_func | ||
self.work_proc = proc_type(target=self.work_func, name=self.proc_name, args=(self, *add_args,)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay not passing self here any more somewhat fundamentally breaks #38.
The point was to get access to the self.do_work Value. This is fine for this PR and 38 is now rebased on top of this PR. It may come back or we may pass that one value more intentionally...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I fixed on a new branch based from this PR. This now closes #38
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in a similar vein to a thread from earlier in this PR. I did expect that this PR and #38 would conflict a bit, particularly in this refactor. I'm leaning toward not accessing any self
member in the functions we pass through to Process-es, rather passing the process-safe events/values/etc in explicitly. Naming an argument "self" and passing a different object into that argument is particularly confusing from a python-convention point of view.
But what I really liked from this refactor was the cleaner work_func
signature, so I could be convinced either way w.r.t
Thanks for the reviews everyone! Merging! |
Description
Adds logging framework to allow workers in multiprocessing.Process to log to a central logger
Refactors ActionNode work function signatures to expect a
py_trees.common.Status
returned. This allows us to do common nitty gritty work and not expose that to the user.Adjusts tests that are affected by these changes. Also makes tests more pytest-y
Removes ability for
Worker
to "bind" methods to itself (this never worked properly in the first place)Example Output
`pytest test_check_and_do.py -s`
An external script
The Script
The Output
Example file outputs
Motivation and Context
More info to come here... As well as how to's on actually enabling logging
Logging through multiprocessing
Prepare for a bit of a diatribe, as I don't want my struggles to be confined to my own notes forever.
Logging naturally works well within a single python process, and normal
threading.Threads
. Once we start wanting to log events in spawnedmultiprocessing.Process
, we start to worry about serializing access to a single file/resource/stream across multiple python processes. Here in BEAMS we'll likely want to log to multiple locations, not just to the console but to files / GUI outputs / etc.The simplest way to do this is pass logging records through a
multiprocessing.Queue
, and have the central logging process process those records as they're received. This logging process could also be in a separatemultiprocessing.Process
, but placing it in a daemon thread lets us start it and forget about it.Sources:
https://docs.python.org/3/howto/logging-cookbook.html#logging-from-multiple-threads
https://github.com/pcdshub/hutch-python/blob/master/hutch_python/log_setup.py
ActionNode Work function fragmentation
This necessitated some changes to how we structure the work functions used by ActionNode. We were doing a lot of dirty work boilerplate in the "work_func", and I thought it sensible to separate that dirty work from the "business logic". This resulted in:
work_func
that returns apy_trees.common.Status
: The "business logic"work_wrapper
that sets upNaming here could be improved, but that's an issue for a different effort
I'd argue that if and when the exact form of these
work_wrapper
functions might vary, those differences should be crystallized in subclasses ofActionNode
. This way the signatures are clear and documented (in code for now)I'd also argue that the
work_func
signature should be codified somewhere. The work wrapper currently expects it to take the Callablecompletion_condition
, but that's really a Check-and-Do specific formulation. We could potentially scope this so work func is always aCallable[[], Status]
The removal of `Worker.set_work_func`
Worker.set_work_func
used to assign a function toWorker.work_func
. There has historically been much confusion over "self", when it's needed, and when we have to provide it in additional arguments.Dynamically binding methods to class instances is bad practice to begin with. A class exists to capture a structure and consistent interface through with other parts of the program can interact. By dynamically changing this, we confuse ourselves and other developers.
We already use work_func in two ways.
I argue this third one should be removed, as the other ways of assigning work to the Worker are more than flexible enough
How Has This Been Tested?
Interactively, and through tests.
Test have been adjusted when needed.
Where Has This Been Documented?
This PR, more docstrings to come
Pre-merge checklist
docs/pre-release-notes.sh
and created a pre-release documentation page