Skip to content

Commit

Permalink
Close without holding the lock
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Nov 21, 2024
1 parent 8f54094 commit 29b1f0b
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 30 deletions.
57 changes: 43 additions & 14 deletions nano/node/transport/tcp_channels.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp_channels.hpp>

#include <ranges>

/*
* tcp_channels
*/
Expand Down Expand Up @@ -308,9 +310,9 @@ bool nano::transport::tcp_channels::track_reachout (nano::endpoint const & endpo

void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point cutoff_deadline)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto channels_l = all_channels ();

auto should_close = [this, cutoff_deadline] (auto const & channel) {
auto should_close = [this, cutoff_deadline] (std::shared_ptr<tcp_channel> const & channel) {
// Remove channels that haven't successfully sent a message within the cutoff time
if (auto last = channel->get_last_packet_sent (); last < cutoff_deadline)
{
Expand All @@ -332,27 +334,33 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point
return false;
};

for (auto const & entry : channels)
// Close stale channels without holding the mutex
for (auto const & channel : channels_l)
{
if (should_close (entry.channel))
if (should_close (channel))
{
entry.channel->close ();
channel->close ();
}
}

erase_if (channels, [this] (auto const & entry) {
if (!entry.channel->alive ())
{
node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ());
entry.channel->close ();
return true; // Erase
}
return false;
});
nano::unique_lock<nano::mutex> lock{ mutex };

// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_deadline));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);

// Erase dead channels from list, but close them outside of the lock
auto erased = nano::erase_if (channels, [this] (auto const & entry) {
return !entry.channel->alive ();
});

lock.unlock ();

for (auto const & entry : erased)
{
node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ());
entry.channel->close ();
}
}

void nano::transport::tcp_channels::keepalive ()
Expand Down Expand Up @@ -422,6 +430,27 @@ bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint)
return node.tcp_listener.connect (endpoint.address (), endpoint.port ());
}

auto nano::transport::tcp_channels::all_sockets () const -> std::deque<std::shared_ptr<tcp_socket>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = channels | std::views::transform ([] (auto const & entry) { return entry.socket; });
return { r.begin (), r.end () };
}

auto nano::transport::tcp_channels::all_servers () const -> std::deque<std::shared_ptr<tcp_server>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = channels | std::views::transform ([] (auto const & entry) { return entry.server; });
return { r.begin (), r.end () };
}

auto nano::transport::tcp_channels::all_channels () const -> std::deque<std::shared_ptr<tcp_channel>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = channels | std::views::transform ([] (auto const & entry) { return entry.channel; });
return { r.begin (), r.end () };
}

nano::container_info nano::transport::tcp_channels::container_info () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
Expand Down
4 changes: 4 additions & 0 deletions nano/node/transport/tcp_channels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class tcp_channels final
// Connection start
bool start_tcp (nano::endpoint const &);

std::deque<std::shared_ptr<tcp_socket>> all_sockets () const;
std::deque<std::shared_ptr<tcp_server>> all_servers () const;
std::deque<std::shared_ptr<tcp_channel>> all_channels () const;

nano::container_info container_info () const;

private: // Dependencies
Expand Down
36 changes: 21 additions & 15 deletions nano/node/transport/tcp_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,33 +164,39 @@ void nano::transport::tcp_listener::run_cleanup ()
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup);

cleanup ();
timeout ();
cleanup (lock);
debug_assert (lock.owns_lock ());

condition.wait_for (lock, 1s, [this] () { return stopped.load (); });
}
}

void nano::transport::tcp_listener::cleanup ()
void nano::transport::tcp_listener::cleanup (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());

// Erase dead connections
erase_if (connections, [this] (auto const & connection) {
if (!connection.socket->alive ())
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead);
logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", fmt::streamed (connection.endpoint));
connection.socket->close ();
return true;
}
return false;
});

// Erase completed attempts
erase_if (attempts, [this] (auto const & attempt) {
return attempt.task.ready ();
});

// Erase dead connections
auto erased = nano::erase_if (connections, [this] (auto const & connection) {
return !connection.socket->alive ();
});

lock.unlock ();

for (auto const & connection : erased)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead);
logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", fmt::streamed (connection.endpoint));
connection.socket->close ();
}

lock.lock ();
}

void nano::transport::tcp_listener::timeout ()
Expand All @@ -204,7 +210,7 @@ void nano::transport::tcp_listener::timeout ()
{
if (!attempt.task.ready () && attempt.start < cutoff)
{
attempt.task.cancel ();
attempt.task.cancel (); // Cancel is non-blocking and will return immediately, safe to call under lock

stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout);
logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)",
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/tcp_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class tcp_listener final
asio::awaitable<void> wait_available_slots () const;

void run_cleanup ();
void cleanup ();
void cleanup (nano::unique_lock<nano::mutex> &);
void timeout ();

enum class accept_result
Expand Down

0 comments on commit 29b1f0b

Please sign in to comment.