Skip to content

Commit

Permalink
almost done but not yet
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc committed Aug 9, 2024
1 parent ad02c78 commit 7ae231a
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 35 deletions.
4 changes: 2 additions & 2 deletions worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class DepLibUring

class LibUring;

thread_local static LibUring* liburing;
// Whether liburing is enabled or not after runtime checks.
static bool enabled{ false };
static bool enabled;
thread_local static LibUring* liburing;

private:
// Private singleton.
Expand Down
3 changes: 2 additions & 1 deletion worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <sys/utsname.h>

/* Static variables. */

bool DepLibUring::enabled{ false };
/* liburing instance per thread. */
thread_local DepLibUring::LibUring* DepLibUring::liburing{ nullptr };
/* Completion queue entry array used to retrieve processes tasks. */
Expand Down Expand Up @@ -186,6 +186,7 @@ void DepLibUring::StartPollingCQEs()

MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not supported");


DepLibUring::liburing->StartPollingCQEs();
}

Expand Down
7 changes: 5 additions & 2 deletions worker/src/DepUsrSCTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,11 @@ void DepUsrSCTP::Checker::OnTimer(TimerHandle* /*timer*/)
usrsctp_handle_timers(elapsedMs);

#ifdef MS_LIBURING_SUPPORTED
// Submit all prepared submission entries.
DepLibUring::Submit();
if (DepLibUring::IsEnabled())
{
// Submit all prepared submission entries.
DepLibUring::Submit();
}
#endif

this->lastCalledAtMs = nowMs;
Expand Down
32 changes: 22 additions & 10 deletions worker/src/RTC/Router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,11 @@ namespace RTC
std::shared_ptr<RTC::RtpPacket> sharedPacket;

#ifdef MS_LIBURING_SUPPORTED
// Activate liburing usage.
DepLibUring::SetActive();
if (DepLibUring::IsEnabled())
{
// Activate liburing usage.
DepLibUring::SetActive();
}
#endif

for (auto* consumer : consumers)
Expand All @@ -683,8 +686,11 @@ namespace RTC
}

#ifdef MS_LIBURING_SUPPORTED
// Submit all prepared submission entries.
DepLibUring::Submit();
if (DepLibUring::IsEnabled())
{
// Submit all prepared submission entries.
DepLibUring::Submit();
}
#endif
}

Expand Down Expand Up @@ -925,10 +931,13 @@ namespace RTC
if (!dataConsumers.empty())
{
#ifdef MS_LIBURING_SUPPORTED
// Activate liburing usage.
// The effective sending could be synchronous, thus we would send those
// messages within a single system call.
DepLibUring::SetActive();
if (DepLibUring::IsEnabled())
{
// Activate liburing usage.
// The effective sending could be synchronous, thus we would send those
// messages within a single system call.
DepLibUring::SetActive();
}
#endif

for (auto* dataConsumer : dataConsumers)
Expand All @@ -937,8 +946,11 @@ namespace RTC
}

#ifdef MS_LIBURING_SUPPORTED
// Submit all prepared submission entries.
DepLibUring::Submit();
if (DepLibUring::IsEnabled())
{
// Submit all prepared submission entries.
DepLibUring::Submit();
}
#endif
}
}
Expand Down
14 changes: 10 additions & 4 deletions worker/src/RTC/RtpStreamSend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,11 @@ namespace RTC
this->nackCount++;

#ifdef MS_LIBURING_SUPPORTED
// Activate liburing usage.
DepLibUring::SetActive();
if (DepLibUring::IsEnabled())
{
// Activate liburing usage.
DepLibUring::SetActive();
}
#endif

for (auto it = nackPacket->Begin(); it != nackPacket->End(); ++it)
Expand Down Expand Up @@ -173,8 +176,11 @@ namespace RTC
}

#ifdef MS_LIBURING_SUPPORTED
// Submit all prepared submission entries.
DepLibUring::Submit();
if (DepLibUring::IsEnabled())
{
// Submit all prepared submission entries.
DepLibUring::Submit();
}
#endif
}

