Skip to content

Commit

Permalink
Simplify platform worker
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks committed Jan 3, 2025
1 parent b33ddb3 commit 69b1f03
Showing 1 changed file with 20 additions and 26 deletions.
46 changes: 20 additions & 26 deletions src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
#include "platform_worker.c.clog.h"
#endif

typedef struct CXPLAT_WORKER_SQE {
CXPLAT_SQE Base;
struct CXPLAT_WORKER* Worker;
} CXPLAT_WORKER_SQE;

typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {

//
Expand All @@ -35,17 +30,17 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {
//
// Submission queue entry for shutting down the worker thread.
//
CXPLAT_WORKER_SQE ShutdownSqe;
CXPLAT_SQE ShutdownSqe;

//
// Submission queue entry for waking the thread to poll.
//
CXPLAT_WORKER_SQE WakeSqe;
CXPLAT_SQE WakeSqe;

//
// Submission queue entry for update the polling set.
//
CXPLAT_WORKER_SQE UpdatePollSqe;
CXPLAT_SQE UpdatePollSqe;

//
// Serializes access to the execution contexts.
Expand Down Expand Up @@ -110,7 +105,8 @@ ShutdownCompletion(
_In_ CXPLAT_CQE* Cqe
)
{
CXPLAT_WORKER* Worker = ((CXPLAT_WORKER_SQE*)CxPlatCqeGetSqe(Cqe))->Worker;
CXPLAT_WORKER* Worker =
CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_WORKER, ShutdownSqe);
Worker->StoppedThread = TRUE;
}

Expand All @@ -132,7 +128,8 @@ UpdatePollCompletion(
_In_ CXPLAT_CQE* Cqe
)
{
CXPLAT_WORKER* Worker = ((CXPLAT_WORKER_SQE*)CxPlatCqeGetSqe(Cqe))->Worker;
CXPLAT_WORKER* Worker =
CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_WORKER, UpdatePollSqe);
CxPlatUpdateExecutionContexts(Worker);
}

Expand Down Expand Up @@ -223,26 +220,23 @@ CxPlatWorkerPoolLazyStart(
goto Error;
}
Worker->InitializedEventQ = TRUE;
Worker->ShutdownSqe.Worker = &WorkerPool->Workers[i];
if (!CxPlatSqeInitialize(&Worker->EventQ, ShutdownCompletion, &Worker->ShutdownSqe.Base)) {
if (!CxPlatSqeInitialize(&Worker->EventQ, ShutdownCompletion, &Worker->ShutdownSqe)) {
QuicTraceEvent(
LibraryError,
"[ lib] ERROR, %s.",
"CxPlatSqeInitialize(shutdown)");
goto Error;
}
Worker->InitializedShutdownSqe = TRUE;
Worker->WakeSqe.Worker = &WorkerPool->Workers[i];
if (!CxPlatSqeInitialize(&Worker->EventQ, WakeCompletion, &Worker->WakeSqe.Base)) {
if (!CxPlatSqeInitialize(&Worker->EventQ, WakeCompletion, &Worker->WakeSqe)) {
QuicTraceEvent(
LibraryError,
"[ lib] ERROR, %s.",
"CxPlatSqeInitialize(wake)");
goto Error;
}
Worker->InitializedWakeSqe = TRUE;
Worker->UpdatePollSqe.Worker = &WorkerPool->Workers[i];
if (!CxPlatSqeInitialize(&Worker->EventQ, UpdatePollCompletion, &Worker->UpdatePollSqe.Base)) {
if (!CxPlatSqeInitialize(&Worker->EventQ, UpdatePollCompletion, &Worker->UpdatePollSqe)) {
QuicTraceEvent(
LibraryError,
"[ lib] ERROR, %s.",
Expand Down Expand Up @@ -270,7 +264,7 @@ CxPlatWorkerPoolLazyStart(
CXPLAT_WORKER* Worker = &WorkerPool->Workers[i];
if (Worker->InitializedThread) {
Worker->StoppingThread = TRUE;
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe.Base);
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe);
CxPlatThreadWait(&Worker->Thread);
CxPlatThreadDelete(&Worker->Thread);
#if DEBUG
Expand All @@ -280,13 +274,13 @@ CxPlatWorkerPoolLazyStart(
Worker->DestroyedThread = TRUE;
}
if (Worker->InitializedUpdatePollSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->UpdatePollSqe.Base);
CxPlatSqeCleanup(&Worker->EventQ, &Worker->UpdatePollSqe);
}
if (Worker->InitializedWakeSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->WakeSqe.Base);
CxPlatSqeCleanup(&Worker->EventQ, &Worker->WakeSqe);
}
if (Worker->InitializedShutdownSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->ShutdownSqe.Base);
CxPlatSqeCleanup(&Worker->EventQ, &Worker->ShutdownSqe);
}
if (Worker->InitializedEventQ) {
CxPlatEventQCleanup(&Worker->EventQ);
Expand Down Expand Up @@ -317,17 +311,17 @@ CxPlatWorkerPoolUninit(
for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) {
CXPLAT_WORKER* Worker = &WorkerPool->Workers[i];
Worker->StoppingThread = TRUE;
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe.Base);
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe);
CxPlatThreadWait(&Worker->Thread);
CxPlatThreadDelete(&Worker->Thread);
#if DEBUG
CXPLAT_DBG_ASSERT(Worker->ThreadStarted);
CXPLAT_DBG_ASSERT(Worker->ThreadFinished);
#endif
Worker->DestroyedThread = TRUE;
CxPlatSqeCleanup(&Worker->EventQ, &Worker->UpdatePollSqe.Base);
CxPlatSqeCleanup(&Worker->EventQ, &Worker->WakeSqe.Base);
CxPlatSqeCleanup(&Worker->EventQ, &Worker->ShutdownSqe.Base);
CxPlatSqeCleanup(&Worker->EventQ, &Worker->UpdatePollSqe);
CxPlatSqeCleanup(&Worker->EventQ, &Worker->WakeSqe);
CxPlatSqeCleanup(&Worker->EventQ, &Worker->ShutdownSqe);
CxPlatEventQCleanup(&Worker->EventQ);
CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&Worker->DynamicPoolList));
CxPlatLockUninitialize(&Worker->ECLock);
Expand Down Expand Up @@ -434,7 +428,7 @@ CxPlatAddExecutionContext(
CxPlatLockRelease(&Worker->ECLock);

if (QueueEvent) {
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->UpdatePollSqe.Base);
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->UpdatePollSqe);
}
}

Expand All @@ -445,7 +439,7 @@ CxPlatWakeExecutionContext(
{
CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Context->CxPlatContext;
if (!InterlockedFetchAndSetBoolean(&Worker->Running)) {
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->WakeSqe.Base);
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->WakeSqe);
}
}

Expand Down

0 comments on commit 69b1f03

Please sign in to comment.