Skip to content

Commit

Permalink
Merge branch 'release/1.1.5'
Browse files Browse the repository at this point in the history
  • Loading branch information
i.s.vovk committed Feb 12, 2021
2 parents 52a31d7 + 9c4f6fe commit a3559ca
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 26 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 1.1.5 (2021-02-12)

### Features

* Add idle timeout parameter for a pool. If session is sitting idle in the pool for a longer time it will replaced with new one

## 1.1.4 (2020-04-28)

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0 FATAL_ERROR)

set(STREAMCLIENT_VERSION_MAJOR "1")
set(STREAMCLIENT_VERSION_MINOR "1")
set(STREAMCLIENT_VERSION_RELEASE "4")
set(STREAMCLIENT_VERSION_RELEASE "5")
set(STREAMCLIENT_VERSION_STRING "${STREAMCLIENT_VERSION_MAJOR}.${STREAMCLIENT_VERSION_MINOR}.${STREAMCLIENT_VERSION_RELEASE}")
set(STREAMCLIENT_LIB_VERSION ${STREAMCLIENT_VERSION_STRING})
mark_as_advanced(STREAMCLIENT_VERSION_MAJOR STREAMCLIENT_VERSION_MINOR STREAMCLIENT_VERSION_RELEASE STREAMCLIENT_VERSION_STRING STREAMCLIENT_LIB_VERSION)
Expand Down
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ brew install -s -v ./[email protected] --with-icu4c
brew link --force --overwrite [email protected]
```

### Build
### Test/Install

Prefer [out-of-source](https://gitlab.kitware.com/cmake/community/-/wikis/FAQ#what-is-an-out-of-source-build) building:

Expand All @@ -204,14 +204,10 @@ mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=Debug ..
make -j$(nproc)
make install
```

You may also test it with:
```bash
make test
```
Or install (sudo may be required):

To install the lib (sudo may be required):
```bash
make install
```
Expand Down
36 changes: 29 additions & 7 deletions include/stream-client/connector/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class base_connection_pool
using stream_type = typename connector_type::stream_type;
using protocol_type = typename stream_type::protocol_type;

using clock_type = typename stream_type::clock_type;
using time_duration_type = typename stream_type::time_duration_type;
using time_point_type = typename stream_type::time_point_type;
using clock_type = typename connector_type::clock_type;
using time_duration_type = typename connector_type::time_duration_type;
using time_point_type = typename connector_type::time_point_type;

/**
* Parametrized constructor.
Expand All @@ -43,11 +43,31 @@ class base_connection_pool
* @param[in] size Number of connected sockets to maintain in the pool.
* Note that real number of established connections my be @p size + 1.
* This happens when you pull a stream with get_session() , the pool establishes new one to replace it,
* and later you return pulled stream back with return_session()/
* and later you return pulled stream back with return_session().
* @param[in] idle_timeout sessions which are in the pool for a longer time are replaced with new ones.
* @param[in] ...argn Arguments to pass to @p Connector constructor.
*/
template <typename... ArgN>
base_connection_pool(std::size_t size, ArgN&&... argn);
base_connection_pool(std::size_t size, time_duration_type idle_timeout, ArgN&&... argn);

/**
* Parametrized constructor.
* Constructs pool for desired connector (protocol). Passed @p argn forwarded to @p Connector constructor.
* This operation starts background thread to fill the pool with opened sockets,
* therefore subsequent get_session() calls may take longer time compared with the state when pool is full.
*
* @tparam ...ArgN Types of argn.
*
* @param[in] size Number of connected sockets to maintain in the pool.
* Note that real number of established connections my be @p size + 1.
* This happens when you pull a stream with get_session() , the pool establishes new one to replace it,
* and later you return pulled stream back with return_session().
* @param[in] ...argn Arguments to pass to @p Connector constructor.
*/
template <typename Arg1, typename... ArgN,
typename std::enable_if<
!std::is_convertible<Arg1, typename Connector::time_duration_type>::value>::type* = nullptr>
base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn);

/// Copy constructor is not permitted.
base_connection_pool(const base_connection_pool<Connector>& other) = delete;
Expand Down Expand Up @@ -263,8 +283,10 @@ class base_connection_pool

connector_type connector_; ///< Underlying connector used to establish sockets.

