From ca9b5e26b9fdd68334c794cecd733a82180adaea Mon Sep 17 00:00:00 2001 From: t-horikawa Date: Mon, 1 Jul 2024 17:51:29 +0900 Subject: [PATCH] updated wire.h to be the same as tateyama --- modules/ipc/src/main/native/include/wire.h | 41 +++++++++++++++------- 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/modules/ipc/src/main/native/include/wire.h b/modules/ipc/src/main/native/include/wire.h index feb23ac7..a3c346f7 100644 --- a/modules/ipc/src/main/native/include/wire.h +++ b/modules/ipc/src/main/native/include/wire.h @@ -1012,16 +1012,24 @@ class connection_queue } void push(std::size_t sid, std::size_t admin_slots = 0) { boost::interprocess::scoped_lock lock(mutex_); - queue_.at(index(pushed_.load() + admin_slots)) = sid; - pushed_.fetch_add(1); + if (admin_slots > 0 && is_admin(sid)) { + queue_.at(index(pushed_.load() + admin_slots)) = reset_admin(sid); + admin_slots_in_use_.fetch_sub(1, std::memory_order_release); + pushed_.fetch_add(1); + } else { + queue_.at(index(pushed_.load() + admin_slots)) = sid; + pushed_.fetch_add(1, std::memory_order_release); + } std::atomic_thread_fence(std::memory_order_acq_rel); condition_.notify_one(); } [[nodiscard]] std::size_t try_pop() { + boost::interprocess::scoped_lock lock(mutex_); // trade off auto current = poped_.load(); while (true) { - if (pushed_.load() <= current) { - throw std::runtime_error("no request available"); + auto ps = pushed_.load(std::memory_order_acquire); + if ((ps + admin_slots_in_use_.load()) <= current) { + throw std::runtime_error("no request slot is available for normal request"); } if (poped_.compare_exchange_strong(current, current + 1)) { return queue_.at(index(current)); @@ -1029,13 +1037,16 @@ class connection_queue } } [[nodiscard]] std::size_t try_pop(std::uint8_t admin_slots) { + boost::interprocess::scoped_lock lock(mutex_); auto current = poped_.load(); while (true) { - if ((pushed_.load() + admin_slots) <= current) { - throw std::runtime_error("no request available"); + auto ps = pushed_.load(std::memory_order_acquire); + if ((ps + (admin_slots - admin_slots_in_use_.load())) <= current) { + throw std::runtime_error("no request slot is available for admin request"); } if (poped_.compare_exchange_strong(current, current + 1)) { - return queue_.at(index(current)); + admin_slots_in_use_.fetch_add(1); + return set_admin(queue_.at(index(current))); } } } @@ -1064,7 +1075,8 @@ class connection_queue } private: boost::interprocess::vector queue_; - std::size_t capacity_; + std::uint32_t capacity_; + std::atomic_uint8_t admin_slots_in_use_{0}; boost::interprocess::interprocess_mutex mutex_{}; boost::interprocess::interprocess_condition condition_{}; @@ -1136,6 +1148,11 @@ class connection_queue using element_allocator = boost::interprocess::allocator; constexpr static std::size_t session_id_indicating_error = UINT64_MAX; + constexpr static std::size_t admin_bit = 1ULL << 63UL; + + static std::size_t set_admin(std::size_t slot) { return slot | admin_bit; } + static std::size_t reset_admin(std::size_t slot) { return slot & ~admin_bit; } + static bool is_admin(std::size_t slot) { return (slot & admin_bit) != 0; } /** * @brief Construct a new object. @@ -1165,7 +1182,7 @@ class connection_queue return sid; } std::size_t wait(std::size_t sid, std::int64_t timeout = 0) { - auto& entry = v_requested_.at(sid); + auto& entry = v_requested_.at(reset_admin(sid)); try { auto rtnv = entry.wait(timeout); entry.reuse(); @@ -1176,7 +1193,7 @@ class connection_queue } } bool check(std::size_t sid) { - return v_requested_.at(sid).check(); + return v_requested_.at(reset_admin(sid)).check(); } std::size_t listen() { if (q_requested_.wait(terminate_)) { @@ -1190,12 +1207,12 @@ class connection_queue // either accept() or reject() must be called void accept(std::size_t sid, std::size_t session_id) { q_requested_.pop(); - v_requested_.at(sid).accept(session_id); + v_requested_.at(reset_admin(sid)).accept(session_id); } // either accept() or reject() must be called void reject(std::size_t sid) { q_requested_.pop(); - v_requested_.at(sid).reject(); + v_requested_.at(reset_admin(sid)).reject(); q_free_.push(sid, admin_slots_); } void disconnect(std::size_t sid) {