Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] cleanup zmq ipc sockets on exit #11115

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import atexit
import importlib
import inspect
import multiprocessing
Expand Down Expand Up @@ -196,6 +197,14 @@ async def build_async_engine_client_from_engine_args(
assert engine_pid is not None, "Engine process failed to start."
logger.info("Started engine process with PID %d", engine_pid)

def _cleanup_ipc_path():
socket_path = ipc_path.replace("ipc://", "")
if os.path.exists(socket_path):
os.remove(socket_path)

# Ensure we clean up the local IPC socket file on exit.
atexit.register(_cleanup_ipc_path)

# Build RPCClient, which conforms to EngineClient Protocol.
engine_config = engine_args.create_engine_config()
build_client = partial(MQLLMEngineClient, ipc_path, engine_config,
Expand Down
16 changes: 14 additions & 2 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import signal
import threading
import time
from dataclasses import dataclass
from multiprocessing.process import BaseProcess
from typing import List, Tuple, Type, Union

Expand Down Expand Up @@ -119,6 +120,14 @@ def profile(self, is_start=True):
self.model_executor.profile(is_start)


@dataclass
class EngineCoreProcHandle:
proc: BaseProcess
ready_path: str
input_path: str
output_path: str


class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process."""

Expand Down Expand Up @@ -190,7 +199,7 @@ def make_engine_core_process(
input_path: str,
output_path: str,
ready_path: str,
) -> BaseProcess:
) -> EngineCoreProcHandle:
# The current process might have CUDA context,
# so we need to spawn a new process.
# NOTE(rob): this is a problem for using EngineCoreProc w/
Expand All @@ -212,7 +221,10 @@ def make_engine_core_process(

# Wait for startup
EngineCoreProc.wait_for_startup(proc, ready_path)
return proc
return EngineCoreProcHandle(proc=proc,
ready_path=ready_path,
input_path=input_path,
output_path=output_path)

@staticmethod
def run_engine_core(*args, **kwargs):
Expand Down
28 changes: 20 additions & 8 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
import os
from typing import List, Union

import msgspec
Expand Down Expand Up @@ -148,7 +149,7 @@ def __init__(
self.input_socket.bind(input_path)

# Start EngineCore in background process.
self.proc = EngineCoreProc.make_engine_core_process(
self.proc_handle = EngineCoreProc.make_engine_core_process(
*args,
input_path=input_path,
output_path=output_path,
Expand All @@ -161,13 +162,24 @@ def shutdown(self):
# Shut down the zmq context.
self.ctx.destroy(linger=0)

# Shutdown the process if needed.
if hasattr(self, "proc") and self.proc.is_alive():
self.proc.terminate()
self.proc.join(5)

if self.proc.is_alive():
kill_process_tree(self.proc.pid)
if hasattr(self, "proc_handle"):
# Shutdown the process if needed.
if self.proc_handle.proc.is_alive():
self.proc_handle.proc.terminate()
self.proc_handle.proc.join(5)

if self.proc_handle.proc.is_alive():
kill_process_tree(self.proc_handle.proc.pid)

# Remove zmq ipc socket files
ipc_sockets = [
self.proc_handle.ready_path, self.proc_handle.output_path,
self.proc_handle.input_path
]
for ipc_socket in ipc_sockets:
socket_file = ipc_socket.replace("ipc://", "")
if os.path.exists(socket_file):
os.remove(socket_file)

def __del__(self):
self.shutdown()
Expand Down
21 changes: 14 additions & 7 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,23 @@ def wait_for_termination(procs, timeout):

# Send SIGTERM if still running
active_procs = [w.proc for w in self.workers if w.proc.is_alive()]
self.workers = None
for p in active_procs:
p.terminate()
if wait_for_termination(active_procs, 4):
return
if not wait_for_termination(active_procs, 4):
# Send SIGKILL if still running
active_procs = [p for p in active_procs if p.is_alive()]
for p in active_procs:
p.kill()

# Send SIGKILL if still running
active_procs = [p for p in active_procs if p.is_alive()]
for p in active_procs:
p.kill()
self._cleanup_sockets()
Copy link
Collaborator

@tlrmchlsmth tlrmchlsmth Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we clean up the sockets before we terminate the workers, in case the client process gets impatient and sends a SIGKILL to this one while this process waits for the workers to gracefully terminate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaning up before terminating workers is more likely to fail because the workers may still have the socket open.

I think anywhere we use SIGKILL is giving up the illusion of proper cleanup. Hopefully that's rare?

self.workers = None

def _cleanup_sockets(self):
for w in self.workers:
# Remove the zmq ipc socket file
socket_path = w.ready_path.replace("ipc://", "")
if os.path.exists(socket_path):
os.remove(socket_path)

def shutdown(self):
"""Properly shut down the executor and its workers"""
Expand Down
Loading