Skip to content

Commit

Permalink
Re-working process worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Hartorn committed Oct 23, 2023
1 parent 7bbeb0e commit 3b2db1c
Showing 1 changed file with 5 additions and 7 deletions.
12 changes: 5 additions & 7 deletions giskard/utils/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dataclasses import dataclass, field
from enum import Enum
from io import StringIO
from multiprocessing import Process, Queue, SimpleQueue, cpu_count, get_context
from multiprocessing import Process, Queue, cpu_count, get_context
from multiprocessing.context import SpawnContext, SpawnProcess
from multiprocessing.managers import SyncManager
from queue import Empty, Full
Expand Down Expand Up @@ -92,9 +92,7 @@ class GiskardResult:
exception: Any = None


def _process_worker(
tasks_queue: Queue[Optional[GiskardTask]], tasks_results: Queue[GiskardResult], running_process: Dict[str, str]
):
def _process_worker(tasks_queue: Queue, tasks_results: Queue, running_process: Dict[str, str]):
pid = os.getpid()
LOGGER.info("Process %s started", pid)

Expand Down Expand Up @@ -159,12 +157,12 @@ def __init__(self, nb_workers: Optional[int] = None, name: Optional[str] = None)
# Mapping of the running tasks and worker pids
self.with_timeout_tasks: List[TimeoutData] = []
# Queue with tasks to run
self.pending_tasks_queue: Queue[GiskardTask] = self._mp_context.Queue()
self.pending_tasks_queue: Queue = self._mp_context.Queue()
# Queue with tasks to be consumed asap
# As in ProcessPool, add one more to avoid idling process
self.running_tasks_queue: Queue[Optional[GiskardTask]] = self._mp_context.Queue(maxsize=self._nb_workers + 1)
self.running_tasks_queue: Queue = self._mp_context.Queue(maxsize=self._nb_workers + 1)
# Queue with results to notify
self.tasks_results: Queue[GiskardResult] = self._mp_context.Queue()
self.tasks_results: Queue = self._mp_context.Queue()
# Mapping task_id with future
self.futures_mapping: Dict[str, Future] = dict()
LOGGER.debug("Starting threads for the WorkerPoolExecutor")
Expand Down

0 comments on commit 3b2db1c

Please sign in to comment.