Skip to content

Commit

Permalink
revise destroy_manager.h
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Aug 25, 2024
1 parent 702a54e commit 0cfa9e2
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 45 deletions.
1 change: 0 additions & 1 deletion include/base_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class base_node { // NOLINT
* @param destroy_barrier the barrier that control the job finish.
*/
virtual status destroy(manager*) = 0;
virtual status destroy(manager*, barrier*) = 0;

/**
* @details display function for analysis and debug.
Expand Down
11 changes: 4 additions & 7 deletions include/border_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,20 @@ class alignas(CACHE_LINE_SIZE) border_node final : public base_node { // NOLINT
* @brief release all heap objects and clean up.
*/
status destroy(manager* m) override {
return destroy(m, nullptr);
}
status destroy(manager* m, barrier* p) override {
barrier b{m, p};
barrier b{m};
std::size_t cnk = get_permutation_cnk();
for (std::size_t i = 0; i < cnk; ++i) {
// living link or value
std::size_t index = permutation_.get_index_of_rank(i);
if (lv_.at(index).get_next_layer() != nullptr) {
// has some layer, considering parallel
b.push([this, index, m, &b](){
b.push([this, index, m](){
// cleanup process
lv_.at(index).destroy(m, &b);
lv_.at(index).destroy(m);
});
} else {
// not some layer, not considering parallel
lv_.at(index).destroy(m, &b);
lv_.at(index).destroy(m);
}
}
b.wait();
Expand Down
38 changes: 12 additions & 26 deletions include/destroy_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,32 @@ class alignas(CACHE_LINE_SIZE) manager {

class alignas(CACHE_LINE_SIZE) barrier {
public:
explicit barrier(manager* m) : barrier(m, nullptr) {}
barrier(manager* m, barrier* b) : manager_(m), worker_(manager_->current_worker()), parent_(b) {}
explicit barrier(manager* m) : manager_(m), worker_(manager_->current_worker()) {}

void push(std::function<void(void)>&& func) {
fork();
manager_->next_worker()->push(this, std::move(func));
}
void join() {
auto s = status_.fetch_sub(1) - 1;
if (parent_) {
if (s == WAITING) {
worker_->push(this);
}
} else {
if (s == 0) {
std::unique_lock<std::mutex> lock(mtx_);
cond_.notify_one();
}
if (s == WAITING) {
worker_->push(this);
}
}
void wait() {
if (parent_) {
if (status_.fetch_add(WAITING) != 0) {
worker_->dispatch(this);
}
} else {
std::unique_lock<std::mutex> lock(mtx_);
cond_.wait(lock, [this](){ return status_.load() == 0; });
if (status_.fetch_add(WAITING) != 0) {
if (!worker_) { std::abort(); }
worker_->dispatch(this);
}
}

private:
private:
manager* manager_;
worker* worker_;
barrier* parent_;

std::atomic_uint status_{0};
static constexpr uint WAITING = 0x80000000;

// for the source thread
std::mutex mtx_{};
std::condition_variable cond_{};

inline void fork() {
status_.fetch_add(1);
}
Expand All @@ -79,7 +62,7 @@ class alignas(CACHE_LINE_SIZE) manager {
dispatch();
}

private:
private:
std::deque<std::pair<barrier*, std::function<void(void)>>> tasks_{};
std::deque<barrier*> completed_barriers_{};

Expand Down Expand Up @@ -146,13 +129,16 @@ class alignas(CACHE_LINE_SIZE) manager {
};

// manager
manager() = delete;
manager() : manager(std::thread::hardware_concurrency() - 1) {};
explicit manager(std::size_t size) : size_{size} {
for (std::size_t i = 0; i < size_; i++) {
workers_.emplace_back(std::make_unique<worker>());
threads_.emplace_back(std::thread(std::ref(*workers_.back())));
indexes_.insert(std::make_pair(threads_.back().get_id(), i));
}
workers_.emplace_back(std::make_unique<worker>());
indexes_.insert(std::make_pair(std::this_thread::get_id(), size_));
size_++;
}
~manager() {
for (auto&& w: workers_) {
Expand Down
5 changes: 2 additions & 3 deletions include/interface_destroy.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
namespace yakushima {

[[maybe_unused]] static status destroy() {
// manager m{std::thread::hardware_concurrency()};
manager m{2};
manager m{};
if (storage::get_storages()->empty()) { return status::OK_ROOT_IS_NULL; }
std::vector<std::tuple<std::string, tree_instance*, std::size_t>>
tuple_list;
Expand All @@ -27,7 +26,7 @@ namespace yakushima {
for (auto&& elem : tuple_list) {
base_node* root = std::get<1>(elem)->load_root_ptr();
if (root == nullptr) { continue; }
root->destroy(&m, nullptr);
root->destroy(&m);
delete root; // NOLINT
std::get<1>(elem)->store_root_ptr(nullptr);
}
Expand Down
9 changes: 3 additions & 6 deletions include/interior_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,10 @@ class alignas(CACHE_LINE_SIZE) interior_node final // NOLINT
* @pre This function is called by single thread.
*/
status destroy(manager* m) override {
return destroy(m, nullptr);
}
status destroy(manager* m, barrier* p) override {
barrier b{m, p};
barrier b{m};
for (auto i = 0; i < n_keys_ + 1; ++i) {
b.push([this, i, m, &b](){
get_child_at(i)->destroy(m, &b);
b.push([this, i, m](){
get_child_at(i)->destroy(m);
delete get_child_at(i); // NOLINT
});
}
Expand Down
4 changes: 2 additions & 2 deletions include/link_or_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class link_or_value {
/**
* @details release heap objects.
*/
void destroy(manager* m, barrier* p) {
void destroy(manager* m) {
if (auto* child = get_next_layer(); child != nullptr) {
child->destroy(m, p);
child->destroy(m);
delete child; // NOLINT
} else if (auto* v = get_value(); v != nullptr) {
if (value::need_delete(v)) { value::delete_value(v); }
Expand Down

0 comments on commit 0cfa9e2

Please sign in to comment.