Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GSK-1895 New worker pool #1478

Merged
merged 26 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
18565be
Creating new worker pool
Hartorn Oct 13, 2023
9af63c7
Trying new worker pool
Hartorn Oct 13, 2023
f5a35fd
Improve worker clean up
Hartorn Oct 13, 2023
788f9f4
Try to fix tests for CI
Hartorn Oct 13, 2023
8dc5601
Removing git dependency
Hartorn Oct 16, 2023
d9a853e
Repair tests with multiprocess (issue with coverage)
Hartorn Oct 16, 2023
4e126eb
Adding logs capture in worker pool
Hartorn Oct 16, 2023
9c116e2
Fix lock file
Hartorn Oct 20, 2023
dfd6d1b
Improve threads + taking in account review
Hartorn Oct 20, 2023
65af850
Making process manipulation safer
Hartorn Oct 20, 2023
28d8b47
Adding some more logs and warm up at start for the worker
Hartorn Oct 20, 2023
82057d7
Improve code quality
Hartorn Oct 22, 2023
62c72c4
Marking analytics thread as daemon
Hartorn Oct 23, 2023
a873dae
Minor cleaning
Hartorn Oct 23, 2023
b93067b
Improving forced shutdown
Hartorn Oct 23, 2023
c53635d
Re-working process worker
Hartorn Oct 23, 2023
331b11e
Making safe_get safer
Hartorn Oct 23, 2023
91cb65d
Merge branch 'main' into new-worker-pool
Inokinoki Oct 24, 2023
c50be07
Comment code
Hartorn Oct 24, 2023
626f69e
Skipping tests for windows
Hartorn Oct 24, 2023
25eb2b9
Merge branch 'main' into new-worker-pool
Hartorn Oct 24, 2023
8c04bfd
Merge branch 'main' into new-worker-pool
Hartorn Oct 25, 2023
cfebc73
Merge branch 'main' into new-worker-pool
andreybavt Oct 25, 2023
cc630bf
update lockfile
andreybavt Oct 25, 2023
f4eb8bf
update lockfile
andreybavt Oct 25, 2023
1e6cc6e
fix build
andreybavt Oct 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/build-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defaults:
shell: bash
jobs:
build-python:
name: "Python ${{ matrix.python-version }}${{ matrix.pandas_v1 && ' (Pandas V1)' || ''}}${{ matrix.pydantic_v1 && ' (Pydantic V1)' || ''}} on ${{ matrix.os }}${{matrix.experimental && ' (Non failing)' || '' }}"
name: "Python ${{ matrix.python-version }}${{ matrix.pandas_v1 && ' (Pandas V1)' || ''}}${{ matrix.pydantic_v1 && ' (Pydantic V1)' || ''}} on ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false # Do not stop when any job fails
Expand Down Expand Up @@ -131,6 +131,9 @@ jobs:
pdm run pip freeze | grep '^pandas'
pdm run pip freeze | grep -q '^pandas==${{ matrix.pandas_v1 && '1' || '2' }}\.'

- name: Test code (concurrency)
run: pdm run test-worker

- name: Test code
env:
PYTEST_XDIST_AUTO_NUM_WORKERS: ${{ startsWith(matrix.os,'windows-') && 1 || 2 }}
Expand Down
11 changes: 4 additions & 7 deletions giskard/ml_worker/testing/registry/registry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Dict, Optional

import hashlib
import importlib.util
import inspect
Expand All @@ -6,20 +8,15 @@
import sys
import uuid
from pathlib import Path
from typing import Optional, Dict

import cloudpickle

from giskard.core.core import SavableMeta
from giskard.ml_worker.testing.registry.udf_repository import udf_repo_available, udf_root
from giskard.settings import expand_env_var, settings
from giskard.settings import settings


def find_plugin_location():
if udf_repo_available:
return udf_root
else:
return Path(expand_env_var(settings.home)) / "plugins"
return settings.home_dir / "plugins"


