Skip to content

Commit

Permalink
RedisConnection#Connect(): get rid of spin lock
Browse files Browse the repository at this point in the history
Instead of IoEngine::YieldCurrentCoroutine(yc) until m_Queues.FutureResponseActions.empty(), async-wait a CV which is updated along with m_Queues.FutureResponseActions.
  • Loading branch information
Al2Klimov committed Dec 6, 2024
1 parent 4b884ea commit c0dd426
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
19 changes: 9 additions & 10 deletions lib/icingadb/redisconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int p
m_DbIndex(db), m_CertPath(std::move(certPath)), m_KeyPath(std::move(keyPath)), m_Insecure(insecure),
m_CaPath(std::move(caPath)), m_CrlPath(std::move(crlPath)), m_TlsProtocolmin(std::move(tlsProtocolmin)),
m_CipherList(std::move(cipherList)), m_ConnectTimeout(connectTimeout), m_DebugInfo(std::move(di)), m_Connecting(false), m_Connected(false),
m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent)
m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io), m_NoQueuedReads(io, true), m_LogStatsTimer(io), m_Parent(parent)
{
if (useTls && m_Path.IsEmpty()) {
UpdateTLSContext();
Expand Down Expand Up @@ -302,12 +302,6 @@ void RedisConnection::Connect(asio::yield_context& yc)

boost::asio::deadline_timer timer (m_Strand.context());

auto waitForReadLoop ([this, &yc]() {
while (!m_Queues.FutureResponseActions.empty()) {
IoEngine::YieldCurrentCoroutine(yc);
}
});

for (;;) {
try {
if (m_Path.IsEmpty()) {
Expand Down Expand Up @@ -340,7 +334,7 @@ void RedisConnection::Connect(asio::yield_context& yc)
}

Handshake(conn, yc);
waitForReadLoop();
m_NoQueuedReads.Wait(yc);
m_TlsConn = std::move(conn);
} else {
Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
Expand All @@ -352,7 +346,7 @@ void RedisConnection::Connect(asio::yield_context& yc)

icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
Handshake(conn, yc);
waitForReadLoop();
m_NoQueuedReads.Wait(yc);
m_TcpConn = std::move(conn);
}
} else {
Expand All @@ -365,7 +359,7 @@ void RedisConnection::Connect(asio::yield_context& yc)

conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
Handshake(conn, yc);
waitForReadLoop();
m_NoQueuedReads.Wait(yc);
m_UnixConn = std::move(conn);
}

Expand Down Expand Up @@ -477,6 +471,7 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
}

m_QueuedReads.Clear();
m_NoQueuedReads.Set();
}
}

Expand Down Expand Up @@ -577,6 +572,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
}

m_QueuedReads.Set();
m_NoQueuedReads.Clear();
}

if (next.FireAndForgetQueries) {
Expand Down Expand Up @@ -613,6 +609,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
}

m_QueuedReads.Set();
m_NoQueuedReads.Clear();
}

if (next.GetResultOfQuery) {
Expand All @@ -638,6 +635,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
}

m_QueuedReads.Set();
m_NoQueuedReads.Clear();
}

if (next.GetResultsOfQueries) {
Expand All @@ -660,6 +658,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});

m_QueuedReads.Set();
m_NoQueuedReads.Clear();
}

if (next.Callback) {
Expand Down
2 changes: 1 addition & 1 deletion lib/icingadb/redisconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ namespace icinga
std::set<QueryPriority> m_SuppressedQueryKinds;

// Indicate that there's something to send/receive
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
AsioConditionVariable m_QueuedWrites, m_QueuedReads, m_NoQueuedReads;

std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback;

Expand Down

0 comments on commit c0dd426

Please sign in to comment.