Skip to content

Commit

Permalink
fix dynamic connectable (#686)
Browse files Browse the repository at this point in the history
* fix dynamic connectable

* fix
  • Loading branch information
victimsnino authored Nov 21, 2024
1 parent 06c51f0 commit 96c6e33
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 9 deletions.
15 changes: 12 additions & 3 deletions src/rpp/rpp/observables/connectable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ namespace rpp
* @ingroup observables
*/
template<rpp::constraint::observable OriginalObservable, typename Subject>
class connectable_observable final : public decltype(std::declval<Subject>().get_observable())
class connectable_observable : public decltype(std::declval<Subject>().get_observable())
{
using base = decltype(std::declval<Subject>().get_observable());

public:
static_assert(rpp::constraint::subject<Subject>);

connectable_observable(const OriginalObservable& original_observable, const Subject& subject = Subject{})
connectable_observable(const OriginalObservable& original_observable, const Subject& subject)
: base{subject.get_observable()}
, m_original_observable{original_observable}
, m_subject{subject}
{
}

connectable_observable(OriginalObservable && original_observable, const Subject& subject = Subject{})
connectable_observable(OriginalObservable && original_observable, const Subject& subject)
: base{subject.get_observable()}
, m_original_observable{std::move(original_observable)}
, m_subject{subject}
Expand Down Expand Up @@ -166,6 +166,15 @@ namespace rpp
return std::move(*this) | std::forward<Op>(op);
}

auto as_dynamic_connectable() const &
{
return rpp::dynamic_connectable_observable<Subject>{m_original_observable.as_dynamic(), m_subject};
}
auto as_dynamic_connectable()&&
{
return rpp::dynamic_connectable_observable<Subject>{std::move(m_original_observable).as_dynamic(), std::move(m_subject)};
}

private:
RPP_NO_UNIQUE_ADDRESS OriginalObservable m_original_observable;
Subject m_subject;
Expand Down
41 changes: 41 additions & 0 deletions src/rpp/rpp/observables/dynamic_connectable_observable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <rpp/observables/connectable_observable.hpp>
#include <rpp/observables/dynamic_observable.hpp>

namespace rpp
{
template<typename Subject>
class dynamic_connectable_observable final : public connectable_observable<rpp::dynamic_observable<rpp::subjects::utils::extract_subject_type_t<Subject>>, Subject>
{
public:
static_assert(rpp::constraint::subject<Subject>);

using base = connectable_observable<rpp::dynamic_observable<rpp::subjects::utils::extract_subject_type_t<Subject>>, Subject>;

using base::base;

template<rpp::constraint::observable_strategy<rpp::subjects::utils::extract_subject_type_t<Subject>> Strategy>
requires (!rpp::constraint::decayed_same_as<Strategy, rpp::dynamic_observable<rpp::subjects::utils::extract_subject_type_t<Subject>>>)
dynamic_connectable_observable(const rpp::connectable_observable<Strategy, Subject>& original)
: dynamic_connectable_observable{original.as_dynamic_connectable()}
{
}

template<rpp::constraint::observable_strategy<rpp::subjects::utils::extract_subject_type_t<Subject>> Strategy>
requires (!rpp::constraint::decayed_same_as<Strategy, rpp::dynamic_observable<rpp::subjects::utils::extract_subject_type_t<Subject>>>)
dynamic_connectable_observable(rpp::connectable_observable<Strategy, Subject>&& original)
: dynamic_connectable_observable{std::move(original).as_dynamic_connectable()}
{
}
};
} // namespace rpp
6 changes: 1 addition & 5 deletions src/rpp/rpp/observables/dynamic_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ namespace rpp::details::observables
template<rpp::constraint::observable Observable>
static const vtable* create() noexcept
{
static vtable s_res{
.subscribe = forwarding_subscribe<Type, Observable>};
static vtable s_res{.subscribe = forwarding_subscribe<Type, Observable>};
return &s_res;
}
};
Expand Down Expand Up @@ -103,7 +102,4 @@ namespace rpp
{
}
};

