Skip to content

Commit

Permalink
[parallel] If not num_threads, use queue instead of tail recursion
Browse files Browse the repository at this point in the history
Previously, running `bt fuzz --jobs 0` would crash with a stack overflow
after 244 iterations (±4 stack frames per iteration). This happened
because after every GeneratorTask, a new task was started by calling
Parallel.put, which in turn called its lambda, which called the new
GeneratorTask, which called Parallel.put again in finish_task, etc.

I've rewritten the task handling in the case of `not num_threads`
by moving it all to Parallel.done. Parallel.put now puts the task in
the same priority queue as in the case where we _do_ want parallelism
(num_threads > 0).
The fuzzer should now be able to run more than 244 iterations again 😄
  • Loading branch information
mpsijm committed Feb 3, 2024
1 parent d53e1b2 commit 1e31e39
Showing 1 changed file with 26 additions and 20 deletions.
46 changes: 26 additions & 20 deletions bin/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,30 +121,24 @@ def _interrupt_handler(self, sig, frame):

# Add one task. Higher priority => done first
def put(self, task, priority=0):
# no task should be added after .done() was called
assert not self.finish

# no task will be handled after self.abort so skip adding
if self.abort:
return

if not self.num_threads:
# no task should be added after .done() was called
assert not self.finish
# no task will be handled after self.abort
if not self.abort:
if self.pin:
cores = list(os.sched_getaffinity(0))
os.sched_setaffinity(0, {cores[0]})
self.f(task)
os.sched_setaffinity(0, cores)
else:
self.f(task)
self.total_tasks += 1
heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks))
return

with self.mutex:
# no task should be added after .done() was called
assert not self.finish
# no task will be handled after self.abort so skip adding
if not self.abort:
# mark task as to be done and notify workers
self.missing += 1
self.total_tasks += 1
heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks))
self.todo.notify()
# mark task as to be done and notify workers
self.missing += 1
self.total_tasks += 1
heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks))
self.todo.notify()

def join(self):
if not self.num_threads:
Expand All @@ -158,6 +152,18 @@ def join(self):

# Wait for all tasks to be done and stop all threads
def done(self):
if not self.num_threads:
# no task will be handled after self.abort
while self.tasks and not self.abort:
task = heapq.heappop(self.tasks)
if self.pin:
cores = list(os.sched_getaffinity(0))
os.sched_setaffinity(0, {cores[0]})
self.f(task.task)
os.sched_setaffinity(0, cores)
else:
self.f(task.task)

self.finish = True

if not self.num_threads:
Expand Down

0 comments on commit 1e31e39

Please sign in to comment.