Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: making doing work better #34

Merged
merged 7 commits into from
Sep 3, 2024
Merged

Conversation

joshc-slac
Copy link
Collaborator

@joshc-slac joshc-slac commented Aug 13, 2024

The base Worker class was previously implemented to wrap multiprocessing.Process such that we could standardize process spawning and management mechanisms and propagate those changes efficiently through the codebase.

With this in mind it made sense for our ActionNodes which inherently utilize Process's to build off of this functionality.

Description

We made a subclass of Worker called ActionNodeWorker which extends the Worker base class to hold the necessary interprocess communication elements needed by any ActionNode in relation to its spawned processes namely:

  • voltaile_status : a multiprocessing.Value which wraps an enum to communcate BT status up to parent node
  • completion_condition: the Callable[[None], bool]1 which determines work function completion

This has a two fold effect:

  • Ownership of IPC variables is concretized logically to the ActionNode class
  • IPC nitty gritty is abstracted from the user space of defining the work function and completion condition. Currently in tree_config.py

Motivation and Context

  • With the goal of having more than "one shot" trees we need a mechanism to make a while loop that lasts the lifecycle of the program but still terminates the Worker class gives us a mechanism to do so, we may as well use it
  • It is critical we standardized how work is spawned and managed lest we end up doing it differently all over the codebase which would make debugging and feature addition difficult.

How Has This Been Tested?

Unit tests (run better now even)

Where Has This Been Documented?

Slides

image

Todo:

In a PR based of this one we will take advantage of the "more than one shot" trees which will take full advantage of the reactivity of BTs. Two things are needed:

  • make current tests more than one shot capable
  • write tests that will only pass if they work in a more than one shot fashion. i.e. unset a PV after its been set

I am leaving these as future TODOs to keep this PR readable and make pace of dev apparent.

Pre-merge checklist

  • Code works interactively
  • Code follows the style guide
  • Code contains descriptive docstrings, including context and API
  • New/changed functions and methods are covered in the test suite where possible
  • Test suite passes locally
  • Test suite passes on GitHub Actions
  • Ran docs/pre-release-notes.sh and created a pre-release documentation page
  • Pre-release docs include context, functional descriptions, and contributors as appropriate

Footnotes

  1. for full transparency this is captured by closure in the ConditionItem class which is partly why its valuable to pass an argument to the spawned process by the CAProcess command

@joshc-slac joshc-slac force-pushed the joshc-slac/enh-worker-util branch from e932fc4 to b289496 Compare August 13, 2024 22:55
@joshc-slac
Copy link
Collaborator Author

Fixed up the merge conflicts. Waiting to push for a moment.

test_check_and_do.py had a trivial fix.
still working on test_worker.py which has a less than trivial fix, should be resolved soon

@joshc-slac joshc-slac force-pushed the joshc-slac/enh-worker-util branch from b289496 to 5f62106 Compare August 16, 2024 21:21
@joshc-slac joshc-slac force-pushed the joshc-slac/enh-worker-util branch from 5f62106 to c18ee2a Compare August 17, 2024 00:31
@joshc-slac joshc-slac marked this pull request as ready for review August 17, 2024 00:32
@joshc-slac joshc-slac requested review from tangkong and ZLLentz August 17, 2024 00:32
@joshc-slac
Copy link
Collaborator Author

Closes #28

@joshc-slac joshc-slac force-pushed the joshc-slac/enh-worker-util branch from 519649d to 31de5a9 Compare August 17, 2024 00:45
+ "%s.terminate()[%s->%s]"
% (self.__class__.__name__, self.status, new_status)
)
# TODO: serious introspection about that we want to do here.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally anti commented code being persisted in the code base but I think this serves as an adequate TODO. Termination is a py_tree step triggered by a composite node, we likely do want to take advantage of this function not for clean up as is commented here (but maybe if we go to multi shot Processes) but for logging

Copy link
Member

@ZLLentz ZLLentz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good change, I remember most of my comments being nitpicks or questions

