Skip to content

Commit

Permalink
Fix a race condition in data store flare operations
Browse files Browse the repository at this point in the history
The race (as seen via Zeek usage) goes like:

Thread A: enqueue item, get suspended
Thread B: sees mailbox has items
Thread B: dequeue item
Thread B: extinguish flare
Thread A: resume, fire flare

That ordering can leave the flare in an active state without any actual
items remaining in the mailbox.

This patch adds a mutex/lock such that extinguishing of the flare cannot
be interleaved between the enqueue and firing of the flare.

This likely relates to zeek/zeek#838,
zeek/zeek#716, as well as this thread
http://mailman.icsi.berkeley.edu/pipermail/zeek/2020-February/015062.html

(cherry picked from commit d2737cd)
  • Loading branch information
jsiwek committed Mar 9, 2020
1 parent c1d9e10 commit e65cca6
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 4 deletions.
24 changes: 24 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@

1.3.1 | 2020-03-09 13:02:50 -0700

* Release 1.3.0.

* Fix a race condition in data store flare operations (Jon Siwek, Corelight)

The race (as seen via Zeek usage that causes high CPU load) goes like:

Thread A: enqueue item, get suspended
Thread B: sees mailbox has items
Thread B: dequeue item
Thread B: extinguish flare
Thread A: resume, fire flare

That ordering can leave the flare in an active state without any actual
items remaining in the mailbox.

This patch adds a mutex/lock such that extinguishing of the flare cannot
be interleaved between the enqueue and firing of the flare.

This likely relates to https://github.com/zeek/zeek/issues/838,
https://github.com/zeek/zeek/issues/716, as well as this thread
http://mailman.icsi.berkeley.edu/pipermail/zeek/2020-February/015062.html

1.3.0 | 2020-02-08 11:02:01 -0800

* Release 1.3.0.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.0
1.3.1
5 changes: 3 additions & 2 deletions include/broker/detail/flare_actor.hh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <atomic>
#include <mutex>
#include <chrono>
#include <limits>

Expand Down Expand Up @@ -52,7 +52,8 @@ public:

private:
flare flare_;
std::atomic<int> flare_count_;
int flare_count_;
std::mutex flare_mtx_;
};

} // namespace detail
Expand Down
2 changes: 1 addition & 1 deletion include/broker/version.hh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ using type = unsigned;

constexpr type major = 1;
constexpr type minor = 3;
constexpr type patch = 0;
constexpr type patch = 1;
constexpr auto suffix = "";

constexpr type protocol = 2;
Expand Down
7 changes: 7 additions & 0 deletions src/detail/flare_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,27 @@ void flare_actor::act() {

void flare_actor::await_data() {
BROKER_DEBUG("awaiting data");
std::unique_lock<std::mutex> lock{flare_mtx_};
if (flare_count_ > 0 )
return;
lock.unlock();
flare_.await_one();
}

bool flare_actor::await_data(timeout_type timeout) {
BROKER_DEBUG("awaiting data with timeout");
std::unique_lock<std::mutex> lock{flare_mtx_};
if (flare_count_ > 0)
return true;
lock.unlock();
auto res = flare_.await_one(timeout);
return res;
}

void flare_actor::enqueue(caf::mailbox_element_ptr ptr, caf::execution_unit*) {
auto mid = ptr->mid;
auto sender = ptr->sender;
std::unique_lock<std::mutex> lock{flare_mtx_};
switch (mailbox().enqueue(ptr.release())) {
case caf::detail::enqueue_result::unblocked_reader: {
BROKER_DEBUG("firing flare");
Expand All @@ -65,6 +70,7 @@ void flare_actor::enqueue(caf::mailbox_element_ptr ptr, caf::execution_unit*) {
}

caf::mailbox_element_ptr flare_actor::dequeue() {
std::unique_lock<std::mutex> lock{flare_mtx_};
auto rval = blocking_actor::dequeue();

if (rval)
Expand All @@ -78,6 +84,7 @@ const char* flare_actor::name() const {
}

void flare_actor::extinguish_one() {
std::unique_lock<std::mutex> lock{flare_mtx_};
auto extinguished = flare_.extinguish_one();
CAF_ASSERT(extinguished);
--flare_count_;
Expand Down

0 comments on commit e65cca6

Please sign in to comment.