From 745667b7e7faf637a4c84595a4d11c97ff0af473 Mon Sep 17 00:00:00 2001 From: Maarten Sijm <9739541+mpsijm@users.noreply.github.com> Date: Sun, 21 Jan 2024 23:11:13 +0100 Subject: [PATCH] [parallel] Explicitly split up SequentialQueue and ParallelQueue --- bin/fuzz.py | 2 +- bin/generate.py | 6 +- bin/parallel.py | 194 +++++++++++++++++++++++++++--------------------- bin/problem.py | 4 +- bin/run.py | 2 +- 5 files changed, 118 insertions(+), 90 deletions(-) diff --git a/bin/fuzz.py b/bin/fuzz.py index 62383714..2083cc39 100644 --- a/bin/fuzz.py +++ b/bin/fuzz.py @@ -184,7 +184,7 @@ def run(self): self.start_time = time.monotonic() self.iteration = 0 self.tasks = 0 - self.queue = parallel.Parallel(lambda task: task.run(bar), pin=True) + self.queue = parallel.create(lambda task: task.run(bar), pin=True) # pool of ids used for generators self.tmp_ids = 2 * max(1, self.queue.num_threads) + 1 diff --git a/bin/generate.py b/bin/generate.py index 1f64466e..eff3b39c 100644 --- a/bin/generate.py +++ b/bin/generate.py @@ -1520,7 +1520,7 @@ def build_program(p): p.build(localbar) localbar.done() - p = parallel.Parallel(build_program) + p = parallel.create(build_program) for pr in programs: p.put(pr) p.done() @@ -1636,7 +1636,7 @@ def count_dir(d): # after to deduplicate them against generated testcases. # 1 - p = parallel.Parallel(lambda t: t.listed and t.generate(self.problem, self, bar)) + p = parallel.create(lambda t: t.listed and t.generate(self.problem, self, bar)) def generate_dir(d): p.join() @@ -1646,7 +1646,7 @@ def generate_dir(d): p.done() # 2 - p = parallel.Parallel(lambda t: not t.listed and t.generate(self.problem, self, bar)) + p = parallel.create(lambda t: not t.listed and t.generate(self.problem, self, bar)) def generate_dir_unlisted(d): p.join() diff --git a/bin/parallel.py b/bin/parallel.py index d412752d..4705d899 100644 --- a/bin/parallel.py +++ b/bin/parallel.py @@ -1,15 +1,14 @@ #!/usr/bin/env python3 -import threading -import signal import heapq - import os +import signal +import threading import config import util -class ParallelItem: +class QueueItem: def __init__(self, task, priority, id): self.task = task self.priority = priority @@ -26,60 +25,105 @@ def __lt__(self, other): return self.id < other.id -class Parallel: - # f(task): the function to run on each queue item. - # num_threads: True: the configured default - # None/False/0: disable parallelization - def __init__(self, f, num_threads=True, pin=False): +class AbstractQueue: + def __init__(self, f, pin): self.f = f - self.num_threads = config.args.jobs if num_threads is True else num_threads - self.pin = pin and not util.is_windows() and not util.is_bsd() + self.pin = pin + self.num_threads = 1 + + # min heap + self.tasks: list[QueueItem] = [] + self.total_tasks = 0 + self.missing = 0 + + self.abort = False # mutex to lock parallel access self.mutex = threading.RLock() + + def __enter__(self): + self.mutex.__enter__() + + def __exit__(self, *args): + self.mutex.__exit__(*args) + + # Add one task. Higher priority => done first + def put(self, task, priority=0): + raise "Abstract method" + + # By default, do nothing on .join(). This is overridden in ParallelQueue. + def join(self): + return + + def done(self): + raise "Abstract method" + + def stop(self): + self.abort = True + + +class SequentialQueue(AbstractQueue): + def __init__(self, f, pin): + super().__init__(f, pin) + + # Add one task. Higher priority => done first + def put(self, task, priority=0): + # no task will be handled after self.abort so skip adding + if self.abort: + return + + self.total_tasks += 1 + heapq.heappush(self.tasks, QueueItem(task, priority, self.total_tasks)) + + # Execute all tasks. + def done(self): + if self.pin: + cores = list(os.sched_getaffinity(0)) + os.sched_setaffinity(0, {cores[0]}) + + # no task will be handled after self.abort + while self.tasks and not self.abort: + self.f(heapq.heappop(self.tasks).task) + + if self.pin: + os.sched_setaffinity(0, cores) + + +class ParallelQueue(AbstractQueue): + def __init__(self, f, pin, num_threads): + super().__init__(f, pin) + + assert num_threads and type(num_threads) is int + self.num_threads = num_threads + # condition used to notify worker if the queue has changed self.todo = threading.Condition(self.mutex) # condition used to notify join that the queue is empty self.all_done = threading.Condition(self.mutex) - # only used in parallel mode self.first_error = None - # min heap - self.tasks = [] - self.total_tasks = 0 - self.missing = 0 - - # also used if num_threads is false - self.abort = False self.finish = False - if self.num_threads: - if self.pin: - # only use available cores and reserve one - cores = list(os.sched_getaffinity(0)) - if self.num_threads > len(cores) - 1: - self.num_threads = len(cores) - 1 - - # sort cores by id. If num_threads << len(cores) this ensures that we - # use different physical cores instead of hyperthreads - cores.sort() + if self.pin: + # only use available cores and reserve one + cores = list(os.sched_getaffinity(0)) + if self.num_threads > len(cores) - 1: + self.num_threads = len(cores) - 1 - self.threads = [] - for i in range(self.num_threads): - args = [{cores[i]}] if self.pin else [] - t = threading.Thread(target=self._worker, args=args, daemon=True) - t.start() - self.threads.append(t) + # sort cores by id. If num_threads << len(cores) this ensures that we + # use different physical cores instead of hyperthreads + cores.sort() - signal.signal(signal.SIGINT, self._interrupt_handler) - - def __enter__(self): - self.mutex.__enter__() + self.threads = [] + for i in range(self.num_threads): + args = [{cores[i]}] if self.pin else [] + t = threading.Thread(target=self._worker, args=args, daemon=True) + t.start() + self.threads.append(t) - def __exit__(self, *args): - self.mutex.__exit__(*args) + signal.signal(signal.SIGINT, self._interrupt_handler) - def _worker(self, cores=False): + def _worker(self, cores: bool | list[int] = False): if cores is not False: os.sched_setaffinity(0, cores) while True: @@ -121,29 +165,18 @@ def _interrupt_handler(self, sig, frame): # Add one task. Higher priority => done first def put(self, task, priority=0): - # no task should be added after .done() was called - assert not self.finish - - # no task will be handled after self.abort so skip adding - if self.abort: - return - - if not self.num_threads: - self.total_tasks += 1 - heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks)) - return - with self.mutex: - # mark task as to be done and notify workers - self.missing += 1 - self.total_tasks += 1 - heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks)) - self.todo.notify() + # no task should be added after .done() was called + assert not self.finish + # no task will be handled after self.abort so skip adding + if not self.abort: + # mark task as to be done and notify workers + self.missing += 1 + self.total_tasks += 1 + heapq.heappush(self.tasks, QueueItem(task, priority, self.total_tasks)) + self.todo.notify() def join(self): - if not self.num_threads: - return - # wait for all current task to be completed with self.all_done: self.all_done.wait_for(lambda: self.missing == 0) @@ -152,24 +185,9 @@ def join(self): # Wait for all tasks to be done and stop all threads def done(self): - if not self.num_threads: - # no task will be handled after self.abort - while self.tasks and not self.abort: - task = heapq.heappop(self.tasks) - if self.pin: - cores = list(os.sched_getaffinity(0)) - os.sched_setaffinity(0, {cores[0]}) - self.f(task.task) - os.sched_setaffinity(0, cores) - else: - self.f(task.task) - self.finish = True - if not self.num_threads: - return - - # notify all workes with permission to leave main loop + # notify all workers with permission to leave main loop with self.todo: self.todo.notify_all() @@ -178,17 +196,14 @@ def done(self): t.join() # mutex is no longer needed - # report first error occured during execution + # report first error occurred during execution if self.first_error is not None: raise self.first_error # Discard all remaining work in the queue and stop all workers. # Call done() to join the threads. def stop(self): - self.abort = True - - if not self.num_threads: - return + super().stop() with self.mutex: # drop all items in the queue at once @@ -199,3 +214,16 @@ def stop(self): # notify .join() if queue runs empty if self.missing == 0: self.all_done.notify_all() + + +def create(f, pin=False, num_threads=True): + # f(task): the function to run on each queue item. + # num_threads: True: the configured default + # None/False/0: disable parallelization + num_threads = config.args.jobs if num_threads is True else num_threads + pin = pin and not util.is_windows() and not util.is_bsd() + + if num_threads: + return ParallelQueue(f, pin, num_threads) + else: + return SequentialQueue(f, pin) diff --git a/bin/problem.py b/bin/problem.py index 0e7937bd..7e1450fe 100644 --- a/bin/problem.py +++ b/bin/problem.py @@ -312,7 +312,7 @@ def build_program(p): p.build(localbar) localbar.done() - p = parallel.Parallel(build_program) + p = parallel.create(build_program) for pr in programs: p.put(pr) p.done() @@ -435,7 +435,7 @@ def build_program(p): build_ok &= p.build(localbar) localbar.done() - p = parallel.Parallel(build_program) + p = parallel.create(build_program) for pr in validators: p.put(pr) p.done() diff --git a/bin/run.py b/bin/run.py index bcac9889..dcea2788 100644 --- a/bin/run.py +++ b/bin/run.py @@ -533,7 +533,7 @@ def process_run(run, p): bar.count = None p.stop() - p = parallel.Parallel(lambda run: process_run(run, p), pin=True) + p = parallel.create(lambda run: process_run(run, p), pin=True) for run in runs: p.put(run)