-
-
Notifications
You must be signed in to change notification settings - Fork 282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GSK-1895 New worker pool #1478
GSK-1895 New worker pool #1478
Conversation
bd58989
to
a3b8bde
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of general comments:
- the queue names are prefixed with
_
but being referenced from outside of executor, for example:executor._pending_tasks_queue
. I think it's ok if they become public, at least we're using them this way anyway - There are still untested corner cases, like "Future has been cancelled already". It'll be great to increase code coverage even more considering that this is a core part of the ML Worker
giskard/utils/worker_pool.py
Outdated
while not self._running_tasks_queue.empty(): | ||
try: | ||
self._running_tasks_queue.get_nowait() | ||
except BaseException: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we log them at least?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, probably. Plus I should reduce the exeception scope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done !
return exit_codes | ||
|
||
|
||
def _results_thread( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand you're waking up frequently to check if the whole pool is in one of the FINAL_STATES or not. Wouldn't it be better to call executor._tasks_results.get(block=True, timeout=2)
with a larger timeout and after catching Empty
exception check the the pool is in one of the FINAL_STATES?
In this case there'll be less busy waiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I guess we could. I'm not a big fan of using try except as an if, but it may be better in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done !
future.set_result(result.result) | ||
else: | ||
# TODO(Bazire): improve to get Traceback | ||
future.set_exception(RuntimeError(result.exception)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't it by directly future.set_exception(result.exception)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception is a string in this case (the exception + traceback), because it needs to be pickable, and exception are linked to many objects + have circular references
giskard/utils/worker_pool.py
Outdated
try: | ||
del executor._futures_mapping[result.id] | ||
except BaseException: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above, we need to know if something goes not as planned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I will add some warning logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
giskard/utils/worker_pool.py
Outdated
future = executor._futures_mapping.get(task.id) | ||
if future is not None and future.set_running_or_notify_cancel(): | ||
executor._running_tasks_queue.put(task) | ||
elif future is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're checking for future is not None
twice, I think it'll be simple to have this check first and then a second if
to check if the future is canceled or not under it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
giskard/utils/worker_pool.py
Outdated
# Mapping of the running tasks and worker pids | ||
self._running_process: Dict[str, str] = self._manager.dict() | ||
# Mapping of the running tasks and worker pids | ||
self._with_timeout_tasks: List[Tuple[str, float]] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: can we use a dataclass here instead of a Tuple? It'll be more maintainable sice we won't have to remember what the parameters were
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I created the dataclass and forgot to use it 😆 😿
Done !
also let's get Sonar code smells fixed too (at least the ones not related to BaseException) |
giskard/utils/worker_pool.py
Outdated
self._pending_tasks_queue: SimpleQueue[GiskardTask] = self._mp_context.SimpleQueue() | ||
# Queue with tasks to run | ||
# As in ProcessPool, add one more to avoid idling process | ||
self._running_tasks_queue: Queue[GiskardTask] = self._mp_context.Queue(maxsize=self._nb_workers + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the way you use it, it should be rather Queue[Optional[GiskardTask]]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
df6558d
to
9e4784a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took in account all comment.
Also, I improved the threads, especially the killer one.
I added a quich health_check too
giskard/utils/worker_pool.py
Outdated
# Mapping of the running tasks and worker pids | ||
self._running_process: Dict[str, str] = self._manager.dict() | ||
# Mapping of the running tasks and worker pids | ||
self._with_timeout_tasks: List[Tuple[str, float]] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I created the dataclass and forgot to use it 😆 😿
Done !
giskard/utils/worker_pool.py
Outdated
self._pending_tasks_queue: SimpleQueue[GiskardTask] = self._mp_context.SimpleQueue() | ||
# Queue with tasks to run | ||
# As in ProcessPool, add one more to avoid idling process | ||
self._running_tasks_queue: Queue[GiskardTask] = self._mp_context.Queue(maxsize=self._nb_workers + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
giskard/utils/worker_pool.py
Outdated
while not self._running_tasks_queue.empty(): | ||
try: | ||
self._running_tasks_queue.get_nowait() | ||
except BaseException: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done !
return exit_codes | ||
|
||
|
||
def _results_thread( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done !
giskard/utils/worker_pool.py
Outdated
try: | ||
del executor._futures_mapping[result.id] | ||
except BaseException: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done !
giskard/utils/worker_pool.py
Outdated
try: | ||
del executor._futures_mapping[result.id] | ||
except BaseException: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
2cf31fd
to
0976146
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
227cd39
to
ed598a6
Compare
ed598a6
to
62c72c4
Compare
3b2db1c
to
c53635d
Compare
f7b8730
to
331b11e
Compare
# Conflicts: # .github/workflows/build-python.yml # pdm.lock # pyproject.toml
Kudos, SonarCloud Quality Gate passed! |
Description
Re-implement a worker pool, with possibility for timeout
Also, had capacity to capture all loging for executed task
I removed python deps and related code, because it's was a bother for me under windows for testing.
Related Issue
Type of Change
Checklist
CODE_OF_CONDUCT.md
document.CONTRIBUTING.md
guide.make codestyle
.