std::size_t pool_size_; ///< Number of stream to keep in the @p sesson_pool_.
std::list<std::unique_ptr<stream_type>> sesson_pool_; ///< The list of established sockets
std::size_t pool_max_size_; ///< Number of stream to keep in the @p sesson_pool_.
time_duration_type idle_timeout_; ///< Idle timeout for the sessions in the @p sesson_pool_.
std::list<std::pair<time_point_type, std::unique_ptr<stream_type>>>
sesson_pool_; ///< The list of established sockets.
mutable std::timed_mutex pool_mutex_; ///< @p sesson_pool_ mutex.
mutable std::condition_variable_any pool_cv_; ///< @p sesson_pool_ condition variable.

Expand Down
40 changes: 29 additions & 11 deletions include/stream-client/connector/impl/connection_pool.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,23 @@ namespace connector {

template <typename Connector>
template <typename... ArgN>
base_connection_pool<Connector>::base_connection_pool(std::size_t size, ArgN&&... argn)
base_connection_pool<Connector>::base_connection_pool(std::size_t size, time_duration_type idle_timeout, ArgN&&... argn)
: connector_(std::forward<ArgN>(argn)...)
, pool_size_(size)
, pool_max_size_(size)
, idle_timeout_(idle_timeout)
, watch_pool(true)
{
pool_watcher_ = std::thread([this]() { this->watch_pool_routine(); });
}

template <typename Connector>
template <typename Arg1, typename... ArgN,
typename std::enable_if<!std::is_convertible<Arg1, typename Connector::time_duration_type>::value>::type*>
base_connection_pool<Connector>::base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn)
: base_connection_pool(size, time_duration_type::max(), std::forward<Arg1>(arg1), std::forward<ArgN>(argn)...)
{
}

template <typename Connector>
base_connection_pool<Connector>::~base_connection_pool()
{
Expand All @@ -38,7 +47,7 @@ base_connection_pool<Connector>::get_session(boost::system::error_code& ec, cons
return nullptr;
}

std::unique_ptr<stream_type> session = std::move(sesson_pool_.front());
std::unique_ptr<stream_type> session = std::move(sesson_pool_.front().second);
sesson_pool_.pop_front();
return session;
}
Expand All @@ -56,7 +65,7 @@ void base_connection_pool<Connector>::return_session(std::unique_ptr<stream_type
return;
}

sesson_pool_.emplace_back(std::move(session));
sesson_pool_.emplace_back(clock_type::now(), std::move(session));
pool_lk.unlock();
pool_cv_.notify_all();
}
Expand Down Expand Up @@ -87,10 +96,19 @@ void base_connection_pool<Connector>::watch_pool_routine()
if (!pool_lk.try_lock_for(lock_timeout)) {
continue;
}
int vacant_places = pool_size_ - sesson_pool_.size();
if (vacant_places < 0) {
vacant_places = 0;
// remove session which idling past idle_timeout_
std::size_t pool_current_size = 0;
for (auto pool_it = sesson_pool_.begin(); pool_it != sesson_pool_.end();) {
const auto idle_for = clock_type::now() - pool_it->first;
if (idle_for >= idle_timeout_) {
pool_it = sesson_pool_.erase(pool_it);
} else {
++pool_it;
++pool_current_size;
}
}
// pool_current_size may be bigger if someone returned previous session
std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0;

// at this point we own pool_mutex_, but we want to get new sessions simultaneously;
// that's why new mutex to sync adding threads
Expand All @@ -102,15 +120,15 @@ void base_connection_pool<Connector>::watch_pool_routine()
auto new_session = connector.new_session();
// ensure only single session added at time
std::unique_lock<std::mutex> add_lk(pool_add);
pool.emplace_back(std::move(new_session));
pool.emplace_back(clock_type::now(), std::move(new_session));
added = true;
} catch (const boost::system::system_error& e) {
// NOOP
// TODO: log errors ?
}
};

std::list<std::thread> adders;
for (int i = 0; i < vacant_places; ++i) {
for (std::size_t i = 0; i < vacant_places; ++i) {
adders.emplace_back(add_session);
}
for (auto& a : adders) {
Expand All @@ -121,7 +139,7 @@ void base_connection_pool<Connector>::watch_pool_routine()
if (added) {
pool_cv_.notify_all();
} else {
// stop cpu spooling if no session has been added
// stop cpu spooling if nothing has been added
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
Expand Down

0 comments on commit a3559ca

Please sign in to comment.