diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index 9ef71cf4f..ed4e92a58 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -69,21 +69,21 @@ namespace rpp * @ingroup observables */ template - class connectable_observable final : public decltype(std::declval().get_observable()) + class connectable_observable : public decltype(std::declval().get_observable()) { using base = decltype(std::declval().get_observable()); public: static_assert(rpp::constraint::subject); - connectable_observable(const OriginalObservable& original_observable, const Subject& subject = Subject{}) + connectable_observable(const OriginalObservable& original_observable, const Subject& subject) : base{subject.get_observable()} , m_original_observable{original_observable} , m_subject{subject} { } - connectable_observable(OriginalObservable && original_observable, const Subject& subject = Subject{}) + connectable_observable(OriginalObservable && original_observable, const Subject& subject) : base{subject.get_observable()} , m_original_observable{std::move(original_observable)} , m_subject{subject} @@ -166,6 +166,15 @@ namespace rpp return std::move(*this) | std::forward(op); } + auto as_dynamic_connectable() const & + { + return rpp::dynamic_connectable_observable{m_original_observable.as_dynamic(), m_subject}; + } + auto as_dynamic_connectable()&& + { + return rpp::dynamic_connectable_observable{std::move(m_original_observable).as_dynamic(), std::move(m_subject)}; + } + private: RPP_NO_UNIQUE_ADDRESS OriginalObservable m_original_observable; Subject m_subject; diff --git a/src/rpp/rpp/observables/dynamic_connectable_observable.hpp b/src/rpp/rpp/observables/dynamic_connectable_observable.hpp new file mode 100644 index 000000000..ecf467b53 --- /dev/null +++ b/src/rpp/rpp/observables/dynamic_connectable_observable.hpp @@ -0,0 +1,41 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include + +namespace rpp +{ + template + class dynamic_connectable_observable final : public connectable_observable>, Subject> + { + public: + static_assert(rpp::constraint::subject); + + using base = connectable_observable>, Subject>; + + using base::base; + + template> Strategy> + requires (!rpp::constraint::decayed_same_as>>) + dynamic_connectable_observable(const rpp::connectable_observable& original) + : dynamic_connectable_observable{original.as_dynamic_connectable()} + { + } + + template> Strategy> + requires (!rpp::constraint::decayed_same_as>>) + dynamic_connectable_observable(rpp::connectable_observable&& original) + : dynamic_connectable_observable{std::move(original).as_dynamic_connectable()} + { + } + }; +} // namespace rpp diff --git a/src/rpp/rpp/observables/dynamic_observable.hpp b/src/rpp/rpp/observables/dynamic_observable.hpp index a215d41b9..113fbfb6c 100644 --- a/src/rpp/rpp/observables/dynamic_observable.hpp +++ b/src/rpp/rpp/observables/dynamic_observable.hpp @@ -63,8 +63,7 @@ namespace rpp::details::observables template static const vtable* create() noexcept { - static vtable s_res{ - .subscribe = forwarding_subscribe}; + static vtable s_res{.subscribe = forwarding_subscribe}; return &s_res; } }; @@ -103,7 +102,4 @@ namespace rpp { } }; - - template - using dynamic_connectable_observable = connectable_observable>, Subject>; } // namespace rpp diff --git a/src/rpp/rpp/observables/fwd.hpp b/src/rpp/rpp/observables/fwd.hpp index 3de20b121..aee37ea4b 100644 --- a/src/rpp/rpp/observables/fwd.hpp +++ b/src/rpp/rpp/observables/fwd.hpp @@ -172,6 +172,9 @@ namespace rpp template class dynamic_observable; + template + class dynamic_connectable_observable; + template Strategy> class blocking_observable; diff --git a/src/tests/rpp/test_connectable_observable.cpp b/src/tests/rpp/test_connectable_observable.cpp index f93c58bd0..cdf07f3e1 100644 --- a/src/tests/rpp/test_connectable_observable.cpp +++ b/src/tests/rpp/test_connectable_observable.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -82,6 +83,8 @@ TEST_CASE("connectable observable") test(rpp::connectable_observable{source, rpp::subjects::publish_subject{}}); SUBCASE("dynamic_connectable created manually") test(rpp::dynamic_connectable_observable>{source, rpp::subjects::publish_subject{}}); + SUBCASE("dynamic_connectable coneverted") + test(rpp::dynamic_connectable_observable>{rpp::connectable_observable{source, rpp::subjects::publish_subject{}}}); SUBCASE("connectable created via multicast") test(source | rpp::ops::multicast(rpp::subjects::publish_subject{})); SUBCASE("connectable created via templated multicast") diff --git a/src/tests/rpp/test_observables.cpp b/src/tests/rpp/test_observables.cpp index a205308f4..146c9959f 100644 --- a/src/tests/rpp/test_observables.cpp +++ b/src/tests/rpp/test_observables.cpp @@ -99,10 +99,20 @@ TEST_CASE("create observable works properly as observable") test(observable.as_dynamic()); } + SUBCASE("dynamic observable cast") + { + test(rpp::dynamic_observable{observable}); + } + SUBCASE("dynamic observable via move") { test(std::move(observable).as_dynamic()); // NOLINT } + + SUBCASE("dynamic observable cast via move") + { + test(rpp::dynamic_observable{std::move(observable)}); // NOLINT + } } TEST_CASE("blocking_observable blocks subscribe call") diff --git a/src/tests/rpp/test_observers.cpp b/src/tests/rpp/test_observers.cpp index 253686c50..fc904882a 100644 --- a/src/tests/rpp/test_observers.cpp +++ b/src/tests/rpp/test_observers.cpp @@ -187,7 +187,10 @@ TEST_CASE("set_upstream without base disposable makes it main disposalbe") test_observer(original_observer); SUBCASE("dynamic observer") - test_observer(std::move(original_observer).as_dynamic()); + test_observer(std::move(original_observer).as_dynamic()); // NOLINT + + SUBCASE("dynamic observer via cast") + test_observer(rpp::dynamic_observer{std::move(original_observer)}); // NOLINT } TEST_CASE("set_upstream can be called multiple times")