Skip to content

Commit

Permalink
[parallel] Explicitly split up SequentialQueue and ParallelQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
mpsijm committed Feb 3, 2024
1 parent 76ffe54 commit 745667b
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 90 deletions.
2 changes: 1 addition & 1 deletion bin/fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def run(self):
self.start_time = time.monotonic()
self.iteration = 0
self.tasks = 0
self.queue = parallel.Parallel(lambda task: task.run(bar), pin=True)
self.queue = parallel.create(lambda task: task.run(bar), pin=True)

# pool of ids used for generators
self.tmp_ids = 2 * max(1, self.queue.num_threads) + 1
Expand Down
6 changes: 3 additions & 3 deletions bin/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ def build_program(p):
p.build(localbar)
localbar.done()

p = parallel.Parallel(build_program)
p = parallel.create(build_program)
for pr in programs:
p.put(pr)
p.done()
Expand Down Expand Up @@ -1636,7 +1636,7 @@ def count_dir(d):
# after to deduplicate them against generated testcases.

# 1
p = parallel.Parallel(lambda t: t.listed and t.generate(self.problem, self, bar))
p = parallel.create(lambda t: t.listed and t.generate(self.problem, self, bar))

def generate_dir(d):
p.join()
Expand All @@ -1646,7 +1646,7 @@ def generate_dir(d):
p.done()

# 2
p = parallel.Parallel(lambda t: not t.listed and t.generate(self.problem, self, bar))
p = parallel.create(lambda t: not t.listed and t.generate(self.problem, self, bar))

def generate_dir_unlisted(d):
p.join()
Expand Down
194 changes: 111 additions & 83 deletions bin/parallel.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#!/usr/bin/env python3
import threading
import signal
import heapq

import os
import signal
import threading

import config
import util


class ParallelItem:
class QueueItem:
def __init__(self, task, priority, id):
self.task = task
self.priority = priority
Expand All @@ -26,60 +25,105 @@ def __lt__(self, other):
return self.id < other.id


class Parallel:
# f(task): the function to run on each queue item.
# num_threads: True: the configured default
# None/False/0: disable parallelization
def __init__(self, f, num_threads=True, pin=False):
class AbstractQueue:
def __init__(self, f, pin):
self.f = f
self.num_threads = config.args.jobs if num_threads is True else num_threads
self.pin = pin and not util.is_windows() and not util.is_bsd()
self.pin = pin
self.num_threads = 1

# min heap
self.tasks: list[QueueItem] = []
self.total_tasks = 0
self.missing = 0

self.abort = False

# mutex to lock parallel access
self.mutex = threading.RLock()

def __enter__(self):
self.mutex.__enter__()

def __exit__(self, *args):
self.mutex.__exit__(*args)

# Add one task. Higher priority => done first
def put(self, task, priority=0):
raise "Abstract method"

# By default, do nothing on .join(). This is overridden in ParallelQueue.
def join(self):
return

def done(self):
raise "Abstract method"

def stop(self):
self.abort = True


class SequentialQueue(AbstractQueue):
def __init__(self, f, pin):
super().__init__(f, pin)

# Add one task. Higher priority => done first
def put(self, task, priority=0):
# no task will be handled after self.abort so skip adding
if self.abort:
return

self.total_tasks += 1
heapq.heappush(self.tasks, QueueItem(task, priority, self.total_tasks))

# Execute all tasks.
def done(self):
if self.pin:
cores = list(os.sched_getaffinity(0))
os.sched_setaffinity(0, {cores[0]})

# no task will be handled after self.abort
while self.tasks and not self.abort:
self.f(heapq.heappop(self.tasks).task)

if self.pin:
os.sched_setaffinity(0, cores)


class ParallelQueue(AbstractQueue):
def __init__(self, f, pin, num_threads):
super().__init__(f, pin)

assert num_threads and type(num_threads) is int
self.num_threads = num_threads

# condition used to notify worker if the queue has changed
self.todo = threading.Condition(self.mutex)
# condition used to notify join that the queue is empty
self.all_done = threading.Condition(self.mutex)

# only used in parallel mode
self.first_error = None
# min heap
self.tasks = []
self.total_tasks = 0
self.missing = 0

# also used if num_threads is false
self.abort = False
self.finish = False

if self.num_threads:
if self.pin:
# only use available cores and reserve one
cores = list(os.sched_getaffinity(0))
if self.num_threads > len(cores) - 1:
self.num_threads = len(cores) - 1

