Skip to content

Commit

Permalink
Merge branch 'main' into fix/do-not-add-produces-mime-types
Browse files Browse the repository at this point in the history
  • Loading branch information
underdarknl authored Sep 26, 2024
2 parents b586c21 + 579ae18 commit ae994f9
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
4 changes: 3 additions & 1 deletion boefjes/boefjes/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import multiprocessing
from datetime import datetime, timezone
from enum import Enum
from multiprocessing.context import ForkContext, ForkProcess
from uuid import UUID

import structlog
Expand All @@ -23,9 +24,10 @@

app = FastAPI(title="Boefje API")
logger = structlog.get_logger(__name__)
ctx: ForkContext = multiprocessing.get_context("fork")


class UvicornServer(multiprocessing.Process):
class UvicornServer(ForkProcess):
def __init__(self, config: Config):
super().__init__()
self.server = Server(config=config)
Expand Down
17 changes: 10 additions & 7 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import multiprocessing as mp
import multiprocessing
import os
import signal
import sys
import time
from multiprocessing.context import ForkContext
from multiprocessing.process import BaseProcess
from queue import Queue

import structlog
Expand All @@ -22,6 +24,7 @@
from boefjes.sql.plugin_storage import create_plugin_storage

logger = structlog.get_logger(__name__)
ctx: ForkContext = multiprocessing.get_context("fork")


class SchedulerWorkerManager(WorkerManager):
Expand All @@ -36,11 +39,11 @@ def __init__(
self.scheduler_client = scheduler_client
self.settings = settings

manager = mp.Manager()
manager = ctx.Manager()

self.task_queue = manager.Queue() # multiprocessing.Queue() will not work on macOS, see mp.Queue.qsize()
self.handling_tasks = manager.dict()
self.workers: list[mp.Process] = []
self.workers: list[BaseProcess] = []

logger.setLevel(log_level)

Expand All @@ -50,7 +53,7 @@ def run(self, queue_type: WorkerManager.Queue) -> None:
logger.info("Created worker pool for queue '%s'", queue_type.value)

self.workers = [
mp.Process(target=_start_working, args=self._worker_args()) for _ in range(self.settings.pool_size)
ctx.Process(target=_start_working, args=self._worker_args()) for _ in range(self.settings.pool_size)
]
for worker in self.workers:
worker.start()
Expand Down Expand Up @@ -158,13 +161,13 @@ def _check_workers(self) -> None:
self._cleanup_pending_worker_task(worker)
worker.close()

new_worker = mp.Process(target=_start_working, args=self._worker_args())
new_worker = ctx.Process(target=_start_working, args=self._worker_args())
new_worker.start()
new_workers.append(new_worker)

self.workers = new_workers

def _cleanup_pending_worker_task(self, worker: mp.Process) -> None:
def _cleanup_pending_worker_task(self, worker: BaseProcess) -> None:
if worker.pid not in self.handling_tasks:
logger.debug("No pending task found for Worker[pid=%s, %s]", worker.pid, _format_exit_code(worker.exitcode))
return
Expand Down Expand Up @@ -231,7 +234,7 @@ def _format_exit_code(exitcode: int | None) -> str:


def _start_working(
task_queue: mp.Queue,
task_queue: multiprocessing.Queue,
handler: Handler,
scheduler_client: SchedulerClientInterface,
handling_tasks: dict[int, str],
Expand Down

0 comments on commit ae994f9

Please sign in to comment.