Expand Down
1 change: 1 addition & 0 deletions worker/src/RTC/SrtpSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ namespace RTC
uint8_t* encryptBuffer = EncryptBuffer;

#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
if (!DepLibUring::IsActive())
{
Expand Down
14 changes: 10 additions & 4 deletions worker/src/RTC/Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2158,8 +2158,11 @@ namespace RTC
std::unique_ptr<RTC::RTCP::CompoundPacket> packet{ new RTC::RTCP::CompoundPacket() };

#ifdef MS_LIBURING_SUPPORTED
// Activate liburing usage.
DepLibUring::SetActive();
if (DepLibUring::IsEnabled())
{
// Activate liburing usage.
DepLibUring::SetActive();
}
#endif

for (auto& kv : this->mapConsumers)
Expand Down Expand Up @@ -2207,8 +2210,11 @@ namespace RTC
}

#ifdef MS_LIBURING_SUPPORTED
// Submit all prepared submission entries.
DepLibUring::Submit();
if (DepLibUring::IsEnabled())
{
// Submit all prepared submission entries.
DepLibUring::Submit();
}
#endif
}

Expand Down
14 changes: 10 additions & 4 deletions worker/src/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel)
DepUsrSCTP::CreateChecker();

#ifdef MS_LIBURING_SUPPORTED
// Start polling CQEs, which will create a uv_pool_t handle.
DepLibUring::StartPollingCQEs();
if (DepLibUring::IsEnabled())
{
// Start polling CQEs, which will create a uv_pool_t handle.
DepLibUring::StartPollingCQEs();
}
#endif

// Tell the Node process that we are running.
Expand Down Expand Up @@ -106,8 +109,11 @@ void Worker::Close()
DepUsrSCTP::CloseChecker();

#ifdef MS_LIBURING_SUPPORTED
// Stop polling CQEs, which will close the uv_pool_t handle.
DepLibUring::StopPollingCQEs();
if (DepLibUring::IsEnabled())
{
// Stop polling CQEs, which will close the uv_pool_t handle.
DepLibUring::StopPollingCQEs();
}
#endif

// Close the Channel.
Expand Down
12 changes: 8 additions & 4 deletions worker/src/handles/TcpConnectionHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,14 @@ void TcpConnectionHandle::Start()
}

#ifdef MS_LIBURING_SUPPORTED
err = uv_fileno(reinterpret_cast<uv_handle_t*>(this->uvHandle), std::addressof(this->fd));

if (err != 0)
if (DepLibUring::IsEnabled())
{
MS_THROW_ERROR("uv_fileno() failed: %s", uv_strerror(err));
err = uv_fileno(reinterpret_cast<uv_handle_t*>(this->uvHandle), std::addressof(this->fd));

if (err != 0)
{
MS_THROW_ERROR("uv_fileno() failed: %s", uv_strerror(err));
}
}
#endif
}
Expand Down Expand Up @@ -209,6 +212,7 @@ void TcpConnectionHandle::Write(
}

#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
if (!DepLibUring::IsActive())
{
Expand Down
12 changes: 8 additions & 4 deletions worker/src/handles/UdpSocketHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,14 @@ UdpSocketHandle::UdpSocketHandle(uv_udp_t* uvHandle) : uvHandle(uvHandle)
}

#ifdef MS_LIBURING_SUPPORTED
err = uv_fileno(reinterpret_cast<uv_handle_t*>(this->uvHandle), std::addressof(this->fd));

if (err != 0)
if (DepLibUring::IsEnabled())
{
MS_THROW_ERROR("uv_fileno() failed: %s", uv_strerror(err));
err = uv_fileno(reinterpret_cast<uv_handle_t*>(this->uvHandle), std::addressof(this->fd));

if (err != 0)
{
MS_THROW_ERROR("uv_fileno() failed: %s", uv_strerror(err));
}
}
#endif
}
Expand Down Expand Up @@ -144,6 +147,7 @@ void UdpSocketHandle::Send(
}

#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
if (!DepLibUring::IsActive())
{
Expand Down

0 comments on commit 7ae231a

Please sign in to comment.