template<typename Subject>
using dynamic_connectable_observable = connectable_observable<rpp::dynamic_observable<rpp::subjects::utils::extract_subject_type_t<Subject>>, Subject>;
} // namespace rpp
3 changes: 3 additions & 0 deletions src/rpp/rpp/observables/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ namespace rpp
template<constraint::decayed_type Type>
class dynamic_observable;

template<typename Subject>
class dynamic_connectable_observable;

template<constraint::decayed_type Type, constraint::observable_strategy<Type> Strategy>
class blocking_observable;

Expand Down
3 changes: 3 additions & 0 deletions src/tests/rpp/test_connectable_observable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <doctest/doctest.h>

#include <rpp/observables/connectable_observable.hpp>
#include <rpp/observables/dynamic_connectable_observable.hpp>
#include <rpp/observables/dynamic_observable.hpp>
#include <rpp/observers/mock_observer.hpp>
#include <rpp/operators/map.hpp>
Expand Down Expand Up @@ -82,6 +83,8 @@ TEST_CASE("connectable observable")
test(rpp::connectable_observable{source, rpp::subjects::publish_subject<int>{}});
SUBCASE("dynamic_connectable created manually")
test(rpp::dynamic_connectable_observable<rpp::subjects::publish_subject<int>>{source, rpp::subjects::publish_subject<int>{}});
SUBCASE("dynamic_connectable coneverted")
test(rpp::dynamic_connectable_observable<rpp::subjects::publish_subject<int>>{rpp::connectable_observable{source, rpp::subjects::publish_subject<int>{}}});
SUBCASE("connectable created via multicast")
test(source | rpp::ops::multicast(rpp::subjects::publish_subject<int>{}));
SUBCASE("connectable created via templated multicast")
Expand Down
10 changes: 10 additions & 0 deletions src/tests/rpp/test_observables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,20 @@ TEST_CASE("create observable works properly as observable")
test(observable.as_dynamic());
}

SUBCASE("dynamic observable cast")
{
test(rpp::dynamic_observable<int>{observable});
}

SUBCASE("dynamic observable via move")
{
test(std::move(observable).as_dynamic()); // NOLINT
}

SUBCASE("dynamic observable cast via move")
{
test(rpp::dynamic_observable<int>{std::move(observable)}); // NOLINT
}
}

TEST_CASE("blocking_observable blocks subscribe call")
Expand Down
5 changes: 4 additions & 1 deletion src/tests/rpp/test_observers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ TEST_CASE("set_upstream without base disposable makes it main disposalbe")
test_observer(original_observer);

SUBCASE("dynamic observer")
test_observer(std::move(original_observer).as_dynamic());
test_observer(std::move(original_observer).as_dynamic()); // NOLINT

SUBCASE("dynamic observer via cast")
test_observer(rpp::dynamic_observer<int>{std::move(original_observer)}); // NOLINT
}

TEST_CASE("set_upstream can be called multiple times")
Expand Down

