Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance launcher executor #2744

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def _init_converter(self, fl_ctx: FLContext):
def _initialize_external_execution(
self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal
) -> bool:
self.reset_peer_is_up_or_dead()
with self._lock:
self._abort_signal = abort_signal
self._current_task = task_name
Expand All @@ -242,13 +243,15 @@ def _initialize_external_execution(
abort_signal.trigger("launch task failed")
return False

self.log_info(fl_ctx, f"External execution for task ({task_name}) is launched.")
self.log_info(fl_ctx, f"Launcher successfully launched task ({task_name}).")
# wait for external execution to set up their pipe_handler
setup_success = self._wait_external_setup(task_name, fl_ctx, abort_signal)
if not setup_success:
self.log_error(fl_ctx, "External execution set up failed.")
abort_signal.trigger("External execution set up failed.")
error = f"Failed external setup for task ({task_name})."
self.log_error(fl_ctx, error)
abort_signal.trigger(error)
return False
self.log_info(fl_ctx, f"External setup for task ({task_name}) succeeded.")
return True

def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs) -> Any:
Expand Down
3 changes: 3 additions & 0 deletions nvflare/app_common/executors/task_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ def ask_peer_to_end(self, fl_ctx: FLContext) -> bool:
def peer_is_up_or_dead(self) -> bool:
return self.pipe_handler.peer_is_up_or_dead.is_set()

def reset_peer_is_up_or_dead(self):
self.pipe_handler.peer_is_up_or_dead.clear()

def pause_pipe_handler(self):
"""Stops pipe_handler heartbeat."""
self.pipe_handler.pause()
Expand Down
Loading