diff --git a/CHANGELOG.md b/CHANGELOG.md index 8198497..c50ddb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 1.1.5 (2021-02-12) + +### Features + +* Add idle timeout parameter for a pool. If session is sitting idle in the pool for a longer time it will replaced with new one + ## 1.1.4 (2020-04-28) ### Bug Fixes diff --git a/CMakeLists.txt b/CMakeLists.txt index 0a5cca5..e177d4a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0 FATAL_ERROR) set(STREAMCLIENT_VERSION_MAJOR "1") set(STREAMCLIENT_VERSION_MINOR "1") -set(STREAMCLIENT_VERSION_RELEASE "4") +set(STREAMCLIENT_VERSION_RELEASE "5") set(STREAMCLIENT_VERSION_STRING "${STREAMCLIENT_VERSION_MAJOR}.${STREAMCLIENT_VERSION_MINOR}.${STREAMCLIENT_VERSION_RELEASE}") set(STREAMCLIENT_LIB_VERSION ${STREAMCLIENT_VERSION_STRING}) mark_as_advanced(STREAMCLIENT_VERSION_MAJOR STREAMCLIENT_VERSION_MINOR STREAMCLIENT_VERSION_RELEASE STREAMCLIENT_VERSION_STRING STREAMCLIENT_LIB_VERSION) diff --git a/README.md b/README.md index 50278e1..989882d 100644 --- a/README.md +++ b/README.md @@ -195,7 +195,7 @@ brew install -s -v ./boost@1.66.rb --with-icu4c brew link --force --overwrite boost@1.66 ``` -### Build +### Test/Install Prefer [out-of-source](https://gitlab.kitware.com/cmake/community/-/wikis/FAQ#what-is-an-out-of-source-build) building: @@ -204,14 +204,10 @@ mkdir build cd build cmake -DCMAKE_BUILD_TYPE=Debug .. make -j$(nproc) -make install -``` - -You may also test it with: -```bash make test ``` -Or install (sudo may be required): + +To install the lib (sudo may be required): ```bash make install ``` diff --git a/include/stream-client/connector/connection_pool.hpp b/include/stream-client/connector/connection_pool.hpp index 4a16b76..dfa40d1 100644 --- a/include/stream-client/connector/connection_pool.hpp +++ b/include/stream-client/connector/connection_pool.hpp @@ -28,9 +28,9 @@ class base_connection_pool using stream_type = typename connector_type::stream_type; using protocol_type = typename stream_type::protocol_type; - using clock_type = typename stream_type::clock_type; - using time_duration_type = typename stream_type::time_duration_type; - using time_point_type = typename stream_type::time_point_type; + using clock_type = typename connector_type::clock_type; + using time_duration_type = typename connector_type::time_duration_type; + using time_point_type = typename connector_type::time_point_type; /** * Parametrized constructor. @@ -43,11 +43,31 @@ class base_connection_pool * @param[in] size Number of connected sockets to maintain in the pool. * Note that real number of established connections my be @p size + 1. * This happens when you pull a stream with get_session() , the pool establishes new one to replace it, - * and later you return pulled stream back with return_session()/ + * and later you return pulled stream back with return_session(). + * @param[in] idle_timeout sessions which are in the pool for a longer time are replaced with new ones. * @param[in] ...argn Arguments to pass to @p Connector constructor. */ template - base_connection_pool(std::size_t size, ArgN&&... argn); + base_connection_pool(std::size_t size, time_duration_type idle_timeout, ArgN&&... argn); + + /** + * Parametrized constructor. + * Constructs pool for desired connector (protocol). Passed @p argn forwarded to @p Connector constructor. + * This operation starts background thread to fill the pool with opened sockets, + * therefore subsequent get_session() calls may take longer time compared with the state when pool is full. + * + * @tparam ...ArgN Types of argn. + * + * @param[in] size Number of connected sockets to maintain in the pool. + * Note that real number of established connections my be @p size + 1. + * This happens when you pull a stream with get_session() , the pool establishes new one to replace it, + * and later you return pulled stream back with return_session(). + * @param[in] ...argn Arguments to pass to @p Connector constructor. + */ + template ::value>::type* = nullptr> + base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn); /// Copy constructor is not permitted. base_connection_pool(const base_connection_pool& other) = delete; @@ -263,8 +283,10 @@ class base_connection_pool connector_type connector_; ///< Underlying connector used to establish sockets. - std::size_t pool_size_; ///< Number of stream to keep in the @p sesson_pool_. - std::list> sesson_pool_; ///< The list of established sockets + std::size_t pool_max_size_; ///< Number of stream to keep in the @p sesson_pool_. + time_duration_type idle_timeout_; ///< Idle timeout for the sessions in the @p sesson_pool_. + std::list>> + sesson_pool_; ///< The list of established sockets. mutable std::timed_mutex pool_mutex_; ///< @p sesson_pool_ mutex. mutable std::condition_variable_any pool_cv_; ///< @p sesson_pool_ condition variable. diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index c75de15..1473fb2 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -5,14 +5,23 @@ namespace connector { template template -base_connection_pool::base_connection_pool(std::size_t size, ArgN&&... argn) +base_connection_pool::base_connection_pool(std::size_t size, time_duration_type idle_timeout, ArgN&&... argn) : connector_(std::forward(argn)...) - , pool_size_(size) + , pool_max_size_(size) + , idle_timeout_(idle_timeout) , watch_pool(true) { pool_watcher_ = std::thread([this]() { this->watch_pool_routine(); }); } +template +template ::value>::type*> +base_connection_pool::base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn) + : base_connection_pool(size, time_duration_type::max(), std::forward(arg1), std::forward(argn)...) +{ +} + template base_connection_pool::~base_connection_pool() { @@ -38,7 +47,7 @@ base_connection_pool::get_session(boost::system::error_code& ec, cons return nullptr; } - std::unique_ptr session = std::move(sesson_pool_.front()); + std::unique_ptr session = std::move(sesson_pool_.front().second); sesson_pool_.pop_front(); return session; } @@ -56,7 +65,7 @@ void base_connection_pool::return_session(std::unique_ptr::watch_pool_routine() if (!pool_lk.try_lock_for(lock_timeout)) { continue; } - int vacant_places = pool_size_ - sesson_pool_.size(); - if (vacant_places < 0) { - vacant_places = 0; + // remove session which idling past idle_timeout_ + std::size_t pool_current_size = 0; + for (auto pool_it = sesson_pool_.begin(); pool_it != sesson_pool_.end();) { + const auto idle_for = clock_type::now() - pool_it->first; + if (idle_for >= idle_timeout_) { + pool_it = sesson_pool_.erase(pool_it); + } else { + ++pool_it; + ++pool_current_size; + } } + // pool_current_size may be bigger if someone returned previous session + std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0; // at this point we own pool_mutex_, but we want to get new sessions simultaneously; // that's why new mutex to sync adding threads @@ -102,15 +120,15 @@ void base_connection_pool::watch_pool_routine() auto new_session = connector.new_session(); // ensure only single session added at time std::unique_lock add_lk(pool_add); - pool.emplace_back(std::move(new_session)); + pool.emplace_back(clock_type::now(), std::move(new_session)); added = true; } catch (const boost::system::system_error& e) { - // NOOP + // TODO: log errors ? } }; std::list adders; - for (int i = 0; i < vacant_places; ++i) { + for (std::size_t i = 0; i < vacant_places; ++i) { adders.emplace_back(add_session); } for (auto& a : adders) { @@ -121,7 +139,7 @@ void base_connection_pool::watch_pool_routine() if (added) { pool_cv_.notify_all(); } else { - // stop cpu spooling if no session has been added + // stop cpu spooling if nothing has been added std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }