Skip to content

Commit

Permalink
feat: Restructure how we do sync / waits
Browse files Browse the repository at this point in the history
The big issue we have right now is that we're limited by the "slots" a counter has. Either users are stuck with the "base" 4 slots, or they have to dynamically allocate to get more. Which is also fragile for the user to know beforehand.

The core realization of this change is that any time a fiber is waiting (IE it needs to be in the counter's queue), it's "asleep", and we can guarantee the stack memory is valid. So instead of the *counter* allocating memory, we allocate memory on the stack for the wait, and use a linked list to store the "queue" of waiting fibers.

BREAKING CHANGE: This removes AtomicCounter, TaskCounter, and AtomicFlag. And replaces them with WaitGroup. WaitGroup functions very similarly to TaskCounter, but users no longer need to worry about how many "waiting fiber slots" they need. Fibtex is also restructured. It's no longer possible to configure the "lock behavior"
  • Loading branch information
RichieSams committed Oct 30, 2023
1 parent 806be82 commit b07d177
Show file tree
Hide file tree
Showing 23 changed files with 868 additions and 934 deletions.
44 changes: 19 additions & 25 deletions README.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ So we have to do it manually.

[source,cc]
----
#include "ftl/task_counter.h"
#include "ftl/task_scheduler.h"
#include "ftl/wait_group.h"
#include <assert.h>
#include <stdint.h>
Expand Down Expand Up @@ -94,14 +94,14 @@ int main() {
}
// Schedule the tasks
ftl::TaskCounter counter(&taskScheduler);
taskScheduler.AddTasks(numTasks, tasks, ftl::TaskPriority::Normal, &counter);
ftl::WaitGroup wg(&taskScheduler);
taskScheduler.AddTasks(numTasks, tasks, ftl::TaskPriority::Normal, &wg);
// FTL creates its own copies of the tasks, so we can safely delete the memory
delete[] tasks;
// Wait for the tasks to complete
taskScheduler.WaitForCounter(&counter);
wg.Wait();
// Add the results
uint64_t result = 0ULL;
Expand Down Expand Up @@ -156,15 +156,14 @@ Now, obviously, this is a contrived example. And as I said above, TBB has an awe
{blank}

## The Architecture from 10,000 ft
(Christian has some great illustrations on pages 8 - 17 of his slides that help explain the flow of fibers and tasks. I suggest looking at those while you're reading)

**Task Queue** - An 'ordinary' queue for holding the tasks that are waiting to be executed. In the current code, there is only one queue. However, a more sophisticated system might have multiple queues with varying priorities.
**Task Queue** - An 'ordinary' queue for holding the tasks that are waiting to be executed. In the current code, there is a "high priority" queue, and a "low priority" queue.

**Fiber Pool** - A pool of fibers used for switching to new tasks while the current task is waiting on a dependency. Fibers execute the tasks

**Worker Threads** - 1 per logical CPU core. These run the fibers.

**Waiting Tasks** - A list of the tasks that are waiting for a dependency to be fufilled. Dependencies are represented with atomic counters
**Waiting Tasks** - All the fibers / tasks that are waiting for a dependency to be fufilled. Dependencies are represented with WaitGroups


Tasks can be created on the stack. They're just a simple struct with a function pointer and an optional void *arg to be passed to the function:
Expand All @@ -189,49 +188,44 @@ You schedule a task for execution by calling TaskScheduler::AddTasks()
[source,cc]
----
ftl::TaskCounter counter(taskScheduler);
taskScheduler->AddTasks(10, tasks, ftl::TaskPriority::High, &counter);
ftl::WaitGroup wg(taskScheduler);
taskScheduler->AddTasks(10, tasks, ftl::TaskPriority::High, &wg);
----
The tasks get added to the queue, and other threads (or the current one, when it is finished with the current task) can start executing them when they get popped off the queue.
AddTasks can optionally take a pointer to a TaskCounter. If you do, the value of the counter will incremented by the number of tasks queued. Every time a task finishes, the counter will be atomically decremented. You can use this functionality to create depencendies between tasks. You do that with the function
AddTasks can optionally take a pointer to a WaitGroup. If you do, the value of the WaitGroup will incremented by the number of tasks queued. Every time a task finishes, the WaitGroup will be atomically decremented. You can use this functionality to create depencendies between tasks. You do that with the function
[source,cc]
----
void TaskScheduler::WaitForCounter(TaskCounter *counter);
void WaitGroup::Wait();
----
This is where fibers come into play. If the counter == 0, the function trivially returns. If not, the scheduler will move the current fiber into the **Waiting Tasks** list and grab a new fiber from the **Fiber Pool**. The new fiber pops a task from the **Task Queue** and starts execution with that.
This is where fibers come into play. If the value of WaitGroup == 0, the function trivially returns. If not, the scheduler will move the current fiber into a list of waiting fibers in the WaitGroup and grab a new fiber from the **Fiber Pool**. The new fiber pops a task from the **Task Queue** and starts execution with that.

But what about the task we stored in **Waiting Tasks**? When will it finish being executed?
But what about the task/fiber we stored in the WaitGroup? When will it finish being executed?

When the TaskCounter hit zero from decrements, we add all the waiting fibers to the **Ready Fibers** list in the TaskScheduler.
Before a fiber tries to pop a task off the **Task Queue**, it checks if there are any **Ready Fibers**. If so, it will return itself to the **Fiber Pool** and switch to the fiber that is ready. The ready fiber will continue execution right where it left off
When the WaitGroup value hits zero from decrements, we add all the waiting fibers back into the queue in the TaskScheduler. The next time a thread switches fibers (either because the current fiber finished, or because it called WaitGroup::Wait() ), the ready Task will be picked up and resumed where it left off.

{blank}

## Advanced Features

### FullAtomicCounter

TaskCounters are implemented with an internal atomic counter. However, access to this atomic counter is protected from the user for performance and algorithmic simplicity reasons.
That said, it can be useful to be able to use WaitForCounter on something non task-related. That's where FullAtomicCounter comes in.

FullAtomicCounter has member functions correlaries for all the "regular" atomic functions (load, store, fetch_add, etc).
Each time they're called, we check all waiting fibers if they're equal to their target value. In comparison, TaskCounter only checks when the final value is zero.
Therefore, FullAtomicCounter has more overhead than TaskCounter, but much greater flexibility

### Fibtex

Generally, you shouldn't use Mutexes in fiber code, for two reasons:

1. If you take a mutex, and call WaitForCounter(), when WaitForCounter resumes, your code could be on another thread. The mutex unlock will be undefined behavior, and probably lead to a deadlock
1. If you take a mutex, and call WaitGroup::Wait(), when Wait() resumes, your code could be on another thread. The mutex unlock will be undefined behavior, and probably lead to a deadlock
2. Mutex contention will block the worker threads. And since we generally don't oversubscribe the threads to the cores, this leaves cores idle.
To solve this, we created Fibtex. It implements the std lockable interface, so you can use it with all your favorite wrappers (std::lock_guard, std::unique_lock, etc.)
It's implemented behind the scenes with a TaskCounter, so if a Fibtex is locked, a waiter can switch to another task and do valuable work

### Thread Pinning

When a fiber is resumed after a WaitGroup::Wait() or a Fibtex::lock(), there is no guarantee that it will resume on the same thread that it was running on when it was suspended. For most code, this is fine. However, certain libraries have strong assumptions. For example, in DirectX, you must do the final frame submit from the same thread that created the swap chain. Thus, some code will need to guarantee that fibers are resumed on the same thread where they were running when suspended. To do this, you can use the argument `pinToCurrentThread`. When set to `true`, the scheduler will guarantee that the resumed fiber will run on the same thread. This argument is available for WaitGroup::Wait() and Fibtext::lock()


{blank}

## Dependencies
Expand Down
7 changes: 3 additions & 4 deletions benchmarks/empty/empty.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
* limitations under the License.
*/

#include "ftl/task_counter.h"
#include "ftl/task_scheduler.h"
#include "ftl/wait_group.h"

#include "catch2/benchmark/catch_benchmark.hpp"
#include "catch2/catch_test_macros.hpp"
Expand All @@ -49,10 +49,9 @@ TEST_CASE("Empty benchmark") {

meter.measure([&taskScheduler, tasks] {
for (unsigned i = 0; i < kNumIterations; ++i) {
ftl::TaskCounter counter(&taskScheduler);
ftl::WaitGroup wg(&taskScheduler);
taskScheduler.AddTasks(kNumTasks, tasks, ftl::TaskPriority::Normal);

taskScheduler.WaitForCounter(&counter);
wg.Wait();
}
});

Expand Down
15 changes: 7 additions & 8 deletions benchmarks/producer_consumer/producer_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
* limitations under the License.
*/

#include "ftl/task_counter.h"
#include "ftl/task_scheduler.h"
#include "ftl/wait_group.h"

#include "catch2/benchmark/catch_benchmark.hpp"
#include "catch2/catch_test_macros.hpp"
Expand All @@ -43,11 +43,11 @@ void Producer(ftl::TaskScheduler *taskScheduler, void *arg) {
tasks[i] = { Consumer, arg };
}

ftl::TaskCounter counter(taskScheduler);
taskScheduler->AddTasks(kNumConsumerTasks, tasks, ftl::TaskPriority::Normal, &counter);
ftl::WaitGroup wg(taskScheduler);
taskScheduler->AddTasks(kNumConsumerTasks, tasks, ftl::TaskPriority::Normal, &wg);
delete[] tasks;

taskScheduler->WaitForCounter(&counter);
wg.Wait();
}

TEST_CASE("ProducerConsumer benchmark") {
Expand All @@ -65,10 +65,9 @@ TEST_CASE("ProducerConsumer benchmark") {

meter.measure([&taskScheduler, tasks] {
for (unsigned i = 0; i < kNumIterations; ++i) {
ftl::TaskCounter counter(&taskScheduler);
taskScheduler.AddTasks(kNumProducerTasks, tasks, ftl::TaskPriority::Normal);

taskScheduler.WaitForCounter(&counter);
ftl::WaitGroup wg(&taskScheduler);
taskScheduler.AddTasks(kNumProducerTasks, tasks, ftl::TaskPriority::Normal, &wg);
wg.Wait();
}
});

Expand Down
8 changes: 4 additions & 4 deletions examples/triangle_num.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ So we have to do it manually.
!! If you update this example, make sure it matches the code in `README.asciidoc` !!
*/

#include "ftl/task_counter.h"
#include "ftl/task_scheduler.h"
#include "ftl/wait_group.h"

#include <assert.h>
#include <stdint.h>
Expand Down Expand Up @@ -74,14 +74,14 @@ int main() {
}

// Schedule the tasks
ftl::TaskCounter counter(&taskScheduler);
taskScheduler.AddTasks(numTasks, tasks, ftl::TaskPriority::Normal, &counter);
ftl::WaitGroup wg(&taskScheduler);
taskScheduler.AddTasks(numTasks, tasks, ftl::TaskPriority::Normal, &wg);

// FTL creates its own copies of the tasks, so we can safely delete the memory
delete[] tasks;

// Wait for the tasks to complete
taskScheduler.WaitForCounter(&counter);
wg.Wait();

// Add the results
uint64_t result = 0ULL;
Expand Down
191 changes: 0 additions & 191 deletions include/ftl/atomic_counter.h

This file was deleted.

Loading

0 comments on commit b07d177

Please sign in to comment.