diff --git a/boefjes/boefjes/api.py b/boefjes/boefjes/api.py index 6ba56b76ab4..8209a14f991 100644 --- a/boefjes/boefjes/api.py +++ b/boefjes/boefjes/api.py @@ -2,6 +2,7 @@ import multiprocessing from datetime import datetime, timezone from enum import Enum +from multiprocessing.context import ForkContext, ForkProcess from uuid import UUID import structlog @@ -23,9 +24,10 @@ app = FastAPI(title="Boefje API") logger = structlog.get_logger(__name__) +ctx: ForkContext = multiprocessing.get_context("fork") -class UvicornServer(multiprocessing.Process): +class UvicornServer(ForkProcess): def __init__(self, config: Config): super().__init__() self.server = Server(config=config) diff --git a/boefjes/boefjes/app.py b/boefjes/boefjes/app.py index 2b671f0f642..9514c81b638 100644 --- a/boefjes/boefjes/app.py +++ b/boefjes/boefjes/app.py @@ -1,8 +1,10 @@ -import multiprocessing as mp +import multiprocessing import os import signal import sys import time +from multiprocessing.context import ForkContext +from multiprocessing.process import BaseProcess from queue import Queue import structlog @@ -22,6 +24,7 @@ from boefjes.sql.plugin_storage import create_plugin_storage logger = structlog.get_logger(__name__) +ctx: ForkContext = multiprocessing.get_context("fork") class SchedulerWorkerManager(WorkerManager): @@ -36,11 +39,11 @@ def __init__( self.scheduler_client = scheduler_client self.settings = settings - manager = mp.Manager() + manager = ctx.Manager() self.task_queue = manager.Queue() # multiprocessing.Queue() will not work on macOS, see mp.Queue.qsize() self.handling_tasks = manager.dict() - self.workers: list[mp.Process] = [] + self.workers: list[BaseProcess] = [] logger.setLevel(log_level) @@ -50,7 +53,7 @@ def run(self, queue_type: WorkerManager.Queue) -> None: logger.info("Created worker pool for queue '%s'", queue_type.value) self.workers = [ - mp.Process(target=_start_working, args=self._worker_args()) for _ in range(self.settings.pool_size) + ctx.Process(target=_start_working, args=self._worker_args()) for _ in range(self.settings.pool_size) ] for worker in self.workers: worker.start() @@ -158,13 +161,13 @@ def _check_workers(self) -> None: self._cleanup_pending_worker_task(worker) worker.close() - new_worker = mp.Process(target=_start_working, args=self._worker_args()) + new_worker = ctx.Process(target=_start_working, args=self._worker_args()) new_worker.start() new_workers.append(new_worker) self.workers = new_workers - def _cleanup_pending_worker_task(self, worker: mp.Process) -> None: + def _cleanup_pending_worker_task(self, worker: BaseProcess) -> None: if worker.pid not in self.handling_tasks: logger.debug("No pending task found for Worker[pid=%s, %s]", worker.pid, _format_exit_code(worker.exitcode)) return @@ -231,7 +234,7 @@ def _format_exit_code(exitcode: int | None) -> str: def _start_working( - task_queue: mp.Queue, + task_queue: multiprocessing.Queue, handler: Handler, scheduler_client: SchedulerClientInterface, handling_tasks: dict[int, str],