Skip to content

Commit

Permalink
use boost::context::fiber (not boost::fiber)
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Aug 22, 2024
1 parent 9684f59 commit 771e5cc
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 131 deletions.
6 changes: 3 additions & 3 deletions include/base_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
#include "cpu.h"
#include "scheme.h"
#include "version.h"
#include "destroy_manager.h"

#include "glog/logging.h"

namespace yakushima {

class destroy_manager;

class base_node { // NOLINT
public:
class key_tuple {
Expand Down Expand Up @@ -74,7 +73,8 @@ class base_node { // NOLINT
* @param destroy_manager the manager of the destroy operation.
* @param destroy_barrier the barrier that control the job finish.
*/
virtual status destroy(destroy_manager&) = 0;
virtual status destroy(manager&, barrier&) = 0;
virtual status destroy(manager&) = 0;

/**
* @details display function for analysis and debug.
Expand Down
44 changes: 31 additions & 13 deletions include/border_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,28 +81,46 @@ class alignas(CACHE_LINE_SIZE) border_node final : public base_node { // NOLINT
/**
* @brief release all heap objects and clean up.
*/
status destroy(destroy_manager& manager) override {
status destroy(manager& m, barrier &p) override {
barrier b{m, p};
std::size_t cnk = get_permutation_cnk();
destroy_barrier barrier{};
for (std::size_t i = 0; i < cnk; ++i) {
// living link or value
std::size_t index = permutation_.get_index_of_rank(i);
if (lv_.at(index).get_next_layer() != nullptr) {
if (i < (cnk - 1)) {
barrier.begin();
manager.put([this, index, &manager, &barrier](){
lv_.at(index).destroy(manager);
barrier.end();
});
} else {
lv_.at(index).destroy(manager);
}
// has some layer, considering parallel
m.push(b, [this, index, &m, &b](){
// cleanup process
lv_.at(index).destroy(m, b);
});
} else {
// not some layer, not considering parallel
lv_.at(index).destroy(manager);
lv_.at(index).destroy(m, b);
}
}
barrier.wait();
b.wait();

return status::OK_DESTROY_BORDER;
}
status destroy(manager& m) override {
barrier b{m};
std::size_t cnk = get_permutation_cnk();
for (std::size_t i = 0; i < cnk; ++i) {
// living link or value
std::size_t index = permutation_.get_index_of_rank(i);
if (lv_.at(index).get_next_layer() != nullptr) {
// has some layer, considering parallel
m.push(b, [this, index, &m, &b](){
// cleanup process
lv_.at(index).destroy(m, b);
});
} else {
// not some layer, not considering parallel
lv_.at(index).destroy(m, b);
}
}
b.wait();

return status::OK_DESTROY_BORDER;
}

Expand Down
277 changes: 202 additions & 75 deletions include/destroy_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,104 +16,231 @@

namespace yakushima {

class destroy_barrier {
class alignas(CACHE_LINE_SIZE) manager {
public:
destroy_barrier() : destroy_barrier([](){}) {
}
explicit destroy_barrier(const std::function<void(void)> f) : cleanup_(f) {
}
void wait() {
std::unique_lock<std::mutex> lock(mtx_);
cond_.wait(lock, [this](){ return 0 == job_count_; });
}
void begin() {
job_count_++;
}
void end() {
job_count_--;
if (job_count_ == 0) {
cleanup_();
cond_.notify_all();
}
}
class worker;

private:
std::function<void(void)> cleanup_;
std::atomic_uint job_count_{};
mutable std::mutex mtx_{};
mutable boost::fibers::condition_variable_any cond_{};
};
class alignas(CACHE_LINE_SIZE) barrier {
public:
barrier(manager& m) : manager_(m), worker_(manager_.get_worker()), parent_(nullptr) {
}
barrier(manager& m, barrier& b) : manager_(m), worker_(manager_.get_worker()), parent_(&b) {
}

class destroy_manager {
public:
constexpr static size_t max_threads = 4;
explicit destroy_manager(bool inactive) : inactive_(inactive) {
if (!inactive_) {
num_threads_ = std::thread::hardware_concurrency() < max_threads ? std::thread::hardware_concurrency() : max_threads;
boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >(num_threads_);
if (num_threads_ > 1) {
threads_.reserve(num_threads_ - 1);
for (std::size_t i = 0; i < num_threads_ - 1; i++) {
threads_.emplace_back(std::thread(std::ref(*this)));
}
void fork() {
forks_++;
}
void join() {
joins_++;
if (parent_) {
worker_->notify();
} else {
inactive_ = true;
if (ready()) {
std::unique_lock<std::mutex> lock(mtx_);
cond_.notify_one();
}
}
}
}
destroy_manager() : destroy_manager(false) {
}
~destroy_manager() {
if (!inactive_) {
{
void wait() {
if (parent_) {
manager_.get_worker()->push(this);
base_fiber_ = std::move(worker_fiber_).resume();
if (forks_ != joins_) {
std::abort();
}
} else {
std::unique_lock<std::mutex> lock(mtx_);
finished_.store(true);
cond_.wait(lock, [this](){ return forks_ == joins_; });
lock.unlock();
cond_.notify_all();
}
for( auto&& e: threads_) {
e.join();
}
bool ready() {
return forks_.load() == joins_.load();
}
boost::context::fiber fiber() {
return std::move(base_fiber_);
}

private:
manager& manager_;
worker* worker_;
barrier* parent_;

std::atomic_uint forks_{0};
std::atomic_uint joins_{0};

// for the first thread
std::mutex mtx_{};
std::condition_variable cond_{};

boost::context::fiber base_fiber_{};
boost::context::fiber worker_fiber_{
[this](boost::context::fiber && f) {
base_fiber_ = std::move(f);
manager_.dispatch();
return std::move(f);
}
};
void fork(const std::function<void(void)> f) {
forks_++;
manager_.push(*this, f);
}
}

destroy_manager(destroy_manager const&) = delete;
destroy_manager(destroy_manager&&) = delete;
destroy_manager& operator = (destroy_manager const&) = delete;
destroy_manager& operator = (destroy_manager&&) = delete;
friend class manager;
friend class worker;
};

class alignas(CACHE_LINE_SIZE) worker {
public:
worker() = delete;
explicit worker(std::size_t id) : id_(id) {}

void put(const std::function<void(void)>& f) {
if (inactive_) {
f();
return;
void operator()() {
worker_fiber_ = std::move(worker_fiber_).resume();
}
boost::fibers::fiber([this, f](){
fiber_count_++;
f();
if ( 0 == --fiber_count_) {

private:
std::size_t id_;
std::deque<boost::context::fiber> fibers_{};
std::deque<barrier*> barriers_{};

// for termination
std::mutex mtx_{};
std::condition_variable cond_{};
bool terminate_{};
barrier* current_barrier_{};

boost::context::fiber base_fiber_{};
boost::context::fiber worker_fiber_{
[this](boost::context::fiber && f) {
base_fiber_ = std::move(f);
dispatch();
return std::move(f);
}
};

void push(boost::context::fiber&& f) {
std::unique_lock<std::mutex> lock(mtx_);
fibers_.emplace_back(std::move(f));
lock.unlock();
notify();
}
void push(barrier* b) {
std::unique_lock<std::mutex> lock(mtx_);
barriers_.emplace_front(b);
}
void dispatch() {
while (true) {
std::unique_lock<std::mutex> lock(mtx_);
if ( 0 == --fiber_count_) {
cond_.wait(lock, [this](){ return has_work() || terminate_; });
lock.unlock();
if (terminate_) {
base_fiber_ = std::move(base_fiber_).resume();
}
if (!barriers_.empty()) {
barrier* b{};
std::unique_lock<std::mutex> lock(mtx_);
for (auto it = barriers_.begin(); it != barriers_.end(); it++) {
if ((*it)->ready()) {
b = *it;
barriers_.erase(it);
break;
}
}
lock.unlock();
if (b) {
worker_fiber_ = std::move(b->fiber()).resume();
}
}
if (!fibers_.empty()) {
std::unique_lock<std::mutex> lock(mtx_);
auto f = std::move(fibers_.front());
fibers_.pop_front();
lock.unlock();
cond_.notify_all();
f = std::move(f).resume();
}
}
}).detach();
}
}
void notify() {
std::unique_lock<std::mutex> lock(mtx_);
cond_.notify_one();
}
bool terminate() {
if (has_work()) {
return false;
}
terminate_ = true;
notify();
return true;
}
bool has_work() {
if (!fibers_.empty()) {
return true;
}
for (auto it = barriers_.begin(); it != barriers_.end(); it++) {
if ((*it)->ready()) {
return true;
}
}
return false;
}

void operator()() {
boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >(num_threads_);
std::unique_lock<std::mutex> lock(mtx_);
cond_.wait(lock, [this](){ return 0 == fiber_count_.load() && finished_.load(); });
friend class manager;
friend class barrier;
};

// manager
manager() = delete;
~manager() {
for (auto&& w: workers_) {
while(!w->terminate());
}
for (auto&& t: threads_) {
t.join();
}
}
explicit manager(std::size_t size) : size_{size} {
for (std::size_t i = 0; i < size_; i++) {
workers_.emplace_back(std::make_unique<worker>(i));
threads_.emplace_back(std::thread(std::ref(*workers_.back())));
indexes_.insert(std::make_pair(threads_.back().get_id(), i));
}
}
void push(barrier& b, const std::function<void(void)> func) {
auto i = index_.load();
while(true) {
if (index_.compare_exchange_strong(i, (i < (size_ - 1)) ? (i + 1) : 0)) {
break;
}
}
b.fork();
auto& w = workers_.at(i);
w->push(boost::context::fiber{
[&b, func](boost::context::fiber && f) {
func();
b.join();
return std::move(f);
}}
);
}
void dispatch() {
workers_.at(indexes_.at(std::this_thread::get_id()))->dispatch();
}
worker* get_worker() {
if( auto it = indexes_.find(std::this_thread::get_id()); it != indexes_.end()) {
return workers_.at(it->second).get();
}
return nullptr;
}

private:
bool inactive_;
mutable std::mutex mtx_{};
mutable boost::fibers::condition_variable_any cond_{};
std::size_t size_;
std::vector<std::unique_ptr<worker>> workers_{};
std::vector<std::thread> threads_{};
std::atomic_bool finished_{};
std::atomic_uint fiber_count_{0};
std::size_t num_threads_{};
std::map<std::thread::id, std::size_t> indexes_{};
std::atomic_ulong index_{};
};

using barrier = manager::barrier;

} // namespace yakushima
Loading

0 comments on commit 771e5cc

Please sign in to comment.