Skip to content

Commit

Permalink
Prunable Pool Allocator (#4517)
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks authored Sep 5, 2024
1 parent e6c874a commit c68abdf
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ CXPLAT_THREAD_CALLBACK(QuicWorkerThread, Context)
CXPLAT_EXECUTION_CONTEXT* EC = &Worker->ExecutionContext;

CXPLAT_EXECUTION_STATE State = {
0, CxPlatTimeUs64(), UINT32_MAX, 0, CxPlatCurThreadID()
0, 0, 0, UINT32_MAX, 0, CxPlatCurThreadID()
};

QuicTraceEvent(
Expand Down
22 changes: 22 additions & 0 deletions src/generated/linux/platform_worker.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#define _clog_MACRO_QuicTraceLogInfo 1
#define QuicTraceLogInfo(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__)))
#endif
#ifndef _clog_MACRO_QuicTraceLogVerbose
#define _clog_MACRO_QuicTraceLogVerbose 1
#define QuicTraceLogVerbose(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__)))
#endif
#ifndef _clog_MACRO_QuicTraceEvent
#define _clog_MACRO_QuicTraceEvent 1
#define QuicTraceEvent(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__)))
Expand Down Expand Up @@ -61,6 +65,24 @@ tracepoint(CLOG_PLATFORM_WORKER_C, PlatformWorkerThreadStop , arg2);\



/*----------------------------------------------------------
// Decoder Ring for PlatformWorkerProcessPools
// [ lib][%p] Processing pools
// QuicTraceLogVerbose(
PlatformWorkerProcessPools,
"[ lib][%p] Processing pools",
Worker);
// arg2 = arg2 = Worker = arg2
----------------------------------------------------------*/
#ifndef _clog_3_ARGS_TRACE_PlatformWorkerProcessPools
#define _clog_3_ARGS_TRACE_PlatformWorkerProcessPools(uniqueId, encoded_arg_string, arg2)\
tracepoint(CLOG_PLATFORM_WORKER_C, PlatformWorkerProcessPools , arg2);\

#endif




/*----------------------------------------------------------
// Decoder Ring for AllocFailure
// Allocation of '%s' failed. (%llu bytes)
Expand Down
19 changes: 19 additions & 0 deletions src/generated/linux/platform_worker.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ TRACEPOINT_EVENT(CLOG_PLATFORM_WORKER_C, PlatformWorkerThreadStop,



/*----------------------------------------------------------
// Decoder Ring for PlatformWorkerProcessPools
// [ lib][%p] Processing pools
// QuicTraceLogVerbose(
PlatformWorkerProcessPools,
"[ lib][%p] Processing pools",
Worker);
// arg2 = arg2 = Worker = arg2
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_PLATFORM_WORKER_C, PlatformWorkerProcessPools,
TP_ARGS(
const void *, arg2),
TP_FIELDS(
ctf_integer_hex(uint64_t, arg2, (uint64_t)arg2)
)
)



/*----------------------------------------------------------
// Decoder Ring for AllocFailure
// Allocation of '%s' failed. (%llu bytes)
Expand Down
31 changes: 29 additions & 2 deletions src/inc/quic_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,40 @@ typedef struct QUIC_EXECUTION_CONFIG QUIC_EXECUTION_CONFIG;
typedef struct CXPLAT_EXECUTION_CONTEXT CXPLAT_EXECUTION_CONTEXT;

typedef struct CXPLAT_EXECUTION_STATE {
uint64_t TimeNow; // in microseconds
uint64_t LastWorkTime; // in microseconds
uint64_t TimeNow; // in microseconds
uint64_t LastWorkTime; // in microseconds
uint64_t LastPoolProcessTime; // in microseconds
uint32_t WaitTime;
uint32_t NoWorkCount;
CXPLAT_THREAD_ID ThreadID;
} CXPLAT_EXECUTION_STATE;

#ifndef _KERNEL_MODE // Not supported on kernel mode

//
// Supports more dynamic operations, but must be submitted to the platform worker
// to manage.
//
typedef struct CXPLAT_POOL_EX {
CXPLAT_POOL Base;
CXPLAT_LIST_ENTRY Link;
void* Owner;
} CXPLAT_POOL_EX;

void
CxPlatAddDynamicPoolAllocator(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
_Inout_ CXPLAT_POOL_EX* Pool,
_In_ uint16_t Index // Into the execution config processor array
);

void
CxPlatRemoveDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool
);

#endif

//
// Returns FALSE when it's time to cleanup.
//
Expand Down
20 changes: 20 additions & 0 deletions src/inc/quic_platform_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,26 @@ CxPlatPoolFree(
}
}

inline
BOOLEAN
CxPlatPoolPrune(
_Inout_ CXPLAT_POOL* Pool
)
{
CxPlatLockAcquire(&Pool->Lock);
void* Entry = CxPlatListPopEntry(&Pool->ListHead);
if (Entry != NULL) {
CXPLAT_FRE_ASSERT(Pool->ListDepth > 0);
Pool->ListDepth--;
}
CxPlatLockRelease(&Pool->Lock);
if (Entry == NULL) {
return FALSE;
}
CxPlatFree(Entry, Pool->Tag);
return TRUE;
}

//
// Reference Count Interface
//
Expand Down
14 changes: 14 additions & 0 deletions src/inc/quic_platform_winuser.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,20 @@ CxPlatPoolFree(
}
}

inline
BOOLEAN
CxPlatPoolPrune(
_Inout_ CXPLAT_POOL* Pool
)
{
void* Entry = InterlockedPopEntrySList(&Pool->ListHead);
if (Entry == NULL) {
return FALSE;
}
Pool->Free(Entry, Pool->Tag, Pool);
return TRUE;
}

#define CxPlatZeroMemory RtlZeroMemory
#define CxPlatCopyMemory RtlCopyMemory
#define CxPlatMoveMemory RtlMoveMemory
Expand Down
17 changes: 17 additions & 0 deletions src/manifest/clog.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -8749,6 +8749,18 @@
"splitArgs": [],
"macroName": "QuicTraceLogWarning"
},
"PlatformWorkerProcessPools": {
"ModuleProperites": {},
"TraceString": "[ lib][%p] Processing pools",
"UniqueId": "PlatformWorkerProcessPools",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg2"
}
],
"macroName": "QuicTraceLogVerbose"
},
"PlatformWorkerThreadStart": {
"ModuleProperites": {},
"TraceString": "[ lib][%p] Worker start",
Expand Down Expand Up @@ -15940,6 +15952,11 @@
"TraceID": "PlatformThreadCreateFailed",
"EncodingString": "[ lib] pthread_create failed, retrying without affinitization"
},
{
"UniquenessHash": "3f46cbc3-c609-dab4-07c9-fbe956e68f5c",
"TraceID": "PlatformWorkerProcessPools",
"EncodingString": "[ lib][%p] Processing pools"
},
{
"UniquenessHash": "5c3773e2-ef60-26b9-4b3c-d433ca2656df",
"TraceID": "PlatformWorkerThreadStart",
Expand Down
2 changes: 1 addition & 1 deletion src/perf/lib/Tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ CXPLAT_THREAD_CALLBACK(TcpWorker::WorkerThread, Context)
{
TcpWorker* This = (TcpWorker*)Context;
CXPLAT_EXECUTION_STATE DummyState = {
0, CxPlatTimeUs64(), UINT32_MAX, 0, CxPlatCurThreadID()
0, 0, 0, UINT32_MAX, 0, CxPlatCurThreadID()
};
while (DoWork(This, &DummyState)) {
if (!InterlockedFetchAndClearBoolean(&This->ExecutionContext.Ready)) {
Expand Down
11 changes: 8 additions & 3 deletions src/platform/datapath_winuser.c
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,11 @@ DataPathInitialize(
FALSE,
RecvDatagramLength,
QUIC_POOL_DATA,
&Datapath->Partitions[i].RecvDatagramPool);
&Datapath->Partitions[i].RecvDatagramPool.Base);
CxPlatAddDynamicPoolAllocator(
Datapath->WorkerPool,
&Datapath->Partitions[i].RecvDatagramPool,
i);

CxPlatPoolInitializeEx(
FALSE,
Expand Down Expand Up @@ -1025,7 +1029,8 @@ CxPlatProcessorContextRelease(
CxPlatPoolUninitialize(&DatapathProc->LargeSendBufferPool);
CxPlatPoolUninitialize(&DatapathProc->RioSendBufferPool);
CxPlatPoolUninitialize(&DatapathProc->RioLargeSendBufferPool);
CxPlatPoolUninitialize(&DatapathProc->RecvDatagramPool);
CxPlatRemoveDynamicPoolAllocator(&DatapathProc->RecvDatagramPool);
CxPlatPoolUninitialize(&DatapathProc->RecvDatagramPool.Base);
CxPlatPoolUninitialize(&DatapathProc->RioRecvPool);
CxPlatDataPathRelease(DatapathProc->Datapath);
}
Expand Down Expand Up @@ -2470,7 +2475,7 @@ CxPlatSocketAllocRxIoBlock(
if (SocketProc->Parent->UseRio) {
OwningPool = &DatapathProc->RioRecvPool;
} else {
OwningPool = &DatapathProc->RecvDatagramPool;
OwningPool = &DatapathProc->RecvDatagramPool.Base;
}

IoBlock = CxPlatPoolAlloc(OwningPool);
Expand Down
5 changes: 5 additions & 0 deletions src/platform/inline.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ CxPlatPoolFree(
_In_ void* Entry
);

BOOLEAN
CxPlatPoolPrune(
_Inout_ CXPLAT_POOL* Pool
);

void
CxPlatListInitializeHead(
_Out_ CXPLAT_LIST_ENTRY* ListHead
Expand Down
2 changes: 1 addition & 1 deletion src/platform/platform_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ typedef struct QUIC_CACHEALIGN CXPLAT_DATAPATH_PROC {
// Pool of receive datagram contexts and buffers to be shared by all sockets
// on this core.
//
CXPLAT_POOL RecvDatagramPool;
CXPLAT_POOL_EX RecvDatagramPool;

//
// Pool of RIO receive datagram contexts and buffers to be shared by all
Expand Down
79 changes: 77 additions & 2 deletions src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {
//
CXPLAT_LOCK ECLock;

//
// List of dynamic pools to manage.
//
CXPLAT_LIST_ENTRY DynamicPoolList;

//
// Execution contexts that are waiting to be added to CXPLAT_WORKER::ExecutionContexts.
//
Expand Down Expand Up @@ -164,6 +169,7 @@ CxPlatWorkerPoolLazyStart(
CxPlatZeroMemory(WorkerPool->Workers, WorkersSize);
for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) {
CxPlatLockInitialize(&WorkerPool->Workers[i].ECLock);
CxPlatListInitializeHead(&WorkerPool->Workers[i].DynamicPoolList);
WorkerPool->Workers[i].InitializedECLock = TRUE;
WorkerPool->Workers[i].IdealProcessor = ProcessorList ? ProcessorList[i] : (uint16_t)i;
CXPLAT_DBG_ASSERT(WorkerPool->Workers[i].IdealProcessor < CxPlatProcCount());
Expand Down Expand Up @@ -293,6 +299,7 @@ CxPlatWorkerPoolUninit(
CxPlatSqeCleanup(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].ShutdownSqe);
#endif // CXPLAT_SQE_INIT
CxPlatEventQCleanup(&WorkerPool->Workers[i].EventQ);
CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&WorkerPool->Workers[i].DynamicPoolList));
CxPlatLockUninitialize(&WorkerPool->Workers[i].ECLock);
}

Expand All @@ -305,6 +312,68 @@ CxPlatWorkerPoolUninit(
CxPlatLockUninitialize(&WorkerPool->WorkerLock);
}

#define DYNAMIC_POOL_PROCESSING_PERIOD 1000000 // 1 second
#define DYNAMIC_POOL_PRUNE_COUNT 8

void
CxPlatAddDynamicPoolAllocator(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
_Inout_ CXPLAT_POOL_EX* Pool,
_In_ uint16_t Index // Into the execution config processor array
)
{
CXPLAT_DBG_ASSERT(WorkerPool);
CXPLAT_FRE_ASSERT(Index < WorkerPool->WorkerCount);
CXPLAT_WORKER* Worker = &WorkerPool->Workers[Index];
Pool->Owner = Worker;
CxPlatLockAcquire(&Worker->ECLock);
CxPlatListInsertTail(&Worker->DynamicPoolList, &Pool->Link);
CxPlatLockRelease(&Worker->ECLock);
}

void
CxPlatRemoveDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool
)
{
CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Pool->Owner;
CxPlatLockAcquire(&Worker->ECLock);
CxPlatListEntryRemove(&Pool->Link);
CxPlatLockRelease(&Worker->ECLock);
}

void
CxPlatProcessDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool
)
{
for (uint32_t i = 0; i < DYNAMIC_POOL_PRUNE_COUNT; ++i) {
if (!CxPlatPoolPrune((CXPLAT_POOL*)Pool)) {
return;
}
}
}

void
CxPlatProcessDynamicPoolAllocators(
_In_ CXPLAT_WORKER* Worker
)
{
QuicTraceLogVerbose(
PlatformWorkerProcessPools,
"[ lib][%p] Processing pools",
Worker);

CxPlatLockAcquire(&Worker->ECLock);
CXPLAT_LIST_ENTRY* Entry = Worker->DynamicPoolList.Flink;
while (Entry != &Worker->DynamicPoolList) {
CXPLAT_POOL_EX* Pool = CXPLAT_CONTAINING_RECORD(Entry, CXPLAT_POOL_EX, Link);
Entry = Entry->Flink;
CxPlatProcessDynamicPoolAllocator(Pool);
}
CxPlatLockRelease(&Worker->ECLock);
}

CXPLAT_EVENTQ*
CxPlatWorkerPoolGetEventQ(
_In_ const CXPLAT_WORKER_POOL* WorkerPool,
Expand Down Expand Up @@ -388,7 +457,6 @@ CxPlatRunExecutionContexts(
#if DEBUG // Debug statistics
++Worker->EcPollCount;
#endif
State->TimeNow = CxPlatTimeUs64();

uint64_t NextTime = UINT64_MAX;
CXPLAT_SLIST_ENTRY** EC = &Worker->ExecutionContexts;
Expand Down Expand Up @@ -487,7 +555,7 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)
Worker->ThreadStarted = TRUE;
#endif

CXPLAT_EXECUTION_STATE State = { 0, CxPlatTimeUs64(), UINT32_MAX, 0, CxPlatCurThreadID() };
CXPLAT_EXECUTION_STATE State = { 0, 0, 0, UINT32_MAX, 0, CxPlatCurThreadID() };

Worker->Running = TRUE;

Expand All @@ -497,9 +565,11 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)
#if DEBUG // Debug statistics
++Worker->LoopCount;
#endif
State.TimeNow = CxPlatTimeUs64();

CxPlatRunExecutionContexts(Worker, &State);
if (State.WaitTime && InterlockedFetchAndClearBoolean(&Worker->Running)) {
State.TimeNow = CxPlatTimeUs64();
CxPlatRunExecutionContexts(Worker, &State); // Run once more to handle race conditions
}

Expand All @@ -513,6 +583,11 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)
CxPlatSchedulerYield();
State.NoWorkCount = 0;
}

if (State.TimeNow - State.LastPoolProcessTime > DYNAMIC_POOL_PROCESSING_PERIOD) {
CxPlatProcessDynamicPoolAllocators(Worker);
State.LastPoolProcessTime = State.TimeNow;
}
}

Shutdown:
Expand Down
Loading

0 comments on commit c68abdf

Please sign in to comment.