completion_condition,
name: str,
work_func: Callable[[Any], None],
completion_condition: Callable[[Any], bool],
work_gate=Event(),
work_lock=Lock(),
**kwargs,
): # TODO: can add failure condition argument...
super(ActionNode, self).__init__(name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: this one of the old py2 syntaxes for super, it still works but it's wordier than the new way. I realize this is not part of the diff and that it matches the docs example but it caught my eye.

Suggested change
super(ActionNode, self).__init__(name)
super().__init__(name)

Comment on lines 88 to 98
# def terminate(self, new_status: py_trees.common.Status) -> None:
# """Nothing to clean up in this example."""
# print(f"TERMINATE CALLED ON {self.name}, pid: {os.getpid()}")
# if self.work_proc.is_alive():
# print(f"The process is still alive on {os.getpid()}")
# self.work_proc.terminate()
# self.logger.debug(
# py_trees.console.red
# + "%s.terminate()[%s->%s]"
# % (self.__class__.__name__, self.status, new_status)
# )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given your above comment, maybe we can put in a basic log message instead of wiping the function entirely? I'm not sure precisely which info is useful here.

Suggested change
# def terminate(self, new_status: py_trees.common.Status) -> None:
# """Nothing to clean up in this example."""
# print(f"TERMINATE CALLED ON {self.name}, pid: {os.getpid()}")
# if self.work_proc.is_alive():
# print(f"The process is still alive on {os.getpid()}")
# self.work_proc.terminate()
# self.logger.debug(
# py_trees.console.red
# + "%s.terminate()[%s->%s]"
# % (self.__class__.__name__, self.status, new_status)
# )
def terminate(self, new_status: py_trees.common.Status) -> None:
"""Nothing to clean up.""
logger.debug("%s terminated", self)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new file (and many of the old files) has camelcase naming which is against the pep8 recommendations
https://peps.python.org/pep-0008/#package-and-module-names

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great call out, I might make this a seprate trivial PR

proc_type=CAProcess,
add_args=(comp_cond, volatile_status))
self.comp_condition = comp_cond
self.__volatile_status__ = volatile_status
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is __volatile_status__ used for in workers? Does it do something already that I'm missing or is it planned for the future?

Why are these using the python double-underscore formatting wherever they appear?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the double-underscore is largely unnecessary here, in python it implies a special method or attribute that python provides. If you intended it to be private, the best way to hint that is with a single leading underscore. (Though nothing is truly private in python)

Correct me if I'm wrong Josh, but the VolatileStatus is just a way to communicate the status of the worker process to the main process. It's a multiprocessing-safe enum.

I've been pondering whether or not this is really necessary. Could we not just set up a multiprocessing.PIPE and pass primitive values that we cast into enums? They may still be enums on either side of the pipe, but just passing the value prevents us from having to jump through all these hoops

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does the functionality critical work of inter process communication in BTs. As Worker doesn't necessarily require IPC this ActionWorker class extends the Worker class for the subclass of things that will require IPC as they are part of trees.

We had this conversation once before. I totally agree other mechanisms can be used to facilitate IPC. Pythonically Queues and Pipes are multiprocessing safe but so to is Value. Because we are strictly passing one of three values completely encapsulated by an enum I found Value much more elegant than needing to call send and receive on a pipe or pop from a queue. Value just wraps a portion of shared memory which seemed more elegant at this level of dev space.

On our long call on Friday Robert and I sketched a similar idea that py_trees reach with a source of truth "database" in which all processes have access too; however, to prevent race conditions you would still have to share a Process safe lock between these workers which seems like doing the same thing as sharing a Value.

I wanted to indicate that future developers should only very intentionally mess with volatile_status with the dunders but I agree now that that is the incorrect way to signal that.

Totally open to Pipe's and Queue's but in this specific case (of just passing a wrapped int / enum) it seems more cumbersome than is worth. Again somewhat potato potato

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is important to communicate as long as these are seperate processes we need IPC safe mechanisms of passing information therefore it is strictly necessary to have something of this sort. There may be an async implementation in which these processes (or then called python threads) all happen on the same stack and can share information easier via the GIL. I have reasons I can expoind upon of why I don't love this implementation but it would wokr.

Copy link
Member

@ZLLentz ZLLentz Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original question is less conceptual and more that I don't understand (could not find) precisely where in the code this volatile status is used. Therefore, I either don't actually understand the code flow or there's some issue in the implementation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't be sorry its a good question. Worker needs to pass it to the constructor of Process. Worker owns the process; ActionNode which instantiates Worker by way of ActionWorker owns the IPC related variables namely here volatile status.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to hit approve but I maintain that everything still works if you remove this specific line of code since it will never be accessed this way again during execution

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah.... Sorry it took me till now to see your exact point. Its specifically the the ActionWorker does not need to maintain a reference to this "piece of shared memory"

From one perspective that is totally, totally correct. Lets not leave a loaded gun laying around (sorry the metaphor was too fun to avoid). However, there is also a world where ActionWorker defines stop_work functions for all work that gets packaged and sent off to Worker and it may be useful to have a reference to it here... Though an argument could be made that that should be grabbed by invoking super in some way I am not pythonically familiar with yet...

TLDR: you're right I can remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a guideline I read somewhere that PR's should never include pieces that "will be used later on"

I catch myself doing that all too often. It's definitely good to help keep diffs understandable

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agreed, i've had peers willing to die on the hill of commented out code should't live in commits either

@@ -6,7 +6,7 @@

class CheckAndDo(py_trees.composites.Selector):
def __init__(self, name: str, check: ConditionNode, do: ActionNode) -> None:
super().__init__(name, memory=True)
super().__init__(name, memory=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowledge check for me: does this make it so we always re-check the "check" side of the "check and do" on every tick even if the "do" is already running?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own reference: https://py-trees.readthedocs.io/en/devel/composites.html

if RUNNING on the previous tick, resume with the RUNNING child

I believe what Zach has concluded here is correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, I realized that error in this PR. We want the trees to err reactive. I.e. if a condition is no longer true or true we want to catch that even if executing another action.

We should call this out more explicitly / write unit tests to show the difference. This is a great call out

Comment on lines 38 to 41
print(self.work_func)
# breakpoint()
self.work_proc.start()
logging.info(f"Starting work on: {self.work_proc.pid}")
logging.debug(f"Starting work on: {self.work_proc.pid}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've got some mixing of logging and printing here, and this isn't in the multiprocess context yet so it kind of seems more messy than intentional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a widespread thing we'll have to clean up (#19)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes good call out. when debugging I kinda spam both as they tend to print differently, not good

proc_name: str,
stop_func: Optional[Callable[[None], None]] = None,
work_func: Optional[Callable[[Any], None]] = None,
proc_type: Union[Process, CAProcess] = Process,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny tiny type hinting nitpicks:

  • CAProcess is a subclass of Process, so the Union is unnecessary for the type hint
  • We pass in a type here rather than an object, so the type keyword is needed
Suggested change
proc_type: Union[Process, CAProcess] = Process,
proc_type: type[Process] = Process,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, since only ActionWorker really expects to work with CAProcess, the base Worker does not

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet, ya I was slightly abusing the notation to show the users what types of things to pass here but from an OOP standpoint and generally you are totally correct.

Copy link
Contributor

@tangkong tangkong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts, I think this is largely good as well.

I do think there is more to discuss w.r.t. how work is done, particularly when it comes to encapsulating the work functions. Josh and I sat on a call and spent a good amount of time struggling to debug things, which might suggest there are better ways about it.

The summary of my shower thought is that we could try to make this more event-driven, passing messages between sub-processes via multiprocessing.Pipe. This aligns with the general guidance from python to avoid sharing state when possible. If we can avoid having to manage Locks entirely, perhaps our lives will be made easier in the future. I envisioned an alternate formulation for work functions looking something like

def work_func(conn: multiprocessing.connection.Connection):
    value = 0
    do_work = True
    while do_work or value < 100:
        if conn.poll():   # returns true if there is data to receive
            # receive message and process it maybe, here I'm assuming it's just a bool
            do_work = conn.recv()
            # could also respond to requests for status here maybe?
        if value < 100:  # the rest of the work
            value += 10

    conn.send((value, status))

maybe I'm just too event-pilled for my own good

@@ -6,7 +6,7 @@

class CheckAndDo(py_trees.composites.Selector):
def __init__(self, name: str, check: ConditionNode, do: ActionNode) -> None:
super().__init__(name, memory=True)
super().__init__(name, memory=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own reference: https://py-trees.readthedocs.io/en/devel/composites.html

if RUNNING on the previous tick, resume with the RUNNING child

I believe what Zach has concluded here is correct.

Comment on lines 38 to 41
print(self.work_func)
# breakpoint()
self.work_proc.start()
logging.info(f"Starting work on: {self.work_proc.pid}")
logging.debug(f"Starting work on: {self.work_proc.pid}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a widespread thing we'll have to clean up (#19)

@@ -55,6 +69,4 @@ def work_func(self):

def set_work_func(self, work_func):
self.work_func = work_func
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of bad / confusing overloading. We define Worker.work_func as a method of Worker, but then overwrite it with a function that is passed in at init. Furthermore this function can have any arbitrary signature, depending on the purpose of the worker.

We need to choose our approach here. We're either:

  • subclassing Worker for new behaviors, and making sure that Worker.work_func adheres to the signature / interface defined above
  • accepting any arbitrary function, storing it, and sending it off to a multiprocessing.Process etc.

We appear to be doing a mix of both in this codebase. (Sequencer overloads Worker.work_func, and ActionWorker overwrites it with a function)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, especially with what you taught me about bound vs passed (?) class methods. Removing this functionality may be best, I wanted to leave it flexible till I found what seemed preferable.

It seems we have agreed upon not using bound class members (means we have to pass self as an argument for spawned Processes) as it patters well to utilizing the base Worker class to spawn our work threads (and therefore concretize feature adds / debugging in spawned processes)

logging.error("Already working, not starting work")
return
self.do_work.value = True
print(self.work_func)
# breakpoint()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# breakpoint()

an orphaned breakpoint 😢

proc_type=CAProcess,
add_args=(comp_cond, volatile_status))
self.comp_condition = comp_cond
self.__volatile_status__ = volatile_status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that the double-underscore is largely unnecessary here, in python it implies a special method or attribute that python provides. If you intended it to be private, the best way to hint that is with a single leading underscore. (Though nothing is truly private in python)

Correct me if I'm wrong Josh, but the VolatileStatus is just a way to communicate the status of the worker process to the main process. It's a multiprocessing-safe enum.

I've been pondering whether or not this is really necessary. Could we not just set up a multiprocessing.PIPE and pass primitive values that we cast into enums? They may still be enums on either side of the pipe, but just passing the value prevents us from having to jump through all these hoops

proc_name: str,
stop_func: Optional[Callable[[None], None]] = None,
work_func: Optional[Callable[[Any], None]] = None,
proc_type: Union[Process, CAProcess] = Process,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, since only ActionWorker really expects to work with CAProcess, the base Worker does not

self.stop_func = stop_func

def start_work(self):
if self.do_work.value:
if (self.do_work.value):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you find you needed the parens here? They shouldn't be necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c habits die hard

def work_func(self):
while self.do_work.value or self.val.value < 100:
if self.val.value < 100: # Note: value captured via closure
time.sleep(0.01)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note from a call Josh and I sat on for a while, this sleep is necessary. Without this, the loop runs fast enough to terminate and have trouble acquiring the lock (I think)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was somewhat surprised to find this to be the case in python. It is strictly necessary in a language like C in which if you were to go full bore in a loop acquiring a Lock, there would not be CPU cycles for another, lower or equiv priority thread to acquire the same lock

@joshc-slac
Copy link
Collaborator Author

joshc-slac commented Aug 20, 2024

Some thoughts, I think this is largely good as well.

I do think there is more to discuss w.r.t. how work is done, particularly when it comes to encapsulating the work functions. Josh and I sat on a call and spent a good amount of time struggling to debug things, which might suggest there are better ways about it.

The summary of my shower thought is that we could try to make this more event-driven, passing messages between sub-processes via multiprocessing.Pipe. This aligns with the general guidance from python to avoid sharing state when possible. If we can avoid having to manage Locks entirely, perhaps our lives will be made easier in the future. I envisioned an alternate formulation for work functions looking something like

def work_func(conn: multiprocessing.connection.Connection):
    value = 0
    do_work = True
    while do_work or value < 100:
        if conn.poll():   # returns true if there is data to receive
            # receive message and process it maybe, here I'm assuming it's just a bool
            do_work = conn.recv()
            # could also respond to requests for status here maybe?
        if value < 100:  # the rest of the work
            value += 10

    conn.send((value, status))

maybe I'm just too event-pilled for my own good

On one hand you're right it took both of us a few hours to iron out. To be fair the largest difficulty I had was understanding the notion of bound class members vs passed class members. The locking stuff became fairly trivial after that, especially after learning python doesnt like full bore acquisition of locks. I am not sure I agree with what this work function achieves but I see the general intent. We would have to send the "volatile status" on the socket and the update method of the action node would have to recv it. There are some nuances there where I would use an Event() variable to signal that data is available on the socket to make sure we ack every write (i.e. are not checking stale data on either end) and more critically do not block in the update method.

I do not think this maps to my notion of event driven. In which we bind functions to signals handled by an event queue. I hope I am not coming across as overly pedantic or portraying this as overly complicated these are bog standard trade offs in systems design; the hard thing is getting it right for the right reasons then it becomes a game of abstraction.

Maybe pythonically a Queue of size 1 would be the best method to do IPC I am not sure. My goal was simplicity because these things can and will be abused especially in a language as permissive as python. If infrastructurally we limit what we can pass between these Work Proceses to the strictly necessary set (RUNNING, SUCCEED, FAILURE) I think we set ourselves up for long term success.

Sounds like a great discussion for Wednesday. Its fairly critical we align on this; the good news is this schema works. I would argue we should run with until we find a breaking case

@joshc-slac
Copy link
Collaborator Author

A lot of good discussion. At the last beams meeting we determined the implementation details decided in this PR are sufficient to carry us on the order of many months and should this application experience more success, development hours we can revisit some of the decisions made here.

Attached are slides generated to reflect the various trade offs and calculations made to settle on these implementation details.

@joshc-slac joshc-slac requested review from ZLLentz and tangkong August 26, 2024 19:16
ZLLentz
ZLLentz previously approved these changes Aug 26, 2024
Copy link
Contributor

@tangkong tangkong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On to the next

@joshc-slac joshc-slac merged commit a50c0dd into master Sep 3, 2024
12 of 22 checks passed
@joshc-slac joshc-slac deleted the joshc-slac/enh-worker-util branch September 3, 2024 23:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants