Skip to content

Commit

Permalink
report messages via the notificaiton handler
Browse files Browse the repository at this point in the history
  • Loading branch information
odygrd committed May 12, 2023
1 parent 8c91d49 commit b5dd703
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 100 deletions.
16 changes: 13 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
- [v2.8.1](#v281)
- [v2.9.0](#v290)
- [v2.8.0](#v280)
- [v2.7.0](#v270)
- [v2.6.0](#v260)
Expand Down Expand Up @@ -41,11 +41,21 @@
- [v1.1.0](#v110)
- [v1.0.0](#v100)

## v2.8.1
## v2.9.0

**Fixes**

- Fixed a bug in TimeRotatingFileHandler ([#287](https://github.com/odygrd/quill/pull/287))
- Fixed a bug in TimeRotatingFileHandler. ([#287](https://github.com/odygrd/quill/pull/287))

**Improvements**

- Renamed `backend_thread_error_handler` to `backend_thread_notifications_handler` in `Config.h`. Previously this
handler was
used only to report errors from the backend worker thread to the user. This callback will also now report
info messages to the user.
- Report unbounded spsc queue reallocation via
the `backend_thread_notifications_handler`. ([#286](https://github.com/odygrd/quill/pull/286))
- Report bounded spsc queue dropped messages via the `backend_thread_notifications_handler`.

## v2.8.0

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/backend_throughput/quill_backend_throughput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ int main()
auto delta_d = std::chrono::duration_cast<std::chrono::duration<double>>(delta).count();

std::cout << fmt::format(
"Throughput is {:.2f} million msgs/sec average, total time elapsed: {} ms for {} "
"Throughput is {:.2f} million msgs/sec average, total time elapsed: {} ms for {} "
"log messages \n",
total_iterations / delta_d / 1e6,
std::chrono::duration_cast<std::chrono::milliseconds>(delta).count(), total_iterations)
Expand Down
8 changes: 3 additions & 5 deletions examples/example_configure_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ int main()
{
quill::Config cfg;

#if !defined(QUILL_NO_EXCEPTIONS)
// Set a custom error handler to handler exceptions - if exceptions are enabled
cfg.backend_thread_error_handler = [](std::string const& s)
{ std::cout << "Hello from error handler. Error: " << s << std::endl; };
#endif

cfg.backend_thread_notification_handler = [](std::string const& s)
{ std::cout << "Hello from notification handler. Message: " << s << std::endl; };

// Setting to an invalid CPU. When we call quill::start() our error handler will be invoked and an error will be logged
cfg.backend_thread_cpu_affinity = static_cast<uint16_t>(321312);

Expand Down
16 changes: 10 additions & 6 deletions quill/include/quill/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,22 @@ struct Config
*/
std::string default_logger_name = "root";

#if !defined(QUILL_NO_EXCEPTIONS)
/**
* The background thread in very rare occasion might thrown an exception which can not be caught
* in the user threads. In that case the backend worker thread will call this callback instead.
*
* Set up a custom error handler to be used if the backend thread has any error.
* Set up a custom notifications handler to be used if the backend thread has any error.
*
* Set an error handler callback e.g :
* backend_thread_error_handler = [](std::string const& s) { std::cerr << s << std::endl; }
* This handler is also used to deliver messages to the user, for example when the unbounded
* queue reallocates or when the bounded queue gets full
*
* When not set here the default is:
* backend_thread_notification_handler = [](std::string const& s) { std::cerr << s << std::endl; }
*
* If you wish to disable being notified use:
* backend_thread_notification_handler = [](std::string const&) { }
*/
backend_worker_error_handler_t backend_thread_error_handler;
#endif
backend_worker_notification_handler_t backend_thread_notification_handler;

/**
* Sets a custom clock that will be used to obtain the timestamp
Expand Down
4 changes: 2 additions & 2 deletions quill/include/quill/Quill.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ namespace quill

/** Version Info **/
constexpr uint32_t VersionMajor{2};
constexpr uint32_t VersionMinor{8};
constexpr uint32_t VersionPatch{1};
constexpr uint32_t VersionMinor{9};
constexpr uint32_t VersionPatch{0};
constexpr uint32_t Version{VersionMajor * 10000 + VersionMinor * 100 + VersionPatch};

/** forward declarations **/
Expand Down
85 changes: 57 additions & 28 deletions quill/include/quill/detail/backend/BackendWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ class BackendWorker
* @param cached_thread_contexts loaded thread contexts
*/
QUILL_ATTRIBUTE_HOT static void _check_dropped_messages(
ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts) noexcept;
ThreadContextCollection::backend_thread_contexts_cache_t const& cached_thread_contexts,
backend_worker_notification_handler_t const& notification_handler) noexcept;

/**
* Process a structured log template message
Expand Down Expand Up @@ -203,17 +204,15 @@ class BackendWorker
std::chrono::system_clock::time_point _last_rdtsc_resync;
uint32_t _backend_worker_thread_id{0}; /** cached backend worker thread id */

backend_worker_notification_handler_t _notification_handler; /** error handler for the backend thread */

bool _backend_thread_yield; /** backend_thread_yield from config **/
bool _has_unflushed_messages{false}; /** There are messages that are buffered by the OS, but not yet flushed */
bool _strict_log_timestamp_order{true};
bool _empty_all_queues_before_exit{true};
bool _use_transit_buffer{true};
std::atomic<bool> _is_running{false}; /** The spawned backend thread status */

#if !defined(QUILL_NO_EXCEPTIONS)
backend_worker_error_handler_t _error_handler; /** error handler for the backend thread */
#endif

alignas(CACHE_LINE_ALIGNED) std::mutex _wake_up_mutex;
std::condition_variable _wake_up_cv;
bool _wake_up{false};
Expand Down Expand Up @@ -254,13 +253,13 @@ void BackendWorker::run()
_rdtsc_resync_interval = _config.rdtsc_resync_interval;
_use_transit_buffer = _config.backend_thread_use_transit_buffer;

#if !defined(QUILL_NO_EXCEPTIONS)
if (_config.backend_thread_error_handler)
if (_config.backend_thread_notification_handler)
{
// set up the default error handler
_error_handler = _config.backend_thread_error_handler;
_notification_handler = _config.backend_thread_notification_handler;
}
#endif

assert(_notification_handler && "_notification_handler is always set");

std::thread worker(
[this]()
Expand All @@ -274,8 +273,8 @@ void BackendWorker::run()
}
}
#if !defined(QUILL_NO_EXCEPTIONS)
QUILL_CATCH(std::exception const& e) { _error_handler(e.what()); }
QUILL_CATCH_ALL() { _error_handler(std::string{"Caught unhandled exception."}); }
QUILL_CATCH(std::exception const& e) { _notification_handler(e.what()); }
QUILL_CATCH_ALL() { _notification_handler(std::string{"Caught unhandled exception."}); }
#endif

QUILL_TRY
Expand All @@ -284,8 +283,8 @@ void BackendWorker::run()
set_thread_name(_config.backend_thread_name.data());
}
#if !defined(QUILL_NO_EXCEPTIONS)
QUILL_CATCH(std::exception const& e) { _error_handler(e.what()); }
QUILL_CATCH_ALL() { _error_handler(std::string{"Caught unhandled exception."}); }
QUILL_CATCH(std::exception const& e) { _notification_handler(e.what()); }
QUILL_CATCH_ALL() { _notification_handler(std::string{"Caught unhandled exception."}); }
#endif

// Cache this thread's id
Expand All @@ -300,21 +299,21 @@ void BackendWorker::run()
// main loop
QUILL_TRY { _main_loop(); }
#if !defined(QUILL_NO_EXCEPTIONS)
QUILL_CATCH(std::exception const& e) { _error_handler(e.what()); }
QUILL_CATCH(std::exception const& e) { _notification_handler(e.what()); }
QUILL_CATCH_ALL()
{
_error_handler(std::string{"Caught unhandled exception."});
_notification_handler(std::string{"Caught unhandled exception."});
} // clang-format on
#endif
}

// exit
QUILL_TRY { _exit(); }
#if !defined(QUILL_NO_EXCEPTIONS)
QUILL_CATCH(std::exception const& e) { _error_handler(e.what()); }
QUILL_CATCH(std::exception const& e) { _notification_handler(e.what()); }
QUILL_CATCH_ALL()
{
_error_handler(std::string{"Caught unhandled exception."});
_notification_handler(std::string{"Caught unhandled exception."});
} // clang-format on
#endif
});
Expand Down Expand Up @@ -379,7 +378,15 @@ uint32_t BackendWorker::_read_queue_messages_and_decode(QueueT& queue, ThreadCon
size_t const queue_capacity = queue.capacity();
uint32_t total_bytes_read{0};

std::byte* read_pos = queue.prepare_read();
std::byte* read_pos;
if constexpr (std::is_same_v<QueueT, UnboundedQueue>)
{
read_pos = queue.prepare_read(_notification_handler);
}
else
{
read_pos = queue.prepare_read();
}

// read max of one full queue and also max_transit events otherwise we can get stuck here forever
// if the producer keeps producing
Expand Down Expand Up @@ -407,7 +414,14 @@ uint32_t BackendWorker::_read_queue_messages_and_decode(QueueT& queue, ThreadCon
total_bytes_read += static_cast<uint32_t>(read_pos - read_begin);

// read again
read_pos = queue.prepare_read();
if constexpr (std::is_same_v<QueueT, UnboundedQueue>)
{
read_pos = queue.prepare_read(_notification_handler);
}
else
{
read_pos = queue.prepare_read();
}
}

if (total_bytes_read != 0)
Expand Down Expand Up @@ -678,12 +692,10 @@ void BackendWorker::_process_transit_event(TransitEvent& transit_event)
}
#if !defined(QUILL_NO_EXCEPTIONS)
QUILL_CATCH(std::exception const& e)
{
_error_handler(e.what());
}
{ _notification_handler(e.what()); }
QUILL_CATCH_ALL()
{
_error_handler(std::string{"Caught unhandled exception."});
_notification_handler(std::string{"Caught unhandled exception."});
} // clang-format on
#endif
}
Expand Down Expand Up @@ -723,13 +735,22 @@ bool BackendWorker::_process_and_write_single_message(const ThreadContextCollect
for (ThreadContext* thread_context : cached_thread_contexts)
{
std::visit(
[&thread_context, &min_ts, &tc](auto& queue)
[&thread_context, &min_ts, &tc, this](auto& queue)
{
// find the minimum timestamp accross all queues
using T = std::decay_t<decltype(queue)>;
if constexpr ((std::is_same_v<T, UnboundedQueue>) || (std::is_same_v<T, BoundedQueue>))
{
std::byte* read_pos = queue.prepare_read();
std::byte* read_pos;
if constexpr (std::is_same_v<T, UnboundedQueue>)
{
read_pos = queue.prepare_read(_notification_handler);
}
else
{
read_pos = queue.prepare_read();
}

if (read_pos && (reinterpret_cast<detail::Header*>(read_pos)->timestamp < min_ts))
{
min_ts = reinterpret_cast<detail::Header*>(read_pos)->timestamp;
Expand All @@ -752,7 +773,15 @@ bool BackendWorker::_process_and_write_single_message(const ThreadContextCollect
using T = std::decay_t<decltype(queue)>;
if constexpr ((std::is_same_v<T, UnboundedQueue>) || (std::is_same_v<T, BoundedQueue>))
{
std::byte* read_pos = queue.prepare_read();
std::byte* read_pos;
if constexpr (std::is_same_v<T, UnboundedQueue>)
{
read_pos = queue.prepare_read(_notification_handler);
}
else
{
read_pos = queue.prepare_read();
}
assert(read_pos);

std::byte* const read_begin = read_pos;
Expand Down Expand Up @@ -842,7 +871,7 @@ void BackendWorker::_main_loop()
_force_flush();

// check for any dropped messages by the threads
_check_dropped_messages(cached_thread_contexts);
_check_dropped_messages(cached_thread_contexts, _notification_handler);

// We can also clear any invalidated or empty thread contexts
_thread_context_collection.clear_invalid_and_empty_thread_contexts();
Expand Down Expand Up @@ -951,7 +980,7 @@ void BackendWorker::_exit()
if (all_empty)
{
// we are done, all queues are now empty
_check_dropped_messages(cached_thread_contexts);
_check_dropped_messages(cached_thread_contexts, _notification_handler);
_force_flush();
break;
}
Expand Down
2 changes: 1 addition & 1 deletion quill/include/quill/detail/misc/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ enum TimestampClockType : uint8_t
/**
* backend worker thread error handler type
*/
using backend_worker_error_handler_t = std::function<void(std::string const&)>;
using backend_worker_notification_handler_t = std::function<void(std::string const&)>;

} // namespace quill

Expand Down
8 changes: 0 additions & 8 deletions quill/include/quill/detail/misc/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,4 @@ QUILL_NODISCARD time_t next_noon_or_midnight_timestamp(time_t timestamp, Timezon
* @return the formatted string as vector of characters
*/
QUILL_NODISCARD std::vector<char> safe_strftime(char const* format_string, time_t timestamp, Timezone timezone);

/**
* Split a string into tokens
* @param s given string
* @param delimiter delimiter
* @return returns a vector of tokens
*/
QUILL_NODISCARD std::vector<std::string> split(std::string const& s, char delimiter);
} // namespace quill::detail
23 changes: 22 additions & 1 deletion quill/include/quill/detail/spsc_queue/UnboundedQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <limits>

#include "BoundedQueue.h"
#include "quill/detail/misc/Common.h"
#include "quill/detail/misc/Os.h"

namespace quill::detail
{
Expand Down Expand Up @@ -153,9 +155,11 @@ class UnboundedQueue

/**
* Prepare to read from the buffer
* @notification_handler a callback used for notifications to the user
* @return a pair of the buffer location to read and the number of available bytes
*/
QUILL_NODISCARD_ALWAYS_INLINE_HOT std::byte* prepare_read()
QUILL_NODISCARD_ALWAYS_INLINE_HOT std::byte* prepare_read(
backend_worker_notification_handler_t const& notification_handler = backend_worker_notification_handler_t{})
{
std::byte* read_pos = _consumer->bounded_queue.prepare_read();

Expand All @@ -177,9 +181,26 @@ class UnboundedQueue
_consumer->bounded_queue.commit_read();

// switch to the new buffer, existing one is deleted
auto const previous_capacity = _consumer->bounded_queue.capacity();
delete _consumer;
_consumer = next_node;
read_pos = _consumer->bounded_queue.prepare_read();

if (notification_handler)
{
char ts[24];
time_t t = time(nullptr);
struct tm p;
quill::detail::localtime_rs(std::addressof(t), std::addressof(p));
strftime(ts, 24, "%X", std::addressof(p));

// we switched to a new here, and we also notify the user of the allocation via the
// notification_handler
notification_handler(
std::string{ts} + " Quill INFO: A new SPSC queue was allocated [new_capacity_bytes: " +
std::to_string(_consumer->bounded_queue.capacity()) +
", previous_capacity_bytes: " + std::to_string(previous_capacity) + "]");
}
}
}
}
Expand Down
Loading

0 comments on commit b5dd703

Please sign in to comment.