Skip to content

Commit

Permalink
backend worker read multiple messages from the queues
Browse files Browse the repository at this point in the history
  • Loading branch information
odygrd committed Nov 19, 2022
1 parent ffff04b commit 07bbf5d
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 136 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- [v2.3.4](#v2.3.4)
- [v2.3.3](#v2.3.3)
- [v2.3.2](#v2.3.2)
- [v2.3.1](#v2.3.1)
Expand Down Expand Up @@ -31,11 +32,18 @@
- [v1.1.0](#v1.1.0)
- [v1.0.0](#v1.0.0)

## v2.3.4

**Improvements**

- Optimise the backend logging thread to read multiple log messages from the same queue, but still fairly read each
queue from all active threads.

## v2.3.3

**Fixes**

- Previously when multiple threads were loggin, Quill backend logging thread would first try reading the log messages of
- Previously when multiple threads were login, Quill backend logging thread would first try reading the log messages of
one thread until the queue was completely empty before reading the log messages of the next thread.
When one of the threads was logging a lot, it could result in only displaying the log of that thread, hiding the
logs of the other threads. This has now been fixed and all log messages from all threads are read fairly.
Expand Down
2 changes: 1 addition & 1 deletion quill/include/quill/Quill.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace quill
/** Version Info **/
constexpr uint32_t VersionMajor{2};
constexpr uint32_t VersionMinor{3};
constexpr uint32_t VersionPatch{3};
constexpr uint32_t VersionPatch{4};
constexpr uint32_t Version{VersionMajor * 10000 + VersionMinor * 100 + VersionPatch};

/** forward declarations **/
Expand Down
243 changes: 109 additions & 134 deletions quill/include/quill/detail/backend/BackendWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,14 @@ class BackendWorker
/**
* Populate our local priority queue
* @param cached_thread_contexts local thread context cache
* @param is_terminating backend worker is terminating
*/
QUILL_ATTRIBUTE_HOT inline void _populate_priority_queue(
ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts, bool is_terminating);
ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts);

/**
* Deserialize an log message from the raw SPSC queue and emplace them to priority queue
*/
QUILL_ATTRIBUTE_HOT inline bool _read_queue_and_decode(ThreadContext* thread_context);
QUILL_ATTRIBUTE_HOT inline void _read_queue_and_decode(ThreadContext* thread_context);

/**
* Checks for events in all queues and processes the one with the minimum timestamp
Expand Down Expand Up @@ -265,38 +264,17 @@ void BackendWorker::run()
}

/***/
void BackendWorker::_populate_priority_queue(ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts,
bool is_terminating)
void BackendWorker::_populate_priority_queue(ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts)
{
while (true)
for (ThreadContext* thread_context : cached_thread_contexts)
{
uint16_t empty{0};
for (ThreadContext* thread_context : cached_thread_contexts)
{
if (!is_terminating && (_transit_events.size() >= _max_transit_events))
{
// transit events queue is full
return;
}

// copy everything to a priority queue
if (!_read_queue_and_decode(thread_context))
{
// _read_queue_and_decode returns false for an empty queue
++empty;
}
}

if (empty == cached_thread_contexts.size())
{
// all queues are empty we are done
return;
}
// copy everything to a priority queue
_read_queue_and_decode(thread_context);
}
}

/***/
bool BackendWorker::_read_queue_and_decode(ThreadContext* thread_context)
void BackendWorker::_read_queue_and_decode(ThreadContext* thread_context)
{
ThreadContext::SPSCQueueT& spsc_queue = thread_context->spsc_queue();

Expand All @@ -308,127 +286,124 @@ bool BackendWorker::_read_queue_and_decode(ThreadContext* thread_context)
auto read = spsc_queue.prepare_read();
std::byte* read_buffer = read.first;
size_t bytes_available = read.second;
std::byte* const read_begin = read_buffer;

if (bytes_available == 0)
while (bytes_available > 0)
{
// the queue is empty
return false;
}

// The queue is empty. First we want to allocate a new TransitEvent to store the message
// from the queue
void* transit_event_buffer = _free_list_allocator.allocate(sizeof(TransitEvent));
auto* transit_event = new (transit_event_buffer) TransitEvent{};
transit_event->thread_id = thread_context->thread_id();
transit_event->thread_name = thread_context->thread_name();

// read the header first, and take copy of the header
read_buffer = detail::align_pointer<alignof(Header), std::byte>(read_buffer);
transit_event->header = *(reinterpret_cast<detail::Header*>(read_buffer));
read_buffer += sizeof(detail::Header);

// if we are using rdtsc clock then here we will convert the value to nanoseconds since epoch
// doing the conversion here ensures that every transit that is inserted in the priority queue
// below has a header timestamp of nanoseconds since epoch and makes it even possible to
// have Logger objects using different clocks
if (transit_event->header.logger_details->timestamp_clock_type() == TimestampClockType::Rdtsc)
{
if (!_rdtsc_clock)
std::byte* const read_begin = read_buffer;

// First we want to allocate a new TransitEvent to store the message from the queue
void* transit_event_buffer = _free_list_allocator.allocate(sizeof(TransitEvent));
auto* transit_event = new (transit_event_buffer) TransitEvent{};
transit_event->thread_id = thread_context->thread_id();
transit_event->thread_name = thread_context->thread_name();

// read the header first, and take copy of the header
read_buffer = detail::align_pointer<alignof(Header), std::byte>(read_buffer);
transit_event->header = *(reinterpret_cast<detail::Header*>(read_buffer));
read_buffer += sizeof(detail::Header);

// if we are using rdtsc clock then here we will convert the value to nanoseconds since epoch
// doing the conversion here ensures that every transit that is inserted in the priority queue
// below has a header timestamp of nanoseconds since epoch and makes it even possible to
// have Logger objects using different clocks
if (transit_event->header.logger_details->timestamp_clock_type() == TimestampClockType::Rdtsc)
{
// Here we lazy initialise rdtsc clock on the backend thread only if the user decides to use it
// Use rdtsc clock based on config. The clock requires a few seconds to init as it is
// taking samples first
_rdtsc_clock = std::make_unique<RdtscClock>(_config.rdtsc_resync_interval);
}

// convert the rdtsc value to nanoseconds since epoch
transit_event->header.timestamp = _rdtsc_clock->time_since_epoch(transit_event->header.timestamp);
}
if (!_rdtsc_clock)
{
// Here we lazy initialise rdtsc clock on the backend thread only if the user decides to use it
// Use rdtsc clock based on config. The clock requires a few seconds to init as it is
// taking samples first
_rdtsc_clock = std::make_unique<RdtscClock>(_config.rdtsc_resync_interval);
}

// we need to check and do not try to format the flush events as that wouldn't be valid
if (transit_event->header.metadata->macro_metadata.event() != MacroMetadata::Event::Flush)
{
#if defined(_WIN32)
if (transit_event->header.metadata->macro_metadata.has_wide_char())
{
// convert the format string to a narrow string
size_t const size_needed =
get_wide_string_encoding_size(transit_event->header.metadata->macro_metadata.wmessage_format());
std::string format_str(size_needed, 0);
wide_string_to_narrow(format_str.data(), size_needed,
transit_event->header.metadata->macro_metadata.wmessage_format());

assert(!transit_event->header.metadata->macro_metadata.is_structured_log_template() &&
"structured log templates are not supported for wide characters");

read_buffer = transit_event->header.metadata->format_to_fn(
format_str, read_buffer, transit_event->formatted_msg, _args);
// convert the rdtsc value to nanoseconds since epoch
transit_event->header.timestamp = _rdtsc_clock->time_since_epoch(transit_event->header.timestamp);
}
else

// we need to check and do not try to format the flush events as that wouldn't be valid
if (transit_event->header.metadata->macro_metadata.event() != MacroMetadata::Event::Flush)
{
#endif
if (transit_event->header.metadata->macro_metadata.is_structured_log_template())
#if defined(_WIN32)
if (transit_event->header.metadata->macro_metadata.has_wide_char())
{
// for messages containing named arguments threat them as structured logs
auto const search = _slog_templates.find(transit_event->header.metadata);
if (search != std::cend(_slog_templates))
{
auto const& [fmt_str, structured_keys] = search->second;
// convert the format string to a narrow string
size_t const size_needed =
get_wide_string_encoding_size(transit_event->header.metadata->macro_metadata.wmessage_format());
std::string format_str(size_needed, 0);
wide_string_to_narrow(format_str.data(), size_needed,
transit_event->header.metadata->macro_metadata.wmessage_format());

transit_event->structured_keys = structured_keys;
assert(!transit_event->header.metadata->macro_metadata.is_structured_log_template() &&
"structured log templates are not supported for wide characters");

read_buffer = transit_event->header.metadata->format_to_fn(
fmt_str, read_buffer, transit_event->formatted_msg, _args);
read_buffer = transit_event->header.metadata->format_to_fn(
format_str, read_buffer, transit_event->formatted_msg, _args);
}
else
{
#endif
if (transit_event->header.metadata->macro_metadata.is_structured_log_template())
{
// for messages containing named arguments threat them as structured logs
auto const search = _slog_templates.find(transit_event->header.metadata);
if (search != std::cend(_slog_templates))
{
auto const& [fmt_str, structured_keys] = search->second;

transit_event->structured_keys = structured_keys;

read_buffer = transit_event->header.metadata->format_to_fn(
fmt_str, read_buffer, transit_event->formatted_msg, _args);
}
else
{
auto [fmt_str, structured_keys] = _process_structured_log_template(
transit_event->header.metadata->macro_metadata.message_format());

// insert the results
_slog_templates[transit_event->header.metadata] = std::make_pair(fmt_str, structured_keys);

transit_event->structured_keys = std::move(structured_keys);

read_buffer = transit_event->header.metadata->format_to_fn(
fmt_str, read_buffer, transit_event->formatted_msg, _args);
}

// formatted values for any given keys
for (auto const& arg : _args)
{
transit_event->structured_values.emplace_back(fmt::vformat("{}", fmt::basic_format_args(&arg, 1)));
}
}
else
{
auto [fmt_str, structured_keys] = _process_structured_log_template(
transit_event->header.metadata->macro_metadata.message_format());

// insert the results
_slog_templates[transit_event->header.metadata] = std::make_pair(fmt_str, structured_keys);

transit_event->structured_keys = std::move(structured_keys);

// regular logs
read_buffer = transit_event->header.metadata->format_to_fn(
fmt_str, read_buffer, transit_event->formatted_msg, _args);
}

// formatted values for any given keys
for (auto const& arg : _args)
{
transit_event->structured_values.emplace_back(fmt::vformat("{}", fmt::basic_format_args(&arg, 1)));
transit_event->header.metadata->macro_metadata.message_format(), read_buffer,
transit_event->formatted_msg, _args);
}
}
else
{
// regular logs
read_buffer = transit_event->header.metadata->format_to_fn(
transit_event->header.metadata->macro_metadata.message_format(), read_buffer,
transit_event->formatted_msg, _args);
}
#if defined(_WIN32)
}
}
#endif
}
else
{
// if this is a flush event then we do not need to format anything for the
// transit_event, but we need to set the transit event's flush_flag pointer instead
uintptr_t flush_flag_tmp;
std::memcpy(&flush_flag_tmp, read_buffer, sizeof(uintptr_t));
transit_event->flush_flag = reinterpret_cast<std::atomic<bool>*>(flush_flag_tmp);
read_buffer += sizeof(uintptr_t);
}

// Finish reading
assert((read_buffer >= read_begin) && "read_buffer should be greater or equal to read_begin");
spsc_queue.finish_read(static_cast<size_t>(read_buffer - read_begin));
}
else
{
// if this is a flush event then we do not need to format anything for the
// transit_event, but we need to set the transit event's flush_flag pointer instead
uintptr_t flush_flag_tmp;
std::memcpy(&flush_flag_tmp, read_buffer, sizeof(uintptr_t));
transit_event->flush_flag = reinterpret_cast<std::atomic<bool>*>(flush_flag_tmp);
read_buffer += sizeof(uintptr_t);
}

_transit_events.emplace(transit_event);
// Finish reading
assert((read_buffer >= read_begin) && "read_buffer should be greater or equal to read_begin");
auto const read_size = static_cast<size_t>(read_buffer - read_begin);
spsc_queue.finish_read(read_size);
bytes_available -= read_size;

return true;
_transit_events.emplace(transit_event);
}
}

/***/
Expand Down Expand Up @@ -565,7 +540,7 @@ void BackendWorker::_main_loop()
ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts =
_thread_context_collection.backend_thread_contexts_cache();

_populate_priority_queue(cached_thread_contexts, false);
_populate_priority_queue(cached_thread_contexts);

if (QUILL_LIKELY(!_transit_events.empty()))
{
Expand Down Expand Up @@ -613,7 +588,7 @@ void BackendWorker::_exit()

while (true)
{
_populate_priority_queue(cached_thread_contexts, true);
_populate_priority_queue(cached_thread_contexts);

if (!_transit_events.empty())
{
Expand Down

0 comments on commit 07bbf5d

Please sign in to comment.