From 32c05e92490759f3d2373b2d99183b1a31bac459 Mon Sep 17 00:00:00 2001 From: "i.s.vovk" Date: Fri, 12 Feb 2021 15:58:23 +0300 Subject: [PATCH 1/5] implement idling eviction for sessions in a pool --- .../connector/connection_pool.hpp | 26 ++++++++++++-- .../connector/impl/connection_pool.ipp | 34 ++++++++++++++----- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/include/stream-client/connector/connection_pool.hpp b/include/stream-client/connector/connection_pool.hpp index 4a16b76..bc7959c 100644 --- a/include/stream-client/connector/connection_pool.hpp +++ b/include/stream-client/connector/connection_pool.hpp @@ -43,7 +43,25 @@ class base_connection_pool * @param[in] size Number of connected sockets to maintain in the pool. * Note that real number of established connections my be @p size + 1. * This happens when you pull a stream with get_session() , the pool establishes new one to replace it, - * and later you return pulled stream back with return_session()/ + * and later you return pulled stream back with return_session(). + * @param[in] idle_timeout sessions which are in the pool for a longer time are replaced with new ones. + * @param[in] ...argn Arguments to pass to @p Connector constructor. + */ + template + base_connection_pool(std::size_t size, time_duration_type idle_timeout, ArgN&&... argn); + + /** + * Parametrized constructor. + * Constructs pool for desired connector (protocol). Passed @p argn forwarded to @p Connector constructor. + * This operation starts background thread to fill the pool with opened sockets, + * therefore subsequent get_session() calls may take longer time compared with the state when pool is full. + * + * @tparam ...ArgN Types of argn. + * + * @param[in] size Number of connected sockets to maintain in the pool. + * Note that real number of established connections my be @p size + 1. + * This happens when you pull a stream with get_session() , the pool establishes new one to replace it, + * and later you return pulled stream back with return_session(). * @param[in] ...argn Arguments to pass to @p Connector constructor. */ template @@ -263,8 +281,10 @@ class base_connection_pool connector_type connector_; ///< Underlying connector used to establish sockets. - std::size_t pool_size_; ///< Number of stream to keep in the @p sesson_pool_. - std::list> sesson_pool_; ///< The list of established sockets + std::size_t pool_max_size_; ///< Number of stream to keep in the @p sesson_pool_. + time_duration_type idle_timeout_; ///< Idle timeout for the sessions in the @p sesson_pool_. + std::list>> + sesson_pool_; ///< The list of established sockets. mutable std::timed_mutex pool_mutex_; ///< @p sesson_pool_ mutex. mutable std::condition_variable_any pool_cv_; ///< @p sesson_pool_ condition variable. diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index c75de15..9e4c1ba 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -5,14 +5,22 @@ namespace connector { template template -base_connection_pool::base_connection_pool(std::size_t size, ArgN&&... argn) +base_connection_pool::base_connection_pool(std::size_t size, time_duration_type idle_timeout, ArgN&&... argn) : connector_(std::forward(argn)...) - , pool_size_(size) + , pool_max_size_(size) + , idle_timeout_(idle_timeout) , watch_pool(true) { pool_watcher_ = std::thread([this]() { this->watch_pool_routine(); }); } +template +template +base_connection_pool::base_connection_pool(std::size_t size, ArgN&&... argn) + : base_connection_pool(size, time_duration_type::max(), std::forward(argn)...) +{ +} + template base_connection_pool::~base_connection_pool() { @@ -38,7 +46,7 @@ base_connection_pool::get_session(boost::system::error_code& ec, cons return nullptr; } - std::unique_ptr session = std::move(sesson_pool_.front()); + std::unique_ptr session = std::move(sesson_pool_.front().second); sesson_pool_.pop_front(); return session; } @@ -56,7 +64,7 @@ void base_connection_pool::return_session(std::unique_ptr::watch_pool_routine() if (!pool_lk.try_lock_for(lock_timeout)) { continue; } - int vacant_places = pool_size_ - sesson_pool_.size(); - if (vacant_places < 0) { - vacant_places = 0; + // remove session which idling past idle_timeout_ + std::size_t pool_current_size = 0; + for (auto pool_it = sesson_pool_.begin(); pool_it != sesson_pool_.end();) { + const auto idle_for = clock_type::now() - pool_it->first; + if (idle_for >= idle_timeout_) { + pool_it = sesson_pool_.erase(pool_it); + } else { + ++pool_it; + ++pool_current_size; + } } + std::size_t vacant_places = pool_max_size_ - pool_current_size; // at this point we own pool_mutex_, but we want to get new sessions simultaneously; // that's why new mutex to sync adding threads @@ -102,7 +118,7 @@ void base_connection_pool::watch_pool_routine() auto new_session = connector.new_session(); // ensure only single session added at time std::unique_lock add_lk(pool_add); - pool.emplace_back(std::move(new_session)); + pool.emplace_back(clock_type::now(), std::move(new_session)); added = true; } catch (const boost::system::system_error& e) { // NOOP @@ -110,7 +126,7 @@ void base_connection_pool::watch_pool_routine() }; std::list adders; - for (int i = 0; i < vacant_places; ++i) { + for (std::size_t i = 0; i < vacant_places; ++i) { adders.emplace_back(add_session); } for (auto& a : adders) { From f41fa408291a4276c44245d430e66cf116343440 Mon Sep 17 00:00:00 2001 From: "i.s.vovk" Date: Fri, 12 Feb 2021 16:05:40 +0300 Subject: [PATCH 2/5] Update readme --- README.md | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 50278e1..989882d 100644 --- a/README.md +++ b/README.md @@ -195,7 +195,7 @@ brew install -s -v ./boost@1.66.rb --with-icu4c brew link --force --overwrite boost@1.66 ``` -### Build +### Test/Install Prefer [out-of-source](https://gitlab.kitware.com/cmake/community/-/wikis/FAQ#what-is-an-out-of-source-build) building: @@ -204,14 +204,10 @@ mkdir build cd build cmake -DCMAKE_BUILD_TYPE=Debug .. make -j$(nproc) -make install -``` - -You may also test it with: -```bash make test ``` -Or install (sudo may be required): + +To install the lib (sudo may be required): ```bash make install ``` From 17fb457f681f43789a564524840938b0f7b77c91 Mon Sep 17 00:00:00 2001 From: "i.s.vovk" Date: Fri, 12 Feb 2021 18:48:30 +0300 Subject: [PATCH 3/5] disambiguate base_connection_pool ctors --- include/stream-client/connector/connection_pool.hpp | 12 +++++++----- .../stream-client/connector/impl/connection_pool.ipp | 7 ++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/include/stream-client/connector/connection_pool.hpp b/include/stream-client/connector/connection_pool.hpp index bc7959c..dfa40d1 100644 --- a/include/stream-client/connector/connection_pool.hpp +++ b/include/stream-client/connector/connection_pool.hpp @@ -28,9 +28,9 @@ class base_connection_pool using stream_type = typename connector_type::stream_type; using protocol_type = typename stream_type::protocol_type; - using clock_type = typename stream_type::clock_type; - using time_duration_type = typename stream_type::time_duration_type; - using time_point_type = typename stream_type::time_point_type; + using clock_type = typename connector_type::clock_type; + using time_duration_type = typename connector_type::time_duration_type; + using time_point_type = typename connector_type::time_point_type; /** * Parametrized constructor. @@ -64,8 +64,10 @@ class base_connection_pool * and later you return pulled stream back with return_session(). * @param[in] ...argn Arguments to pass to @p Connector constructor. */ - template - base_connection_pool(std::size_t size, ArgN&&... argn); + template ::value>::type* = nullptr> + base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn); /// Copy constructor is not permitted. base_connection_pool(const base_connection_pool& other) = delete; diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index 9e4c1ba..a271cb9 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -15,9 +15,10 @@ base_connection_pool::base_connection_pool(std::size_t size, time_dur } template -template -base_connection_pool::base_connection_pool(std::size_t size, ArgN&&... argn) - : base_connection_pool(size, time_duration_type::max(), std::forward(argn)...) +template ::value>::type*> +base_connection_pool::base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn) + : base_connection_pool(size, time_duration_type::max(), std::forward(arg1), std::forward(argn)...) { } From 6a03a95561527603b7db09c2859280f7df522337 Mon Sep 17 00:00:00 2001 From: "i.s.vovk" Date: Fri, 12 Feb 2021 21:11:19 +0300 Subject: [PATCH 4/5] prevent session overflow in the pool --- include/stream-client/connector/impl/connection_pool.ipp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index a271cb9..1473fb2 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -107,7 +107,8 @@ void base_connection_pool::watch_pool_routine() ++pool_current_size; } } - std::size_t vacant_places = pool_max_size_ - pool_current_size; + // pool_current_size may be bigger if someone returned previous session + std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0; // at this point we own pool_mutex_, but we want to get new sessions simultaneously; // that's why new mutex to sync adding threads @@ -122,7 +123,7 @@ void base_connection_pool::watch_pool_routine() pool.emplace_back(clock_type::now(), std::move(new_session)); added = true; } catch (const boost::system::system_error& e) { - // NOOP + // TODO: log errors ? } }; @@ -138,7 +139,7 @@ void base_connection_pool::watch_pool_routine() if (added) { pool_cv_.notify_all(); } else { - // stop cpu spooling if no session has been added + // stop cpu spooling if nothing has been added std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } From 9c4f6fefb84e47fefee507fbbbc8c5a1b1234fb8 Mon Sep 17 00:00:00 2001 From: "i.s.vovk" Date: Fri, 12 Feb 2021 18:49:29 +0300 Subject: [PATCH 5/5] bump version up to 1.1.5; update CHANGELOG --- CHANGELOG.md | 6 ++++++ CMakeLists.txt | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8198497..c50ddb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 1.1.5 (2021-02-12) + +### Features + +* Add idle timeout parameter for a pool. If session is sitting idle in the pool for a longer time it will replaced with new one + ## 1.1.4 (2020-04-28) ### Bug Fixes diff --git a/CMakeLists.txt b/CMakeLists.txt index 0a5cca5..e177d4a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0 FATAL_ERROR) set(STREAMCLIENT_VERSION_MAJOR "1") set(STREAMCLIENT_VERSION_MINOR "1") -set(STREAMCLIENT_VERSION_RELEASE "4") +set(STREAMCLIENT_VERSION_RELEASE "5") set(STREAMCLIENT_VERSION_STRING "${STREAMCLIENT_VERSION_MAJOR}.${STREAMCLIENT_VERSION_MINOR}.${STREAMCLIENT_VERSION_RELEASE}") set(STREAMCLIENT_LIB_VERSION ${STREAMCLIENT_VERSION_STRING}) mark_as_advanced(STREAMCLIENT_VERSION_MAJOR STREAMCLIENT_VERSION_MINOR STREAMCLIENT_VERSION_RELEASE STREAMCLIENT_VERSION_STRING STREAMCLIENT_LIB_VERSION)