logger = logging.getLogger(__name__)
Expand Down
78 changes: 0 additions & 78 deletions giskard/ml_worker/testing/registry/udf_repository.py

This file was deleted.

18 changes: 9 additions & 9 deletions giskard/ml_worker/websocket/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from giskard.models.base import BaseModel
from giskard.models.model_explanation import explain, explain_text
from giskard.push import Push
from giskard.utils import call_in_pool, log_pool_stats, shutdown_pool
from giskard.utils import call_in_pool, shutdown_pool
from giskard.utils.analytics_collector import analytics

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,8 +82,6 @@ def wrapped_handle_result(
action: MLWorkerAction, ml_worker: MLWorker, start: float, rep_id: Optional[str], ignore_timeout: bool
):
def handle_result(future: Union[Future, Callable[..., websocket.WorkerReply]]):
log_pool_stats()

info = None # Needs to be defined in case of cancellation

try:
Expand Down Expand Up @@ -211,17 +209,19 @@ def dispatch_action(callback, ml_worker, action, req, execute_in_pool, timeout=N
# If execution should be done in a pool
if execute_in_pool:
logger.debug("Submitting for action %s '%s' into the pool", action.name, callback.__name__)
kwargs = {
"callback": callback,
"action": action,
"params": params,
"ml_worker": MLWorkerInfo(ml_worker),
"client_params": client_params,
}
future = call_in_pool(
parse_and_execute,
callback=callback,
action=action,
params=params,
ml_worker=MLWorkerInfo(ml_worker),
client_params=client_params,
kwargs=kwargs,
timeout=timeout,
)
future.add_done_callback(result_handler)
log_pool_stats()
return

result_handler(
Expand Down
123 changes: 31 additions & 92 deletions giskard/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,63 @@
import logging
import os
import signal
from concurrent.futures import CancelledError, Future, InvalidStateError, ProcessPoolExecutor
from concurrent.futures import Future
from functools import wraps
from threading import Lock, Thread
from time import sleep, time
from time import sleep

from giskard.settings import settings
from giskard.utils.worker_pool import WorkerPoolExecutor

LOGGER = logging.getLogger(__name__)


def threaded(fn):
def wrapper(*args, **kwargs):
thread = Thread(target=fn, args=args, kwargs=kwargs)
thread = Thread(target=fn, daemon=True, args=args, kwargs=kwargs)
thread.start()
return thread

return wrapper


class WorkerPool:
NOT_STARTED = "Pool is not started"


class SingletonWorkerPool:
"Utility class to wrap a Process pool"

def __init__(self):
self.pool = None
self.nb_cancellable = 0
self.max_workers = 0

def start(self, max_workers: int = None):
if self.pool is not None:
return
self.max_workers = max(max_workers, settings.min_workers) if max_workers is not None else os.cpu_count()
LOGGER.info("Starting worker pool with %s workers...", self.max_workers)
self.pool = ProcessPoolExecutor(max_workers=self.max_workers)
LOGGER.info("Pool is started")
self.pool = WorkerPoolExecutor(nb_workers=self.max_workers)

def shutdown(self, wait=True, cancel_futures=False):
def shutdown(self, wait=True):
if self.pool is None:
return
self.pool.shutdown(wait=wait, cancel_futures=cancel_futures)
with NB_CANCELLABLE_WORKER_LOCK:
self.nb_cancellable = 0
self.pool.shutdown(wait=wait)

def schedule(self, fn, args=None, kwargs=None, timeout=None) -> Future:
if self.pool is None:
raise RuntimeError(NOT_STARTED)
return self.pool.schedule(fn, args=args, kwargs=kwargs, timeout=timeout)

def submit(self, *args, **kwargs) -> Future:
if self.pool is None:
raise ValueError("Pool is not started")
raise RuntimeError(NOT_STARTED)
return self.pool.submit(*args, **kwargs)

def map(self, *args, **kwargs):
if self.pool is None:
raise ValueError("Pool is not started")
raise RuntimeError(NOT_STARTED)
return self.pool.map(*args, **kwargs)

def log_stats(self):
if self.pool is None:
LOGGER.debug("Pool is not yet started")
return
LOGGER.debug(
"Pool is currently having :\n - %s pending items\n - %s workers\n - %s cancellable tasks",
len(self.pool._pending_work_items),
len(self.pool._processes),
self.nb_cancellable,
)


POOL = WorkerPool()


def log_pool_stats():
"""Log pools stats to have some debug info"""
POOL.log_stats()
POOL = SingletonWorkerPool()


def start_pool(max_workers: int = None):
Expand All @@ -80,16 +67,14 @@ def start_pool(max_workers: int = None):
max_workers (int, optional): _description_. Defaults to None.
"""
POOL.start(max_workers=max_workers)
# Doing warmup to spin up all workers
for _ in POOL.map(int, range(100)):
# Consume the results
pass
log_pool_stats()
# Warmup the pool
for _ in range(100):
call_in_pool(sleep, [0])


def shutdown_pool():
"""Stop the pool"""
POOL.shutdown(wait=True, cancel_futures=True)
POOL.shutdown(wait=True)


def pooled(fn):
Expand All @@ -101,67 +86,21 @@ def pooled(fn):

@wraps(fn)
def wrapper(*args, **kwargs):
return call_in_pool(fn, *args, **kwargs)
return call_in_pool(fn, args=args, kwargs=kwargs)

return wrapper


NB_CANCELLABLE_WORKER_LOCK = Lock()


@threaded
def start_killer(timeout: float, future: Future, pid: int, executor: ProcessPoolExecutor):
start = time()
# Try to get the result in proper time
while (time() - start) < timeout:
# future.result(timeout=timeout) => Not working with WSL and python 3.10, switchin to something safer
LOGGER.debug("Sleeping for pid %s", pid)
sleep(1)
if future.done():
executor.shutdown(wait=True, cancel_futures=False)
with NB_CANCELLABLE_WORKER_LOCK:
POOL.nb_cancellable -= 1
return
LOGGER.warning("Thread gonna kill pid %s", pid)
# Manually setting exception, to allow customisation
# TODO(Bazire): See if we need a custom error to handle that properly
try:
future.set_exception(CancelledError("Background task was taking too much time and was cancelled"))
except InvalidStateError:
pass
# Shutting down an executor is actually not stopping the running processes
executor.shutdown(wait=False, cancel_futures=False)
# Kill the process running by targeting its pid
os.kill(pid, signal.SIGINT)
# Let's clean up the executor also
# Also, does not matter to call shutdown several times
executor.shutdown(wait=True, cancel_futures=False)
with NB_CANCELLABLE_WORKER_LOCK:
POOL.nb_cancellable -= 1
LOGGER.debug("Executor has been successfully shutdown")
log_pool_stats()


def call_in_pool(fn, *args, timeout=None, **kwargs):
"""Submit the function call with args and kwargs inside the process pool

Args:
fn (function): the function to call

Returns:
Future: the promise of the results
"""
if timeout is None:
return POOL.submit(fn, *args, **kwargs)
# Create independant process pool
# If we kill a running process, it breaking the Process pool, making it unusable
one_shot_executor = ProcessPoolExecutor(max_workers=1)
pid = one_shot_executor.submit(os.getpid).result(timeout=5)
future = one_shot_executor.submit(fn, *args, **kwargs)
start_killer(timeout, future, pid, one_shot_executor)
with NB_CANCELLABLE_WORKER_LOCK:
POOL.nb_cancellable += 1
return future
def call_in_pool(
fn,
args=None,
kwargs=None,
timeout=None,
):
return POOL.schedule(fn, args=args, kwargs=kwargs, timeout=timeout)


def fullname(o):
Expand Down
Loading