From 50c2d1d7999ba6c54d0dfe021486e6cbb9c1dc59 Mon Sep 17 00:00:00 2001 From: Maarten Sijm <9739541+mpsijm@users.noreply.github.com> Date: Sun, 21 Jan 2024 23:20:46 +0100 Subject: [PATCH] [parallel] Improve documentation --- bin/parallel.py | 45 +++++++++++++++++++++++++-------------------- bin/run.py | 2 +- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/bin/parallel.py b/bin/parallel.py index 4705d899..c149c675 100644 --- a/bin/parallel.py +++ b/bin/parallel.py @@ -36,7 +36,7 @@ def __init__(self, f, pin): self.total_tasks = 0 self.missing = 0 - self.abort = False + self.aborted = False # mutex to lock parallel access self.mutex = threading.RLock() @@ -58,8 +58,8 @@ def join(self): def done(self): raise "Abstract method" - def stop(self): - self.abort = True + def abort(self): + self.aborted = True class SequentialQueue(AbstractQueue): @@ -68,8 +68,8 @@ def __init__(self, f, pin): # Add one task. Higher priority => done first def put(self, task, priority=0): - # no task will be handled after self.abort so skip adding - if self.abort: + # no task will be handled after self.abort() so skip adding + if self.aborted: return self.total_tasks += 1 @@ -81,8 +81,8 @@ def done(self): cores = list(os.sched_getaffinity(0)) os.sched_setaffinity(0, {cores[0]}) - # no task will be handled after self.abort - while self.tasks and not self.abort: + # no task will be handled after self.abort() + while self.tasks and not self.aborted: self.f(heapq.heappop(self.tasks).task) if self.pin: @@ -128,16 +128,16 @@ def _worker(self, cores: bool | list[int] = False): os.sched_setaffinity(0, cores) while True: with self.mutex: - # if self.abort we need no item in the queue and can stop + # if self.aborted we need no item in the queue and can stop # if self.finish we may need to wake up if all tasks were completed earlier # else we need an item to handle - self.todo.wait_for(lambda: len(self.tasks) > 0 or self.abort or self.finish) + self.todo.wait_for(lambda: len(self.tasks) > 0 or self.aborted or self.finish) - if self.abort: - # we dont handle the queue on abort + if self.aborted: + # we don't handle the queue if self.aborted break elif self.finish and len(self.tasks) == 0: - # on finish we can only stop after the queue runs empty + # if self.finish, we can only stop after the queue runs empty break else: # get item from queue (update self.missing after the task is done) @@ -149,7 +149,7 @@ def _worker(self, cores: bool | list[int] = False): current_error = None self.f(task) except Exception as e: - self.stop() + self.abort() current_error = e with self.mutex: @@ -168,8 +168,8 @@ def put(self, task, priority=0): with self.mutex: # no task should be added after .done() was called assert not self.finish - # no task will be handled after self.abort so skip adding - if not self.abort: + # no task will be handled after self.aborted so skip adding + if not self.aborted: # mark task as to be done and notify workers self.missing += 1 self.total_tasks += 1 @@ -202,8 +202,8 @@ def done(self): # Discard all remaining work in the queue and stop all workers. # Call done() to join the threads. - def stop(self): - super().stop() + def abort(self): + super().abort() with self.mutex: # drop all items in the queue at once @@ -217,9 +217,14 @@ def stop(self): def create(f, pin=False, num_threads=True): - # f(task): the function to run on each queue item. - # num_threads: True: the configured default - # None/False/0: disable parallelization + """ + f(task): the function to run on each queue item. + + num_threads: True: the configured default + None/False/0: disable parallelization + + pin: whether to pin the threads to (physical) CPU cores. + """ num_threads = config.args.jobs if num_threads is True else num_threads pin = pin and not util.is_windows() and not util.is_bsd() diff --git a/bin/run.py b/bin/run.py index dcea2788..54aba5fa 100644 --- a/bin/run.py +++ b/bin/run.py @@ -531,7 +531,7 @@ def process_run(run, p): return bar.count = None - p.stop() + p.abort() p = parallel.create(lambda run: process_run(run, p), pin=True)