Skip to content

Commit

Permalink
feat: add missing overflow policies compositors
Browse files Browse the repository at this point in the history
  • Loading branch information
3Hren committed Jun 28, 2017
1 parent 2397d3d commit 01ab682
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 3 deletions.
11 changes: 10 additions & 1 deletion include/blackhole/sink/asynchronous.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class builder<sink::asynchronous_t> {
public:
/// Constructs a sink builder from some other sink.
///
/// The ownership of the specified sink is transferred to the builder.
/// The ownership of the specified sink is transferred to the builder. The default overflow
/// policy will block the calling thread while the internal message queue is full.
///
/// \param wrapped The target sink (usually the blocking one) that is need to make asynchronous.
explicit builder(std::unique_ptr<sink_t> wrapped);
Expand All @@ -48,6 +49,14 @@ class builder<sink::asynchronous_t> {
auto factor(std::size_t value) & -> builder&;
auto factor(std::size_t value) && -> builder&&;

/// Sets the drop overflow policy.
auto drop() & -> builder&;
auto drop() && -> builder&&;

/// Sets the wait overflow policy.
auto wait() & -> builder&;
auto wait() && -> builder&&;

/// Consumes this builder yielding a newly created asynchronous sink with the options
/// configured.
auto build() && -> std::unique_ptr<sink_t>;
Expand Down
21 changes: 20 additions & 1 deletion src/sink/asynchronous.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ inline namespace v1 {
class builder<sink::asynchronous_t>::inner_t {
public:
std::unique_ptr<sink_t> wrapped;
std::unique_ptr<sink::overflow_policy_t> overflow_policy;
std::size_t factor;
};

builder<sink::asynchronous_t>::builder(std::unique_ptr<sink_t> wrapped) :
d(new inner_t{std::move(wrapped), 10})
d(new inner_t{std::move(wrapped), sink::overflow_policy_factory_t().create("wait"), 10})
{}

auto builder<sink::asynchronous_t>::factor(std::size_t value) & -> builder& {
Expand All @@ -32,6 +33,24 @@ auto builder<sink::asynchronous_t>::factor(std::size_t value) && -> builder&& {
return std::move(factor(value));
}

auto builder<sink::asynchronous_t>::wait() & -> builder& {
d->overflow_policy = sink::overflow_policy_factory_t().create("wait");
return *this;
}

auto builder<sink::asynchronous_t>::drop() && -> builder&& {
return std::move(wait());
}

auto builder<sink::asynchronous_t>::drop() & -> builder& {
d->overflow_policy = sink::overflow_policy_factory_t().create("drop");
return *this;
}

auto builder<sink::asynchronous_t>::wait() && -> builder&& {
return std::move(wait());
}

auto builder<sink::asynchronous_t>::build() && -> std::unique_ptr<sink_t> {
return blackhole::make_unique<sink::asynchronous_t>(std::move(d->wrapped), d->factor);
}
Expand Down
2 changes: 1 addition & 1 deletion src/sink/asynchronous.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class overflow_policy_t {
};

public:
virtual ~overflow_policy_t() {}
virtual ~overflow_policy_t() = default;

/// Handles record queue overflow.
///
Expand Down
32 changes: 32 additions & 0 deletions tests/src/unit/sink/asynchronous.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ TEST(asynchronous_t, BuilderSetFactorFlow) {
EXPECT_EQ(32, dynamic_cast<asynchronous_t&>(*sink).capacity());
}

TEST(asynchronous_t, BuilderSetDropOverflowPolicy) {
std::unique_ptr<mock::sink_t> wrapped(new mock::sink_t);

builder<asynchronous_t> builder(std::move(wrapped));
builder.drop();
std::move(builder).build();
}

TEST(asynchronous_t, BuilderSetDropOverflowPolicyFlow) {
std::unique_ptr<mock::sink_t> wrapped(new mock::sink_t);

builder<asynchronous_t>(std::move(wrapped))
.drop()
.build();
}

TEST(asynchronous_t, BuilderSetWaitOverflowPolicy) {
std::unique_ptr<mock::sink_t> wrapped(new mock::sink_t);

builder<asynchronous_t> builder(std::move(wrapped));
builder.wait();
std::move(builder).build();
}

TEST(asynchronous_t, BuilderSetWaitOverflowPolicyFlow) {
std::unique_ptr<mock::sink_t> wrapped(new mock::sink_t);

builder<asynchronous_t>(std::move(wrapped))
.wait()
.build();
}

} // namespace
} // namespace sink
} // namespace v1
Expand Down

0 comments on commit 01ab682

Please sign in to comment.