1 comment on commit 96c6e33

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 301.92 ns 1.85 ns 1.85 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 314.54 ns 1.85 ns 1.85 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 683.75 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 1023.96 ns 3.42 ns 3.42 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2198.27 ns 112.15 ns 113.40 ns 0.99
defer from array of 1 - defer + create + subscribe + immediate 724.86 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2151.39 ns 59.27 ns 59.19 ns 1.00
interval - interval + take(3) + subscribe + current_thread 2998.03 ns 32.43 ns 32.44 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 30019.32 ns 28186.83 ns 27521.43 ns 1.02
from array of 1000 - create + as_blocking + subscribe + new_thread 39344.38 ns 52020.32 ns 48506.54 ns 1.07
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3462.02 ns 138.66 ns 134.76 ns 1.03

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1093.68 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 832.78 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1010.36 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 967.91 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1246.17 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 909.31 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1111.60 ns 18.20 ns 18.53 ns 0.98
immediate_just(1,2,3)+element_at(1)+subscribe 828.13 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 287.24 ns 1.54 ns 1.54 ns 1.00
current_thread scheduler create worker + schedule 360.95 ns 4.94 ns 4.94 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 826.34 ns 60.82 ns 60.98 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 884.38 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 890.51 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2314.73 ns 146.01 ns 144.61 ns 1.01
immediate_just+buffer(2)+subscribe 1556.91 ns 13.90 ns 14.20 ns 0.98
immediate_just+window(2)+subscribe + subscsribe inner 2399.74 ns 1342.30 ns 1335.11 ns 1.01

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 836.28 ns - - 0.00
immediate_just+take_while(true)+subscribe 862.17 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1988.03 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3428.84 ns 153.57 ns 155.49 ns 0.99
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3687.34 ns 156.44 ns 160.18 ns 0.98
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 134.03 ns 133.13 ns 1.01
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3509.64 ns 461.59 ns 455.38 ns 1.01
immediate_just(1) + zip(immediate_just(2)) + subscribe 2086.05 ns 215.53 ns 216.63 ns 0.99
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3105.33 ns 233.47 ns 223.79 ns 1.04

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.57 ns 14.67 ns 14.85 ns 0.99
subscribe 100 observers to publish_subject 200340.40 ns 16175.73 ns 16200.72 ns 1.00
100 on_next to 100 observers to publish_subject 27011.26 ns 17464.32 ns 17244.61 ns 1.01

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1364.04 ns 13.28 ns 13.28 ns 1.00
basic sample with immediate scheduler 1454.11 ns 5.55 ns 5.55 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 940.96 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2058.59 ns 1004.57 ns 1007.35 ns 1.00
create(on_error())+retry(1)+subscribe 616.85 ns 117.97 ns 109.61 ns 1.08

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1045.36 ns 0.78 ns 2.19 ns 0.35
Subscribe empty callbacks to empty observable via pipe operator 1051.93 ns 0.75 ns 3.21 ns 0.23

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 2088.75 ns 0.25 ns 1.19 ns 0.21
from array of 1 - create + subscribe + current_thread 2626.56 ns 35.45 ns 240.15 ns 0.15
concat_as_source of just(1 immediate) create + subscribe 5456.86 ns 379.33 ns 380.93 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 1968.00 ns 0.23 ns 0.28 ns 0.85
interval - interval + take(3) + subscribe + immediate 4930.88 ns 113.28 ns 143.68 ns 0.79
interval - interval + take(3) + subscribe + current_thread 6047.12 ns 99.47 ns 184.34 ns 0.54
from array of 1 - create + as_blocking + subscribe + new_thread 88281.73 ns 89639.80 ns 136828.33 ns 0.66
from array of 1000 - create + as_blocking + subscribe + new_thread 102092.67 ns 95092.70 ns 128395.89 ns 0.74
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 8152.24 ns 365.99 ns 401.24 ns 0.91

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 2820.24 ns 0.23 ns 0.28 ns 0.83
immediate_just+filter(true)+subscribe 2088.77 ns 0.23 ns 0.27 ns 0.86
immediate_just(1,2)+skip(1)+subscribe 2735.40 ns 0.23 ns 0.29 ns 0.79
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2102.56 ns 0.48 ns 0.55 ns 0.87
immediate_just(1,2)+first()+subscribe 3167.50 ns 0.23 ns 0.28 ns 0.84
immediate_just(1,2)+last()+subscribe 2350.75 ns 0.23 ns 0.30 ns 0.77
immediate_just+take_last(1)+subscribe 3015.98 ns 0.23 ns 0.27 ns 0.85
immediate_just(1,2,3)+element_at(1)+subscribe 2103.54 ns 0.23 ns 0.26 ns 0.91

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 863.97 ns 0.94 ns 1.26 ns 0.74
current_thread scheduler create worker + schedule 1196.12 ns 34.23 ns 45.01 ns 0.76
current_thread scheduler create worker + schedule + recursive schedule 1998.47 ns 202.88 ns 277.62 ns 0.73

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2086.24 ns 4.20 ns 5.07 ns 0.83
immediate_just+scan(10, std::plus)+subscribe 2304.40 ns 0.47 ns 0.51 ns 0.91
immediate_just+flat_map(immediate_just(v*2))+subscribe 5236.17 ns 375.74 ns 439.91 ns 0.85
immediate_just+buffer(2)+subscribe 2484.02 ns 64.65 ns 77.70 ns 0.83
immediate_just+window(2)+subscribe + subscsribe inner 5286.21 ns 2387.90 ns 2990.38 ns 0.80

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2073.65 ns - - 0.00
immediate_just+take_while(true)+subscribe 2086.08 ns 0.23 ns 0.27 ns 0.85

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 4772.03 ns 4.68 ns 5.72 ns 0.82

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 7457.09 ns 412.06 ns 596.70 ns 0.69
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 8303.77 ns 414.87 ns 537.83 ns 0.77
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 447.72 ns 631.48 ns 0.71
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 7857.53 ns 943.75 ns 1161.62 ns 0.81
immediate_just(1) + zip(immediate_just(2)) + subscribe 5068.48 ns 788.56 ns 1275.03 ns 0.62
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 7395.37 ns 640.50 ns 816.57 ns 0.78

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 74.46 ns 48.66 ns 66.27 ns 0.73
subscribe 100 observers to publish_subject 343849.00 ns 40555.96 ns 48729.14 ns 0.83
100 on_next to 100 observers to publish_subject 59395.68 ns 24922.40 ns 27500.16 ns 0.91

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2837.36 ns 76.32 ns 88.94 ns 0.86
basic sample with immediate scheduler 2741.43 ns 18.71 ns 21.80 ns 0.86

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2349.28 ns 0.23 ns 0.26 ns 0.90

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 6880.06 ns 4347.30 ns 4911.17 ns 0.89
create(on_error())+retry(1)+subscribe 1841.12 ns 284.21 ns 351.29 ns 0.81

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 277.71 ns 1.54 ns 0.63 ns 2.43
Subscribe empty callbacks to empty observable via pipe operator 278.21 ns 1.54 ns 0.63 ns 2.43

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 567.80 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 808.10 ns 4.01 ns 4.01 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2394.95 ns 129.67 ns 128.30 ns 1.01
defer from array of 1 - defer + create + subscribe + immediate 812.99 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2239.26 ns 58.26 ns 58.31 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3189.17 ns 30.86 ns 31.11 ns 0.99
from array of 1 - create + as_blocking + subscribe + new_thread 30463.97 ns 27745.02 ns 27642.74 ns 1.00
from array of 1000 - create + as_blocking + subscribe + new_thread 38758.18 ns 36026.62 ns 32988.93 ns 1.09
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3721.55 ns 148.30 ns 148.21 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1193.14 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 881.47 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1136.20 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 907.92 ns 0.62 ns 0.62 ns 1.00
immediate_just(1,2)+first()+subscribe 1412.80 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 1039.14 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1249.15 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 905.80 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 282.90 ns 0.63 ns 1.54 ns 0.41
current_thread scheduler create worker + schedule 423.60 ns 4.02 ns 4.01 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 877.63 ns 56.24 ns 55.81 ns 1.01

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 869.37 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1001.66 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2303.98 ns 140.26 ns 151.96 ns 0.92
immediate_just+buffer(2)+subscribe 1582.90 ns 13.89 ns 13.59 ns 1.02
immediate_just+window(2)+subscribe + subscsribe inner 2475.10 ns 921.07 ns 917.15 ns 1.00

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 875.56 ns - - 0.00
immediate_just+take_while(true)+subscribe 875.12 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2054.12 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3340.11 ns 156.07 ns 158.54 ns 0.98
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3785.78 ns 139.09 ns 140.93 ns 0.99
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 142.89 ns 142.43 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3474.20 ns 377.27 ns 377.00 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 2241.96 ns 199.42 ns 195.47 ns 1.02
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3371.38 ns 224.21 ns 221.34 ns 1.01

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 53.67 ns 17.48 ns 17.49 ns 1.00
subscribe 100 observers to publish_subject 208718.33 ns 16096.95 ns 16275.22 ns 0.99
100 on_next to 100 observers to publish_subject 46209.56 ns 23580.12 ns 23638.56 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1322.98 ns 11.42 ns 11.43 ns 1.00
basic sample with immediate scheduler 1324.04 ns 5.86 ns 5.86 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1027.44 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2214.65 ns 1161.48 ns 1179.24 ns 0.98
create(on_error())+retry(1)+subscribe 685.81 ns 139.62 ns 140.04 ns 1.00

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 560.13 ns 2.16 ns 2.16 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 580.47 ns 2.16 ns 2.16 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1141.67 ns 5.55 ns 5.55 ns 1.00
from array of 1 - create + subscribe + current_thread 1425.71 ns 15.51 ns 15.46 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 3698.92 ns 164.88 ns 169.03 ns 0.98
defer from array of 1 - defer + create + subscribe + immediate 1188.99 ns 5.55 ns 5.55 ns 1.00
interval - interval + take(3) + subscribe + immediate 3338.02 ns 142.69 ns 139.75 ns 1.02
interval - interval + take(3) + subscribe + current_thread 3431.94 ns 60.15 ns 59.79 ns 1.01
from array of 1 - create + as_blocking + subscribe + new_thread 119444.44 ns 113250.00 ns 114333.33 ns 0.99
from array of 1000 - create + as_blocking + subscribe + new_thread 130300.00 ns 131050.00 ns 127711.11 ns 1.03
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 5417.45 ns 196.99 ns 196.17 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1815.20 ns 19.42 ns 19.42 ns 1.00
immediate_just+filter(true)+subscribe 1627.85 ns 18.51 ns 18.51 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1720.62 ns 17.89 ns 17.90 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1363.31 ns 20.69 ns 20.88 ns 0.99
immediate_just(1,2)+first()+subscribe 2365.71 ns 18.20 ns 18.21 ns 1.00
immediate_just(1,2)+last()+subscribe 1460.49 ns 19.12 ns 19.14 ns 1.00
immediate_just+take_last(1)+subscribe 2016.70 ns 64.50 ns 64.94 ns 0.99
immediate_just(1,2,3)+element_at(1)+subscribe 1821.53 ns 20.97 ns 22.07 ns 0.95

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 479.35 ns 4.32 ns 4.32 ns 1.00
current_thread scheduler create worker + schedule 645.48 ns 11.16 ns 11.12 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 1361.92 ns 98.98 ns 99.33 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1324.37 ns 18.82 ns 18.82 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1441.85 ns 20.96 ns 20.96 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 3879.21 ns 182.32 ns 185.40 ns 0.98
immediate_just+buffer(2)+subscribe 2314.00 ns 63.67 ns 64.10 ns 0.99
immediate_just+window(2)+subscribe + subscsribe inner 4036.17 ns 1301.24 ns 1277.56 ns 1.02

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1315.10 ns 17.57 ns 17.58 ns 1.00
immediate_just+take_while(true)+subscribe 1317.72 ns 18.50 ns 18.51 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3641.60 ns 11.74 ns 11.10 ns 1.06

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5177.95 ns 202.43 ns 198.78 ns 1.02
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5469.04 ns 185.79 ns 185.01 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 195.28 ns 194.19 ns 1.01
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 6144.33 ns 439.04 ns 452.48 ns 0.97
immediate_just(1) + zip(immediate_just(2)) + subscribe 4005.73 ns 523.70 ns 520.52 ns 1.01
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 5000.00 ns 319.77 ns 316.50 ns 1.01

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 35.98 ns 20.14 ns 20.35 ns 0.99
subscribe 100 observers to publish_subject 269050.00 ns 27892.86 ns 27592.68 ns 1.01
100 on_next to 100 observers to publish_subject 55005.26 ns 32628.57 ns 32596.77 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1900.53 ns 95.11 ns 95.54 ns 1.00
basic sample with immediate scheduler 1938.95 ns 68.81 ns 68.25 ns 1.01

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1460.28 ns 19.13 ns 19.13 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1925.32 ns 358.64 ns 356.91 ns 1.00
create(on_error())+retry(1)+subscribe 1599.70 ns 136.26 ns 137.33 ns 0.99

Please sign in to comment.