diff --git a/CHANGELOG.md b/CHANGELOG.md index 8acbafa0..253376c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +- [v2.3.4](#v2.3.4) - [v2.3.3](#v2.3.3) - [v2.3.2](#v2.3.2) - [v2.3.1](#v2.3.1) @@ -31,11 +32,18 @@ - [v1.1.0](#v1.1.0) - [v1.0.0](#v1.0.0) +## v2.3.4 + +**Improvements** + +- Optimise the backend logging thread to read multiple log messages from the same queue, but still fairly read each + queue from all active threads. + ## v2.3.3 **Fixes** -- Previously when multiple threads were loggin, Quill backend logging thread would first try reading the log messages of +- Previously when multiple threads were login, Quill backend logging thread would first try reading the log messages of one thread until the queue was completely empty before reading the log messages of the next thread. When one of the threads was logging a lot, it could result in only displaying the log of that thread, hiding the logs of the other threads. This has now been fixed and all log messages from all threads are read fairly. diff --git a/quill/include/quill/Quill.h b/quill/include/quill/Quill.h index 9afe6915..1e05eac5 100644 --- a/quill/include/quill/Quill.h +++ b/quill/include/quill/Quill.h @@ -29,7 +29,7 @@ namespace quill /** Version Info **/ constexpr uint32_t VersionMajor{2}; constexpr uint32_t VersionMinor{3}; -constexpr uint32_t VersionPatch{3}; +constexpr uint32_t VersionPatch{4}; constexpr uint32_t Version{VersionMajor * 10000 + VersionMinor * 100 + VersionPatch}; /** forward declarations **/ diff --git a/quill/include/quill/detail/backend/BackendWorker.h b/quill/include/quill/detail/backend/BackendWorker.h index b9c57d69..eb2b5c5c 100644 --- a/quill/include/quill/detail/backend/BackendWorker.h +++ b/quill/include/quill/detail/backend/BackendWorker.h @@ -97,15 +97,14 @@ class BackendWorker /** * Populate our local priority queue * @param cached_thread_contexts local thread context cache - * @param is_terminating backend worker is terminating */ QUILL_ATTRIBUTE_HOT inline void _populate_priority_queue( - ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts, bool is_terminating); + ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts); /** * Deserialize an log message from the raw SPSC queue and emplace them to priority queue */ - QUILL_ATTRIBUTE_HOT inline bool _read_queue_and_decode(ThreadContext* thread_context); + QUILL_ATTRIBUTE_HOT inline void _read_queue_and_decode(ThreadContext* thread_context); /** * Checks for events in all queues and processes the one with the minimum timestamp @@ -265,38 +264,17 @@ void BackendWorker::run() } /***/ -void BackendWorker::_populate_priority_queue(ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts, - bool is_terminating) +void BackendWorker::_populate_priority_queue(ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts) { - while (true) + for (ThreadContext* thread_context : cached_thread_contexts) { - uint16_t empty{0}; - for (ThreadContext* thread_context : cached_thread_contexts) - { - if (!is_terminating && (_transit_events.size() >= _max_transit_events)) - { - // transit events queue is full - return; - } - - // copy everything to a priority queue - if (!_read_queue_and_decode(thread_context)) - { - // _read_queue_and_decode returns false for an empty queue - ++empty; - } - } - - if (empty == cached_thread_contexts.size()) - { - // all queues are empty we are done - return; - } + // copy everything to a priority queue + _read_queue_and_decode(thread_context); } } /***/ -bool BackendWorker::_read_queue_and_decode(ThreadContext* thread_context) +void BackendWorker::_read_queue_and_decode(ThreadContext* thread_context) { ThreadContext::SPSCQueueT& spsc_queue = thread_context->spsc_queue(); @@ -308,127 +286,124 @@ bool BackendWorker::_read_queue_and_decode(ThreadContext* thread_context) auto read = spsc_queue.prepare_read(); std::byte* read_buffer = read.first; size_t bytes_available = read.second; - std::byte* const read_begin = read_buffer; - if (bytes_available == 0) + while (bytes_available > 0) { - // the queue is empty - return false; - } - - // The queue is empty. First we want to allocate a new TransitEvent to store the message - // from the queue - void* transit_event_buffer = _free_list_allocator.allocate(sizeof(TransitEvent)); - auto* transit_event = new (transit_event_buffer) TransitEvent{}; - transit_event->thread_id = thread_context->thread_id(); - transit_event->thread_name = thread_context->thread_name(); - - // read the header first, and take copy of the header - read_buffer = detail::align_pointer(read_buffer); - transit_event->header = *(reinterpret_cast(read_buffer)); - read_buffer += sizeof(detail::Header); - - // if we are using rdtsc clock then here we will convert the value to nanoseconds since epoch - // doing the conversion here ensures that every transit that is inserted in the priority queue - // below has a header timestamp of nanoseconds since epoch and makes it even possible to - // have Logger objects using different clocks - if (transit_event->header.logger_details->timestamp_clock_type() == TimestampClockType::Rdtsc) - { - if (!_rdtsc_clock) + std::byte* const read_begin = read_buffer; + + // First we want to allocate a new TransitEvent to store the message from the queue + void* transit_event_buffer = _free_list_allocator.allocate(sizeof(TransitEvent)); + auto* transit_event = new (transit_event_buffer) TransitEvent{}; + transit_event->thread_id = thread_context->thread_id(); + transit_event->thread_name = thread_context->thread_name(); + + // read the header first, and take copy of the header + read_buffer = detail::align_pointer(read_buffer); + transit_event->header = *(reinterpret_cast(read_buffer)); + read_buffer += sizeof(detail::Header); + + // if we are using rdtsc clock then here we will convert the value to nanoseconds since epoch + // doing the conversion here ensures that every transit that is inserted in the priority queue + // below has a header timestamp of nanoseconds since epoch and makes it even possible to + // have Logger objects using different clocks + if (transit_event->header.logger_details->timestamp_clock_type() == TimestampClockType::Rdtsc) { - // Here we lazy initialise rdtsc clock on the backend thread only if the user decides to use it - // Use rdtsc clock based on config. The clock requires a few seconds to init as it is - // taking samples first - _rdtsc_clock = std::make_unique(_config.rdtsc_resync_interval); - } - - // convert the rdtsc value to nanoseconds since epoch - transit_event->header.timestamp = _rdtsc_clock->time_since_epoch(transit_event->header.timestamp); - } + if (!_rdtsc_clock) + { + // Here we lazy initialise rdtsc clock on the backend thread only if the user decides to use it + // Use rdtsc clock based on config. The clock requires a few seconds to init as it is + // taking samples first + _rdtsc_clock = std::make_unique(_config.rdtsc_resync_interval); + } - // we need to check and do not try to format the flush events as that wouldn't be valid - if (transit_event->header.metadata->macro_metadata.event() != MacroMetadata::Event::Flush) - { -#if defined(_WIN32) - if (transit_event->header.metadata->macro_metadata.has_wide_char()) - { - // convert the format string to a narrow string - size_t const size_needed = - get_wide_string_encoding_size(transit_event->header.metadata->macro_metadata.wmessage_format()); - std::string format_str(size_needed, 0); - wide_string_to_narrow(format_str.data(), size_needed, - transit_event->header.metadata->macro_metadata.wmessage_format()); - - assert(!transit_event->header.metadata->macro_metadata.is_structured_log_template() && - "structured log templates are not supported for wide characters"); - - read_buffer = transit_event->header.metadata->format_to_fn( - format_str, read_buffer, transit_event->formatted_msg, _args); + // convert the rdtsc value to nanoseconds since epoch + transit_event->header.timestamp = _rdtsc_clock->time_since_epoch(transit_event->header.timestamp); } - else + + // we need to check and do not try to format the flush events as that wouldn't be valid + if (transit_event->header.metadata->macro_metadata.event() != MacroMetadata::Event::Flush) { -#endif - if (transit_event->header.metadata->macro_metadata.is_structured_log_template()) +#if defined(_WIN32) + if (transit_event->header.metadata->macro_metadata.has_wide_char()) { - // for messages containing named arguments threat them as structured logs - auto const search = _slog_templates.find(transit_event->header.metadata); - if (search != std::cend(_slog_templates)) - { - auto const& [fmt_str, structured_keys] = search->second; + // convert the format string to a narrow string + size_t const size_needed = + get_wide_string_encoding_size(transit_event->header.metadata->macro_metadata.wmessage_format()); + std::string format_str(size_needed, 0); + wide_string_to_narrow(format_str.data(), size_needed, + transit_event->header.metadata->macro_metadata.wmessage_format()); - transit_event->structured_keys = structured_keys; + assert(!transit_event->header.metadata->macro_metadata.is_structured_log_template() && + "structured log templates are not supported for wide characters"); - read_buffer = transit_event->header.metadata->format_to_fn( - fmt_str, read_buffer, transit_event->formatted_msg, _args); + read_buffer = transit_event->header.metadata->format_to_fn( + format_str, read_buffer, transit_event->formatted_msg, _args); + } + else + { +#endif + if (transit_event->header.metadata->macro_metadata.is_structured_log_template()) + { + // for messages containing named arguments threat them as structured logs + auto const search = _slog_templates.find(transit_event->header.metadata); + if (search != std::cend(_slog_templates)) + { + auto const& [fmt_str, structured_keys] = search->second; + + transit_event->structured_keys = structured_keys; + + read_buffer = transit_event->header.metadata->format_to_fn( + fmt_str, read_buffer, transit_event->formatted_msg, _args); + } + else + { + auto [fmt_str, structured_keys] = _process_structured_log_template( + transit_event->header.metadata->macro_metadata.message_format()); + + // insert the results + _slog_templates[transit_event->header.metadata] = std::make_pair(fmt_str, structured_keys); + + transit_event->structured_keys = std::move(structured_keys); + + read_buffer = transit_event->header.metadata->format_to_fn( + fmt_str, read_buffer, transit_event->formatted_msg, _args); + } + + // formatted values for any given keys + for (auto const& arg : _args) + { + transit_event->structured_values.emplace_back(fmt::vformat("{}", fmt::basic_format_args(&arg, 1))); + } } else { - auto [fmt_str, structured_keys] = _process_structured_log_template( - transit_event->header.metadata->macro_metadata.message_format()); - - // insert the results - _slog_templates[transit_event->header.metadata] = std::make_pair(fmt_str, structured_keys); - - transit_event->structured_keys = std::move(structured_keys); - + // regular logs read_buffer = transit_event->header.metadata->format_to_fn( - fmt_str, read_buffer, transit_event->formatted_msg, _args); - } - - // formatted values for any given keys - for (auto const& arg : _args) - { - transit_event->structured_values.emplace_back(fmt::vformat("{}", fmt::basic_format_args(&arg, 1))); + transit_event->header.metadata->macro_metadata.message_format(), read_buffer, + transit_event->formatted_msg, _args); } - } - else - { - // regular logs - read_buffer = transit_event->header.metadata->format_to_fn( - transit_event->header.metadata->macro_metadata.message_format(), read_buffer, - transit_event->formatted_msg, _args); - } #if defined(_WIN32) - } + } #endif - } - else - { - // if this is a flush event then we do not need to format anything for the - // transit_event, but we need to set the transit event's flush_flag pointer instead - uintptr_t flush_flag_tmp; - std::memcpy(&flush_flag_tmp, read_buffer, sizeof(uintptr_t)); - transit_event->flush_flag = reinterpret_cast*>(flush_flag_tmp); - read_buffer += sizeof(uintptr_t); - } - - // Finish reading - assert((read_buffer >= read_begin) && "read_buffer should be greater or equal to read_begin"); - spsc_queue.finish_read(static_cast(read_buffer - read_begin)); + } + else + { + // if this is a flush event then we do not need to format anything for the + // transit_event, but we need to set the transit event's flush_flag pointer instead + uintptr_t flush_flag_tmp; + std::memcpy(&flush_flag_tmp, read_buffer, sizeof(uintptr_t)); + transit_event->flush_flag = reinterpret_cast*>(flush_flag_tmp); + read_buffer += sizeof(uintptr_t); + } - _transit_events.emplace(transit_event); + // Finish reading + assert((read_buffer >= read_begin) && "read_buffer should be greater or equal to read_begin"); + auto const read_size = static_cast(read_buffer - read_begin); + spsc_queue.finish_read(read_size); + bytes_available -= read_size; - return true; + _transit_events.emplace(transit_event); + } } /***/ @@ -565,7 +540,7 @@ void BackendWorker::_main_loop() ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts = _thread_context_collection.backend_thread_contexts_cache(); - _populate_priority_queue(cached_thread_contexts, false); + _populate_priority_queue(cached_thread_contexts); if (QUILL_LIKELY(!_transit_events.empty())) { @@ -613,7 +588,7 @@ void BackendWorker::_exit() while (true) { - _populate_priority_queue(cached_thread_contexts, true); + _populate_priority_queue(cached_thread_contexts); if (!_transit_events.empty()) {