From 771e5ccec18674eb6b8b0d726b626436db69745a Mon Sep 17 00:00:00 2001 From: t-horikawa Date: Thu, 22 Aug 2024 15:22:34 +0900 Subject: [PATCH] use boost::context::fiber (not boost::fiber) --- include/base_node.h | 6 +- include/border_node.h | 44 ++++-- include/destroy_manager.h | 277 ++++++++++++++++++++++++++---------- include/interface_destroy.h | 22 +-- include/interface_put.h | 12 +- include/interior_node.h | 30 ++-- include/link_or_value.h | 13 +- include/storage_impl.h | 12 +- 8 files changed, 285 insertions(+), 131 deletions(-) diff --git a/include/base_node.h b/include/base_node.h index 2b4b4bc..0ed2a26 100644 --- a/include/base_node.h +++ b/include/base_node.h @@ -12,13 +12,12 @@ #include "cpu.h" #include "scheme.h" #include "version.h" +#include "destroy_manager.h" #include "glog/logging.h" namespace yakushima { -class destroy_manager; - class base_node { // NOLINT public: class key_tuple { @@ -74,7 +73,8 @@ class base_node { // NOLINT * @param destroy_manager the manager of the destroy operation. * @param destroy_barrier the barrier that control the job finish. */ - virtual status destroy(destroy_manager&) = 0; + virtual status destroy(manager&, barrier&) = 0; + virtual status destroy(manager&) = 0; /** * @details display function for analysis and debug. diff --git a/include/border_node.h b/include/border_node.h index 12d954f..dd9fc27 100644 --- a/include/border_node.h +++ b/include/border_node.h @@ -81,28 +81,46 @@ class alignas(CACHE_LINE_SIZE) border_node final : public base_node { // NOLINT /** * @brief release all heap objects and clean up. */ - status destroy(destroy_manager& manager) override { + status destroy(manager& m, barrier &p) override { + barrier b{m, p}; std::size_t cnk = get_permutation_cnk(); - destroy_barrier barrier{}; for (std::size_t i = 0; i < cnk; ++i) { // living link or value std::size_t index = permutation_.get_index_of_rank(i); if (lv_.at(index).get_next_layer() != nullptr) { - if (i < (cnk - 1)) { - barrier.begin(); - manager.put([this, index, &manager, &barrier](){ - lv_.at(index).destroy(manager); - barrier.end(); - }); - } else { - lv_.at(index).destroy(manager); - } + // has some layer, considering parallel + m.push(b, [this, index, &m, &b](){ + // cleanup process + lv_.at(index).destroy(m, b); + }); } else { // not some layer, not considering parallel - lv_.at(index).destroy(manager); + lv_.at(index).destroy(m, b); } } - barrier.wait(); + b.wait(); + + return status::OK_DESTROY_BORDER; + } + status destroy(manager& m) override { + barrier b{m}; + std::size_t cnk = get_permutation_cnk(); + for (std::size_t i = 0; i < cnk; ++i) { + // living link or value + std::size_t index = permutation_.get_index_of_rank(i); + if (lv_.at(index).get_next_layer() != nullptr) { + // has some layer, considering parallel + m.push(b, [this, index, &m, &b](){ + // cleanup process + lv_.at(index).destroy(m, b); + }); + } else { + // not some layer, not considering parallel + lv_.at(index).destroy(m, b); + } + } + b.wait(); + return status::OK_DESTROY_BORDER; } diff --git a/include/destroy_manager.h b/include/destroy_manager.h index a00a3ac..8e84193 100644 --- a/include/destroy_manager.h +++ b/include/destroy_manager.h @@ -16,104 +16,231 @@ namespace yakushima { -class destroy_barrier { +class alignas(CACHE_LINE_SIZE) manager { public: - destroy_barrier() : destroy_barrier([](){}) { - } - explicit destroy_barrier(const std::function f) : cleanup_(f) { - } - void wait() { - std::unique_lock lock(mtx_); - cond_.wait(lock, [this](){ return 0 == job_count_; }); - } - void begin() { - job_count_++; - } - void end() { - job_count_--; - if (job_count_ == 0) { - cleanup_(); - cond_.notify_all(); - } - } + class worker; -private: - std::function cleanup_; - std::atomic_uint job_count_{}; - mutable std::mutex mtx_{}; - mutable boost::fibers::condition_variable_any cond_{}; -}; + class alignas(CACHE_LINE_SIZE) barrier { + public: + barrier(manager& m) : manager_(m), worker_(manager_.get_worker()), parent_(nullptr) { + } + barrier(manager& m, barrier& b) : manager_(m), worker_(manager_.get_worker()), parent_(&b) { + } -class destroy_manager { -public: - constexpr static size_t max_threads = 4; - explicit destroy_manager(bool inactive) : inactive_(inactive) { - if (!inactive_) { - num_threads_ = std::thread::hardware_concurrency() < max_threads ? std::thread::hardware_concurrency() : max_threads; - boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >(num_threads_); - if (num_threads_ > 1) { - threads_.reserve(num_threads_ - 1); - for (std::size_t i = 0; i < num_threads_ - 1; i++) { - threads_.emplace_back(std::thread(std::ref(*this))); - } + void fork() { + forks_++; + } + void join() { + joins_++; + if (parent_) { + worker_->notify(); } else { - inactive_ = true; + if (ready()) { + std::unique_lock lock(mtx_); + cond_.notify_one(); + } } } - } - destroy_manager() : destroy_manager(false) { - } - ~destroy_manager() { - if (!inactive_) { - { + void wait() { + if (parent_) { + manager_.get_worker()->push(this); + base_fiber_ = std::move(worker_fiber_).resume(); + if (forks_ != joins_) { + std::abort(); + } + } else { std::unique_lock lock(mtx_); - finished_.store(true); + cond_.wait(lock, [this](){ return forks_ == joins_; }); lock.unlock(); - cond_.notify_all(); } - for( auto&& e: threads_) { - e.join(); + } + bool ready() { + return forks_.load() == joins_.load(); + } + boost::context::fiber fiber() { + return std::move(base_fiber_); + } + + private: + manager& manager_; + worker* worker_; + barrier* parent_; + + std::atomic_uint forks_{0}; + std::atomic_uint joins_{0}; + + // for the first thread + std::mutex mtx_{}; + std::condition_variable cond_{}; + + boost::context::fiber base_fiber_{}; + boost::context::fiber worker_fiber_{ + [this](boost::context::fiber && f) { + base_fiber_ = std::move(f); + manager_.dispatch(); + return std::move(f); } + }; + void fork(const std::function f) { + forks_++; + manager_.push(*this, f); } - } - destroy_manager(destroy_manager const&) = delete; - destroy_manager(destroy_manager&&) = delete; - destroy_manager& operator = (destroy_manager const&) = delete; - destroy_manager& operator = (destroy_manager&&) = delete; + friend class manager; + friend class worker; + }; + + class alignas(CACHE_LINE_SIZE) worker { + public: + worker() = delete; + explicit worker(std::size_t id) : id_(id) {} - void put(const std::function& f) { - if (inactive_) { - f(); - return; + void operator()() { + worker_fiber_ = std::move(worker_fiber_).resume(); } - boost::fibers::fiber([this, f](){ - fiber_count_++; - f(); - if ( 0 == --fiber_count_) { + + private: + std::size_t id_; + std::deque fibers_{}; + std::deque barriers_{}; + + // for termination + std::mutex mtx_{}; + std::condition_variable cond_{}; + bool terminate_{}; + barrier* current_barrier_{}; + + boost::context::fiber base_fiber_{}; + boost::context::fiber worker_fiber_{ + [this](boost::context::fiber && f) { + base_fiber_ = std::move(f); + dispatch(); + return std::move(f); + } + }; + + void push(boost::context::fiber&& f) { + std::unique_lock lock(mtx_); + fibers_.emplace_back(std::move(f)); + lock.unlock(); + notify(); + } + void push(barrier* b) { + std::unique_lock lock(mtx_); + barriers_.emplace_front(b); + } + void dispatch() { + while (true) { std::unique_lock lock(mtx_); - if ( 0 == --fiber_count_) { + cond_.wait(lock, [this](){ return has_work() || terminate_; }); + lock.unlock(); + if (terminate_) { + base_fiber_ = std::move(base_fiber_).resume(); + } + if (!barriers_.empty()) { + barrier* b{}; + std::unique_lock lock(mtx_); + for (auto it = barriers_.begin(); it != barriers_.end(); it++) { + if ((*it)->ready()) { + b = *it; + barriers_.erase(it); + break; + } + } + lock.unlock(); + if (b) { + worker_fiber_ = std::move(b->fiber()).resume(); + } + } + if (!fibers_.empty()) { + std::unique_lock lock(mtx_); + auto f = std::move(fibers_.front()); + fibers_.pop_front(); lock.unlock(); - cond_.notify_all(); + f = std::move(f).resume(); } } - }).detach(); - } + } + void notify() { + std::unique_lock lock(mtx_); + cond_.notify_one(); + } + bool terminate() { + if (has_work()) { + return false; + } + terminate_ = true; + notify(); + return true; + } + bool has_work() { + if (!fibers_.empty()) { + return true; + } + for (auto it = barriers_.begin(); it != barriers_.end(); it++) { + if ((*it)->ready()) { + return true; + } + } + return false; + } - void operator()() { - boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >(num_threads_); - std::unique_lock lock(mtx_); - cond_.wait(lock, [this](){ return 0 == fiber_count_.load() && finished_.load(); }); + friend class manager; + friend class barrier; + }; + +// manager + manager() = delete; + ~manager() { + for (auto&& w: workers_) { + while(!w->terminate()); + } + for (auto&& t: threads_) { + t.join(); + } + } + explicit manager(std::size_t size) : size_{size} { + for (std::size_t i = 0; i < size_; i++) { + workers_.emplace_back(std::make_unique(i)); + threads_.emplace_back(std::thread(std::ref(*workers_.back()))); + indexes_.insert(std::make_pair(threads_.back().get_id(), i)); + } + } + void push(barrier& b, const std::function func) { + auto i = index_.load(); + while(true) { + if (index_.compare_exchange_strong(i, (i < (size_ - 1)) ? (i + 1) : 0)) { + break; + } + } + b.fork(); + auto& w = workers_.at(i); + w->push(boost::context::fiber{ + [&b, func](boost::context::fiber && f) { + func(); + b.join(); + return std::move(f); + }} + ); + } + void dispatch() { + workers_.at(indexes_.at(std::this_thread::get_id()))->dispatch(); + } + worker* get_worker() { + if( auto it = indexes_.find(std::this_thread::get_id()); it != indexes_.end()) { + return workers_.at(it->second).get(); + } + return nullptr; } private: - bool inactive_; - mutable std::mutex mtx_{}; - mutable boost::fibers::condition_variable_any cond_{}; + std::size_t size_; + std::vector> workers_{}; std::vector threads_{}; - std::atomic_bool finished_{}; - std::atomic_uint fiber_count_{0}; - std::size_t num_threads_{}; + std::map indexes_{}; + std::atomic_ulong index_{}; }; +using barrier = manager::barrier; + } // namespace yakushima diff --git a/include/interface_destroy.h b/include/interface_destroy.h index 132ad3d..e83b481 100644 --- a/include/interface_destroy.h +++ b/include/interface_destroy.h @@ -17,35 +17,23 @@ namespace yakushima { [[maybe_unused]] static status destroy() { - destroy_manager manager{}; + manager m{std::thread::hardware_concurrency()}; if (storage::get_storages()->empty()) { return status::OK_ROOT_IS_NULL; } std::vector> tuple_list; scan(storage::get_storages(), "", scan_endpoint::INF, "", scan_endpoint::INF, tuple_list, nullptr, 0); - destroy_barrier barrier{}; for (auto&& elem : tuple_list) { base_node* root = std::get<1>(elem)->load_root_ptr(); if (root == nullptr) { continue; } - if (elem != tuple_list.back()) { - barrier.begin(); - manager.put([root, elem, &manager, &barrier](){ - root->destroy(manager); - delete root; // NOLINT - std::get<1>(elem)->store_root_ptr(nullptr); - barrier.end(); - }); - } else { - root->destroy(manager); - delete root; // NOLINT - std::get<1>(elem)->store_root_ptr(nullptr); - } + root->destroy(m); + delete root; // NOLINT + std::get<1>(elem)->store_root_ptr(nullptr); } - barrier.wait(); base_node* tables_root = storage::get_storages()->load_root_ptr(); if (tables_root != nullptr) { - tables_root->destroy(manager); + tables_root->destroy(m); delete tables_root; // NOLINT storage::get_storages()->store_root_ptr(nullptr); } diff --git a/include/interface_put.h b/include/interface_put.h index b4b6619..52cd8f2 100644 --- a/include/interface_put.h +++ b/include/interface_put.h @@ -53,10 +53,14 @@ put([[maybe_unused]] Token token, tree_instance* ti, std::string_view key_view, base_node* desired{dynamic_cast(new_border)}; if (ti->cas_root_ptr(&expected, &desired)) { return status::OK; } if (expected != nullptr) { - destroy_manager manager{true}; // inactive destroy_manager - // root is not nullptr; - new_border->destroy(manager); - delete new_border; // NOLINT + manager m{1}; + barrier b{m}; + m.push(b, [new_border, &m, &b](){ + // root is not nullptr; + new_border->destroy(m, b); + delete new_border; // NOLINT + }); + b.wait(); break; } // root is nullptr. diff --git a/include/interior_node.h b/include/interior_node.h index 2bcd376..8713404 100644 --- a/include/interior_node.h +++ b/include/interior_node.h @@ -113,22 +113,26 @@ class alignas(CACHE_LINE_SIZE) interior_node final // NOLINT * @brief release all heap objects and clean up. * @pre This function is called by single thread. */ - status destroy(destroy_manager& manager) override { - destroy_barrier barrier{}; + status destroy(manager& m, barrier& p) override { + barrier b{m, p}; for (auto i = 0; i < n_keys_ + 1; ++i) { - if (i < n_keys_) { - barrier.begin(); - manager.put([this, i, &manager, &barrier](){ - get_child_at(i)->destroy(manager); - delete get_child_at(i); // NOLINT - barrier.end(); - }); - } else { - get_child_at(i)->destroy(manager); + m.push(b, [this, i, &m, &b](){ + get_child_at(i)->destroy(m, b); delete get_child_at(i); // NOLINT - } + }); + } + b.wait(); + return status::OK_DESTROY_INTERIOR; + } + status destroy(manager& m) override { + barrier b{m}; + for (auto i = 0; i < n_keys_ + 1; ++i) { + m.push(b, [this, i, &m, &b](){ + get_child_at(i)->destroy(m, b); + delete get_child_at(i); // NOLINT + }); } - barrier.wait(); + b.wait(); return status::OK_DESTROY_INTERIOR; } diff --git a/include/link_or_value.h b/include/link_or_value.h index 30dbfea..e68cc81 100644 --- a/include/link_or_value.h +++ b/include/link_or_value.h @@ -34,9 +34,18 @@ class link_or_value { /** * @details release heap objects. */ - void destroy(destroy_manager& manager) { + void destroy(manager& m, barrier &p) { if (auto* child = get_next_layer(); child != nullptr) { - child->destroy(manager); + child->destroy(m, p); + delete child; // NOLINT + } else if (auto* v = get_value(); v != nullptr) { + if (value::need_delete(v)) { value::delete_value(v); } + } + init_lv(); + } + void destroy(manager& m) { + if (auto* child = get_next_layer(); child != nullptr) { + child->destroy(m); delete child; // NOLINT } else if (auto* v = get_value(); v != nullptr) { if (value::need_delete(v)) { value::delete_value(v); } diff --git a/include/storage_impl.h b/include/storage_impl.h index f4a5318..b294a57 100644 --- a/include/storage_impl.h +++ b/include/storage_impl.h @@ -63,10 +63,14 @@ status storage::delete_storage(std::string_view storage_name) { // NOLINT if (ret_st == status::OK) { base_node* tables_root = ret.first->load_root_ptr(); if (tables_root != nullptr) { - destroy_manager manager{true}; // inactive destroy_manager - tables_root->destroy(manager); - delete tables_root; // NOLINT - ret.first->store_root_ptr(nullptr); + manager m{1}; // inactive destroy_manager + barrier b{m}; + m.push(b, [&ret, tables_root, &m, &b](){ + tables_root->destroy(m, b); + delete tables_root; // NOLINT + ret.first->store_root_ptr(nullptr); + }); + b.wait(); } leave(token); return status::OK;