Skip to content

Commit

Permalink
Merge branch 'main' into logging_hierarchy
Browse files Browse the repository at this point in the history
  • Loading branch information
SYangster authored Dec 4, 2024
2 parents 7a187e1 + c6f2521 commit 2a01b97
Show file tree
Hide file tree
Showing 23 changed files with 388 additions and 177 deletions.
2 changes: 1 addition & 1 deletion nvflare/apis/event_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ class EventType(object):

AUTHORIZE_COMMAND_CHECK = "_authorize_command_check"
BEFORE_BUILD_COMPONENT = "_before_build_component"
GET_JOB_LAUNCHER = "_get_job_launcher"
BEFORE_JOB_LAUNCH = "_before_job_launch"
51 changes: 3 additions & 48 deletions nvflare/app_common/job_launcher/client_process_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys

from nvflare.apis.fl_constant import FLContextKey, JobConstants
from nvflare.apis.workspace import Workspace
from nvflare.app_common.job_launcher.process_launcher import ProcessJobLauncher
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path
from nvflare.utils.job_launcher_utils import generate_client_command


class ClientProcessJobLauncher(ProcessJobLauncher):
def get_command(self, launch_data, fl_ctx) -> (str, dict):
new_env = os.environ.copy()
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = launch_data.get(JobConstants.JOB_ID)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t
command = (
f"{sys.executable} -m nvflare.private.fed.app.client.worker_process -m "
+ args.workspace
+ " -w "
+ (workspace_obj.get_startup_kit_dir())
+ " -t "
+ client.token
+ " -d "
+ client.ssid
+ " -n "
+ job_id
+ " -c "
+ client.client_name
+ " -p "
+ str(client.cell.get_internal_listener_url())
+ " -g "
+ service.get("target")
+ " -scheme "
+ service.get("scheme", "grpc")
+ " -s fed_client.json "
" --set" + command_options + " print_conf=True"
)
return command, new_env
def get_command(self, job_meta, fl_ctx) -> str:
return generate_client_command(job_meta, fl_ctx)
22 changes: 15 additions & 7 deletions nvflare/app_common/job_launcher/process_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
from abc import abstractmethod

from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import FLContextKey
from nvflare.apis.fl_constant import FLContextKey, JobConstants
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_launcher_spec import JobHandleSpec, JobLauncherSpec, JobReturnCode, add_launcher
from nvflare.private.fed.utils.fed_utils import extract_job_image
from nvflare.apis.workspace import Workspace
from nvflare.utils.job_launcher_utils import add_custom_dir_to_path, extract_job_image

JOB_RETURN_CODE_MAPPING = {0: JobReturnCode.SUCCESS, 1: JobReturnCode.EXECUTION_ERROR, 9: JobReturnCode.ABORTED}

Expand Down Expand Up @@ -62,7 +63,14 @@ def __init__(self):

def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:

command, new_env = self.get_command(job_meta, fl_ctx)
new_env = os.environ.copy()
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
job_id = job_meta.get(JobConstants.JOB_ID)
app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command = self.get_command(job_meta, fl_ctx)
# use os.setsid to create new process group ID
process = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env)

Expand All @@ -71,22 +79,22 @@ def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:
return ProcessHandle(process)

def handle_event(self, event_type: str, fl_ctx: FLContext):
if event_type == EventType.GET_JOB_LAUNCHER:
if event_type == EventType.BEFORE_JOB_LAUNCH:
job_meta = fl_ctx.get_prop(FLContextKey.JOB_META)
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
if not job_image:
add_launcher(self, fl_ctx)

@abstractmethod
def get_command(self, launch_data, fl_ctx) -> (str, dict):
def get_command(self, job_meta, fl_ctx) -> str:
"""To generate the command to launcher the job in sub-process
Args:
fl_ctx: FLContext
launch_data: job launcher data
job_meta: job meta data
Returns:
launch command, environment dict
launch command
"""
pass
56 changes: 3 additions & 53 deletions nvflare/app_common/job_launcher/server_process_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,61 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys

from nvflare.apis.fl_constant import FLContextKey, JobConstants
from nvflare.apis.workspace import Workspace
from nvflare.app_common.job_launcher.process_launcher import ProcessJobLauncher
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path
from nvflare.utils.job_launcher_utils import generate_server_command


class ServerProcessJobLauncher(ProcessJobLauncher):
def get_command(self, launch_data, fl_ctx) -> (str, dict):
new_env = os.environ.copy()

workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
server = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = launch_data.get(JobConstants.JOB_ID)
restore_snapshot = fl_ctx.get_prop(FLContextKey.SNAPSHOT, False)

app_root = workspace_obj.get_app_dir(job_id)
cell = server.cell
server_state = server.server_state

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t

command = (
sys.executable
+ " -m nvflare.private.fed.app.server.runner_process -m "
+ args.workspace
+ " -s fed_server.json -r "
+ app_root
+ " -n "
+ str(job_id)
+ " -p "
+ str(cell.get_internal_listener_url())
+ " -u "
+ str(cell.get_root_url_for_child())
+ " --host "
+ str(server_state.host)
+ " --port "
+ str(server_state.service_port)
+ " --ssid "
+ str(server_state.ssid)
+ " --ha_mode "
+ str(server.ha_mode)
+ " --set"
+ command_options
+ " print_conf=True restore_snapshot="
+ str(restore_snapshot)
)

return command, new_env
def get_command(self, job_meta, fl_ctx) -> str:
return generate_server_command(job_meta, fl_ctx)
2 changes: 1 addition & 1 deletion nvflare/app_common/launchers/subprocess_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from nvflare.apis.signal import Signal
from nvflare.app_common.abstract.launcher import Launcher, LauncherRunStatus
from nvflare.fuel.utils.log_utils import get_obj_logger
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path
from nvflare.utils.job_launcher_utils import add_custom_dir_to_path


