From 22ef9c733af3724bb9417a7721a4e693c4950a05 Mon Sep 17 00:00:00 2001 From: Odysseas Georgoudis Date: Mon, 27 Nov 2023 16:04:38 +0000 Subject: [PATCH] improve compile time --- quill/CMakeLists.txt | 2 + quill/include/quill/detail/ThreadContext.h | 25 +--- .../quill/detail/ThreadContextCollection.h | 65 +--------- .../quill/detail/backend/BackendWorker.h | 122 +----------------- quill/src/detail/ThreadContext.cpp | 30 +++++ quill/src/detail/ThreadContextCollection.cpp | 72 +++++++++++ quill/src/detail/backend/BackendWorker.cpp | 107 +++++++++++++++ 7 files changed, 227 insertions(+), 196 deletions(-) create mode 100644 quill/src/detail/ThreadContext.cpp create mode 100644 quill/src/detail/ThreadContextCollection.cpp diff --git a/quill/CMakeLists.txt b/quill/CMakeLists.txt index 10dee3dc..76a494c3 100644 --- a/quill/CMakeLists.txt +++ b/quill/CMakeLists.txt @@ -73,6 +73,8 @@ set(SOURCE_FILES src/detail/HandlerCollection.cpp src/detail/LoggerCollection.cpp src/detail/SignalHandler.cpp + src/detail/ThreadContext.cpp + src/detail/ThreadContextCollection.cpp src/handlers/ConsoleHandler.cpp src/handlers/FileHandler.cpp diff --git a/quill/include/quill/detail/ThreadContext.h b/quill/include/quill/detail/ThreadContext.h index d2530493..f9ef4d45 100644 --- a/quill/include/quill/detail/ThreadContext.h +++ b/quill/include/quill/detail/ThreadContext.h @@ -34,20 +34,8 @@ class alignas(CACHE_LINE_ALIGNED) ThreadContext /** * Constructor */ - explicit ThreadContext(QueueType queue_type, uint32_t default_queue_capacity, - uint32_t initial_transit_event_buffer_capacity, bool huge_pages) - : _transit_event_buffer(initial_transit_event_buffer_capacity) - { - if ((queue_type == QueueType::UnboundedBlocking) || - (queue_type == QueueType::UnboundedNoMaxLimit) || (queue_type == QueueType::UnboundedDropping)) - { - _spsc_queue.emplace(default_queue_capacity, huge_pages); - } - else - { - _spsc_queue.emplace(default_queue_capacity, huge_pages); - } - } + ThreadContext(QueueType queue_type, uint32_t default_queue_capacity, + uint32_t initial_transit_event_buffer_capacity, bool huge_pages); /** * Deleted @@ -148,14 +136,7 @@ class alignas(CACHE_LINE_ALIGNED) ThreadContext * counter Called by the backend worker thread * @return current value of the message message counter */ - QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t get_and_reset_message_failure_counter() noexcept - { - if (QUILL_LIKELY(_message_failure_counter.load(std::memory_order_relaxed) == 0)) - { - return 0; - } - return _message_failure_counter.exchange(0, std::memory_order_relaxed); - } + QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t get_and_reset_message_failure_counter() noexcept; private: std::variant _spsc_queue; /** queue for this thread, events are pushed here */ diff --git a/quill/include/quill/detail/ThreadContextCollection.h b/quill/include/quill/detail/ThreadContextCollection.h index 5f569a2b..3c82a0d0 100644 --- a/quill/include/quill/detail/ThreadContextCollection.h +++ b/quill/include/quill/detail/ThreadContextCollection.h @@ -6,6 +6,7 @@ #pragma once #include "quill/Config.h" +#include "quill/detail/ThreadContext.h" #include "quill/detail/misc/Attributes.h" // for QUILL_ATTRIBUTE_HOT #include "quill/detail/misc/Common.h" // for CACHE_LINE_ALIGNED #include // for atomic @@ -23,8 +24,6 @@ struct Config; namespace detail { -class ThreadContext; - /** * ThreadContextCollection class * a) Creates or returns the existing thread local ThreadContext instance to the thread that called Logger.log() @@ -161,26 +160,7 @@ class ThreadContextCollection * If there are no invalidated contexts or no new contexts the existing cache is returned * @return All current owned thread contexts */ - QUILL_NODISCARD QUILL_ATTRIBUTE_HOT backend_thread_contexts_cache_t const& backend_thread_contexts_cache() - { - // Check if _thread_contexts has changed. This can happen only when a new thread context is added by any Logger - if (QUILL_UNLIKELY(_has_new_thread_context())) - { - // if the thread _thread_contexts was changed we lock and remake our reference cache - std::lock_guard const lock {_mutex}; - _thread_context_cache.clear(); - - // Remake thread context ref - for (auto const& elem : _thread_contexts) - { - // We do skip invalidated && empty queue thread contexts as this is very rare, so instead - // we just add them and expect them to be cleaned in the next iteration - _thread_context_cache.push_back(elem.get()); - } - } - - return _thread_context_cache; - } + QUILL_NODISCARD QUILL_ATTRIBUTE_HOT backend_thread_contexts_cache_t const& backend_thread_contexts_cache(); /** * Clears thread context cache from invalid and empty thread contexts @@ -200,60 +180,29 @@ class ThreadContextCollection * @note Only accessed by the backend thread * @return true if the shared data structure was changed by any calls to Logger */ - QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _has_new_thread_context() noexcept - { - // Again relaxed memory model as in case it is false we will acquire the lock - if (_new_thread_context.load(std::memory_order_relaxed)) - { - // if the variable was updated to true, set it to false, - // There should not be any race condition here as this is the only place _changed is set to - // false, and we will return true anyway - _new_thread_context.store(false, std::memory_order_relaxed); - return true; - } - return false; - } + QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _has_new_thread_context() noexcept; /** * Indicate that the context has changed. A new thread context has been added or removed * @note Only called by the caller threads */ - void _set_new_thread_context() noexcept - { - // Set changed is used with the lock, we can have relaxed memory order here as the lock - // is acq/rel anyway - return _new_thread_context.store(true, std::memory_order_relaxed); - } + void _set_new_thread_context() noexcept; /** * Increment the counter for a removed thread context. This notifies the backend thread to look for an invalidated context */ - void _add_invalid_thread_context() noexcept - { - // relaxed is fine, see _has_invalid_thread_context explanation - _invalid_thread_context.fetch_add(1, std::memory_order_relaxed); - } + void _add_invalid_thread_context() noexcept; /** * Reduce the value of thread context removed counter. This is decreased by the backend thread * when we found and removed the invalided context */ - void _sub_invalid_thread_context() noexcept - { - // relaxed is fine, see _has_invalid_thread_context explanation - _invalid_thread_context.fetch_sub(1, std::memory_order_relaxed); - } + void _sub_invalid_thread_context() noexcept; /** * @return True if there is an invalid thread context */ - QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _has_invalid_thread_context() const noexcept - { - // Here we do relaxed because if the value is not zero we will look inside ThreadContext invalid - // flag that is also a relaxed atomic, and then we will look into the SPSC queue size that is - // also atomic Even if we don't read everything in order we will check again in the next circle - return _invalid_thread_context.load(std::memory_order_relaxed) != 0; - } + QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _has_invalid_thread_context() const noexcept; /** * Remove a thread context from our main thread context collection diff --git a/quill/include/quill/detail/backend/BackendWorker.h b/quill/include/quill/detail/backend/BackendWorker.h index a1e93a7a..640d9c87 100644 --- a/quill/include/quill/detail/backend/BackendWorker.h +++ b/quill/include/quill/detail/backend/BackendWorker.h @@ -140,12 +140,12 @@ class BackendWorker /** * Process a single trnasit event */ - QUILL_ATTRIBUTE_HOT inline void _process_transit_event(TransitEvent& transit_event); + QUILL_ATTRIBUTE_HOT void _process_transit_event(TransitEvent& transit_event); /** * Write a transit event */ - QUILL_ATTRIBUTE_HOT inline void _write_transit_event(TransitEvent const& transit_event) const; + QUILL_ATTRIBUTE_HOT void _write_transit_event(TransitEvent const& transit_event) const; /** * Process the lowest timestamp from the queues and write it to the log file @@ -156,7 +156,7 @@ class BackendWorker /** * Force flush all active Handlers */ - QUILL_ATTRIBUTE_HOT inline void _force_flush(); + QUILL_ATTRIBUTE_HOT void _force_flush(); /** * Check for dropped messages - only when bounded queue is used @@ -474,76 +474,7 @@ void BackendWorker::_process_transit_events(ThreadContextCollection::backend_thr TransitEvent* transit_event = transit_buffer->front(); assert(transit_event && "transit_buffer is set only when transit_event is valid"); - _process_transit_event(*transit_event); - - // Remove this event and move to the next. - transit_buffer->pop_front(); -} - -/***/ -void BackendWorker::_process_transit_event(TransitEvent& transit_event) -{ - MacroMetadata const macro_metadata = transit_event.metadata(); - - // If backend_process(...) throws we want to skip this event and move to the next, so we catch the - // error here instead of catching it in the parent try/catch block of main_loop - QUILL_TRY - { - if (macro_metadata.event() == MacroMetadata::Event::Log) - { - if (transit_event.log_level() != LogLevel::Backtrace) - { - _write_transit_event(transit_event); - - // We also need to check the severity of the log message here against the backtrace - // Check if we should also flush the backtrace messages: - // After we forwarded the message we will check the severity of this message for this logger - // If the severity of the message is higher than the backtrace flush severity we will also - // flush the backtrace of the logger - if (QUILL_UNLIKELY(transit_event.log_level() >= transit_event.header.logger_details->backtrace_flush_level())) - { - _backtrace_log_message_storage.process(transit_event.header.logger_details->name(), - [this](TransitEvent const& te) - { _write_transit_event(te); }); - } - } - else - { - // this is a backtrace log and we will store it - _backtrace_log_message_storage.store(std::move(transit_event)); - } - } - else if (macro_metadata.event() == MacroMetadata::Event::InitBacktrace) - { - // we can just convert the capacity back to int here and use it - _backtrace_log_message_storage.set_capacity( - transit_event.header.logger_details->name(), - static_cast(std::stoul( - std::string{transit_event.formatted_msg.begin(), transit_event.formatted_msg.end()}))); - } - else if (macro_metadata.event() == MacroMetadata::Event::FlushBacktrace) - { - // process all records in backtrace for this logger_name and log them by calling backend_process_backtrace_log_message - _backtrace_log_message_storage.process(transit_event.header.logger_details->name(), - [this](TransitEvent const& te) - { _write_transit_event(te); }); - } - else if (macro_metadata.event() == MacroMetadata::Event::Flush) - { - _handler_collection.active_handlers(_active_handlers_cache); - _force_flush(); - - // this is a flush event, so we need to notify the caller to continue now - transit_event.flush_flag->store(true); - - // we also need to reset the flush_flag as the TransitEvents are re-used - transit_event.flush_flag = nullptr; - } - - // Since after processing an event we never force flush but leave it up to the OS instead, - // set this to true to keep track of unflushed messages we have - _has_unflushed_messages = true; - } + QUILL_TRY { _process_transit_event(*transit_event); } #if !defined(QUILL_NO_EXCEPTIONS) QUILL_CATCH(std::exception const& e) { _notification_handler(e.what()); } QUILL_CATCH_ALL() @@ -551,31 +482,9 @@ void BackendWorker::_process_transit_event(TransitEvent& transit_event) _notification_handler(std::string{"Caught unhandled exception."}); } // clang-format on #endif -} -/***/ -void BackendWorker::_write_transit_event(TransitEvent const& transit_event) const -{ - // Forward the record to all the logger handlers - MacroMetadata const macro_metadata = transit_event.metadata(); - - for (auto& handler : transit_event.header.logger_details->handlers()) - { - auto const& formatted_log_message_buffer = handler->formatter().format( - std::chrono::nanoseconds{transit_event.header.timestamp}, transit_event.thread_id, - transit_event.thread_name, _process_id, transit_event.header.logger_details->name(), - transit_event.log_level_as_str(), macro_metadata, transit_event.formatted_msg); - - // If all filters are okay we write this message to the file - if (handler->apply_filters(transit_event.thread_id, - std::chrono::nanoseconds{transit_event.header.timestamp}, - transit_event.log_level(), macro_metadata, formatted_log_message_buffer)) - { - // log to the handler, also pass the log_message_timestamp this is only needed in some - // cases like daily file rotation - handler->write(formatted_log_message_buffer, transit_event); - } - } + // Remove this event and move to the next. + transit_buffer->pop_front(); } /***/ @@ -651,25 +560,6 @@ bool BackendWorker::_process_and_write_single_message(const ThreadContextCollect return true; } -/***/ -void BackendWorker::_force_flush() -{ - if (_has_unflushed_messages) - { - // If we have buffered any messages then flush all active handlers - for (auto const& handler : _active_handlers_cache) - { - std::shared_ptr h = handler.lock(); - if (h) - { - h->flush(); - } - } - - _has_unflushed_messages = false; - } -} - /***/ void BackendWorker::_check_message_failures(ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts, backend_worker_notification_handler_t const& notification_handler) noexcept diff --git a/quill/src/detail/ThreadContext.cpp b/quill/src/detail/ThreadContext.cpp new file mode 100644 index 00000000..e9c0df58 --- /dev/null +++ b/quill/src/detail/ThreadContext.cpp @@ -0,0 +1,30 @@ +#include "quill/detail/ThreadContext.h" + +namespace quill::detail +{ +/***/ +ThreadContext::ThreadContext(QueueType queue_type, uint32_t default_queue_capacity, + uint32_t initial_transit_event_buffer_capacity, bool huge_pages) + : _transit_event_buffer(initial_transit_event_buffer_capacity) +{ + if ((queue_type == QueueType::UnboundedBlocking) || + (queue_type == QueueType::UnboundedNoMaxLimit) || (queue_type == QueueType::UnboundedDropping)) + { + _spsc_queue.emplace(default_queue_capacity, huge_pages); + } + else + { + _spsc_queue.emplace(default_queue_capacity, huge_pages); + } +} + +/***/ +size_t ThreadContext::get_and_reset_message_failure_counter() noexcept +{ + if (QUILL_LIKELY(_message_failure_counter.load(std::memory_order_relaxed) == 0)) + { + return 0; + } + return _message_failure_counter.exchange(0, std::memory_order_relaxed); +} +} // namespace quill::detail diff --git a/quill/src/detail/ThreadContextCollection.cpp b/quill/src/detail/ThreadContextCollection.cpp new file mode 100644 index 00000000..a07ccc58 --- /dev/null +++ b/quill/src/detail/ThreadContextCollection.cpp @@ -0,0 +1,72 @@ +#include "quill/detail/ThreadContextCollection.h" + +namespace quill::detail +{ +/***/ +QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ThreadContextCollection::backend_thread_contexts_cache_t const& ThreadContextCollection::backend_thread_contexts_cache() +{ + // Check if _thread_contexts has changed. This can happen only when a new thread context is added by any Logger + if (QUILL_UNLIKELY(_has_new_thread_context())) + { + // if the thread _thread_contexts was changed we lock and remake our reference cache + std::lock_guard const lock{_mutex}; + _thread_context_cache.clear(); + + // Remake thread context ref + for (auto const& elem : _thread_contexts) + { + // We do skip invalidated && empty queue thread contexts as this is very rare, so instead + // we just add them and expect them to be cleaned in the next iteration + _thread_context_cache.push_back(elem.get()); + } + } + + return _thread_context_cache; +} + +/***/ +bool ThreadContextCollection::_has_new_thread_context() noexcept +{ + // Again relaxed memory model as in case it is false we will acquire the lock + if (_new_thread_context.load(std::memory_order_relaxed)) + { + // if the variable was updated to true, set it to false, + // There should not be any race condition here as this is the only place _changed is set to + // false, and we will return true anyway + _new_thread_context.store(false, std::memory_order_relaxed); + return true; + } + return false; +} + +/***/ +void ThreadContextCollection::_set_new_thread_context() noexcept +{ + // Set changed is used with the lock, we can have relaxed memory order here as the lock + // is acq/rel anyway + return _new_thread_context.store(true, std::memory_order_relaxed); +} + +/***/ +void ThreadContextCollection::_add_invalid_thread_context() noexcept +{ + // relaxed is fine, see _has_invalid_thread_context explanation + _invalid_thread_context.fetch_add(1, std::memory_order_relaxed); +} + +/***/ +void ThreadContextCollection::_sub_invalid_thread_context() noexcept +{ + // relaxed is fine, see _has_invalid_thread_context explanation + _invalid_thread_context.fetch_sub(1, std::memory_order_relaxed); +} + +/***/ +bool ThreadContextCollection::_has_invalid_thread_context() const noexcept +{ + // Here we do relaxed because if the value is not zero we will look inside ThreadContext invalid + // flag that is also a relaxed atomic, and then we will look into the SPSC queue size that is + // also atomic Even if we don't read everything in order we will check again in the next circle + return _invalid_thread_context.load(std::memory_order_relaxed) != 0; +} +} // namespace quill::detail \ No newline at end of file diff --git a/quill/src/detail/backend/BackendWorker.cpp b/quill/src/detail/backend/BackendWorker.cpp index 23263f79..cc045aef 100644 --- a/quill/src/detail/backend/BackendWorker.cpp +++ b/quill/src/detail/backend/BackendWorker.cpp @@ -343,6 +343,113 @@ std::pair> BackendWorker::_process_structu return std::make_pair(fmt_str, keys); } +/***/ +void BackendWorker::_write_transit_event(TransitEvent const& transit_event) const +{ + // Forward the record to all the logger handlers + MacroMetadata const macro_metadata = transit_event.metadata(); + + for (auto& handler : transit_event.header.logger_details->handlers()) + { + auto const& formatted_log_message_buffer = handler->formatter().format( + std::chrono::nanoseconds{transit_event.header.timestamp}, transit_event.thread_id, + transit_event.thread_name, _process_id, transit_event.header.logger_details->name(), + transit_event.log_level_as_str(), macro_metadata, transit_event.formatted_msg); + + // If all filters are okay we write this message to the file + if (handler->apply_filters(transit_event.thread_id, + std::chrono::nanoseconds{transit_event.header.timestamp}, + transit_event.log_level(), macro_metadata, formatted_log_message_buffer)) + { + // log to the handler, also pass the log_message_timestamp this is only needed in some + // cases like daily file rotation + handler->write(formatted_log_message_buffer, transit_event); + } + } +} + +/***/ +void BackendWorker::_process_transit_event(TransitEvent& transit_event) +{ + MacroMetadata const macro_metadata = transit_event.metadata(); + + // If backend_process(...) throws we want to skip this event and move to the next, so we catch the + // error here instead of catching it in the parent try/catch block of main_loop + if (macro_metadata.event() == MacroMetadata::Event::Log) + { + if (transit_event.log_level() != LogLevel::Backtrace) + { + _write_transit_event(transit_event); + + // We also need to check the severity of the log message here against the backtrace + // Check if we should also flush the backtrace messages: + // After we forwarded the message we will check the severity of this message for this logger + // If the severity of the message is higher than the backtrace flush severity we will also + // flush the backtrace of the logger + if (QUILL_UNLIKELY(transit_event.log_level() >= transit_event.header.logger_details->backtrace_flush_level())) + { + _backtrace_log_message_storage.process(transit_event.header.logger_details->name(), + [this](TransitEvent const& te) + { _write_transit_event(te); }); + } + } + else + { + // this is a backtrace log and we will store it + _backtrace_log_message_storage.store(std::move(transit_event)); + } + } + else if (macro_metadata.event() == MacroMetadata::Event::InitBacktrace) + { + // we can just convert the capacity back to int here and use it + _backtrace_log_message_storage.set_capacity( + transit_event.header.logger_details->name(), + static_cast(std::stoul( + std::string{transit_event.formatted_msg.begin(), transit_event.formatted_msg.end()}))); + } + else if (macro_metadata.event() == MacroMetadata::Event::FlushBacktrace) + { + // process all records in backtrace for this logger_name and log them by calling backend_process_backtrace_log_message + _backtrace_log_message_storage.process(transit_event.header.logger_details->name(), + [this](TransitEvent const& te) + { _write_transit_event(te); }); + } + else if (macro_metadata.event() == MacroMetadata::Event::Flush) + { + _handler_collection.active_handlers(_active_handlers_cache); + _force_flush(); + + // this is a flush event, so we need to notify the caller to continue now + transit_event.flush_flag->store(true); + + // we also need to reset the flush_flag as the TransitEvents are re-used + transit_event.flush_flag = nullptr; + } + + // Since after processing an event we never force flush but leave it up to the OS instead, + // set this to true to keep track of unflushed messages we have + _has_unflushed_messages = true; +} + +/***/ +void BackendWorker::_force_flush() +{ + if (_has_unflushed_messages) + { + // If we have buffered any messages then flush all active handlers + for (auto const& handler : _active_handlers_cache) + { + std::shared_ptr h = handler.lock(); + if (h) + { + h->flush(); + } + } + + _has_unflushed_messages = false; + } +} + /***/ bool BackendWorker::_check_all_queues_empty(ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts) {