From 2cf31fd5e4ef5ffd36d46f622dc1a7a12cd96a59 Mon Sep 17 00:00:00 2001 From: Hartorn Date: Fri, 20 Oct 2023 14:31:53 +0200 Subject: [PATCH] Making process manipulation safer --- giskard/utils/worker_pool.py | 50 ++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/giskard/utils/worker_pool.py b/giskard/utils/worker_pool.py index c97019a484..216bb2e050 100644 --- a/giskard/utils/worker_pool.py +++ b/giskard/utils/worker_pool.py @@ -24,26 +24,40 @@ def _generate_task_id(): return str(uuid4()) +def _safe_is_alive(p: Process) -> bool: + try: + return p.is_alive() + except ValueError: + return False + + +def _safe_exit_code(p: Process) -> int: + try: + return p.exitcode + except ValueError: + return -1 + + def _wait_process_stop(p_list: List[Process], timeout: float = 1): end_time = time.monotonic() + timeout - while any([p.is_alive() for p in p_list]) and time.monotonic() < end_time: + while any([_safe_is_alive(p) for p in p_list]) and time.monotonic() < end_time: sleep(0.1) def _stop_processes(p_list: List[Process], timeout: float = 1) -> List[Optional[int]]: # Check if process is alive. for p in p_list: - if p.is_alive(): + if _safe_is_alive(p): # Try to terminate with SIGTERM first p.terminate() _wait_process_stop(p_list, timeout=timeout) for p in p_list: - if p.is_alive(): + if _safe_is_alive(p): # If still alive, kill the processes p.kill() _wait_process_stop(p_list, timeout=2) - exit_codes = [p.exitcode for p in p_list] + exit_codes = [_safe_exit_code(p) for p in p_list] # Free all resources for p in p_list: p.close() @@ -163,9 +177,12 @@ def __init__(self, nb_workers: Optional[int] = None, name: Optional[str] = None) LOGGER.info("WorkerPoolExecutor is started") def health_check(self): - if any([not p.is_alive() for p in self._processes.values()]): + if self._state in FINAL_STATES: + return + if any([not _safe_is_alive(p) for p in self._processes.values()]): LOGGER.warning("At least one process died for an unknown reason, marking pool as broken") self._state = PoolState.BROKEN + self.shutdown(wait=False, timeout=1) def _spawn_worker(self): # Daemon means process are linked to main one, and will be stopped if current process is stopped @@ -203,12 +220,13 @@ def schedule( self._with_timeout_tasks.append(TimeoutData(task.id, time.monotonic() + timeout)) return res - def shutdown(self, wait=True, timeout: float = 5): - if self._state in FINAL_STATES: + def shutdown(self, wait=True, timeout: float = 5, force=False): + if self._state in FINAL_STATES and not force: return # Changing state, so that thread will stop # Killer thread will also do cleanup - self._state = PoolState.STOPPING + if not force: + self._state = PoolState.STOPPING # Cancelling all futures we have for future in self._futures_mapping.values(): if future.cancel() and not future.done(): @@ -226,13 +244,17 @@ def shutdown(self, wait=True, timeout: float = 5): LOGGER.warning("Queue was empty, skipping") LOGGER.exception(e) # Try to nicely stop the worker, by adding None into the running tasks - for _ in range(self._nb_workers): - self._running_tasks_queue.put(None, timeout=1) + try: + for _ in range(self._nb_workers): + self._running_tasks_queue.put(None, timeout=1) + except OSError as e: + # This happens if queues is closed + LOGGER.warning("Running task queue is already closed") + LOGGER.exception(e) # Wait for process to stop by themselves p_list = list(self._processes.values()) if wait: _wait_process_stop(p_list, timeout=timeout) - # Clean all the queues for queue in [self._pending_tasks_queue, self._tasks_results, self._running_tasks_queue]: # In python 3.8, Simple queue seems to not have close method @@ -318,7 +340,8 @@ def _killer_thread( while len(executor._with_timeout_tasks) == 0 and executor._state not in FINAL_STATES: # No need to be too active sleep(1) - executor.health_check() + if executor._state not in FINAL_STATES: + executor.health_check() if executor._state in FINAL_STATES: return @@ -339,7 +362,8 @@ def _killer_thread( if pid is not None: p = executor._processes.pop(pid) _stop_processes([p]) - executor._spawn_worker() + if executor._state not in FINAL_STATES: + executor._spawn_worker() except BaseException as e: # This is probably an OSError, but we want to be extra safe LOGGER.warning("Unexpected error when killing a timed out process, pool is broken")