Skip to content

Commit

Permalink
Making process manipulation safer
Browse files Browse the repository at this point in the history
  • Loading branch information
Hartorn committed Oct 20, 2023
1 parent 9e4784a commit 2cf31fd
Showing 1 changed file with 37 additions and 13 deletions.
50 changes: 37 additions & 13 deletions giskard/utils/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,40 @@ def _generate_task_id():
return str(uuid4())


def _safe_is_alive(p: Process) -> bool:
try:
return p.is_alive()
except ValueError:
return False


def _safe_exit_code(p: Process) -> int:
try:
return p.exitcode
except ValueError:
return -1


def _wait_process_stop(p_list: List[Process], timeout: float = 1):
end_time = time.monotonic() + timeout
while any([p.is_alive() for p in p_list]) and time.monotonic() < end_time:
while any([_safe_is_alive(p) for p in p_list]) and time.monotonic() < end_time:
sleep(0.1)


def _stop_processes(p_list: List[Process], timeout: float = 1) -> List[Optional[int]]:
# Check if process is alive.
for p in p_list:
if p.is_alive():
if _safe_is_alive(p):
# Try to terminate with SIGTERM first
p.terminate()
_wait_process_stop(p_list, timeout=timeout)

for p in p_list:
if p.is_alive():
if _safe_is_alive(p):
# If still alive, kill the processes
p.kill()
_wait_process_stop(p_list, timeout=2)
exit_codes = [p.exitcode for p in p_list]
exit_codes = [_safe_exit_code(p) for p in p_list]
# Free all resources
for p in p_list:
p.close()
Expand Down Expand Up @@ -163,9 +177,12 @@ def __init__(self, nb_workers: Optional[int] = None, name: Optional[str] = None)
LOGGER.info("WorkerPoolExecutor is started")

def health_check(self):
if any([not p.is_alive() for p in self._processes.values()]):
if self._state in FINAL_STATES:
return
if any([not _safe_is_alive(p) for p in self._processes.values()]):
LOGGER.warning("At least one process died for an unknown reason, marking pool as broken")
self._state = PoolState.BROKEN
self.shutdown(wait=False, timeout=1)

def _spawn_worker(self):
# Daemon means process are linked to main one, and will be stopped if current process is stopped
Expand Down Expand Up @@ -203,12 +220,13 @@ def schedule(
self._with_timeout_tasks.append(TimeoutData(task.id, time.monotonic() + timeout))
return res

def shutdown(self, wait=True, timeout: float = 5):
if self._state in FINAL_STATES:
def shutdown(self, wait=True, timeout: float = 5, force=False):
if self._state in FINAL_STATES and not force:
return
# Changing state, so that thread will stop
# Killer thread will also do cleanup
self._state = PoolState.STOPPING
if not force:
self._state = PoolState.STOPPING
# Cancelling all futures we have
for future in self._futures_mapping.values():
if future.cancel() and not future.done():
Expand All @@ -226,13 +244,17 @@ def shutdown(self, wait=True, timeout: float = 5):
LOGGER.warning("Queue was empty, skipping")
LOGGER.exception(e)
# Try to nicely stop the worker, by adding None into the running tasks
for _ in range(self._nb_workers):
self._running_tasks_queue.put(None, timeout=1)
try:
for _ in range(self._nb_workers):
self._running_tasks_queue.put(None, timeout=1)
except OSError as e:
# This happens if queues is closed
LOGGER.warning("Running task queue is already closed")
LOGGER.exception(e)
# Wait for process to stop by themselves
p_list = list(self._processes.values())
if wait:
_wait_process_stop(p_list, timeout=timeout)

# Clean all the queues
for queue in [self._pending_tasks_queue, self._tasks_results, self._running_tasks_queue]:
# In python 3.8, Simple queue seems to not have close method
Expand Down Expand Up @@ -318,7 +340,8 @@ def _killer_thread(
while len(executor._with_timeout_tasks) == 0 and executor._state not in FINAL_STATES:
# No need to be too active
sleep(1)
executor.health_check()
if executor._state not in FINAL_STATES:
executor.health_check()
if executor._state in FINAL_STATES:
return

Expand All @@ -339,7 +362,8 @@ def _killer_thread(
if pid is not None:
p = executor._processes.pop(pid)
_stop_processes([p])
executor._spawn_worker()
if executor._state not in FINAL_STATES:
executor._spawn_worker()
except BaseException as e:
# This is probably an OSError, but we want to be extra safe
LOGGER.warning("Unexpected error when killing a timed out process, pool is broken")
Expand Down

0 comments on commit 2cf31fd

Please sign in to comment.