Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve compile time #377

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quill/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ set(SOURCE_FILES
src/detail/HandlerCollection.cpp
src/detail/LoggerCollection.cpp
src/detail/SignalHandler.cpp
src/detail/ThreadContext.cpp
src/detail/ThreadContextCollection.cpp

src/handlers/ConsoleHandler.cpp
src/handlers/FileHandler.cpp
Expand Down
25 changes: 3 additions & 22 deletions quill/include/quill/detail/ThreadContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,8 @@ class alignas(CACHE_LINE_ALIGNED) ThreadContext
/**
* Constructor
*/
explicit ThreadContext(QueueType queue_type, uint32_t default_queue_capacity,
uint32_t initial_transit_event_buffer_capacity, bool huge_pages)
: _transit_event_buffer(initial_transit_event_buffer_capacity)
{
if ((queue_type == QueueType::UnboundedBlocking) ||
(queue_type == QueueType::UnboundedNoMaxLimit) || (queue_type == QueueType::UnboundedDropping))
{
_spsc_queue.emplace<UnboundedQueue>(default_queue_capacity, huge_pages);
}
else
{
_spsc_queue.emplace<BoundedQueue>(default_queue_capacity, huge_pages);
}
}
ThreadContext(QueueType queue_type, uint32_t default_queue_capacity,
uint32_t initial_transit_event_buffer_capacity, bool huge_pages);

/**
* Deleted
Expand Down Expand Up @@ -148,14 +136,7 @@ class alignas(CACHE_LINE_ALIGNED) ThreadContext
* counter Called by the backend worker thread
* @return current value of the message message counter
*/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t get_and_reset_message_failure_counter() noexcept
{
if (QUILL_LIKELY(_message_failure_counter.load(std::memory_order_relaxed) == 0))
{
return 0;
}
return _message_failure_counter.exchange(0, std::memory_order_relaxed);
}
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t get_and_reset_message_failure_counter() noexcept;

private:
std::variant<std::monostate, UnboundedQueue, BoundedQueue> _spsc_queue; /** queue for this thread, events are pushed here */
Expand Down
65 changes: 7 additions & 58 deletions quill/include/quill/detail/ThreadContextCollection.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#pragma once

#include "quill/Config.h"
#include "quill/detail/ThreadContext.h"
#include "quill/detail/misc/Attributes.h" // for QUILL_ATTRIBUTE_HOT
#include "quill/detail/misc/Common.h" // for CACHE_LINE_ALIGNED
#include <atomic> // for atomic
Expand All @@ -23,8 +24,6 @@ struct Config;
namespace detail
{

class ThreadContext;

/**
* ThreadContextCollection class
* a) Creates or returns the existing thread local ThreadContext instance to the thread that called Logger.log()
Expand Down Expand Up @@ -161,26 +160,7 @@ class ThreadContextCollection
* If there are no invalidated contexts or no new contexts the existing cache is returned
* @return All current owned thread contexts
*/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT backend_thread_contexts_cache_t const& backend_thread_contexts_cache()
{
// Check if _thread_contexts has changed. This can happen only when a new thread context is added by any Logger
if (QUILL_UNLIKELY(_has_new_thread_context()))
{
// if the thread _thread_contexts was changed we lock and remake our reference cache
std::lock_guard<std::mutex> const lock {_mutex};
_thread_context_cache.clear();

// Remake thread context ref
for (auto const& elem : _thread_contexts)
{
// We do skip invalidated && empty queue thread contexts as this is very rare, so instead
// we just add them and expect them to be cleaned in the next iteration
_thread_context_cache.push_back(elem.get());
}
}

return _thread_context_cache;
}
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT backend_thread_contexts_cache_t const& backend_thread_contexts_cache();

/**
* Clears thread context cache from invalid and empty thread contexts
Expand All @@ -200,60 +180,29 @@ class ThreadContextCollection
* @note Only accessed by the backend thread
* @return true if the shared data structure was changed by any calls to Logger
*/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _has_new_thread_context() noexcept
{
// Again relaxed memory model as in case it is false we will acquire the lock
if (_new_thread_context.load(std::memory_order_relaxed))
{
// if the variable was updated to true, set it to false,
// There should not be any race condition here as this is the only place _changed is set to
// false, and we will return true anyway
_new_thread_context.store(false, std::memory_order_relaxed);
return true;
}
return false;
}
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _has_new_thread_context() noexcept;

/**
* Indicate that the context has changed. A new thread context has been added or removed
* @note Only called by the caller threads
*/
void _set_new_thread_context() noexcept
{
// Set changed is used with the lock, we can have relaxed memory order here as the lock
// is acq/rel anyway
return _new_thread_context.store(true, std::memory_order_relaxed);
}
void _set_new_thread_context() noexcept;

/**
* Increment the counter for a removed thread context. This notifies the backend thread to look for an invalidated context
*/
void _add_invalid_thread_context() noexcept
{
// relaxed is fine, see _has_invalid_thread_context explanation
_invalid_thread_context.fetch_add(1, std::memory_order_relaxed);
}
void _add_invalid_thread_context() noexcept;

/**
* Reduce the value of thread context removed counter. This is decreased by the backend thread
* when we found and removed the invalided context
*/
void _sub_invalid_thread_context() noexcept
{
// relaxed is fine, see _has_invalid_thread_context explanation
_invalid_thread_context.fetch_sub(1, std::memory_order_relaxed);
}
void _sub_invalid_thread_context() noexcept;

/**
* @return True if there is an invalid thread context
*/
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _has_invalid_thread_context() const noexcept
{
// Here we do relaxed because if the value is not zero we will look inside ThreadContext invalid
// flag that is also a relaxed atomic, and then we will look into the SPSC queue size that is
// also atomic Even if we don't read everything in order we will check again in the next circle
return _invalid_thread_context.load(std::memory_order_relaxed) != 0;
}
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _has_invalid_thread_context() const noexcept;

/**
* Remove a thread context from our main thread context collection
Expand Down
122 changes: 6 additions & 116 deletions quill/include/quill/detail/backend/BackendWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ class BackendWorker
/**
* Process a single trnasit event
*/
QUILL_ATTRIBUTE_HOT inline void _process_transit_event(TransitEvent& transit_event);
QUILL_ATTRIBUTE_HOT void _process_transit_event(TransitEvent& transit_event);

/**
* Write a transit event
*/
QUILL_ATTRIBUTE_HOT inline void _write_transit_event(TransitEvent const& transit_event) const;
QUILL_ATTRIBUTE_HOT void _write_transit_event(TransitEvent const& transit_event) const;

/**
* Process the lowest timestamp from the queues and write it to the log file
Expand All @@ -156,7 +156,7 @@ class BackendWorker
/**
* Force flush all active Handlers
*/
QUILL_ATTRIBUTE_HOT inline void _force_flush();
QUILL_ATTRIBUTE_HOT void _force_flush();

/**
* Check for dropped messages - only when bounded queue is used
Expand Down Expand Up @@ -474,108 +474,17 @@ void BackendWorker::_process_transit_events(ThreadContextCollection::backend_thr
TransitEvent* transit_event = transit_buffer->front();
assert(transit_event && "transit_buffer is set only when transit_event is valid");

_process_transit_event(*transit_event);

// Remove this event and move to the next.
transit_buffer->pop_front();
}

/***/
void BackendWorker::_process_transit_event(TransitEvent& transit_event)
{
MacroMetadata const macro_metadata = transit_event.metadata();

// If backend_process(...) throws we want to skip this event and move to the next, so we catch the
// error here instead of catching it in the parent try/catch block of main_loop
QUILL_TRY
{
if (macro_metadata.event() == MacroMetadata::Event::Log)
{
if (transit_event.log_level() != LogLevel::Backtrace)
{
_write_transit_event(transit_event);

// We also need to check the severity of the log message here against the backtrace
// Check if we should also flush the backtrace messages:
// After we forwarded the message we will check the severity of this message for this logger
// If the severity of the message is higher than the backtrace flush severity we will also
// flush the backtrace of the logger
if (QUILL_UNLIKELY(transit_event.log_level() >= transit_event.header.logger_details->backtrace_flush_level()))
{
_backtrace_log_message_storage.process(transit_event.header.logger_details->name(),
[this](TransitEvent const& te)
{ _write_transit_event(te); });
}
}
else
{
// this is a backtrace log and we will store it
_backtrace_log_message_storage.store(std::move(transit_event));
}
}
else if (macro_metadata.event() == MacroMetadata::Event::InitBacktrace)
{
// we can just convert the capacity back to int here and use it
_backtrace_log_message_storage.set_capacity(
transit_event.header.logger_details->name(),
static_cast<uint32_t>(std::stoul(
std::string{transit_event.formatted_msg.begin(), transit_event.formatted_msg.end()})));
}
else if (macro_metadata.event() == MacroMetadata::Event::FlushBacktrace)
{
// process all records in backtrace for this logger_name and log them by calling backend_process_backtrace_log_message
_backtrace_log_message_storage.process(transit_event.header.logger_details->name(),
[this](TransitEvent const& te)
{ _write_transit_event(te); });
}
else if (macro_metadata.event() == MacroMetadata::Event::Flush)
{
_handler_collection.active_handlers(_active_handlers_cache);
_force_flush();

// this is a flush event, so we need to notify the caller to continue now
transit_event.flush_flag->store(true);

// we also need to reset the flush_flag as the TransitEvents are re-used
transit_event.flush_flag = nullptr;
}

// Since after processing an event we never force flush but leave it up to the OS instead,
// set this to true to keep track of unflushed messages we have
_has_unflushed_messages = true;
}
QUILL_TRY { _process_transit_event(*transit_event); }
#if !defined(QUILL_NO_EXCEPTIONS)
QUILL_CATCH(std::exception const& e) { _notification_handler(e.what()); }
QUILL_CATCH_ALL()
{
_notification_handler(std::string{"Caught unhandled exception."});
} // clang-format on
#endif
}

/***/
void BackendWorker::_write_transit_event(TransitEvent const& transit_event) const
{
// Forward the record to all the logger handlers
MacroMetadata const macro_metadata = transit_event.metadata();

for (auto& handler : transit_event.header.logger_details->handlers())
{
auto const& formatted_log_message_buffer = handler->formatter().format(
std::chrono::nanoseconds{transit_event.header.timestamp}, transit_event.thread_id,
transit_event.thread_name, _process_id, transit_event.header.logger_details->name(),
transit_event.log_level_as_str(), macro_metadata, transit_event.formatted_msg);

// If all filters are okay we write this message to the file
if (handler->apply_filters(transit_event.thread_id,
std::chrono::nanoseconds{transit_event.header.timestamp},
transit_event.log_level(), macro_metadata, formatted_log_message_buffer))
{
// log to the handler, also pass the log_message_timestamp this is only needed in some
// cases like daily file rotation
handler->write(formatted_log_message_buffer, transit_event);
}
}
// Remove this event and move to the next.
transit_buffer->pop_front();
}

/***/
Expand Down Expand Up @@ -651,25 +560,6 @@ bool BackendWorker::_process_and_write_single_message(const ThreadContextCollect
return true;
}

/***/
void BackendWorker::_force_flush()
{
if (_has_unflushed_messages)
{
// If we have buffered any messages then flush all active handlers
for (auto const& handler : _active_handlers_cache)
{
std::shared_ptr<Handler> h = handler.lock();
if (h)
{
h->flush();
}
}

_has_unflushed_messages = false;
}
}

/***/
void BackendWorker::_check_message_failures(ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts,
backend_worker_notification_handler_t const& notification_handler) noexcept
Expand Down
30 changes: 30 additions & 0 deletions quill/src/detail/ThreadContext.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include "quill/detail/ThreadContext.h"

namespace quill::detail
{
/***/
ThreadContext::ThreadContext(QueueType queue_type, uint32_t default_queue_capacity,
uint32_t initial_transit_event_buffer_capacity, bool huge_pages)
: _transit_event_buffer(initial_transit_event_buffer_capacity)
{
if ((queue_type == QueueType::UnboundedBlocking) ||
(queue_type == QueueType::UnboundedNoMaxLimit) || (queue_type == QueueType::UnboundedDropping))
{
_spsc_queue.emplace<UnboundedQueue>(default_queue_capacity, huge_pages);
}
else
{
_spsc_queue.emplace<BoundedQueue>(default_queue_capacity, huge_pages);
}
}

/***/
size_t ThreadContext::get_and_reset_message_failure_counter() noexcept
{
if (QUILL_LIKELY(_message_failure_counter.load(std::memory_order_relaxed) == 0))
{
return 0;
}
return _message_failure_counter.exchange(0, std::memory_order_relaxed);
}
} // namespace quill::detail
Loading