Skip to content

Commit

Permalink
eliminate the race that occurs between writing data to wire and setti…
Browse files Browse the repository at this point in the history
…ng the close flag
  • Loading branch information
t-horikawa committed Jul 5, 2024
1 parent b236f56 commit d52ddef
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions modules/ipc/src/main/native/include/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,15 +383,18 @@ class unidirectional_message_wire : public simple_wire<message_header> {
*/
message_header peep(const char* base) {
while (true) {
bool termination_requested = termination_requested_.load();
bool onetime_notification = onetime_notification_.load();
std::atomic_thread_fence(std::memory_order_acq_rel);
if(stored() >= message_header::size) {
copy_header(base);
return header_received_;
}
if (termination_requested_.load()) {
if (termination_requested) {
termination_requested_.store(false);
return {message_header::terminate_request, 0};
}
if (onetime_notification_.load()) {
if (onetime_notification) {
throw std::runtime_error("received shutdown request from outside the communication partner");
}
boost::interprocess::scoped_lock lock(m_mutex_);
Expand Down Expand Up @@ -473,10 +476,12 @@ class unidirectional_response_wire : public simple_wire<response_header> {
}

while (true) {
bool closed_shutdown = closed_.load() || shutdown_.load();
std::atomic_thread_fence(std::memory_order_acq_rel);
if(stored() >= response_header::size) {
break;
}
if (closed_.load() || shutdown_.load()) {
if (closed_shutdown) {
header_received_ = response_header(0, 0, 0);
return header_received_;
}
Expand Down Expand Up @@ -841,13 +846,15 @@ class unidirectional_simple_wires {
boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(timeout))),
#endif
[this, &active_wire](){
bool eor = is_eor();
std::atomic_thread_fence(std::memory_order_acq_rel);
for (auto&& wire: unidirectional_simple_wires_) {
if (wire.has_record()) {
active_wire = &wire;
return true;
}
}
return is_eor();
return eor;
})) {
wait_for_record_ = false;
throw std::runtime_error("record has not been received within the specified time");
Expand Down Expand Up @@ -896,10 +903,7 @@ class unidirectional_simple_wires {
* used by client
*/
[[nodiscard]] bool is_eor() const {
if (!eor_) {
return false;
}
return std::all_of(unidirectional_simple_wires_.begin(), unidirectional_simple_wires_.end(), [](const unidirectional_simple_wire& wire) { return !wire.has_record(); });
return eor_;
}

private:
Expand Down

0 comments on commit d52ddef

Please sign in to comment.