Skip to content

Commit

Permalink
revise the use of destroy threads
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Aug 15, 2024
1 parent 2af8828 commit e9ddb7f
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 72 deletions.
7 changes: 6 additions & 1 deletion include/base_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

namespace yakushima {

class destroy_manager;
class destroy_barrier;

class base_node { // NOLINT
public:
class key_tuple {
Expand Down Expand Up @@ -69,8 +72,10 @@ class base_node { // NOLINT
/**
* A virtual function is defined because It wants to distinguish the children class of
* the contents by using polymorphism. So this function is pure virtual function.
* @param destroy_manager the manager of the destroy operation.
* @param destroy_barrier the barrier that control the job finish.
*/
virtual status destroy() = 0;
virtual status destroy(destroy_manager&, destroy_barrier&) = 0;

/**
* @details display function for analysis and debug.
Expand Down
27 changes: 7 additions & 20 deletions include/border_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,18 @@ class alignas(CACHE_LINE_SIZE) border_node final : public base_node { // NOLINT
/**
* @brief release all heap objects and clean up.
*/
status destroy() override {
status destroy(destroy_manager& manager, destroy_barrier& barrier) override {
std::size_t cnk = get_permutation_cnk();
std::vector<std::thread> th_vc;
for (std::size_t i = 0; i < cnk; ++i) {
barrier.begin();
// living link or value
std::size_t index = permutation_.get_index_of_rank(i);
// cleanup process
auto process = [this](std::size_t i) { lv_.at(i).destroy(); };
if (lv_.at(index).get_next_layer() != nullptr) {
// has some layer, considering parallel
if (destroy_manager::check_room()) {
th_vc.emplace_back(process, index);
} else {
process(index);
}
} else {
// not some layer, not considering parallel
process(index);
}
}
for (auto&& th : th_vc) {
th.join();
destroy_manager::return_room();
auto& target = lv_.at(index);
manager.put([this, &target, &manager, &barrier](){

Check failure on line 91 in include/border_node.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-unused-lambda-capture

lambda capture 'this' is not used
target.destroy(manager, barrier);
barrier.end();
});
}

return status::OK_DESTROY_BORDER;
}

Expand Down
116 changes: 101 additions & 15 deletions include/destroy_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,121 @@

#include <atomic>
#include <thread>
#include <vector>
#include <deque>
#include <functional>
#include <mutex>
#include <condition_variable>

namespace yakushima {

class destroy_barrier {
public:
destroy_barrier() {
cleanups_.reserve(10000);
}
void put(const std::function<void(void)> f) {

Check warning on line 21 in include/destroy_manager.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

performance-unnecessary-value-param

the const qualified parameter 'f' is copied for each invocation; consider making it a reference
std::unique_lock<std::mutex> lock(mtx_put_);
cleanups_.emplace_back(f);
}
void wait() {
std::unique_lock<std::mutex> lock(mtx_);
while(true) {
if (job_count_ == 0) {
return;
}
cond_.wait(lock, [this](){ return job_count_ == 0; });
}
}
void begin() {
job_count_++;
}
void end() {
job_count_--;
if (job_count_ == 0) {
for(auto&& f: cleanups_) {
f();
}
cond_.notify_all();
}
}

private:
std::atomic_uint job_count_{};
std::vector<std::function<void(void)>> cleanups_{};
mutable std::mutex mtx_{};
mutable std::condition_variable cond_{};
mutable std::mutex mtx_put_{};
};

class destroy_manager {
public:
static bool check_room() {
std::size_t expected{
destroy_threads_num_.load(std::memory_order_acquire)};
for (;;) {
if (expected >= std::thread::hardware_concurrency()) {
return false;
explicit destroy_manager(bool inactive) : inactive_(inactive) {
if (!inactive_) {
std::size_t max = std::thread::hardware_concurrency();
if (max > 1) {
threads_.reserve(max);
for (std::size_t i = 0; i < max; i++) {
threads_.emplace_back(std::thread(std::ref(*this)));
}
} else {
inactive_ = true;
}
}
}
destroy_manager() : destroy_manager(false) {
}
~destroy_manager() {
if (!inactive_) {
{
std::unique_lock<std::mutex> lock(mtx_);
finished_.store(true);
cond_.notify_all();
}
std::size_t desired = expected + 1;
if (destroy_threads_num_.compare_exchange_weak(
expected, desired, std::memory_order_release,
std::memory_order_acquire)) {
return true;
for( auto&& e: threads_) {
e.join();
}
}
}

static std::size_t get_threads_num() {
return destroy_threads_num_.load(std::memory_order_acquire);
destroy_manager(destroy_manager const&) = delete;
destroy_manager(destroy_manager&&) = delete;
destroy_manager& operator = (destroy_manager const&) = delete;
destroy_manager& operator = (destroy_manager&&) = delete;

void put(const std::function<void(void)>& f) {
if (inactive_) {
f();
return;
}
std::unique_lock<std::mutex> lock(mtx_);
works_.emplace_back(f);
cond_.notify_one();
}

static void return_room() { destroy_threads_num_ -= 1; }
void operator()() {
while (true) {
std::unique_lock<std::mutex> lock(mtx_);
cond_.wait(lock, [this](){ return !works_.empty() || finished_.load(); });
if (works_.empty() && finished_.load()) {
return;
}
if (!works_.empty()) {
auto f = works_.front();
works_.pop_front();
lock.unlock();
f();
}
}
}

private:
static inline std::atomic<std::size_t> destroy_threads_num_{0}; // NOLINT
bool inactive_;
mutable std::mutex mtx_{};
mutable std::condition_variable cond_{};
std::vector<std::thread> threads_{};
std::deque<std::function<void(void)>> works_{};
std::atomic_bool finished_{};
};

} // namespace yakushima
33 changes: 27 additions & 6 deletions include/interface_destroy.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,45 @@
namespace yakushima {

[[maybe_unused]] static status destroy() {
destroy_manager manager{};
if (storage::get_storages()->empty()) { return status::OK_ROOT_IS_NULL; }
std::vector<std::tuple<std::string, tree_instance*, std::size_t>>
tuple_list;
scan(storage::get_storages(), "", scan_endpoint::INF, "",
scan_endpoint::INF, tuple_list, nullptr, 0);
std::vector<std::unique_ptr<destroy_barrier>> barriers{};
for (auto&& elem : tuple_list) {
base_node* root = std::get<1>(elem)->load_root_ptr();
if (root == nullptr) { continue; }
root->destroy();
delete root; // NOLINT
std::get<1>(elem)->store_root_ptr(nullptr);
barriers.emplace_back(std::make_unique<destroy_barrier>());
destroy_barrier& barrier = *barriers.back();
barrier.begin();
barrier.put([root, elem](){
delete root; // NOLINT
std::get<1>(elem)->store_root_ptr(nullptr);
});
manager.put([root, elem, &manager, &barrier](){
root->destroy(manager, barrier);
barrier.end();
});
}
for(auto&& e: barriers) {
e->wait();
}

base_node* tables_root = storage::get_storages()->load_root_ptr();
if (tables_root != nullptr) {
tables_root->destroy();
delete tables_root; // NOLINT
storage::get_storages()->store_root_ptr(nullptr);
destroy_barrier barrier{};
barrier.begin();
barrier.put([tables_root](){
delete tables_root; // NOLINT
storage::get_storages()->store_root_ptr(nullptr);
});
manager.put([tables_root, &manager, &barrier](){
tables_root->destroy(manager, barrier);
barrier.end();
});
barrier.wait();
}
return status::OK_DESTROY_ALL;
}
Expand Down
4 changes: 3 additions & 1 deletion include/interface_put.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ put([[maybe_unused]] Token token, tree_instance* ti, std::string_view key_view,
base_node* desired{dynamic_cast<base_node*>(new_border)};
if (ti->cas_root_ptr(&expected, &desired)) { return status::OK; }
if (expected != nullptr) {
destroy_manager manager{true}; // inactive destroy_manager
destroy_barrier barrier{};
// root is not nullptr;
new_border->destroy();
new_border->destroy(manager, barrier);
delete new_border; // NOLINT
break;
}
Expand Down
28 changes: 11 additions & 17 deletions include/interior_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace yakushima {
class alignas(CACHE_LINE_SIZE) interior_node final // NOLINT
: public base_node { // NOLINT
public:

/**
* @details The structure is "ptr, key, ptr, key, ..., ptr".
* So the child_length is key_slice_length plus 1.
Expand Down Expand Up @@ -112,25 +113,18 @@ class alignas(CACHE_LINE_SIZE) interior_node final // NOLINT
* @brief release all heap objects and clean up.
* @pre This function is called by single thread.
*/
status destroy() override {
std::vector<std::thread> th_vc;
th_vc.reserve(n_keys_ + 1);
status destroy(destroy_manager& manager, destroy_barrier& barrier) override {
for (auto i = 0; i < n_keys_ + 1; ++i) {
auto process = [this, i] {
get_child_at(i)->destroy();
delete get_child_at(i); // NOLINT
};
if (destroy_manager::check_room()) {
th_vc.emplace_back(process);
} else {
process();
}
auto* target_node = get_child_at(i);
barrier.begin();
barrier.put([target_node](){
delete target_node; // NOLINT
});
manager.put([this, target_node, &manager, &barrier](){

Check failure on line 123 in include/interior_node.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-unused-lambda-capture

lambda capture 'this' is not used
target_node->destroy(manager, barrier);
barrier.end();
});
}
for (auto&& th : th_vc) {
th.join();
destroy_manager::return_room();
}

return status::OK_DESTROY_INTERIOR;
}

Expand Down
31 changes: 23 additions & 8 deletions include/link_or_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "atomic_wrapper.h"
#include "base_node.h"
#include "destroy_manager.h"
#include "cpu.h"
#include "log.h"
#include "value.h"
Expand Down Expand Up @@ -34,14 +35,28 @@ class link_or_value {
/**
* @details release heap objects.
*/
void destroy() {
if (auto* child = get_next_layer(); child != nullptr) {
child->destroy();
delete child; // NOLINT
} else if (auto* v = get_value(); v != nullptr) {
if (value::need_delete(v)) { value::delete_value(v); }
}
init_lv();
void destroy(destroy_manager& manager, destroy_barrier& barrier) {
barrier.begin();
manager.put([this, &manager, &barrier](){
if (auto* child = get_next_layer(); child != nullptr) {
child->destroy(manager, barrier);
barrier.put([this, child](){

Check failure on line 43 in include/link_or_value.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-unused-lambda-capture

lambda capture 'this' is not used
delete child; // NOLINT
});
barrier.end();
return;
}
if (auto* v = get_value(); v != nullptr) {
if (value::need_delete(v)) {
barrier.put([this, v](){

Check failure on line 51 in include/link_or_value.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-unused-lambda-capture

lambda capture 'this' is not used
value::delete_value(v);
});
}
barrier.end();
return;
}
barrier.end();
});
}

/**
Expand Down
4 changes: 3 additions & 1 deletion include/storage_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ status storage::delete_storage(std::string_view storage_name) { // NOLINT
if (ret_st == status::OK) {
base_node* tables_root = ret.first->load_root_ptr();
if (tables_root != nullptr) {
tables_root->destroy();
destroy_manager manager{true}; // inactive destroy_manager
destroy_barrier barrier{};
tables_root->destroy(manager, barrier);
delete tables_root; // NOLINT
ret.first->store_root_ptr(nullptr);
}
Expand Down
Loading

0 comments on commit e9ddb7f

Please sign in to comment.