Skip to content

Commit

Permalink
Merge branch 'release/v1.3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
i-vovk committed Feb 16, 2022
2 parents bdb48b9 + 0ef707b commit 15969c7
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 91 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ elif [[ "$1" == "docs" ]] ; then

elif [[ "$1" == "lint" ]] ; then
shift 1
./lint.py --color=always --style=file --build-path=./build --recursive "$@" include/
./lint.py --color=always --style=file --exclude-tidy=*.ipp --build-path=./build --recursive "$@" include/

elif [[ "$1" == "build" ]] ; then
shift 1
Expand Down
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
## 1.3.0 (2022-02-16)

### Features

* Support pool refill strategy and implement conservative and greedy (default) - (#20)

### Misc

* cmake: Use system googletest library if found
* cmake: Mute googletest warning
* workflow: Check .ipp files with clang-format


## 1.2.0 (2021-11-24)

### Features
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.0 FATAL_ERROR)

set(STREAMCLIENT_VERSION_MAJOR "1")
set(STREAMCLIENT_VERSION_MINOR "2")
set(STREAMCLIENT_VERSION_MINOR "3")
set(STREAMCLIENT_VERSION_RELEASE "0")
set(STREAMCLIENT_SUMMARY "C++ library")
set(STREAMCLIENT_REPOSITORY_URL "https://github.com/TinkoffCreditSystems/stream-client")
Expand All @@ -25,7 +25,7 @@ set(CMAKE_CXX_STANDARD 14) # todo: upgrade to 17
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-compound-token-split-by-macro")

