Skip to content

Commit

Permalink
Move tcp_message_manager into tcp_channels class
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 15, 2024
1 parent b67d125 commit e1d1517
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 27 deletions.
2 changes: 0 additions & 2 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ nano::network::network (nano::node & node, uint16_t port) :
id{ nano::network_constants::active_network },
syn_cookies{ node.network_params.network.max_peers_per_ip, node.logger },
resolver{ node.io_ctx },
tcp_message_manager{ node.config.tcp_incoming_connections_max },
publish_filter{ 256 * 1024 },
tcp_channels{ node, [this] (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel) {
inbound (message, channel);
Expand Down Expand Up @@ -73,7 +72,6 @@ void nano::network::stop ()

tcp_channels.stop ();
resolver.cancel ();
tcp_message_manager.stop ();

for (auto & thread : processing_threads)
{
Expand Down
22 changes: 0 additions & 22 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,6 @@ namespace nano
{
class node;

class tcp_message_manager final
{
public:
tcp_message_manager (unsigned incoming_connections_max_a);
void put_message (nano::tcp_message_item const & item_a);
nano::tcp_message_item get_message ();
// Stop container and notify waiting threads
void stop ();

private:
nano::mutex mutex;
nano::condition_variable producer_condition;
nano::condition_variable consumer_condition;
std::deque<nano::tcp_message_item> entries;
unsigned max_entries;
static unsigned const max_entries_per_connection = 16;
bool stopped{ false };

friend class network_tcp_message_manager_Test;
};

/**
* Node ID cookies for node ID handshakes
*/
Expand Down Expand Up @@ -142,7 +121,6 @@ class network final
nano::syn_cookies syn_cookies;
boost::asio::ip::udp::resolver resolver;
nano::peer_exclusion excluded_peers;
nano::tcp_message_manager tcp_message_manager;
nano::network_filter publish_filter;
nano::transport::tcp_channels tcp_channels;
std::atomic<uint16_t> port{ 0 };
Expand Down
6 changes: 5 additions & 1 deletion nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ void nano::transport::channel_tcp::operator() (nano::object_stream & obs) const

nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink) :
node{ node },
message_manager{ node.config.tcp_incoming_connections_max },
sink{ std::move (sink) }
{
}
Expand Down Expand Up @@ -295,7 +296,7 @@ void nano::transport::tcp_channels::process_messages ()
{
while (!stopped)
{
auto item (node.network.tcp_message_manager.get_message ());
auto item = message_manager.get_message ();
if (item.message != nullptr)
{
process_message (*item.message, item.endpoint, item.node_id, item.socket);
Expand Down Expand Up @@ -364,6 +365,9 @@ void nano::transport::tcp_channels::stop ()
{
stopped = true;
nano::unique_lock<nano::mutex> lock{ mutex };

message_manager.stop ();

// Close all TCP sockets
for (auto const & channel : channels)
{
Expand Down
31 changes: 30 additions & 1 deletion nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,28 @@ class tcp_message_item final
nano::account node_id;
std::shared_ptr<nano::transport::socket> socket;
};

class tcp_message_manager final
{
public:
tcp_message_manager (unsigned incoming_connections_max_a);
void put_message (nano::tcp_message_item const & item_a);
nano::tcp_message_item get_message ();
// Stop container and notify waiting threads
void stop ();

private:
nano::mutex mutex;
nano::condition_variable producer_condition;
nano::condition_variable consumer_condition;
std::deque<nano::tcp_message_item> entries;
unsigned max_entries;
static unsigned const max_entries_per_connection = 16;
bool stopped{ false };

friend class network_tcp_message_manager_Test;
};

namespace transport
{
class tcp_server;
Expand Down Expand Up @@ -136,10 +158,14 @@ namespace transport
// Connection start
void start_tcp (nano::endpoint const &);
void start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const &, nano::endpoint const &, std::shared_ptr<std::vector<uint8_t>> const &);

private: // Dependencies
nano::node & node;

public:
nano::tcp_message_manager message_manager;

private:
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink;
class endpoint_tag
{
};
Expand Down Expand Up @@ -255,6 +281,9 @@ namespace transport
mi::member<tcp_endpoint_attempt, std::chrono::steady_clock::time_point, &tcp_endpoint_attempt::last_attempt>>>>
attempts;
// clang-format on

private:
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink;
std::atomic<bool> stopped{ false };

friend class network_peer_max_tcp_attempts_subnetwork_Test;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr<nano::message>
{
return;
}
node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket });
node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket });
}

/*
Expand Down

0 comments on commit e1d1517

Please sign in to comment.