Skip to content

Commit

Permalink
Run packet processing from start
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 13, 2024
1 parent 5801ad0 commit f35b29f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 45 deletions.
96 changes: 54 additions & 42 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,14 @@ nano::network::network (nano::node & node_a, uint16_t port_a) :
tcp_channels (node_a, [this] (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel) {
inbound (message, channel);
}),
port (port_a), disconnect_observer ([] () {})
port (port_a)
{
for (std::size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i)
{
packet_processing_threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () {
nano::thread_role::set (nano::thread_role::name::packet_processing);
try
{
tcp_channels.process_messages ();
}
catch (boost::system::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::runtime_error & err)
{
node.logger.critical (nano::log::type::network, "Error: {}", err.what ());
release_assert (false);
}
catch (...)
{
node.logger.critical (nano::log::type::network, "Unknown error");
release_assert (false);
}
});
}
}

nano::network::~network ()
{
stop ();
// All threads must be stopped before this destructor
debug_assert (processing_threads.empty ());
}

void nano::network::start ()
Expand All @@ -68,26 +39,67 @@ void nano::network::start ()
{
ongoing_cleanup ();
}

ongoing_syn_cookie_cleanup ();
ongoing_keepalive ();

if (!node.flags.disable_tcp_realtime)
{
tcp_channels.start ();

for (std::size_t i = 0; i < node.config.network_threads; ++i)
{
processing_threads.emplace_back (nano::thread_attributes::get_default (), [this] () {
nano::thread_role::set (nano::thread_role::name::packet_processing);
run_processing ();
});
}
}
ongoing_keepalive ();
}

void nano::network::stop ()
{
if (!stopped.exchange (true))
stopped = true;

tcp_channels.stop ();
resolver.cancel ();
tcp_message_manager.stop ();

for (auto & thread : processing_threads)
{
tcp_channels.stop ();
resolver.cancel ();
tcp_message_manager.stop ();
port = 0;
for (auto & thread : packet_processing_threads)
{
thread.join ();
}
thread.join ();
}
processing_threads.clear ();

port = 0;
}

void nano::network::run_processing ()
{
try
{
// TODO: Move responsibility of packet queuing and processing to the message_processor class
tcp_channels.process_messages ();
}
catch (boost::system::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::runtime_error & err)
{
node.logger.critical (nano::log::type::network, "Error: {}", err.what ());
release_assert (false);
}
catch (...)
{
node.logger.critical (nano::log::type::network, "Unknown error");
release_assert (false);
}
}

Expand Down
9 changes: 6 additions & 3 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class network final
nano::node_id_handshake::response_payload prepare_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) const;

private:
void run_processing ();
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);

private: // Dependencies
Expand All @@ -140,18 +141,20 @@ class network final
nano::networks const id;
nano::syn_cookies syn_cookies;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::peer_exclusion excluded_peers;
nano::tcp_message_manager tcp_message_manager;
nano::network_filter publish_filter;
nano::transport::tcp_channels tcp_channels;
std::atomic<uint16_t> port{ 0 };
std::function<void ()> disconnect_observer;

public: // Callbacks
std::function<void ()> disconnect_observer{ [] () {} };
// Called when a new channel is observed
std::function<void (std::shared_ptr<nano::transport::channel>)> channel_observer;
std::function<void (std::shared_ptr<nano::transport::channel>)> channel_observer{ [] (auto) {} };

private:
std::atomic<bool> stopped{ false };
std::vector<boost::thread> processing_threads;

public:
static unsigned const broadcast_interval_ms = 10;
Expand Down

0 comments on commit f35b29f

Please sign in to comment.