Skip to content

Commit

Permalink
Fix epoll
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks committed Jan 2, 2025
1 parent 10f8e7a commit d86e76c
Showing 1 changed file with 30 additions and 44 deletions.
74 changes: 30 additions & 44 deletions src/platform/datapath_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ typedef struct CXPLAT_RECV_MSG_CONTROL_BUFFER {
#else
#define CXPLAT_DBG_ASSERT_CMSG(CMsg, type)
#endif

CXPLAT_EVENT_COMPLETION CxPlatSocketContextUninitializeComplete;
CXPLAT_EVENT_COMPLETION CxPlatSocketContextFlushTxEventComplete;
CXPLAT_EVENT_COMPLETION CxPlatSocketContextProcessIoCompletion;

void
CxPlatDataPathCalculateFeatureSupport(
Expand Down Expand Up @@ -572,13 +576,10 @@ CxPlatSocketContextSqeInitialize(
BOOLEAN IoSqeInitialized = FALSE;
BOOLEAN FlushTxInitialized = FALSE;

SocketContext->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN;
SocketContext->IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO;
SocketContext->FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX;

if (!CxPlatSqeInitialize(
SocketContext->DatapathPartition->EventQ,
&SocketContext->ShutdownSqe.Sqe,
CxPlatSocketContextUninitializeComplete,
&SocketContext->ShutdownSqe)) {
Status = errno;
QuicTraceEvent(
Expand All @@ -593,7 +594,7 @@ CxPlatSocketContextSqeInitialize(

if (!CxPlatSqeInitialize(
SocketContext->DatapathPartition->EventQ,
&SocketContext->IoSqe.Sqe,
CxPlatSocketContextProcessIoCompletion,
&SocketContext->IoSqe)) {
Status = errno;
QuicTraceEvent(
Expand All @@ -608,7 +609,7 @@ CxPlatSocketContextSqeInitialize(

if (!CxPlatSqeInitialize(
SocketContext->DatapathPartition->EventQ,
&SocketContext->FlushTxSqe.Sqe,
CxPlatSocketContextFlushTxEventComplete,
&SocketContext->FlushTxSqe)) {
Status = errno;
QuicTraceEvent(
Expand All @@ -627,13 +628,13 @@ CxPlatSocketContextSqeInitialize(

if (QUIC_FAILED(Status)) {
if (ShutdownSqeInitialized) {
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe.Sqe);
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe);
}
if (IoSqeInitialized) {
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe.Sqe);
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe);
}
if (FlushTxInitialized) {
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe.Sqe);
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe);
}
}

Expand Down Expand Up @@ -1127,9 +1128,11 @@ CxPlatSocketRelease(

void
CxPlatSocketContextUninitializeComplete(
_In_ CXPLAT_SOCKET_CONTEXT* SocketContext
_In_ CXPLAT_CQE* Cqe
)
{
CXPLAT_SOCKET_CONTEXT* SocketContext =
CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_SOCKET_CONTEXT, ShutdownSqe);
#if DEBUG
CXPLAT_DBG_ASSERT(!SocketContext->Freed);
SocketContext->Freed = TRUE;
Expand All @@ -1154,9 +1157,9 @@ CxPlatSocketContextUninitializeComplete(
}

if (SocketContext->SqeInitialized) {
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe.Sqe);
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe.Sqe);
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe.Sqe);
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe);
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe);
CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe);
}

CxPlatLockUninitialize(&SocketContext->TxQueueLock);
Expand Down Expand Up @@ -1210,7 +1213,6 @@ CxPlatSocketContextUninitialize(
CXPLAT_FRE_ASSERT(
CxPlatEventQEnqueue(
SocketContext->DatapathPartition->EventQ,
&SocketContext->ShutdownSqe.Sqe,
&SocketContext->ShutdownSqe));
}
}
Expand Down Expand Up @@ -2425,7 +2427,6 @@ SocketSend(
CXPLAT_FRE_ASSERT(
CxPlatEventQEnqueue(
SocketContext->DatapathPartition->EventQ,
&SocketContext->FlushTxSqe.Sqe,
&SocketContext->FlushTxSqe));
}
return;
Expand Down Expand Up @@ -2787,6 +2788,16 @@ CxPlatSocketContextFlushTxQueue(
}
}

void
CxPlatSocketContextFlushTxEventComplete(
_In_ CXPLAT_CQE* Cqe
)
{
CXPLAT_SOCKET_CONTEXT* SocketContext =
CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_SOCKET_CONTEXT, FlushTxSqe);
CxPlatSocketContextFlushTxQueue(SocketContext, FALSE);
}

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
CxPlatSocketGetTcpStatistics(
Expand All @@ -2800,11 +2811,13 @@ CxPlatSocketGetTcpStatistics(
}

void
CxPlatDataPathSocketProcessIoCompletion(
_In_ CXPLAT_SOCKET_CONTEXT* SocketContext,
CxPlatSocketContextProcessIoCompletion(
_In_ CXPLAT_CQE* Cqe
)
{
CXPLAT_SOCKET_CONTEXT* SocketContext =
CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_SOCKET_CONTEXT, IoSqe);

if (CxPlatRundownAcquire(&SocketContext->UpcallRundown)) {
if (EPOLLERR & Cqe->events) {
CxPlatSocketHandleErrors(SocketContext);
Expand All @@ -2827,30 +2840,3 @@ CxPlatDataPathSocketProcessIoCompletion(
CxPlatRundownRelease(&SocketContext->UpcallRundown);
}
}

void
DataPathProcessCqe(
_In_ CXPLAT_CQE* Cqe
)
{
switch (CxPlatCqeType(Cqe)) {
case CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN: {
CXPLAT_SOCKET_CONTEXT* SocketContext =
CXPLAT_CONTAINING_RECORD(CxPlatCqeUserData(Cqe), CXPLAT_SOCKET_CONTEXT, ShutdownSqe);
CxPlatSocketContextUninitializeComplete(SocketContext);
break;
}
case CXPLAT_CQE_TYPE_SOCKET_IO: {
CXPLAT_SOCKET_CONTEXT* SocketContext =
CXPLAT_CONTAINING_RECORD(CxPlatCqeUserData(Cqe), CXPLAT_SOCKET_CONTEXT, IoSqe);
CxPlatDataPathSocketProcessIoCompletion(SocketContext, Cqe);
break;
}
case CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX: {
CXPLAT_SOCKET_CONTEXT* SocketContext =
CXPLAT_CONTAINING_RECORD(CxPlatCqeUserData(Cqe), CXPLAT_SOCKET_CONTEXT, FlushTxSqe);
CxPlatSocketContextFlushTxQueue(SocketContext, FALSE);
break;
}
}
}

0 comments on commit d86e76c

Please sign in to comment.