Skip to content

Commit

Permalink
[parallel] Improve documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
mpsijm committed Feb 3, 2024
1 parent 745667b commit 50c2d1d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
45 changes: 25 additions & 20 deletions bin/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, f, pin):
self.total_tasks = 0
self.missing = 0

self.abort = False
self.aborted = False

# mutex to lock parallel access
self.mutex = threading.RLock()
Expand All @@ -58,8 +58,8 @@ def join(self):
def done(self):
raise "Abstract method"

def stop(self):
self.abort = True
def abort(self):
self.aborted = True


class SequentialQueue(AbstractQueue):
Expand All @@ -68,8 +68,8 @@ def __init__(self, f, pin):

# Add one task. Higher priority => done first
def put(self, task, priority=0):
# no task will be handled after self.abort so skip adding
if self.abort:
# no task will be handled after self.abort() so skip adding
if self.aborted:
return

self.total_tasks += 1
Expand All @@ -81,8 +81,8 @@ def done(self):
cores = list(os.sched_getaffinity(0))
os.sched_setaffinity(0, {cores[0]})

# no task will be handled after self.abort
while self.tasks and not self.abort:
# no task will be handled after self.abort()
while self.tasks and not self.aborted:
self.f(heapq.heappop(self.tasks).task)

if self.pin:
Expand Down Expand Up @@ -128,16 +128,16 @@ def _worker(self, cores: bool | list[int] = False):
os.sched_setaffinity(0, cores)
while True:
with self.mutex:
# if self.abort we need no item in the queue and can stop
# if self.aborted we need no item in the queue and can stop
# if self.finish we may need to wake up if all tasks were completed earlier
# else we need an item to handle
self.todo.wait_for(lambda: len(self.tasks) > 0 or self.abort or self.finish)
self.todo.wait_for(lambda: len(self.tasks) > 0 or self.aborted or self.finish)

if self.abort:
# we dont handle the queue on abort
if self.aborted:
# we don't handle the queue if self.aborted
break
elif self.finish and len(self.tasks) == 0:
# on finish we can only stop after the queue runs empty
# if self.finish, we can only stop after the queue runs empty
break
else:
# get item from queue (update self.missing after the task is done)
Expand All @@ -149,7 +149,7 @@ def _worker(self, cores: bool | list[int] = False):
current_error = None
self.f(task)
except Exception as e:
self.stop()
self.abort()
current_error = e

with self.mutex:
Expand All @@ -168,8 +168,8 @@ def put(self, task, priority=0):
with self.mutex:
# no task should be added after .done() was called
assert not self.finish
# no task will be handled after self.abort so skip adding
if not self.abort:
# no task will be handled after self.aborted so skip adding
if not self.aborted:
# mark task as to be done and notify workers
self.missing += 1
self.total_tasks += 1
Expand Down Expand Up @@ -202,8 +202,8 @@ def done(self):

# Discard all remaining work in the queue and stop all workers.
# Call done() to join the threads.
def stop(self):
super().stop()
def abort(self):
super().abort()

with self.mutex:
# drop all items in the queue at once
Expand All @@ -217,9 +217,14 @@ def stop(self):


def create(f, pin=False, num_threads=True):
# f(task): the function to run on each queue item.
# num_threads: True: the configured default
# None/False/0: disable parallelization
"""
f(task): the function to run on each queue item.
num_threads: True: the configured default
None/False/0: disable parallelization
pin: whether to pin the threads to (physical) CPU cores.
"""
num_threads = config.args.jobs if num_threads is True else num_threads
pin = pin and not util.is_windows() and not util.is_bsd()

Expand Down
2 changes: 1 addition & 1 deletion bin/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ def process_run(run, p):
return

bar.count = None
p.stop()
p.abort()

p = parallel.create(lambda run: process_run(run, p), pin=True)

Expand Down

0 comments on commit 50c2d1d

Please sign in to comment.