diff --git a/CHANGELOG.md b/CHANGELOG.md index a69f52d8..00e71a7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +- [v2.4.1](#v2.4.1) - [v2.4.0](#v2.4.0) - [v2.3.4](#v2.3.4) - [v2.3.3](#v2.3.3) @@ -33,6 +34,15 @@ - [v1.1.0](#v1.1.0) - [v1.0.0](#v1.0.0) +## v2.4.1 + +**Improvements** + +- Previously the backend worker thread would read all the log messages from the queue but not read the log messages when + the buffer had wrapped around. It will now read all the messages. +- Removed the `min_available_bytes` cache from the SPSC queue as an optimisation. It is not needed anymore as we now + read all messages at once instead of reading message by message. + ## v2.4.0 **Improvements** diff --git a/quill/include/quill/Quill.h b/quill/include/quill/Quill.h index 9928a1f5..797e1bce 100644 --- a/quill/include/quill/Quill.h +++ b/quill/include/quill/Quill.h @@ -29,7 +29,7 @@ namespace quill /** Version Info **/ constexpr uint32_t VersionMajor{2}; constexpr uint32_t VersionMinor{4}; -constexpr uint32_t VersionPatch{0}; +constexpr uint32_t VersionPatch{1}; constexpr uint32_t Version{VersionMajor * 10000 + VersionMinor * 100 + VersionPatch}; /** forward declarations **/ diff --git a/quill/include/quill/detail/backend/BackendWorker.h b/quill/include/quill/detail/backend/BackendWorker.h index f48abd06..33fbe020 100644 --- a/quill/include/quill/detail/backend/BackendWorker.h +++ b/quill/include/quill/detail/backend/BackendWorker.h @@ -101,10 +101,20 @@ class BackendWorker QUILL_ATTRIBUTE_HOT inline void _populate_priority_queue( ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts); + /** + * Reads all available bytes from the queue + * @param thread_context + * @param ts_now + */ + QUILL_ATTRIBUTE_HOT inline void _read_from_queue(ThreadContext* thread_context, uint64_t ts_now); + /** * Deserialize an log message from the raw SPSC queue and emplace them to priority queue */ - QUILL_ATTRIBUTE_HOT inline void _read_queue_and_decode(ThreadContext* thread_context, uint64_t ts_now); + QUILL_ATTRIBUTE_HOT inline void _read_queue_messages_and_decode(ThreadContext* thread_context, + ThreadContext::SPSCQueueT& queue, + std::byte* read_buffer, + size_t bytes_available, uint64_t ts_now); /** * Checks for events in all queues and processes the one with the minimum timestamp @@ -279,12 +289,12 @@ void BackendWorker::_populate_priority_queue(ThreadContextCollection::backend_th for (ThreadContext* thread_context : cached_thread_contexts) { // copy everything to a priority queue - _read_queue_and_decode(thread_context, ts_now); + _read_from_queue(thread_context, ts_now); } } /***/ -void BackendWorker::_read_queue_and_decode(ThreadContext* thread_context, uint64_t ts_now) +void BackendWorker::_read_from_queue(ThreadContext* thread_context, uint64_t ts_now) { ThreadContext::SPSCQueueT& spsc_queue = thread_context->spsc_queue(); @@ -293,10 +303,31 @@ void BackendWorker::_read_queue_and_decode(ThreadContext* thread_context, uint64 // The producer will add items to the buffer : // |timestamp|metadata*|logger_details*|args...| - auto read = spsc_queue.prepare_read(); - std::byte* read_buffer = read.first; - size_t bytes_available = read.second; + auto [read_buffer, bytes_available, has_more] = spsc_queue.prepare_read(); + + // here we read all the messages until the end of the buffer + _read_queue_messages_and_decode(thread_context, spsc_queue, read_buffer, bytes_available, ts_now); + + if (has_more) + { + // if there are more bytes to read it is because we need to wrap around the ring buffer, + // and we will perform one more read + std::tie(read_buffer, bytes_available, has_more) = spsc_queue.prepare_read(); + _read_queue_messages_and_decode(thread_context, spsc_queue, read_buffer, bytes_available, ts_now); + } + + assert(!has_more && "It is not possible to have more bytes to read"); + + // Note: If the bounded queue gets filled it will allocate a new bounded queue and will have + // more bytes to read. The case where the queue gets reallocated is not handled and we will + // read the new queue the next time we call this function +} +/***/ +void BackendWorker::_read_queue_messages_and_decode(ThreadContext* thread_context, + ThreadContext::SPSCQueueT& queue, std::byte* read_buffer, + size_t bytes_available, uint64_t ts_now) +{ while (bytes_available > 0) { std::byte* const read_begin = read_buffer; @@ -436,7 +467,7 @@ void BackendWorker::_read_queue_and_decode(ThreadContext* thread_context, uint64 // Finish reading assert((read_buffer >= read_begin) && "read_buffer should be greater or equal to read_begin"); auto const read_size = static_cast(read_buffer - read_begin); - spsc_queue.finish_read(read_size); + queue.finish_read(read_size); bytes_available -= read_size; _transit_events.emplace(transit_event); diff --git a/quill/include/quill/detail/spsc_queue/BoundedQueue.h b/quill/include/quill/detail/spsc_queue/BoundedQueue.h index 83ecc0db..2414a7d8 100644 --- a/quill/include/quill/detail/spsc_queue/BoundedQueue.h +++ b/quill/include/quill/detail/spsc_queue/BoundedQueue.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include "quill/detail/misc/Attributes.h" @@ -32,7 +33,6 @@ class BoundedQueue _end_of_recorded_space = _storage + capacity(); _min_free_space = capacity(); - _min_avail_bytes = 0; _producer_pos.store(_storage); _consumer_pos.store(_storage); } @@ -134,46 +134,38 @@ class BoundedQueue * Prepare to read from the buffer * @return a pair of the buffer location to read and the number of available bytes */ - QUILL_NODISCARD_ALWAYS_INLINE_HOT std::pair prepare_read() noexcept + QUILL_NODISCARD_ALWAYS_INLINE_HOT std::tuple prepare_read() noexcept { - if (_min_avail_bytes > 0) - { - // fast read path - return std::pair{_consumer_pos.load(std::memory_order_relaxed), _min_avail_bytes}; - } - // Save a consistent copy of producerPos // Prevent reading new producerPos but old endOf... - std::byte* const consumer_pos = _consumer_pos.load(std::memory_order_relaxed); - std::byte* const producer_pos = _producer_pos.load(std::memory_order_acquire); + std::byte* consumer_pos = _consumer_pos.load(std::memory_order_relaxed); + std::byte* producer_pos = _producer_pos.load(std::memory_order_acquire); - if (producer_pos >= consumer_pos) - { - // here the consumer is behind the producer - _min_avail_bytes = static_cast(producer_pos - consumer_pos); - return std::pair{consumer_pos, _min_avail_bytes}; - } - else + size_t bytes_available; + + if (consumer_pos > producer_pos) { // consumer is ahead of the producer // xxxp0000cxxxEOB - // _end_of_recorded_space is only set when the producer wraps by the producer, here we - // already know that the producer has wrapped around therefore the _end_of_recorded_space - // has been set - _min_avail_bytes = static_cast(_end_of_recorded_space - consumer_pos); + bytes_available = static_cast(_end_of_recorded_space - consumer_pos); - if (_min_avail_bytes > 0) - { - return std::pair{consumer_pos, _min_avail_bytes}; - } - else + if (bytes_available > 0) { - // Roll over because there is nothing to read until end of buffer - _consumer_pos.store(_storage, std::memory_order_release); - _min_avail_bytes = static_cast(producer_pos - _storage); - return std::pair{_storage, _min_avail_bytes}; + // There are bytes to read until the end of the buffer, and we also want to notify the + // use that there are more to read + return std::tuple{consumer_pos, bytes_available, true}; } + + // Roll over because there is nothing to read until end of buffer + _consumer_pos.store(_storage, std::memory_order_release); } + + // here the consumer is behind the producer + consumer_pos = _consumer_pos.load(std::memory_order_relaxed); + bytes_available = static_cast(producer_pos - consumer_pos); + + // there won't be more bytes to read as we haven't wrapped around + return std::tuple{consumer_pos, bytes_available, false}; } /** @@ -183,7 +175,6 @@ class BoundedQueue */ QUILL_ALWAYS_INLINE_HOT void finish_read(size_t nbytes) noexcept { - _min_avail_bytes -= nbytes; _consumer_pos.store(_consumer_pos.load(std::memory_order_relaxed) + nbytes, std::memory_order_release); } @@ -221,10 +212,6 @@ class BoundedQueue * the next bytes from. This value is only updated by the consumer. */ alignas(CACHELINE_SIZE) std::atomic _consumer_pos; - - /** Min value on the number of bytes the consumer can read **/ - size_t _min_avail_bytes; - char _pad0[CACHELINE_SIZE - sizeof(std::atomic) - sizeof(size_t)] = "\0"; }; } // namespace quill::detail diff --git a/quill/include/quill/detail/spsc_queue/UnboundedQueue.h b/quill/include/quill/detail/spsc_queue/UnboundedQueue.h index 7c21d22d..7576b4eb 100644 --- a/quill/include/quill/detail/spsc_queue/UnboundedQueue.h +++ b/quill/include/quill/detail/spsc_queue/UnboundedQueue.h @@ -134,9 +134,9 @@ class UnboundedQueue * Prepare to read from the buffer * @return a pair of the buffer location to read and the number of available bytes */ - QUILL_NODISCARD_ALWAYS_INLINE_HOT std::pair prepare_read() + QUILL_NODISCARD_ALWAYS_INLINE_HOT std::tuple prepare_read() { - auto [read_pos, available_bytes] = _consumer->bounded_queue.prepare_read(); + auto [read_pos, available_bytes, has_more] = _consumer->bounded_queue.prepare_read(); if (available_bytes == 0) { @@ -148,18 +148,18 @@ class UnboundedQueue // a new buffer was added by the producer, this happens only when we have allocated a new queue // try the existing buffer once more - std::tie(read_pos, available_bytes) = _consumer->bounded_queue.prepare_read(); + std::tie(read_pos, available_bytes, has_more) = _consumer->bounded_queue.prepare_read(); if (available_bytes == 0) { // switch to the new buffer, existing one is deleted delete _consumer; _consumer = next_node; - std::tie(read_pos, available_bytes) = _consumer->bounded_queue.prepare_read(); + std::tie(read_pos, available_bytes, has_more) = _consumer->bounded_queue.prepare_read(); } } } - return std::pair{read_pos, available_bytes}; + return std::tuple{read_pos, available_bytes, has_more}; } /** diff --git a/quill/test/BoundedQueueTest.cpp b/quill/test/BoundedQueueTest.cpp index 396df478..b2da8dc1 100644 --- a/quill/test/BoundedQueueTest.cpp +++ b/quill/test/BoundedQueueTest.cpp @@ -22,7 +22,7 @@ TEST_CASE("read_write_buffer") { auto const res = buffer.prepare_read(); - REQUIRE_EQ(res.second, 32); + REQUIRE_EQ(std::get<1>(res), 32); buffer.finish_read(32u); } @@ -35,7 +35,7 @@ TEST_CASE("read_write_buffer") { // Nothing to read but consumer will also wrap auto const res = buffer.prepare_read(); - REQUIRE_EQ(res.second, 0); + REQUIRE_EQ(std::get<1>(res), 0); } { @@ -78,11 +78,11 @@ TEST_CASE("read_write_multithreaded_plain_ints") { for (uint32_t i = 0; i < 8192; ++i) { - auto [read_buffer, bytes] = buffer.prepare_read(); + auto [read_buffer, bytes, has_more] = buffer.prepare_read(); while (bytes == 0) { std::this_thread::sleep_for(std::chrono::microseconds{2}); - std::tie(read_buffer, bytes) = buffer.prepare_read(); + std::tie(read_buffer, bytes, has_more) = buffer.prepare_read(); } auto value = reinterpret_cast(read_buffer); diff --git a/quill/test/UnboundedQueueTest.cpp b/quill/test/UnboundedQueueTest.cpp index 98b79b11..eb80799e 100644 --- a/quill/test/UnboundedQueueTest.cpp +++ b/quill/test/UnboundedQueueTest.cpp @@ -45,11 +45,11 @@ TEST_CASE("read_write_multithreaded_plain_ints") { for (uint32_t i = 0; i < 8192; ++i) { - auto [read_buffer, bytes] = buffer.prepare_read(); + auto [read_buffer, bytes, has_more] = buffer.prepare_read(); while (bytes == 0) { std::this_thread::sleep_for(std::chrono::microseconds{2}); - std::tie(read_buffer, bytes) = buffer.prepare_read(); + std::tie(read_buffer, bytes, has_more) = buffer.prepare_read(); } auto value = reinterpret_cast(read_buffer);