Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GSK-1895 New worker pool #1478

Merged
merged 26 commits into from
Oct 25, 2023
Merged

GSK-1895 New worker pool #1478

merged 26 commits into from
Oct 25, 2023

Conversation

Hartorn
Copy link
Member

@Hartorn Hartorn commented Oct 13, 2023

Description

Re-implement a worker pool, with possibility for timeout

Also, had capacity to capture all loging for executed task

I removed python deps and related code, because it's was a bother for me under windows for testing.

Related Issue

Type of Change

  • 📚 Examples / docs / tutorials / dependencies update
  • 🔧 Bug fix (non-breaking change which fixes an issue)
  • 🥂 Improvement (non-breaking change which improves an existing feature)
  • 🚀 New feature (non-breaking change which adds functionality)
  • 💥 Breaking change (fix or feature that would cause existing functionality to change)
  • 🔐 Security fix

Checklist

  • I've read the CODE_OF_CONDUCT.md document.
  • I've read the CONTRIBUTING.md guide.
  • I've updated the code style using make codestyle.
  • I've written tests for all new methods and classes that I created.
  • I've written the docstring in Google format for all the methods and classes that I used.

@Hartorn Hartorn self-assigned this Oct 13, 2023
@Hartorn Hartorn force-pushed the new-worker-pool branch 7 times, most recently from bd58989 to a3b8bde Compare October 16, 2023 14:01
@Hartorn Hartorn changed the title WIP: New worker pool New worker pool Oct 16, 2023
@Hartorn Hartorn marked this pull request as ready for review October 16, 2023 15:32
@Hartorn Hartorn requested a review from a team as a code owner October 16, 2023 15:32
giskard/utils/worker_pool.py Outdated Show resolved Hide resolved
giskard/ml_worker/testing/registry/udf_repository.py Outdated Show resolved Hide resolved
Copy link
Contributor

@andreybavt andreybavt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of general comments:

  • the queue names are prefixed with _ but being referenced from outside of executor, for example: executor._pending_tasks_queue. I think it's ok if they become public, at least we're using them this way anyway
  • There are still untested corner cases, like "Future has been cancelled already". It'll be great to increase code coverage even more considering that this is a core part of the ML Worker

giskard/ml_worker/testing/registry/udf_repository.py Outdated Show resolved Hide resolved
giskard/utils/worker_pool.py Show resolved Hide resolved
giskard/utils/worker_pool.py Show resolved Hide resolved
while not self._running_tasks_queue.empty():
try:
self._running_tasks_queue.get_nowait()
except BaseException:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log them at least?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, probably. Plus I should reduce the exeception scope

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done !

return exit_codes


def _results_thread(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand you're waking up frequently to check if the whole pool is in one of the FINAL_STATES or not. Wouldn't it be better to call executor._tasks_results.get(block=True, timeout=2) with a larger timeout and after catching Empty exception check the the pool is in one of the FINAL_STATES?
In this case there'll be less busy waiting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I guess we could. I'm not a big fan of using try except as an if, but it may be better in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done !

future.set_result(result.result)
else:
# TODO(Bazire): improve to get Traceback
future.set_exception(RuntimeError(result.exception))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't it by directly future.set_exception(result.exception) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception is a string in this case (the exception + traceback), because it needs to be pickable, and exception are linked to many objects + have circular references

try:
del executor._futures_mapping[result.id]
except BaseException:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, we need to know if something goes not as planned

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I will add some warning logs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

future = executor._futures_mapping.get(task.id)
if future is not None and future.set_running_or_notify_cancel():
executor._running_tasks_queue.put(task)
elif future is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're checking for future is not None twice, I think it'll be simple to have this check first and then a second if to check if the future is canceled or not under it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

# Mapping of the running tasks and worker pids
self._running_process: Dict[str, str] = self._manager.dict()
# Mapping of the running tasks and worker pids
self._with_timeout_tasks: List[Tuple[str, float]] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: can we use a dataclass here instead of a Tuple? It'll be more maintainable sice we won't have to remember what the parameters were

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I created the dataclass and forgot to use it 😆 😿
Done !

giskard/utils/worker_pool.py Outdated Show resolved Hide resolved
@andreybavt
Copy link
Contributor

andreybavt commented Oct 19, 2023

also let's get Sonar code smells fixed too (at least the ones not related to BaseException)

self._pending_tasks_queue: SimpleQueue[GiskardTask] = self._mp_context.SimpleQueue()
# Queue with tasks to run
# As in ProcessPool, add one more to avoid idling process
self._running_tasks_queue: Queue[GiskardTask] = self._mp_context.Queue(maxsize=self._nb_workers + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way you use it, it should be rather Queue[Optional[GiskardTask]]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@andreybavt andreybavt changed the title New worker pool GSK-1895 New worker pool Oct 19, 2023
@linear
Copy link

linear bot commented Oct 19, 2023

Copy link
Member Author

@Hartorn Hartorn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took in account all comment.
Also, I improved the threads, especially the killer one.

I added a quich health_check too

giskard/utils/worker_pool.py Show resolved Hide resolved
giskard/utils/worker_pool.py Show resolved Hide resolved
# Mapping of the running tasks and worker pids
self._running_process: Dict[str, str] = self._manager.dict()
# Mapping of the running tasks and worker pids
self._with_timeout_tasks: List[Tuple[str, float]] = []
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I created the dataclass and forgot to use it 😆 😿
Done !

giskard/utils/worker_pool.py Show resolved Hide resolved
self._pending_tasks_queue: SimpleQueue[GiskardTask] = self._mp_context.SimpleQueue()
# Queue with tasks to run
# As in ProcessPool, add one more to avoid idling process
self._running_tasks_queue: Queue[GiskardTask] = self._mp_context.Queue(maxsize=self._nb_workers + 1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

while not self._running_tasks_queue.empty():
try:
self._running_tasks_queue.get_nowait()
except BaseException:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done !

return exit_codes


def _results_thread(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done !

try:
del executor._futures_mapping[result.id]
except BaseException:
pass
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done !

try:
del executor._futures_mapping[result.id]
except BaseException:
pass
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@Inokinoki Inokinoki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Hartorn Hartorn force-pushed the new-worker-pool branch 5 times, most recently from 227cd39 to ed598a6 Compare October 23, 2023 15:52
# Conflicts:
#	.github/workflows/build-python.yml
#	pdm.lock
#	pyproject.toml
@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 4 Code Smells

0.0% 0.0% Coverage
0.0% 0.0% Duplication

@andreybavt andreybavt merged commit e759a68 into main Oct 25, 2023
13 checks passed
@andreybavt andreybavt deleted the new-worker-pool branch October 25, 2023 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants