Skip to content

Commit

Permalink
Support Using Streams after Connection Closure (#3938)
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks authored Nov 30, 2023
1 parent 4fa54d9 commit 6e1c3d6
Show file tree
Hide file tree
Showing 21 changed files with 384 additions and 351 deletions.
250 changes: 103 additions & 147 deletions src/core/connection.c

Large diffs are not rendered by default.

17 changes: 13 additions & 4 deletions src/core/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ typedef union QUIC_CONNECTION_STATE {
BOOLEAN ClosedLocally : 1; // Locally closed.
BOOLEAN ClosedRemotely : 1; // Remotely closed.
BOOLEAN AppClosed : 1; // Application (not transport) closed connection.
BOOLEAN HandleShutdown : 1; // Shutdown callback delivered for handle.
BOOLEAN ShutdownComplete : 1; // Shutdown callback delivered for handle.
BOOLEAN HandleClosed : 1; // Handle closed by application layer.
BOOLEAN Uninitialized : 1; // Uninitialize started/completed.
BOOLEAN Freed : 1; // Freed. Used for Debugging.

//
Expand Down Expand Up @@ -120,9 +119,9 @@ typedef union QUIC_CONNECTION_STATE {
BOOLEAN ShutdownCompleteTimedOut : 1;

//
// The application needs to be notified of a shutdown complete event.
// The connection is shutdown and the completion for it needs to be run.
//
BOOLEAN SendShutdownCompleteNotif : 1;
BOOLEAN ProcessShutdownComplete : 1;

//
// Indicates whether this connection shares bindings with others.
Expand Down Expand Up @@ -223,6 +222,7 @@ typedef enum QUIC_CONNECTION_REF {
QUIC_CONN_REF_LOOKUP_TABLE, // Per registered CID.
QUIC_CONN_REF_LOOKUP_RESULT, // For connections returned from lookups.
QUIC_CONN_REF_WORKER, // Worker is (queued for) processing.
QUIC_CONN_REF_TIMER_WHEEL, // The timer wheel is tracking the connection.
QUIC_CONN_REF_ROUTE, // Route resolution is undergoing.
QUIC_CONN_REF_STREAM, // A stream depends on the connection.

Expand Down Expand Up @@ -1117,6 +1117,15 @@ QuicConnRegister(
_Inout_ QUIC_REGISTRATION* Registration
);

//
// Unregisters the connection from the registration.
//
_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicConnUnregister(
_Inout_ QUIC_CONNECTION* Connection
);

//
// Tracing rundown for the connection.
//
Expand Down
4 changes: 3 additions & 1 deletion src/core/listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,9 @@ QuicListenerClaimConnection(
Connection->ClientCallbackHandler != NULL,
"App MUST set callback handler or close connection!");

Connection->State.UpdateWorker = TRUE;
if (!Connection->State.ShutdownComplete) {
Connection->State.UpdateWorker = TRUE;
}

return !Connection->State.HandleClosed;
}
Expand Down
156 changes: 86 additions & 70 deletions src/core/timer_wheel.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ QuicTimerWheelUninitialize(
StillInTimerWheel,
Connection,
"Still in timer wheel! Connection was likely leaked!");
CXPLAT_DBG_ASSERT(!Connection);
Entry = Entry->Flink;
}
CXPLAT_TEL_ASSERT(CxPlatListIsEmpty(&TimerWheel->Slots[i]));
Expand Down Expand Up @@ -277,6 +278,8 @@ QuicTimerWheelRemoveConnection(
if (Connection == TimerWheel->NextConnection) {
QuicTimerWheelUpdate(TimerWheel);
}

QuicConnRelease(Connection, QUIC_CONN_REF_TIMER_WHEEL);
}
}

Expand All @@ -295,96 +298,100 @@ QuicTimerWheelUpdateConnection(
//
CxPlatListEntryRemove(&Connection->TimerLink);

if (ExpirationTime == UINT64_MAX) {
TimerWheel->ConnectionCount--;
}
if (ExpirationTime == UINT64_MAX || Connection->State.ShutdownComplete) {
//
// No more timers left, go ahead and invalidate its link.
//
Connection->TimerLink.Flink = NULL;
QuicTraceLogVerbose(
TimerWheelRemoveConnection,
"[time][%p] Removing Connection %p.",
TimerWheel,
Connection);

} else {
if (Connection == TimerWheel->NextConnection) {
QuicTimerWheelUpdate(TimerWheel);
}

//
// It wasn't in the wheel already, so we must be adding it to the
// wheel.
//
if (ExpirationTime != UINT64_MAX) {
TimerWheel->ConnectionCount++;
QuicConnRelease(Connection, QUIC_CONN_REF_TIMER_WHEEL);
TimerWheel->ConnectionCount--;
return; // Nothing else to do.
}
}

if (ExpirationTime == UINT64_MAX) {
} else if (ExpirationTime != UINT64_MAX && !Connection->State.ShutdownComplete) {
//
// No more timers left, go ahead and invalidate its link.
// It wasn't in the wheel already, so we must be adding it to the wheel.
//
Connection->TimerLink.Flink = NULL;
QuicTraceLogVerbose(
TimerWheelRemoveConnection,
"[time][%p] Removing Connection %p.",
TimerWheel,
Connection);

if (Connection == TimerWheel->NextConnection) {
QuicTimerWheelUpdate(TimerWheel);
}
TimerWheel->ConnectionCount++;
QuicConnAddRef(Connection, QUIC_CONN_REF_TIMER_WHEEL);

} else {
return; // Ignore
}

CXPLAT_DBG_ASSERT(TimerWheel->SlotCount != 0);
uint32_t SlotIndex = TIME_TO_SLOT_INDEX(TimerWheel, ExpirationTime);
//
// We are adding/updating the connection in the timer wheel.
//

//
// Insert the connection into the slot, in the correct order. We search
// the slot's list in reverse order, with the assumption that most new
// timers will on average be later than existing ones.
//
CXPLAT_LIST_ENTRY* ListHead = &TimerWheel->Slots[SlotIndex];
CXPLAT_LIST_ENTRY* Entry = ListHead->Blink;
CXPLAT_DBG_ASSERT(ExpirationTime != UINT64_MAX);
CXPLAT_DBG_ASSERT(!Connection->State.ShutdownComplete);
CXPLAT_DBG_ASSERT(TimerWheel->SlotCount != 0);
uint32_t SlotIndex = TIME_TO_SLOT_INDEX(TimerWheel, ExpirationTime);

while (Entry != ListHead) {
QUIC_CONNECTION* ConnectionEntry =
CXPLAT_CONTAINING_RECORD(Entry, QUIC_CONNECTION, TimerLink);
uint64_t EntryExpirationTime = QuicConnGetNextExpirationTime(ConnectionEntry);
//
// Insert the connection into the slot, in the correct order. We search
// the slot's list in reverse order, with the assumption that most new
// timers will on average be later than existing ones.
//
CXPLAT_LIST_ENTRY* ListHead = &TimerWheel->Slots[SlotIndex];
CXPLAT_LIST_ENTRY* Entry = ListHead->Blink;

if (ExpirationTime > EntryExpirationTime) {
break;
}
while (Entry != ListHead) {
QUIC_CONNECTION* ConnectionEntry =
CXPLAT_CONTAINING_RECORD(Entry, QUIC_CONNECTION, TimerLink);
uint64_t EntryExpirationTime = QuicConnGetNextExpirationTime(ConnectionEntry);

Entry = Entry->Blink;
if (ExpirationTime > EntryExpirationTime) {
break;
}

//
// Insert after the current entry.
//
CxPlatListInsertHead(Entry, &Connection->TimerLink);
Entry = Entry->Blink;
}

//
// Insert after the current entry.
//
CxPlatListInsertHead(Entry, &Connection->TimerLink);

QuicTraceLogVerbose(
TimerWheelUpdateConnection,
"[time][%p] Updating Connection %p.",
TimerWheel,
Connection);

//
// Make sure the next expiration time/connection is still correct.
//
if (ExpirationTime < TimerWheel->NextExpirationTime) {
TimerWheel->NextExpirationTime = ExpirationTime;
TimerWheel->NextConnection = Connection;
QuicTraceLogVerbose(
TimerWheelUpdateConnection,
"[time][%p] Updating Connection %p.",
TimerWheelNextExpiration,
"[time][%p] Next Expiration = {%llu, %p}.",
TimerWheel,
ExpirationTime,
Connection);
} else if (Connection == TimerWheel->NextConnection) {
QuicTimerWheelUpdate(TimerWheel);
}

//
// Make sure the next expiration time/connection is still correct.
//
if (ExpirationTime < TimerWheel->NextExpirationTime) {
TimerWheel->NextExpirationTime = ExpirationTime;
TimerWheel->NextConnection = Connection;
QuicTraceLogVerbose(
TimerWheelNextExpiration,
"[time][%p] Next Expiration = {%llu, %p}.",
TimerWheel,
ExpirationTime,
Connection);
} else if (Connection == TimerWheel->NextConnection) {
QuicTimerWheelUpdate(TimerWheel);
}

//
// Resize the timer wheel if we have too many connections for the
// current size.
//
if (TimerWheel->ConnectionCount >
TimerWheel->SlotCount * QUIC_TIMER_WHEEL_MAX_LOAD_FACTOR) {
QuicTimerWheelResize(TimerWheel);
}
//
// Resize the timer wheel if we have too many connections for the
// current size.
//
if (TimerWheel->ConnectionCount >
TimerWheel->SlotCount * QUIC_TIMER_WHEEL_MAX_LOAD_FACTOR) {
QuicTimerWheelResize(TimerWheel);
}
}

Expand All @@ -400,6 +407,7 @@ QuicTimerWheelGetExpired(
// Iterate through every slot to find all the connections that now have
// expired timers.
//
BOOLEAN NeedsUpdate = FALSE;
for (uint32_t i = 0; i < TimerWheel->SlotCount; ++i) {
CXPLAT_LIST_ENTRY* ListHead = &TimerWheel->Slots[i];
CXPLAT_LIST_ENTRY* Entry = ListHead->Flink;
Expand All @@ -413,7 +421,15 @@ QuicTimerWheelGetExpired(
Entry = Entry->Flink;
CxPlatListEntryRemove(&ConnectionEntry->TimerLink);
CxPlatListInsertTail(OutputListHead, &ConnectionEntry->TimerLink);
if (ConnectionEntry == TimerWheel->NextConnection) {
NeedsUpdate = TRUE;
}
QuicConnAddRef(ConnectionEntry, QUIC_CONN_REF_WORKER);
QuicConnRelease(ConnectionEntry, QUIC_CONN_REF_TIMER_WHEEL);
TimerWheel->ConnectionCount--;
}
}
if (NeedsUpdate) {
QuicTimerWheelUpdate(TimerWheel);
}
}
12 changes: 1 addition & 11 deletions src/core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ QuicWorkerProcessTimers(
QuicConnTimerExpired(Connection, TimeNow);
QuicConfigurationDetachSilo();
Connection->WorkerThreadID = 0;
QuicConnRelease(Connection, QUIC_CONN_REF_WORKER);
}
}

Expand Down Expand Up @@ -474,12 +475,6 @@ QuicWorkerProcessConnection(
Connection->Stats.Schedule.DrainCount++;

if (Connection->State.UpdateWorker) {
//
// If the connection is uninitialized already, it shouldn't have been
// queued to move to a new worker in the first place.
//
CXPLAT_DBG_ASSERT(!Connection->State.Uninitialized);

//
// The connection was recently placed into this worker and needs any
// pre-existing timers to be transitioned to this worker for processing.
Expand Down Expand Up @@ -543,11 +538,6 @@ QuicWorkerProcessConnection(

if (DoneWithConnection) {
if (Connection->State.UpdateWorker) {
//
// The connection should never be queued to a new worker if it's
// already been uninitialized.
//
CXPLAT_DBG_ASSERT(!Connection->State.Uninitialized);
//
// Now that we know we want to process this connection, assign it
// to the correct registration. Remove it from the current worker's
Expand Down
Loading

0 comments on commit 6e1c3d6

Please sign in to comment.