# sort cores by id. If num_threads << len(cores) this ensures that we
# use different physical cores instead of hyperthreads
cores.sort()
if self.pin:
# only use available cores and reserve one
cores = list(os.sched_getaffinity(0))
if self.num_threads > len(cores) - 1:
self.num_threads = len(cores) - 1

self.threads = []
for i in range(self.num_threads):
args = [{cores[i]}] if self.pin else []
t = threading.Thread(target=self._worker, args=args, daemon=True)
t.start()
self.threads.append(t)
# sort cores by id. If num_threads << len(cores) this ensures that we
# use different physical cores instead of hyperthreads
cores.sort()

signal.signal(signal.SIGINT, self._interrupt_handler)

def __enter__(self):
self.mutex.__enter__()
self.threads = []
for i in range(self.num_threads):
args = [{cores[i]}] if self.pin else []
t = threading.Thread(target=self._worker, args=args, daemon=True)
t.start()
self.threads.append(t)

def __exit__(self, *args):
self.mutex.__exit__(*args)
signal.signal(signal.SIGINT, self._interrupt_handler)

def _worker(self, cores=False):
def _worker(self, cores: bool | list[int] = False):
if cores is not False:
os.sched_setaffinity(0, cores)
while True:
Expand Down Expand Up @@ -121,29 +165,18 @@ def _interrupt_handler(self, sig, frame):

# Add one task. Higher priority => done first
def put(self, task, priority=0):
# no task should be added after .done() was called
assert not self.finish

# no task will be handled after self.abort so skip adding
if self.abort:
return

if not self.num_threads:
self.total_tasks += 1
heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks))
return

with self.mutex:
# mark task as to be done and notify workers
self.missing += 1
self.total_tasks += 1
heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks))
self.todo.notify()
# no task should be added after .done() was called
assert not self.finish
# no task will be handled after self.abort so skip adding
if not self.abort:
# mark task as to be done and notify workers
self.missing += 1
self.total_tasks += 1
heapq.heappush(self.tasks, QueueItem(task, priority, self.total_tasks))
self.todo.notify()

def join(self):
if not self.num_threads:
return

# wait for all current task to be completed
with self.all_done:
self.all_done.wait_for(lambda: self.missing == 0)
Expand All @@ -152,24 +185,9 @@ def join(self):

# Wait for all tasks to be done and stop all threads
def done(self):
if not self.num_threads:
# no task will be handled after self.abort
while self.tasks and not self.abort:
task = heapq.heappop(self.tasks)
if self.pin:
cores = list(os.sched_getaffinity(0))
os.sched_setaffinity(0, {cores[0]})
self.f(task.task)
os.sched_setaffinity(0, cores)
else:
self.f(task.task)

self.finish = True

if not self.num_threads:
return

# notify all workes with permission to leave main loop
# notify all workers with permission to leave main loop
with self.todo:
self.todo.notify_all()

Expand All @@ -178,17 +196,14 @@ def done(self):
t.join()

# mutex is no longer needed
# report first error occured during execution
# report first error occurred during execution
if self.first_error is not None:
raise self.first_error

# Discard all remaining work in the queue and stop all workers.
# Call done() to join the threads.
def stop(self):
self.abort = True

if not self.num_threads:
return
super().stop()

with self.mutex:
# drop all items in the queue at once
Expand All @@ -199,3 +214,16 @@ def stop(self):
# notify .join() if queue runs empty
if self.missing == 0:
self.all_done.notify_all()


def create(f, pin=False, num_threads=True):
# f(task): the function to run on each queue item.
# num_threads: True: the configured default
# None/False/0: disable parallelization
num_threads = config.args.jobs if num_threads is True else num_threads
pin = pin and not util.is_windows() and not util.is_bsd()

if num_threads:
return ParallelQueue(f, pin, num_threads)
else:
return SequentialQueue(f, pin)
4 changes: 2 additions & 2 deletions bin/problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def build_program(p):
p.build(localbar)
localbar.done()

p = parallel.Parallel(build_program)
p = parallel.create(build_program)
for pr in programs:
p.put(pr)
p.done()
Expand Down Expand Up @@ -435,7 +435,7 @@ def build_program(p):
build_ok &= p.build(localbar)
localbar.done()

p = parallel.Parallel(build_program)
p = parallel.create(build_program)
for pr in validators:
p.put(pr)
p.done()
Expand Down
2 changes: 1 addition & 1 deletion bin/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def process_run(run, p):
bar.count = None
p.stop()

p = parallel.Parallel(lambda run: process_run(run, p), pin=True)
p = parallel.create(lambda run: process_run(run, p), pin=True)

for run in runs:
p.put(run)
Expand Down

0 comments on commit 745667b

Please sign in to comment.