## Compile as RelWithDebInfo info by default
if(NOT CMAKE_BUILD_TYPE)
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![Language C++](https://img.shields.io/badge/language-c++-blue.svg?logo=c%2B%2B)](https://isocpp.org)
[![Github releases](https://img.shields.io/github/release/TinkoffCreditSystems/stream-client.svg)](https://github.com/TinkoffCreditSystems/stream-client/releases)
[![Coverage Status](https://coveralls.io/repos/github/TinkoffCreditSystems/stream-client/badge.svg?branch=develop)](https://coveralls.io/github/TinkoffCreditSystems/stream-client?branch=develop)
[![Coverage Status](https://coveralls.io/repos/github/Tinkoff/stream-client/badge.svg?branch=develop)](https://coveralls.io/github/Tinkoff/stream-client?branch=develop)
[![License](https://img.shields.io/github/license/TinkoffCreditSystems/stream-client.svg)](./LICENSE)

This is a lightweight, header-only, Boost-based library providing client-side network primitives to easily organize and implement data transmission with remote endpoints.
Expand Down Expand Up @@ -119,6 +119,11 @@ client->receive(boost::asio::buffer(&recv_data[0], send_data.size()));
Represents container occupied with opened sockets. Uses [connector](#connector) to open new sockets in the background thread which is triggered once there are vacant places in the pool. User can call *get_session()* to obtain a socket from the pool and *return_session()* to give it back.
There are two strategies to refill the pool:
- greedy (`stream_client::connector::greedy_strategy`). If there are vacant places it will try to fill them with new sessions simultaneously.
- conservative (`stream_client::connector::conservative_strategy`). Will try to fill up to 2/3 of vacant places in the poll. If failed will back of for some time and retry later. Also, after failures it will create only one new session.
Both of them are defined in terms of `stream_client::connector::pool_strategy` interface, so you are free to implement new one.
Limitations:
1. Sockets that are already in the pool are not checked or maintained in any way. Hence, the pool doesn't guarantee that all sockets are opened at an arbitrary point in time due to the complexity of such checks for all supported protocols.
2. Nothing specific done with sockets upon their return within *return_session()*. Therefore, if they have or will have pending data to read, it will stay there until reading.
Expand All @@ -138,6 +143,8 @@ Connection pools:
* `stream_client::connector::http_pool` - pool of `stream_client::http::http_client` sockets.
* `stream_client::connector::https_pool` - pool of `stream_client::http::https_client` sockets.
*All these pools are using `stream_client::connector::greedy_strategy`.*
#### Example
```c++
const std::chrono::milliseconds resolve_timeout(5000);
Expand Down
5 changes: 4 additions & 1 deletion include/stream-client/connector/connection_pool.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "connector.hpp"
#include "pool_strategy.hpp"

#include <atomic>
#include <list>
Expand All @@ -20,8 +21,9 @@ namespace connector {
* @note Thread-safe. Single instance support concurrent operation.
*
* @tparam Connector Type of connector to use to create sockets.
* @tparam Strategy Type of reconnection strategy. For more info look in pool_strategy.hpp.
*/
template <typename Connector>
template <typename Connector, typename Strategy = greedy_strategy<Connector>>
class base_connection_pool
{
public:
Expand Down Expand Up @@ -312,6 +314,7 @@ class base_connection_pool
/// Background routine used to maintain the pool.
void watch_pool_routine();

Strategy reconnection_;
connector_type connector_; ///< Underlying connector used to establish sockets.

std::size_t pool_max_size_; ///< Number of stream to keep in the @p sesson_pool_.
Expand Down
76 changes: 30 additions & 46 deletions include/stream-client/connector/impl/connection_pool.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
namespace stream_client {
namespace connector {

template <typename Connector>
template <typename Connector, typename Strategy>
template <typename... ArgN>
base_connection_pool<Connector>::base_connection_pool(std::size_t size, time_duration_type idle_timeout, ArgN&&... argn)
base_connection_pool<Connector, Strategy>::base_connection_pool(std::size_t size, time_duration_type idle_timeout,
ArgN&&... argn)
: connector_(std::forward<ArgN>(argn)...)
, pool_max_size_(size)
, idle_timeout_(idle_timeout)
Expand All @@ -14,26 +15,26 @@ base_connection_pool<Connector>::base_connection_pool(std::size_t size, time_dur
pool_watcher_ = std::thread([this]() { this->watch_pool_routine(); });
}

template <typename Connector>
template <typename Connector, typename Strategy>
template <typename Arg1, typename... ArgN,
typename std::enable_if<!std::is_convertible<Arg1, typename Connector::time_duration_type>::value>::type*>
base_connection_pool<Connector>::base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn)
base_connection_pool<Connector, Strategy>::base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn)
: base_connection_pool(size, time_duration_type::max(), std::forward<Arg1>(arg1), std::forward<ArgN>(argn)...)
{
}

template <typename Connector>
base_connection_pool<Connector>::~base_connection_pool()
template <typename Connector, typename Strategy>
base_connection_pool<Connector, Strategy>::~base_connection_pool()
{
watch_pool_.store(false, std::memory_order_release);
if (pool_watcher_.joinable()) {
pool_watcher_.join();
}
}

template <typename Connector>
std::unique_ptr<typename base_connection_pool<Connector>::stream_type>
base_connection_pool<Connector>::get_session(boost::system::error_code& ec, const time_point_type& deadline)
template <typename Connector, typename Strategy>
std::unique_ptr<typename base_connection_pool<Connector, Strategy>::stream_type>
base_connection_pool<Connector, Strategy>::get_session(boost::system::error_code& ec, const time_point_type& deadline)
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_until(deadline)) {
Expand All @@ -52,9 +53,10 @@ base_connection_pool<Connector>::get_session(boost::system::error_code& ec, cons
return session;
}

template <typename Connector>
std::unique_ptr<typename base_connection_pool<Connector>::stream_type>
base_connection_pool<Connector>::try_get_session(boost::system::error_code& ec, const time_point_type& deadline)
template <typename Connector, typename Strategy>
std::unique_ptr<typename base_connection_pool<Connector, Strategy>::stream_type>
base_connection_pool<Connector, Strategy>::try_get_session(boost::system::error_code& ec,
const time_point_type& deadline)
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_until(deadline)) {
Expand All @@ -73,8 +75,8 @@ base_connection_pool<Connector>::try_get_session(boost::system::error_code& ec,
return session;
}

template <typename Connector>
void base_connection_pool<Connector>::return_session(std::unique_ptr<stream_type>&& session)
template <typename Connector, typename Strategy>
void base_connection_pool<Connector, Strategy>::return_session(std::unique_ptr<stream_type>&& session)
{
if (!session || !session->next_layer().is_open()) {
return;
Expand All @@ -88,11 +90,12 @@ void base_connection_pool<Connector>::return_session(std::unique_ptr<stream_type

sesson_pool_.emplace_back(clock_type::now(), std::move(session));
pool_lk.unlock();
pool_cv_.notify_all();
pool_cv_.notify_one();
}

template <typename Connector>
bool base_connection_pool<Connector>::is_connected(boost::system::error_code& ec, const time_point_type& deadline) const
template <typename Connector, typename Strategy>
bool base_connection_pool<Connector, Strategy>::is_connected(boost::system::error_code& ec,
const time_point_type& deadline) const
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_until(deadline)) {
Expand All @@ -107,8 +110,8 @@ bool base_connection_pool<Connector>::is_connected(boost::system::error_code& ec
return true;
}

template <typename Connector>
void base_connection_pool<Connector>::watch_pool_routine()
template <typename Connector, typename Strategy>
void base_connection_pool<Connector, Strategy>::watch_pool_routine()
{
static const auto lock_timeout = std::chrono::milliseconds(100);

Expand Down Expand Up @@ -137,37 +140,18 @@ void base_connection_pool<Connector>::watch_pool_routine()
// pool_current_size may be bigger if someone returned previous session
std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0;

// creating new sessions may be slow and we want to add them simultaneously;
// that's why we need to sync adding threads and lock pool
auto add_session = [this]() {
try {
// getting new session is time consuming operation
auto new_session = connector_.new_session();

// ensure only single session added at time
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_);
sesson_pool_.emplace_back(clock_type::now(), std::move(new_session));
pool_lk.unlock();

// unblock one waiting thread
pool_cv_.notify_one();
} catch (const boost::system::system_error& e) {
// TODO: log errors ?
if (vacant_places) {
auto append_func = [this](std::unique_ptr<stream_type>&& session) {
this->return_session(std::move(session));
};
const auto need_more = reconnection_.refill(connector_, vacant_places, append_func);
if (need_more) {
continue;
}
};

std::list<std::thread> adders;
for (std::size_t i = 0; i < vacant_places; ++i) {
adders.emplace_back(add_session);
}
for (auto& a : adders) {
a.join();
}

// stop cpu spooling if nothing has been added
if (vacant_places == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}

Expand Down
114 changes: 114 additions & 0 deletions include/stream-client/connector/impl/pool_strategy.ipp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#pragma once

#include <boost/system/system_error.hpp>

#include <atomic>
#include <chrono>
#include <list>
#include <random>
#include <thread>
#include <vector>

namespace stream_client {
namespace connector {

template <typename Connector>
const unsigned long conservative_strategy<Connector>::kMaxBackoffMs = 10000; // 10 seconds maximum delay

template <typename Connector>
const unsigned long conservative_strategy<Connector>::kDefaultDelayMs = 50; // 50 milliseconds is default initial delay

template <typename Connector>
const unsigned long conservative_strategy<Connector>::kDefaultDelayMul = 3; // 3 is default delay multiplier

template <typename Connector>
bool greedy_strategy<Connector>::refill(connector_type& connector, std::size_t vacant_places,
append_func_type append_func)
{
// creating new sessions may be slow and we want to add them simultaneously
auto add_session = [&]() {
try {
auto new_session = connector.new_session();
append_func(std::move(new_session));
} catch (const boost::system::system_error& e) {
// TODO: log errors ?
}
};

std::list<std::thread> adders;
for (std::size_t i = 0; i < vacant_places; ++i) {
adders.emplace_back(add_session);
}
for (auto& a : adders) {
a.join();
}

return vacant_places > 0;
}

template <typename Connector>
conservative_strategy<Connector>::conservative_strategy(unsigned long first_delay_ms, unsigned delay_multiplier)
: initial_delay_ms_(first_delay_ms)
, delay_multiplier_(delay_multiplier)
, current_delay_ms_(0)
, r_generator_(r_device_())
{
if (delay_multiplier_ < 1) {
throw std::runtime_error("delay multiplier should be >= 1");
}
}

template <typename Connector>
bool conservative_strategy<Connector>::refill(connector_type& connector, std::size_t vacant_places,
append_func_type append_func)
{
if (clock_type::now() < wait_until_) {
return false;
}

std::atomic_bool is_added{false};

// creating new sessions may be slow and we want to add them simultaneously
auto add_session = [&]() {
try {
auto new_session = connector.new_session();
append_func(std::move(new_session));
is_added = true;
} catch (const boost::system::system_error& e) {
// TODO: log errors ?
}
};

std::vector<std::thread> adders;
const size_t parallel = (vacant_places + 2) / 3 - 1;
if (!current_delay_ms_ && parallel > 0) {
adders.reserve(parallel);
for (std::size_t i = 0; i < parallel; ++i) {
adders.emplace_back(add_session);
}
}
add_session();
for (auto& a : adders) {
a.join();
}

if (is_added) {
current_delay_ms_ = 0;
return true;
}

if (!current_delay_ms_) {
current_delay_ms_ = initial_delay_ms_;
} else {
current_delay_ms_ *= delay_multiplier_;
}
const auto rand_val = double(r_generator_()) / r_generator_.max();
current_delay_ms_ *= rand_val;
current_delay_ms_ = std::min(kMaxBackoffMs, current_delay_ms_);
wait_until_ = clock_type::now() + std::chrono::milliseconds(current_delay_ms_);

return false;
}

} // namespace connector
} // namespace stream_client
Loading

0 comments on commit 15969c7

Please sign in to comment.