diff --git a/include/blackhole/sink/asynchronous.hpp b/include/blackhole/sink/asynchronous.hpp index e8c0c2e8..cbffc95e 100644 --- a/include/blackhole/sink/asynchronous.hpp +++ b/include/blackhole/sink/asynchronous.hpp @@ -39,7 +39,8 @@ class builder { public: /// Constructs a sink builder from some other sink. /// - /// The ownership of the specified sink is transferred to the builder. + /// The ownership of the specified sink is transferred to the builder. The default overflow + /// policy will block the calling thread while the internal message queue is full. /// /// \param wrapped The target sink (usually the blocking one) that is need to make asynchronous. explicit builder(std::unique_ptr wrapped); @@ -48,6 +49,14 @@ class builder { auto factor(std::size_t value) & -> builder&; auto factor(std::size_t value) && -> builder&&; + /// Sets the drop overflow policy. + auto drop() & -> builder&; + auto drop() && -> builder&&; + + /// Sets the wait overflow policy. + auto wait() & -> builder&; + auto wait() && -> builder&&; + /// Consumes this builder yielding a newly created asynchronous sink with the options /// configured. auto build() && -> std::unique_ptr; diff --git a/src/sink/asynchronous.cpp b/src/sink/asynchronous.cpp index b5ddcb41..d60d0602 100644 --- a/src/sink/asynchronous.cpp +++ b/src/sink/asynchronous.cpp @@ -16,11 +16,12 @@ inline namespace v1 { class builder::inner_t { public: std::unique_ptr wrapped; + std::unique_ptr overflow_policy; std::size_t factor; }; builder::builder(std::unique_ptr wrapped) : - d(new inner_t{std::move(wrapped), 10}) + d(new inner_t{std::move(wrapped), sink::overflow_policy_factory_t().create("wait"), 10}) {} auto builder::factor(std::size_t value) & -> builder& { @@ -32,6 +33,24 @@ auto builder::factor(std::size_t value) && -> builder&& { return std::move(factor(value)); } +auto builder::wait() & -> builder& { + d->overflow_policy = sink::overflow_policy_factory_t().create("wait"); + return *this; +} + +auto builder::drop() && -> builder&& { + return std::move(wait()); +} + +auto builder::drop() & -> builder& { + d->overflow_policy = sink::overflow_policy_factory_t().create("drop"); + return *this; +} + +auto builder::wait() && -> builder&& { + return std::move(wait()); +} + auto builder::build() && -> std::unique_ptr { return blackhole::make_unique(std::move(d->wrapped), d->factor); } diff --git a/src/sink/asynchronous.hpp b/src/sink/asynchronous.hpp index 46c41dda..951a00ef 100644 --- a/src/sink/asynchronous.hpp +++ b/src/sink/asynchronous.hpp @@ -20,7 +20,7 @@ class overflow_policy_t { }; public: - virtual ~overflow_policy_t() {} + virtual ~overflow_policy_t() = default; /// Handles record queue overflow. /// diff --git a/tests/src/unit/sink/asynchronous.cpp b/tests/src/unit/sink/asynchronous.cpp index a5dc8afc..fc25c603 100644 --- a/tests/src/unit/sink/asynchronous.cpp +++ b/tests/src/unit/sink/asynchronous.cpp @@ -85,6 +85,38 @@ TEST(asynchronous_t, BuilderSetFactorFlow) { EXPECT_EQ(32, dynamic_cast(*sink).capacity()); } +TEST(asynchronous_t, BuilderSetDropOverflowPolicy) { + std::unique_ptr wrapped(new mock::sink_t); + + builder builder(std::move(wrapped)); + builder.drop(); + std::move(builder).build(); +} + +TEST(asynchronous_t, BuilderSetDropOverflowPolicyFlow) { + std::unique_ptr wrapped(new mock::sink_t); + + builder(std::move(wrapped)) + .drop() + .build(); +} + +TEST(asynchronous_t, BuilderSetWaitOverflowPolicy) { + std::unique_ptr wrapped(new mock::sink_t); + + builder builder(std::move(wrapped)); + builder.wait(); + std::move(builder).build(); +} + +TEST(asynchronous_t, BuilderSetWaitOverflowPolicyFlow) { + std::unique_ptr wrapped(new mock::sink_t); + + builder(std::move(wrapped)) + .wait() + .build(); +} + } // namespace } // namespace sink } // namespace v1