Skip to content

Commit

Permalink
Fix epoll TCP implementation (#3940)
Browse files Browse the repository at this point in the history
  • Loading branch information
csujedihy authored Oct 25, 2023
1 parent 8e1eb1a commit df4cc62
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 85 deletions.
171 changes: 86 additions & 85 deletions src/platform/datapath_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ typedef struct CXPLAT_SEND_DATA {
//
uint8_t Buffer[CXPLAT_LARGE_IO_BUFFER_SIZE];

//
// The total number of bytes buffer sent (only used for TCP).
//
uint32_t TotalBytesSent;

//
// IO vectors used for sends on the socket.
//
Expand Down Expand Up @@ -1299,7 +1304,6 @@ CxPlatSocketCreateTcpInternal(
{
QUIC_STATUS Status;
uint16_t PartitionIndex;
BOOLEAN IsServerSocket = RemoteAddress == NULL;

CXPLAT_DBG_ASSERT(Datapath->TcpHandlers.Receive != NULL);

Expand Down Expand Up @@ -1334,6 +1338,8 @@ CxPlatSocketCreateTcpInternal(
} else {
Binding->LocalAddress.Ip.sa_family = QUIC_ADDRESS_FAMILY_INET6;
}

Binding->RecvBufLen = Datapath->RecvBlockSize - Datapath->RecvBlockBufferOffset;
PartitionIndex =
RemoteAddress ?
((uint16_t)(CxPlatProcCurrentNumber() % Datapath->PartitionCount)) : 0;
Expand All @@ -1360,16 +1366,6 @@ CxPlatSocketCreateTcpInternal(
goto Exit;
}

if (IsServerSocket) {
//
// The return value is being ignored here, as if a system does not support
// bpf we still want the server to work. If this happens, the sockets will
// round robin, but each flow will be sent to the same socket, just not
// based on RSS.
//
(void)CxPlatSocketConfigureRss(SocketContext, 1);
}

if (Type == CXPLAT_SOCKET_TCP_SERVER) {
*NewBinding = Binding;
Binding = NULL;
Expand Down Expand Up @@ -1960,78 +1956,83 @@ CxPlatSocketReceiveMessages(
}

void
CxPlatSocketTcpRecvComplete(
CxPlatSocketReceiveTcpData(
_In_ CXPLAT_SOCKET_CONTEXT* SocketContext
)
{
CXPLAT_DATAPATH_PARTITION* DatapathPartition = SocketContext->DatapathPartition;
DATAPATH_RX_IO_BLOCK* IoBlock = NULL;

uint32_t RetryCount = 0;
//
// Read in a loop until the blocking error is encountered unless EOF or other failures
// are met.
//
do {
IoBlock = CxPlatPoolAlloc(&DatapathPartition->RecvBlockPool);
} while (IoBlock == NULL && ++RetryCount < 10);
if (IoBlock == NULL) {
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"DATAPATH_RX_IO_BLOCK",
0);
goto Exit;
}
uint32_t RetryCount = 0;
do {
IoBlock = CxPlatPoolAlloc(&DatapathPartition->RecvBlockPool);
} while (IoBlock == NULL && ++RetryCount < 10);
if (IoBlock == NULL) {
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"DATAPATH_RX_IO_BLOCK",
0);
goto Exit;
}

IoBlock->OwningPool = &DatapathPartition->RecvBlockPool;
IoBlock->Route.State = RouteResolved;
IoBlock->Route.Queue = SocketContext;
IoBlock->RefCount = 0;
IoBlock->OwningPool = &DatapathPartition->RecvBlockPool;
IoBlock->Route.State = RouteResolved;
IoBlock->Route.Queue = SocketContext;
IoBlock->RefCount = 0;

uint8_t* Buffer = (uint8_t*)IoBlock + DatapathPartition->Datapath->RecvBlockBufferOffset;
int NumberOfBytesTransferred = read(SocketContext->SocketFd, Buffer, CXPLAT_LARGE_IO_BUFFER_SIZE);
uint8_t* Buffer = (uint8_t*)IoBlock + DatapathPartition->Datapath->RecvBlockBufferOffset;
int NumberOfBytesTransferred = read(SocketContext->SocketFd, Buffer, SocketContext->Binding->RecvBufLen);

if (NumberOfBytesTransferred == 0) {
if (!SocketContext->Binding->DisconnectIndicated) {
SocketContext->Binding->DisconnectIndicated = TRUE;
SocketContext->Binding->Datapath->TcpHandlers.Connect(
if (NumberOfBytesTransferred == 0) {
if (!SocketContext->Binding->DisconnectIndicated) {
SocketContext->Binding->DisconnectIndicated = TRUE;
SocketContext->Binding->Datapath->TcpHandlers.Connect(
SocketContext->Binding,
SocketContext->Binding->ClientContext,
FALSE);
}
goto Exit;
} else if (NumberOfBytesTransferred < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
QuicTraceEvent(
DatapathErrorStatus,
"[data][%p] ERROR, %u, %s.",
SocketContext->Binding,
errno,
"read failed");
}
goto Exit;
} else {
DATAPATH_RX_PACKET* Datagram = (DATAPATH_RX_PACKET*)(IoBlock + 1);
Datagram->IoBlock = IoBlock;
CXPLAT_RECV_DATA* Data = &Datagram->Data;

Data->Next = NULL;
Data->Buffer = Buffer;
Data->BufferLength = NumberOfBytesTransferred;
Data->Route = &IoBlock->Route;
Data->PartitionIndex = SocketContext->DatapathPartition->PartitionIndex;
Data->TypeOfService = 0;
Data->Allocated = TRUE;
Data->Route->DatapathType = Data->DatapathType = CXPLAT_DATAPATH_TYPE_USER;
Data->QueuedOnConnection = FALSE;
IoBlock->RefCount++;
IoBlock = NULL;

SocketContext->Binding->Datapath->TcpHandlers.Receive(
SocketContext->Binding,
SocketContext->Binding->ClientContext,
FALSE);
Data);
}
goto Exit;
} else if (NumberOfBytesTransferred < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
QuicTraceEvent(
DatapathErrorStatus,
"[data][%p] ERROR, %u, %s.",
SocketContext->Binding,
errno,
"read failed");
}
goto Exit;
} else {
DATAPATH_RX_PACKET* Datagram = (DATAPATH_RX_PACKET*)(IoBlock + 1);
Datagram->IoBlock = IoBlock;
CXPLAT_RECV_DATA* Data = &Datagram->Data;

Data->Next = NULL;
Data->Buffer = Buffer;
Data->BufferLength = NumberOfBytesTransferred;
Data->Route = &IoBlock->Route;
Data->PartitionIndex = SocketContext->DatapathPartition->PartitionIndex;
Data->TypeOfService = 0;
Data->Allocated = TRUE;
Data->Route->DatapathType = Data->DatapathType = CXPLAT_DATAPATH_TYPE_USER;
Data->QueuedOnConnection = FALSE;
IoBlock->RefCount++;
IoBlock = NULL;

SocketContext->Binding->Datapath->TcpHandlers.Receive(
SocketContext->Binding,
SocketContext->Binding->ClientContext,
Data);
}
} while (TRUE);

Exit:

if (IoBlock) {
CxPlatPoolFree(&DatapathPartition->RecvBlockPool, IoBlock);
}
Expand All @@ -2049,7 +2050,7 @@ CxPlatSocketReceive(
CxPlatSocketReceiveMessages(SocketContext);
}
} else {
CxPlatSocketTcpRecvComplete(SocketContext);
CxPlatSocketReceiveTcpData(SocketContext);
}
}

Expand Down Expand Up @@ -2096,6 +2097,7 @@ SendDataAlloc(
SendData->ClientBuffer.Buffer = SendData->Buffer;
SendData->ClientBuffer.Length = 0;
SendData->TotalSize = 0;
SendData->TotalBytesSent = 0;
SendData->SegmentSize =
(Socket->Type != CXPLAT_SOCKET_UDP ||
Socket->Datapath->Features & CXPLAT_DATAPATH_FEATURE_SEND_SEGMENTATION)
Expand Down Expand Up @@ -2459,24 +2461,17 @@ CxPlatSendDataSendTcp(
_In_ CXPLAT_SEND_DATA* SendData
)
{
uint32_t TotalSize = SendData->TotalSize;
uint8_t *Buffer = SendData->Buffer;
while (TotalSize > 0) {
while (SendData->TotalSize > SendData->TotalBytesSent) {
int BytesSent =
send(
SendData->SocketContext->SocketFd,
Buffer,
TotalSize,
SendData->Buffer + SendData->TotalBytesSent,
SendData->TotalSize - SendData->TotalBytesSent,
0);
if (BytesSent < 0) {
// forcibly send inline
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
return FALSE;
}
Buffer += BytesSent;
TotalSize -= BytesSent;
SendData->TotalBytesSent += BytesSent;
}

return TRUE;
Expand All @@ -2493,14 +2488,20 @@ CxPlatSendDataSend(

QUIC_STATUS Status = QUIC_STATUS_SUCCESS;
CXPLAT_SOCKET_CONTEXT* SocketContext = SendData->SocketContext;
BOOLEAN Success =
SocketType == CXPLAT_SOCKET_UDP ?
BOOLEAN Success;

if (SocketType == CXPLAT_SOCKET_UDP) {
Success =
#ifdef UDP_SEGMENT
SendData->SegmentationSupported ?
CxPlatSendDataSendSegmented(SendData) :
CxPlatSendDataSendSegmented(SendData) : CxPlatSendDataSendMessages(SendData);
#else
CxPlatSendDataSendMessages(SendData);
#endif
CxPlatSendDataSendMessages(SendData) :
CxPlatSendDataSendTcp(SendData);
} else {
Success = CxPlatSendDataSendTcp(SendData);
}

if (!Success) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
Status = QUIC_STATUS_PENDING;
Expand Down
5 changes: 5 additions & 0 deletions src/platform/platform_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,11 @@ typedef struct CXPLAT_SOCKET {
//
uint16_t Mtu;

//
// The size of a receive buffer's payload.
//
uint32_t RecvBufLen;

//
// Indicates the binding connected to a remote IP address.
//
Expand Down

0 comments on commit df4cc62

Please sign in to comment.