diff --git a/bin/parallel.py b/bin/parallel.py index a00cfe40..d412752d 100644 --- a/bin/parallel.py +++ b/bin/parallel.py @@ -121,30 +121,24 @@ def _interrupt_handler(self, sig, frame): # Add one task. Higher priority => done first def put(self, task, priority=0): + # no task should be added after .done() was called + assert not self.finish + + # no task will be handled after self.abort so skip adding + if self.abort: + return + if not self.num_threads: - # no task should be added after .done() was called - assert not self.finish - # no task will be handled after self.abort - if not self.abort: - if self.pin: - cores = list(os.sched_getaffinity(0)) - os.sched_setaffinity(0, {cores[0]}) - self.f(task) - os.sched_setaffinity(0, cores) - else: - self.f(task) + self.total_tasks += 1 + heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks)) return with self.mutex: - # no task should be added after .done() was called - assert not self.finish - # no task will be handled after self.abort so skip adding - if not self.abort: - # mark task as to be done and notify workers - self.missing += 1 - self.total_tasks += 1 - heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks)) - self.todo.notify() + # mark task as to be done and notify workers + self.missing += 1 + self.total_tasks += 1 + heapq.heappush(self.tasks, ParallelItem(task, priority, self.total_tasks)) + self.todo.notify() def join(self): if not self.num_threads: @@ -158,6 +152,18 @@ def join(self): # Wait for all tasks to be done and stop all threads def done(self): + if not self.num_threads: + # no task will be handled after self.abort + while self.tasks and not self.abort: + task = heapq.heappop(self.tasks) + if self.pin: + cores = list(os.sched_getaffinity(0)) + os.sched_setaffinity(0, {cores[0]}) + self.f(task.task) + os.sched_setaffinity(0, cores) + else: + self.f(task.task) + self.finish = True if not self.num_threads: