Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 19, 2024
1 parent eacf6d3 commit 3f49cc1
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 47 deletions.
10 changes: 5 additions & 5 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ using namespace std::chrono_literals;
TEST (network, tcp_connection)
{
nano::test::system system;
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
auto port = system.get_available_port ();
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), port);
acceptor.open (endpoint.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
acceptor.bind (endpoint);
acceptor.listen ();
boost::asio::ip::tcp::socket incoming (system.io_ctx);
boost::asio::ip::tcp::socket incoming (*system.io_ctx);
std::atomic<bool> done1 (false);
std::string message1;
acceptor.async_accept (incoming, [&done1, &message1] (boost::system::error_code const & ec_a) {
Expand All @@ -39,7 +39,7 @@ TEST (network, tcp_connection)
}
done1 = true;
});
boost::asio::ip::tcp::socket connector (system.io_ctx);
boost::asio::ip::tcp::socket connector (*system.io_ctx);
std::atomic<bool> done2 (false);
std::string message2;
connector.async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), acceptor.local_endpoint ().port ()),
Expand Down Expand Up @@ -538,13 +538,13 @@ TEST (network, ipv6_bind_send_ipv4)
std::array<uint8_t, 16> bytes1{};
std::atomic<bool> finish1{ false };
nano::endpoint endpoint3;
boost::asio::ip::udp::socket socket1 (system.io_ctx, endpoint1);
boost::asio::ip::udp::socket socket1 (*system.io_ctx, endpoint1);
socket1.async_receive_from (boost::asio::buffer (bytes1.data (), bytes1.size ()), endpoint3, [&finish1] (boost::system::error_code const & error, size_t size_a) {
ASSERT_FALSE (error);
ASSERT_EQ (16, size_a);
finish1 = true;
});
boost::asio::ip::udp::socket socket2 (system.io_ctx, endpoint2);
boost::asio::ip::udp::socket socket2 (*system.io_ctx, endpoint2);
nano::endpoint endpoint5 (boost::asio::ip::address_v4::loopback (), socket1.local_endpoint ().port ());
nano::endpoint endpoint6 (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4::loopback ()), socket2.local_endpoint ().port ());
socket2.async_send_to (boost::asio::buffer (std::array<uint8_t, 16>{}, 16), endpoint5, [] (boost::system::error_code const & error, size_t size_a) {
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TEST (node, stop)
nano::test::system system (1);
ASSERT_NE (system.nodes[0]->wallets.items.end (), system.nodes[0]->wallets.items.begin ());
system.nodes[0]->stop ();
system.io_ctx.run ();
system.io_ctx->run ();
ASSERT_TRUE (true);
}

Expand Down Expand Up @@ -68,10 +68,10 @@ TEST (node, work_generate)
TEST (node, block_store_path_failure)
{
nano::test::system system;
auto service (std::make_shared<boost::asio::io_context> ());
auto io_ctx = std::make_shared<boost::asio::io_context> ();
auto path (nano::unique_path ());
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
auto node (std::make_shared<nano::node> (*service, system.get_available_port (), path, pool));
auto node (std::make_shared<nano::node> (io_ctx, system.get_available_port (), path, pool));
ASSERT_TRUE (node->wallets.items.empty ());
node->stop ();
}
Expand All @@ -97,7 +97,7 @@ TEST (node_DeathTest, readonly_block_store_not_exist)
TEST (node, password_fanout)
{
nano::test::system system;
boost::asio::io_context io_ctx;
auto io_ctx = std::make_shared<boost::asio::io_context> ();
auto path (nano::unique_path ());
nano::node_config config;
config.peering_port = system.get_available_port ();
Expand Down
20 changes: 10 additions & 10 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ TEST (socket, drop_policy)
nano::inactive_node inactivenode (nano::unique_path (), node_flags);
auto node = inactivenode.node;

nano::thread_runner runner (node->io_ctx, 1);
nano::thread_runner runner (node->io_ctx_shared, 1);

std::vector<std::shared_ptr<nano::transport::socket>> connections;

Expand Down Expand Up @@ -469,7 +469,7 @@ TEST (socket, concurrent_writes)

// This gives more realistic execution than using system#poll, allowing writes to
// queue up and drain concurrently.
nano::thread_runner runner (node->io_ctx, 1);
nano::thread_runner runner (node->io_ctx_shared, 1);

constexpr size_t max_connections = 4;
constexpr size_t client_count = max_connections;
Expand Down Expand Up @@ -622,13 +622,13 @@ TEST (socket_timeout, read)

// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);

// asynchronously accept an incoming connection and create a newsock and do not send any data
boost::asio::ip::tcp::socket newsock (system.io_ctx);
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
});
Expand Down Expand Up @@ -668,13 +668,13 @@ TEST (socket_timeout, write)

// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);

// asynchronously accept an incoming connection and create a newsock and do not receive any data
boost::asio::ip::tcp::socket newsock (system.io_ctx);
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
});
Expand Down Expand Up @@ -719,13 +719,13 @@ TEST (socket_timeout, read_overlapped)

// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);

// asynchronously accept an incoming connection and send one byte only
boost::asio::ip::tcp::socket newsock (system.io_ctx);
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
acceptor.async_accept (newsock, [&newsock] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);

Expand Down Expand Up @@ -777,13 +777,13 @@ TEST (socket_timeout, write_overlapped)

// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);

// asynchronously accept an incoming connection and read 2 bytes only
boost::asio::ip::tcp::socket newsock (system.io_ctx);
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
acceptor.async_accept (newsock, [&newsock, &buffer] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
Expand Down
6 changes: 4 additions & 2 deletions nano/load_test/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ int main (int argc, char * const * argv)
std::this_thread::sleep_for (std::chrono::seconds (7));
std::cout << "Connecting nodes..." << std::endl;

boost::asio::io_context ioc;
std::shared_ptr<boost::asio::io_context> ioc_shared = std::make_shared<boost::asio::io_context> ();
boost::asio::io_context & ioc{ *ioc_shared };

debug_assert (!nano::signal_handler_impl);
nano::signal_handler_impl = [&ioc] () {
Expand Down Expand Up @@ -715,7 +716,8 @@ int main (int argc, char * const * argv)
// Stop main node
stop_rpc (ioc, primary_node_results);
});
nano::thread_runner runner (ioc, simultaneous_process_calls);

nano::thread_runner runner (ioc_shared, simultaneous_process_calls);
t.join ();
runner.join ();

Expand Down
8 changes: 4 additions & 4 deletions nano/nano_node/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,8 @@ int main (int argc, char * const * argv)
}
}
std::cout << boost::str (boost::format ("Starting generating %1% blocks...\n") % (count * 2));
boost::asio::io_context io_ctx1;
boost::asio::io_context io_ctx2;
auto io_ctx1 = std::make_shared<boost::asio::io_context> ();
auto io_ctx2 = std::make_shared<boost::asio::io_context> ();
nano::work_pool work{ network_params.network, std::numeric_limits<unsigned>::max () };
auto path1 (nano::unique_path ());
auto path2 (nano::unique_path ());
Expand Down Expand Up @@ -1283,8 +1283,8 @@ int main (int argc, char * const * argv)
auto end (std::chrono::high_resolution_clock::now ());
auto time (std::chrono::duration_cast<std::chrono::microseconds> (end - begin).count ());
std::cout << boost::str (boost::format ("%|1$ 12d| us \n%2% frontiers per second\n") % time % ((count + 1) * 1000000 / time));
io_ctx1.stop ();
io_ctx2.stop ();
io_ctx1->stop ();
io_ctx2->stop ();
runner1.join ();
runner2.join ();
node1->stop ();
Expand Down
5 changes: 3 additions & 2 deletions nano/nano_wallet/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ int run_wallet (QApplication & application, int argc, char * const * argv, std::
config.node.websocket_config.tls_config = tls_config;
}

boost::asio::io_context io_ctx;
std::shared_ptr<boost::asio::io_context> io_ctx = std::make_shared<boost::asio::io_context> ();

nano::thread_runner runner (io_ctx, config.node.io_threads);

std::shared_ptr<nano::node> node;
Expand Down Expand Up @@ -191,7 +192,7 @@ int run_wallet (QApplication & application, int argc, char * const * argv, std::
}
rpc_config.tls_config = tls_config;
rpc_handler = std::make_unique<nano::inprocess_rpc_handler> (*node, ipc, config.rpc);
rpc = nano::get_rpc (io_ctx, rpc_config, *rpc_handler);
rpc = nano::get_rpc (*io_ctx, rpc_config, *rpc_handler);
rpc->start ();
}
else
Expand Down
9 changes: 5 additions & 4 deletions nano/node/ipc/ipc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,13 @@ class socket_transport : public nano::ipc::transport
{
public:
socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE endpoint_a, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) :
server (server_a), config_transport (config_transport_a)
server (server_a),
config_transport (config_transport_a)
{
// Using a per-transport event dispatcher?
if (concurrency_a > 0)
{
io_ctx = std::make_unique<boost::asio::io_context> ();
io_ctx = std::make_shared<boost::asio::io_context> ();
}

boost::asio::socket_base::reuse_address option (true);
Expand All @@ -482,7 +483,7 @@ class socket_transport : public nano::ipc::transport
// A separate io_context for domain sockets may facilitate better performance on some systems.
if (concurrency_a > 0)
{
runner = std::make_unique<nano::thread_runner> (*io_ctx, static_cast<unsigned> (concurrency_a));
runner = std::make_unique<nano::thread_runner> (io_ctx, static_cast<unsigned> (concurrency_a));
}
}

Expand Down Expand Up @@ -544,7 +545,7 @@ class socket_transport : public nano::ipc::transport
nano::ipc::ipc_server & server;
nano::ipc::ipc_config_transport & config_transport;
std::unique_ptr<nano::thread_runner> runner;
std::unique_ptr<boost::asio::io_context> io_ctx;
std::shared_ptr<boost::asio::io_context> io_ctx;
std::unique_ptr<ACCEPTOR_TYPE> acceptor;
};

Expand Down
4 changes: 2 additions & 2 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, uint16_t pe
}

nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesystem::path const & application_path_a, nano::node_config const & config_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
io_ctx_impl{ io_ctx_a },
io_ctx{ *io_ctx_impl },
io_ctx_shared{ io_ctx_a },
io_ctx{ *io_ctx_shared },
node_id{ load_or_create_node_id (application_path_a) },
write_database_queue (!flags_a.force_use_write_database_queue && (config_a.rocksdb_config.enable)),
node_initialized_latch (1),
Expand Down
4 changes: 1 addition & 3 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,10 @@ class node final : public std::enable_shared_from_this<node>
nano::account get_node_id () const;
nano::telemetry_data local_telemetry () const;

private:
std::shared_ptr<boost::asio::io_context> io_ctx_impl;

public:
const nano::keypair node_id;
nano::write_database_queue write_database_queue;
std::shared_ptr<boost::asio::io_context> io_ctx_shared;
boost::asio::io_context & io_ctx;
boost::latch node_initialized_latch;
nano::node_config config;
Expand Down
12 changes: 6 additions & 6 deletions nano/rpc_test/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1741,7 +1741,7 @@ TEST (rpc, version)
auto const rpc_ctx = add_rpc (system, node1);
boost::property_tree::ptree request1;
request1.put ("action", "version");
test_response response1 (request1, rpc_ctx.rpc->listening_port (), system.io_ctx);
test_response response1 (request1, rpc_ctx.rpc->listening_port (), *system.io_ctx);
ASSERT_TIMELY (5s, response1.status != 0);
ASSERT_EQ (200, response1.status);
ASSERT_EQ ("1", response1.json.get<std::string> ("rpc_version"));
Expand Down Expand Up @@ -2506,7 +2506,7 @@ TEST (rpc, bootstrap)
request.put ("action", "bootstrap");
request.put ("address", "::ffff:127.0.0.1");
request.put ("port", node1->network.endpoint ().port ());
test_response response (request, rpc_ctx.rpc->listening_port (), system0.io_ctx);
test_response response (request, rpc_ctx.rpc->listening_port (), *system0.io_ctx);
while (response.status == 0)
{
system0.poll ();
Expand Down Expand Up @@ -6046,8 +6046,8 @@ TEST (rpc, simultaneous_calls)
const auto ipc_tcp_port = ipc_server.listening_tcp_port ();
ASSERT_TRUE (ipc_tcp_port.has_value ());
rpc_config.rpc_process.num_ipc_connections = 8;
nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config, ipc_tcp_port.value ());
nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor);
nano::ipc_rpc_processor ipc_rpc_processor (*system.io_ctx, rpc_config, ipc_tcp_port.value ());
nano::rpc rpc (*system.io_ctx, rpc_config, ipc_rpc_processor);
rpc.start ();
boost::property_tree::ptree request;
request.put ("action", "account_block_count");
Expand All @@ -6057,7 +6057,7 @@ TEST (rpc, simultaneous_calls)
std::array<std::unique_ptr<test_response>, num> test_responses;
for (int i = 0; i < num; ++i)
{
test_responses[i] = std::make_unique<test_response> (request, system.io_ctx);
test_responses[i] = std::make_unique<test_response> (request, *system.io_ctx);
}

std::promise<void> promise;
Expand Down Expand Up @@ -6087,7 +6087,7 @@ TEST (rpc, simultaneous_calls)
rpc.stop ();
system.stop ();
ipc_server.stop ();
system.io_ctx.stop ();
system.io_ctx->stop ();
runner.join ();
}

Expand Down
6 changes: 3 additions & 3 deletions nano/rpc_test/rpc_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ nano::test::rpc_context::rpc_context (std::shared_ptr<nano::rpc> & rpc_a, std::u

void nano::test::wait_response_impl (nano::test::system & system, rpc_context const & rpc_ctx, boost::property_tree::ptree & request, std::chrono::duration<double, std::nano> const & time, boost::property_tree::ptree & response_json)
{
test_response response (request, rpc_ctx.rpc->listening_port (), system.io_ctx);
test_response response (request, rpc_ctx.rpc->listening_port (), *system.io_ctx);
ASSERT_TIMELY (time, response.status != 0);
ASSERT_EQ (200, response.status);
response_json = response.json;
Expand All @@ -49,8 +49,8 @@ nano::test::rpc_context nano::test::add_rpc (nano::test::system & system, std::s
nano::rpc_config rpc_config (node_a->network_params.network, system.get_available_port (), true);
const auto ipc_tcp_port = ipc_server->listening_tcp_port ();
debug_assert (ipc_tcp_port.has_value ());
auto ipc_rpc_processor (std::make_unique<nano::ipc_rpc_processor> (system.io_ctx, rpc_config, ipc_tcp_port.value ()));
auto rpc (std::make_shared<nano::rpc> (system.io_ctx, rpc_config, *ipc_rpc_processor));
auto ipc_rpc_processor (std::make_unique<nano::ipc_rpc_processor> (*system.io_ctx, rpc_config, ipc_tcp_port.value ()));
auto rpc (std::make_shared<nano::rpc> (*system.io_ctx, rpc_config, *ipc_rpc_processor));
rpc->start ();

return rpc_context{ rpc, ipc_server, ipc_rpc_processor, node_rpc_config };
Expand Down
4 changes: 2 additions & 2 deletions nano/slow_test/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class rpc_wrapper
node_rpc_config{},
rpc_config{ node.network_params.network, port, true },
ipc{ node, node_rpc_config },
ipc_rpc_processor{ system.io_ctx, rpc_config },
rpc{ system.io_ctx, rpc_config, ipc_rpc_processor }
ipc_rpc_processor{ *system.io_ctx, rpc_config },
rpc{ *system.io_ctx, rpc_config, ipc_rpc_processor }
{
}

Expand Down

0 comments on commit 3f49cc1

Please sign in to comment.