def log_subprocess_output(process, logger):
Expand Down
195 changes: 195 additions & 0 deletions nvflare/app_opt/job_launcher/docker_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import time
from abc import abstractmethod

import docker
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import FLContextKey, JobConstants
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_launcher_spec import JobHandleSpec, JobLauncherSpec, JobReturnCode, add_launcher
from nvflare.apis.workspace import Workspace
from nvflare.utils.job_launcher_utils import extract_job_image, generate_client_command, generate_server_command


class DOCKER_STATE:
CREATED = "created"
RESTARTING = "restarting"
RUNNING = "running"
PAUSED = "paused"
EXITED = "exited"
DEAD = "dead"


JOB_RETURN_CODE_MAPPING = {
DOCKER_STATE.CREATED: JobReturnCode.UNKNOWN,
DOCKER_STATE.RESTARTING: JobReturnCode.UNKNOWN,
DOCKER_STATE.RUNNING: JobReturnCode.UNKNOWN,
DOCKER_STATE.PAUSED: JobReturnCode.UNKNOWN,
DOCKER_STATE.EXITED: JobReturnCode.SUCCESS,
DOCKER_STATE.DEAD: JobReturnCode.ABORTED,
}


class DockerJobHandle(JobHandleSpec):

def __init__(self, container, timeout=None):
super().__init__()

self.container = container
self.timeout = timeout
self.logger = logging.getLogger(self.__class__.__name__)

def terminate(self):
if self.container:
self.container.stop()

def poll(self):
container = self._get_container()
if container:
if container.status in [DOCKER_STATE.EXITED, DOCKER_STATE.DEAD]:
container.remove(force=True)
self.logger.debug(f"docker completes state: {container.status}")
return JOB_RETURN_CODE_MAPPING.get(container.status, JobReturnCode.UNKNOWN)

def wait(self):
if self.container:
self.enter_states([DOCKER_STATE.EXITED, DOCKER_STATE.DEAD], self.timeout)

def _get_container(self):
try:
docker_client = docker.from_env()
# Get the container object
container = docker_client.containers.get(self.container.id)
# Get the container state
# state = container.attrs['State']
return container
except:
return None

def enter_states(self, job_states_to_enter: list, timeout=None):
starting_time = time.time()
if not isinstance(job_states_to_enter, (list, tuple)):
job_states_to_enter = [job_states_to_enter]
while True:
container = self._get_container()
if container:
self.logger.debug(f"container state: {container.status}, job states to enter: {job_states_to_enter}")
if container.status in job_states_to_enter:
return True
elif timeout is not None and time.time() - starting_time > timeout:
return False
time.sleep(1)
else:
return False


class DockerJobLauncher(JobLauncherSpec):
def __init__(self, mount_path: str = "/workspace", network: str = "nvflare-network", timeout=None):
super().__init__()

self.mount_path = mount_path
self.network = network
self.timeout = timeout

def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:
self.logger.debug("DockerJobLauncher start to launch job")
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())

workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
job_id = job_meta.get(JobConstants.JOB_ID)
app_custom_folder = workspace_obj.get_app_custom_dir(job_id)

python_path = f"{app_custom_folder}:$PYTHONPATH" if app_custom_folder != "" else "$PYTHONPATH"
job_name, cmd = self.get_command(job_meta, fl_ctx)
command = f' /bin/bash -c "export PYTHONPATH={python_path};{cmd}"'
self.logger.info(f"Launch image:{job_image}, run command: {command}")

docker_workspace = os.environ.get("NVFL_DOCKER_WORKSPACE")
self.logger.info(f"launch_job {job_id} in docker_workspace: {docker_workspace}")
docker_client = docker.from_env()
try:
container = docker_client.containers.run(
job_image,
command=command,
name=job_name,
network=self.network,
detach=True,
# remove=True,
volumes={
docker_workspace: {
"bind": self.mount_path,
"mode": "rw",
},
},
# ports=ports, # Map container ports to host ports (optional)
)
self.logger.info(f"Launch the job in DockerJobLauncher using image: {job_image}")

handle = DockerJobHandle(container)
try:
if handle.enter_states([DOCKER_STATE.RUNNING], timeout=self.timeout):
return handle
else:
handle.terminate()
return None
except:
handle.terminate()
return None

except docker.errors.ImageNotFound:
self.logger.error(f"Failed to launcher job: {job_id} in DockerJobLauncher. Image '{job_image}' not found.")
return None
except docker.errors.APIError as e:
self.logger.error(f"Error starting container: {e}")
return None

def handle_event(self, event_type: str, fl_ctx: FLContext):
if event_type == EventType.BEFORE_JOB_LAUNCH:
job_meta = fl_ctx.get_prop(FLContextKey.JOB_META)
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
if job_image:
add_launcher(self, fl_ctx)

@abstractmethod
def get_command(self, job_meta, fl_ctx) -> (str, str):
"""To generate the command to launcher the job in sub-process
Args:
fl_ctx: FLContext
job_meta: job launcher data
Returns:
(container name, launch command)
"""
pass


class ClientDockerJobLauncher(DockerJobLauncher):
def get_command(self, job_meta, fl_ctx) -> (str, str):
job_id = job_meta.get(JobConstants.JOB_ID)
command = generate_client_command(job_meta, fl_ctx)

return f"client-{job_id}", command


class ServerDockerJobLauncher(DockerJobLauncher):
def get_command(self, job_meta, fl_ctx) -> (str, str):
job_id = job_meta.get(JobConstants.JOB_ID)
command = generate_server_command(job_meta, fl_ctx)

return f"server-{job_id}", command
Loading

0 comments on commit 2a01b97

Please sign in to comment.