Skip to content

Commit

Permalink
empty all the queues when destructing
Browse files Browse the repository at this point in the history
  • Loading branch information
odygrd committed Nov 20, 2022
1 parent fb97af4 commit 7388fbe
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 54 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- [v2.4.1](#v2.4.1)
- [v2.4.0](#v2.4.0)
- [v2.3.4](#v2.3.4)
- [v2.3.3](#v2.3.3)
Expand Down Expand Up @@ -33,6 +34,15 @@
- [v1.1.0](#v1.1.0)
- [v1.0.0](#v1.0.0)

## v2.4.1

**Improvements**

- Previously the backend worker thread would read all the log messages from the queue but not read the log messages when
the buffer had wrapped around. It will now read all the messages.
- Removed the `min_available_bytes` cache from the SPSC queue as an optimisation. It is not needed anymore as we now
read all messages at once instead of reading message by message.

## v2.4.0

**Improvements**
Expand Down
2 changes: 1 addition & 1 deletion quill/include/quill/Quill.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace quill
/** Version Info **/
constexpr uint32_t VersionMajor{2};
constexpr uint32_t VersionMinor{4};
constexpr uint32_t VersionPatch{0};
constexpr uint32_t VersionPatch{1};
constexpr uint32_t Version{VersionMajor * 10000 + VersionMinor * 100 + VersionPatch};

/** forward declarations **/
Expand Down
45 changes: 38 additions & 7 deletions quill/include/quill/detail/backend/BackendWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,20 @@ class BackendWorker
QUILL_ATTRIBUTE_HOT inline void _populate_priority_queue(
ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts);

/**
* Reads all available bytes from the queue
* @param thread_context
* @param ts_now
*/
QUILL_ATTRIBUTE_HOT inline void _read_from_queue(ThreadContext* thread_context, uint64_t ts_now);

/**
* Deserialize an log message from the raw SPSC queue and emplace them to priority queue
*/
QUILL_ATTRIBUTE_HOT inline void _read_queue_and_decode(ThreadContext* thread_context, uint64_t ts_now);
QUILL_ATTRIBUTE_HOT inline void _read_queue_messages_and_decode(ThreadContext* thread_context,
ThreadContext::SPSCQueueT& queue,
std::byte* read_buffer,
size_t bytes_available, uint64_t ts_now);

/**
* Checks for events in all queues and processes the one with the minimum timestamp
Expand Down Expand Up @@ -279,12 +289,12 @@ void BackendWorker::_populate_priority_queue(ThreadContextCollection::backend_th
for (ThreadContext* thread_context : cached_thread_contexts)
{
// copy everything to a priority queue
_read_queue_and_decode(thread_context, ts_now);
_read_from_queue(thread_context, ts_now);
}
}

/***/
void BackendWorker::_read_queue_and_decode(ThreadContext* thread_context, uint64_t ts_now)
void BackendWorker::_read_from_queue(ThreadContext* thread_context, uint64_t ts_now)
{
ThreadContext::SPSCQueueT& spsc_queue = thread_context->spsc_queue();

Expand All @@ -293,10 +303,31 @@ void BackendWorker::_read_queue_and_decode(ThreadContext* thread_context, uint64
// The producer will add items to the buffer :
// |timestamp|metadata*|logger_details*|args...|

auto read = spsc_queue.prepare_read();
std::byte* read_buffer = read.first;
size_t bytes_available = read.second;
auto [read_buffer, bytes_available, has_more] = spsc_queue.prepare_read();

// here we read all the messages until the end of the buffer
_read_queue_messages_and_decode(thread_context, spsc_queue, read_buffer, bytes_available, ts_now);

if (has_more)
{
// if there are more bytes to read it is because we need to wrap around the ring buffer,
// and we will perform one more read
std::tie(read_buffer, bytes_available, has_more) = spsc_queue.prepare_read();
_read_queue_messages_and_decode(thread_context, spsc_queue, read_buffer, bytes_available, ts_now);
}

assert(!has_more && "It is not possible to have more bytes to read");

// Note: If the bounded queue gets filled it will allocate a new bounded queue and will have
// more bytes to read. The case where the queue gets reallocated is not handled and we will
// read the new queue the next time we call this function
}

/***/
void BackendWorker::_read_queue_messages_and_decode(ThreadContext* thread_context,
ThreadContext::SPSCQueueT& queue, std::byte* read_buffer,
size_t bytes_available, uint64_t ts_now)
{
while (bytes_available > 0)
{
std::byte* const read_begin = read_buffer;
Expand Down Expand Up @@ -436,7 +467,7 @@ void BackendWorker::_read_queue_and_decode(ThreadContext* thread_context, uint64
// Finish reading
assert((read_buffer >= read_begin) && "read_buffer should be greater or equal to read_begin");
auto const read_size = static_cast<size_t>(read_buffer - read_begin);
spsc_queue.finish_read(read_size);
queue.finish_read(read_size);
bytes_available -= read_size;

_transit_events.emplace(transit_event);
Expand Down
57 changes: 22 additions & 35 deletions quill/include/quill/detail/spsc_queue/BoundedQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <tuple>
#include <utility>

#include "quill/detail/misc/Attributes.h"
Expand All @@ -32,7 +33,6 @@ class BoundedQueue

_end_of_recorded_space = _storage + capacity();
_min_free_space = capacity();
_min_avail_bytes = 0;
_producer_pos.store(_storage);
_consumer_pos.store(_storage);
}
Expand Down Expand Up @@ -134,46 +134,38 @@ class BoundedQueue
* Prepare to read from the buffer
* @return a pair of the buffer location to read and the number of available bytes
*/
QUILL_NODISCARD_ALWAYS_INLINE_HOT std::pair<std::byte*, size_t> prepare_read() noexcept
QUILL_NODISCARD_ALWAYS_INLINE_HOT std::tuple<std::byte*, size_t, bool> prepare_read() noexcept
{
if (_min_avail_bytes > 0)
{
// fast read path
return std::pair<std::byte*, size_t>{_consumer_pos.load(std::memory_order_relaxed), _min_avail_bytes};
}

// Save a consistent copy of producerPos
// Prevent reading new producerPos but old endOf...
std::byte* const consumer_pos = _consumer_pos.load(std::memory_order_relaxed);
std::byte* const producer_pos = _producer_pos.load(std::memory_order_acquire);
std::byte* consumer_pos = _consumer_pos.load(std::memory_order_relaxed);
std::byte* producer_pos = _producer_pos.load(std::memory_order_acquire);

if (producer_pos >= consumer_pos)
{
// here the consumer is behind the producer
_min_avail_bytes = static_cast<size_t>(producer_pos - consumer_pos);
return std::pair<std::byte*, size_t>{consumer_pos, _min_avail_bytes};
}
else
size_t bytes_available;

if (consumer_pos > producer_pos)
{
// consumer is ahead of the producer
// xxxp0000cxxxEOB
// _end_of_recorded_space is only set when the producer wraps by the producer, here we
// already know that the producer has wrapped around therefore the _end_of_recorded_space
// has been set
_min_avail_bytes = static_cast<size_t>(_end_of_recorded_space - consumer_pos);
bytes_available = static_cast<size_t>(_end_of_recorded_space - consumer_pos);

if (_min_avail_bytes > 0)
{
return std::pair<std::byte*, size_t>{consumer_pos, _min_avail_bytes};
}
else
if (bytes_available > 0)
{
// Roll over because there is nothing to read until end of buffer
_consumer_pos.store(_storage, std::memory_order_release);
_min_avail_bytes = static_cast<size_t>(producer_pos - _storage);
return std::pair<std::byte*, size_t>{_storage, _min_avail_bytes};
// There are bytes to read until the end of the buffer, and we also want to notify the
// use that there are more to read
return std::tuple<std::byte*, size_t, bool>{consumer_pos, bytes_available, true};
}

// Roll over because there is nothing to read until end of buffer
_consumer_pos.store(_storage, std::memory_order_release);
}

// here the consumer is behind the producer
consumer_pos = _consumer_pos.load(std::memory_order_relaxed);
bytes_available = static_cast<size_t>(producer_pos - consumer_pos);

// there won't be more bytes to read as we haven't wrapped around
return std::tuple<std::byte*, size_t, bool>{consumer_pos, bytes_available, false};
}

/**
Expand All @@ -183,7 +175,6 @@ class BoundedQueue
*/
QUILL_ALWAYS_INLINE_HOT void finish_read(size_t nbytes) noexcept
{
_min_avail_bytes -= nbytes;
_consumer_pos.store(_consumer_pos.load(std::memory_order_relaxed) + nbytes, std::memory_order_release);
}

Expand Down Expand Up @@ -221,10 +212,6 @@ class BoundedQueue
* the next bytes from. This value is only updated by the consumer.
*/
alignas(CACHELINE_SIZE) std::atomic<std::byte*> _consumer_pos;

/** Min value on the number of bytes the consumer can read **/
size_t _min_avail_bytes;

char _pad0[CACHELINE_SIZE - sizeof(std::atomic<std::byte*>) - sizeof(size_t)] = "\0";
};
} // namespace quill::detail
10 changes: 5 additions & 5 deletions quill/include/quill/detail/spsc_queue/UnboundedQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ class UnboundedQueue
* Prepare to read from the buffer
* @return a pair of the buffer location to read and the number of available bytes
*/
QUILL_NODISCARD_ALWAYS_INLINE_HOT std::pair<std::byte*, std::size_t> prepare_read()
QUILL_NODISCARD_ALWAYS_INLINE_HOT std::tuple<std::byte*, std::size_t, bool> prepare_read()
{
auto [read_pos, available_bytes] = _consumer->bounded_queue.prepare_read();
auto [read_pos, available_bytes, has_more] = _consumer->bounded_queue.prepare_read();

if (available_bytes == 0)
{
Expand All @@ -148,18 +148,18 @@ class UnboundedQueue
// a new buffer was added by the producer, this happens only when we have allocated a new queue

// try the existing buffer once more
std::tie(read_pos, available_bytes) = _consumer->bounded_queue.prepare_read();
std::tie(read_pos, available_bytes, has_more) = _consumer->bounded_queue.prepare_read();

if (available_bytes == 0)
{
// switch to the new buffer, existing one is deleted
delete _consumer;
_consumer = next_node;
std::tie(read_pos, available_bytes) = _consumer->bounded_queue.prepare_read();
std::tie(read_pos, available_bytes, has_more) = _consumer->bounded_queue.prepare_read();
}
}
}
return std::pair<std::byte*, std::size_t>{read_pos, available_bytes};
return std::tuple<std::byte*, std::size_t, bool>{read_pos, available_bytes, has_more};
}

/**
Expand Down
8 changes: 4 additions & 4 deletions quill/test/BoundedQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ TEST_CASE("read_write_buffer")

{
auto const res = buffer.prepare_read();
REQUIRE_EQ(res.second, 32);
REQUIRE_EQ(std::get<1>(res), 32);
buffer.finish_read(32u);
}

Expand All @@ -35,7 +35,7 @@ TEST_CASE("read_write_buffer")
{
// Nothing to read but consumer will also wrap
auto const res = buffer.prepare_read();
REQUIRE_EQ(res.second, 0);
REQUIRE_EQ(std::get<1>(res), 0);
}

{
Expand Down Expand Up @@ -78,11 +78,11 @@ TEST_CASE("read_write_multithreaded_plain_ints")
{
for (uint32_t i = 0; i < 8192; ++i)
{
auto [read_buffer, bytes] = buffer.prepare_read();
auto [read_buffer, bytes, has_more] = buffer.prepare_read();
while (bytes == 0)
{
std::this_thread::sleep_for(std::chrono::microseconds{2});
std::tie(read_buffer, bytes) = buffer.prepare_read();
std::tie(read_buffer, bytes, has_more) = buffer.prepare_read();
}

auto value = reinterpret_cast<uint32_t const*>(read_buffer);
Expand Down
4 changes: 2 additions & 2 deletions quill/test/UnboundedQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ TEST_CASE("read_write_multithreaded_plain_ints")
{
for (uint32_t i = 0; i < 8192; ++i)
{
auto [read_buffer, bytes] = buffer.prepare_read();
auto [read_buffer, bytes, has_more] = buffer.prepare_read();
while (bytes == 0)
{
std::this_thread::sleep_for(std::chrono::microseconds{2});
std::tie(read_buffer, bytes) = buffer.prepare_read();
std::tie(read_buffer, bytes, has_more) = buffer.prepare_read();
}

auto value = reinterpret_cast<uint32_t const*>(read_buffer);
Expand Down

0 comments on commit 7388fbe

Please sign in to comment.