diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 0a9f771fba..5538ce194f 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -4,6 +4,7 @@ add_executable( fakes/websocket_client.hpp fakes/work_peer.hpp active_transactions.cpp + asio.cpp backlog.cpp block.cpp block_store.cpp diff --git a/nano/core_test/asio.cpp b/nano/core_test/asio.cpp new file mode 100644 index 0000000000..581c3ecdb5 --- /dev/null +++ b/nano/core_test/asio.cpp @@ -0,0 +1,142 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +namespace asio = boost::asio; + +TEST (asio, multithreaded_context) +{ + asio::thread_pool io_ctx{ 8 }; + + asio::strand strand{ io_ctx.get_executor () }; + + asio::ip::tcp::endpoint endpoint{ asio::ip::address_v6::loopback (), 0 }; + asio::ip::tcp::acceptor acceptor{ strand }; + acceptor.open (endpoint.protocol ()); + acceptor.bind (endpoint); + acceptor.listen (boost::asio::socket_base::max_listen_connections); + + std::atomic read_counter{ 0 }; + + auto reader_coro = [&] (asio::ip::tcp::socket socket) -> asio::awaitable { + std::cout << "reader started" << std::endl; + + while (true) + { + std::array buffer; + auto size = co_await socket.async_read_some (asio::buffer (buffer), asio::use_awaitable); + read_counter += size; + } + }; + + struct reader + { + std::future fut; + std::unique_ptr cancellation; + }; + std::vector readers; + + auto acceptor_coro = [&] (asio::ip::tcp::acceptor & acceptor) -> asio::awaitable { + std::cout << "listening started" << std::endl; + + while (true) + { + auto socket = co_await acceptor.async_accept (asio::use_awaitable); + + std::cout << "accepted connection" << std::endl; + + auto cancellation = std::make_unique (); + auto reader_fut = asio::co_spawn ( + strand, + reader_coro (std::move (socket)), + asio::bind_cancellation_slot (cancellation->slot (), asio::use_future)); + + readers.push_back ({ std::move (reader_fut), std::move (cancellation) }); + } + }; + + auto acceptor_fut = asio::co_spawn (strand, acceptor_coro (acceptor), asio::use_future); + + auto sender_coro = [&] () -> asio::awaitable { + std::cout << "sender started" << std::endl; + + asio::ip::tcp::socket socket{ io_ctx }; + co_await socket.async_connect (acceptor.local_endpoint (), asio::use_awaitable); + + std::array buffer; + while (true) + { + co_await socket.async_write_some (asio::buffer (buffer), asio::use_awaitable); + } + }; + + struct sender + { + std::future fut; + std::unique_ptr cancellation; + }; + std::vector senders; + + const auto num_senders = 10; + + for (int i = 0; i < num_senders; ++i) + { + auto cancellation = std::make_unique (); + auto sender_fut = asio::co_spawn ( + strand, + sender_coro (), + asio::bind_cancellation_slot (cancellation->slot (), asio::use_future)); + + senders.push_back ({ std::move (sender_fut), std::move (cancellation) }); + } + + const auto target = 64 * 1024 * 1024; + + while (read_counter < target) + { + std::cout << "read: " << read_counter << std::endl; + std::this_thread::sleep_for (1s); + } + + asio::post (strand, [&] () { + acceptor.close (); + + for (auto & sender : senders) + { + sender.cancellation->emit (asio::cancellation_type::all); + } + for (auto & reader : readers) + { + reader.cancellation->emit (asio::cancellation_type::all); + } + }); + + acceptor_fut.wait (); + + for (auto & sender : senders) + { + sender.fut.wait (); + } + for (auto & reader : readers) + { + reader.fut.wait (); + } +} \ No newline at end of file