Skip to content

Commit

Permalink
Phase 2 of enabling new Stream Frame: Reliable Reset (#3817)
Browse files Browse the repository at this point in the history
  • Loading branch information
ProjectsByJackHe authored Sep 30, 2023
1 parent 649f971 commit 0ba1fc7
Show file tree
Hide file tree
Showing 41 changed files with 1,992 additions and 82 deletions.
1 change: 1 addition & 0 deletions docs/Settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ These parameters are access by calling [GetParam](./api/GetParam.md) or [SetPara
| `QUIC_PARAM_STREAM_IDEAL_SEND_BUFFER_SIZE`<br> 2 | uint64_t - bytes | Get-only | Ideal buffer size to queue to the stream. Assumes only one stream sends steadily. |
| `QUIC_PARAM_STREAM_PRIORITY` <br> 3 | uint16_t | Get/Set | Stream priority. |
| `QUIC_PARAM_STREAM_STATISTICS` <br> 4 | QUIC_STREAM_STATISTICS | Get-only | Stream-level statistics. |
| `QUIC_PARAM_STREAM_RELIABLE_OFFSET` <br> 5 | uint64_t | Get/Set | Part of the new Reliable Reset preview feature. Sets/Gets the number of bytes a sender must send before closing SEND path.

## See Also

Expand Down
4 changes: 4 additions & 0 deletions docs/api/StreamShutdown.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ The stream can also be gracefully shutdown via the `QUIC_SEND_FLAG_FIN` flag. Se
Any stream (even one that hasn't been started) may be called to shutdown. If the stream has not been started yet, then the shutdown is effectively queued. If the app never calls [StreamStart](StreamStart.md) then the shutdown will never been sent out on the wire.
# Reliable Reset
If an app decides to enable Preview Features, the shutdown path can be configured with the QUIC_PARAM_STREAM_RELIABLE_OFFSET Stream parameter, which determines the number of bytes a sender must deliver before it can shut down their SEND path.
# See Also
[StreamOpen](StreamOpen.md)<br>
Expand Down
25 changes: 19 additions & 6 deletions scripts/test.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ This script runs the MsQuic tests.
.Parameter DuoNic
Uses DuoNic instead of loopback (DuoNic must already be installed via 'prepare-machine.ps1 -InstallDuoNic').
.Parameter NumIterations
Number of times to run this particular command. Catches tricky edge cases due to random nature of networks.
.EXAMPLE
test.ps1
Expand All @@ -85,6 +88,8 @@ This script runs the MsQuic tests.
.EXAMPLE
test.ps1 -LogProfile Full.Verbose -Compress
.EXAMPLE
test.ps1 -Filter ParameterValidation* -NumIterations 10
#>

param (
Expand Down Expand Up @@ -172,7 +177,10 @@ param (
[switch]$UseQtip = $false,

[Parameter(Mandatory = $false)]
[string]$OsRunner = ""
[string]$OsRunner = "",

[Parameter(Mandatory = $false)]
[int]$NumIterations = 1
)

Set-StrictMode -Version 'Latest'
Expand Down Expand Up @@ -344,12 +352,17 @@ if (![string]::IsNullOrWhiteSpace($ExtraArtifactDir)) {
$TestArguments += " -ExtraArtifactDir $ExtraArtifactDir"
}

# Run the script.
if (!$Kernel -and !$SkipUnitTests) {
Invoke-Expression ($RunTest + " -Path $MsQuicPlatTest " + $TestArguments)
Invoke-Expression ($RunTest + " -Path $MsQuicCoreTest " + $TestArguments)
for ($iteration = 1; $iteration -le $NumIterations; $iteration++) {
if ($NumIterations -gt 1) {
Write-Host "------- Iteration $iteration -------"
}
# Run the script.
if (!$Kernel -and !$SkipUnitTests) {
Invoke-Expression ($RunTest + " -Path $MsQuicPlatTest " + $TestArguments)
Invoke-Expression ($RunTest + " -Path $MsQuicCoreTest " + $TestArguments)
}
Invoke-Expression ($RunTest + " -Path $MsQuicTest " + $TestArguments)
}
Invoke-Expression ($RunTest + " -Path $MsQuicTest " + $TestArguments)

if ($CodeCoverage) {
# Merge code coverage results
Expand Down
9 changes: 4 additions & 5 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -4705,7 +4705,8 @@ QuicConnRecvFrames(
case QUIC_FRAME_STREAM_6:
case QUIC_FRAME_STREAM_7:
case QUIC_FRAME_MAX_STREAM_DATA:
case QUIC_FRAME_STREAM_DATA_BLOCKED: {
case QUIC_FRAME_STREAM_DATA_BLOCKED:
case QUIC_FRAME_RELIABLE_RESET_STREAM: {
if (Closed) {
if (!QuicStreamFrameSkip(
FrameType, PayloadLength, Payload, &Offset)) {
Expand Down Expand Up @@ -5316,7 +5317,7 @@ QuicConnRecvFrames(
case QUIC_FRAME_IMMEDIATE_ACK: // Always accept the frame, because we always enable support.
AckImmediately = TRUE;
break;

case QUIC_FRAME_TIMESTAMP: { // Always accept the frame, because we always enable support.
if (!Connection->State.TimestampRecvNegotiated) {
QuicTraceEvent(
Expand All @@ -5343,9 +5344,7 @@ QuicConnRecvFrames(
Packet->SendTimestamp = Frame.Timestamp;
break;
}

case QUIC_FRAME_RELIABLE_RESET_STREAM:
// TODO - Implement this frame.

default:
//
// No default case necessary, as we have already validated the frame
Expand Down
25 changes: 25 additions & 0 deletions src/core/frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -2006,6 +2006,31 @@ QuicFrameLog(
Frame.Timestamp);
break;
}

case QUIC_FRAME_RELIABLE_RESET_STREAM: {
QUIC_RELIABLE_RESET_STREAM_EX Frame;
if (!QuicReliableResetFrameDecode(PacketLength, Packet, Offset, &Frame)) {
QuicTraceLogVerbose(
FrameLogReliableResetStreamInvalid,
"[%c][%cX][%llu] RELIABLE_RESET_STREAM [Invalid]",
PtkConnPre(Connection),
PktRxPre(Rx),
PacketNumber);
return FALSE;
}

QuicTraceLogVerbose(
FrameLogReliableResetStream,
"[%c][%cX][%llu] RELIABLE_RESET_STREAM ID:%llu ErrorCode:0x%llX FinalSize:%llu ReliableSize:%llu",
PtkConnPre(Connection),
PktRxPre(Rx),
PacketNumber,
Frame.StreamID,
Frame.ErrorCode,
Frame.FinalSize,
Frame.ReliableSize);
break;
}

default:
CXPLAT_FRE_ASSERT(FALSE);
Expand Down
13 changes: 13 additions & 0 deletions src/core/loss_detection.c
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,10 @@ QuicLossDetectionOnPacketAcknowledged(
QuicStreamOnResetAck(Packet->Frames[i].RESET_STREAM.Stream);
break;

case QUIC_FRAME_RELIABLE_RESET_STREAM:
QuicStreamOnResetReliableAck(Packet->Frames[i].RELIABLE_RESET_STREAM.Stream);
break;

case QUIC_FRAME_CRYPTO:
QuicCryptoOnAck(&Connection->Crypto, &Packet->Frames[i]);
break;
Expand Down Expand Up @@ -699,6 +703,15 @@ QuicLossDetectionRetransmitFrames(
FALSE);
break;

case QUIC_FRAME_RELIABLE_RESET_STREAM:
NewDataQueued |=
QuicSendSetStreamSendFlag(
&Connection->Send,
Packet->Frames[i].RELIABLE_RESET_STREAM.Stream,
QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT,
FALSE);
break;

case QUIC_FRAME_STOP_SENDING:
NewDataQueued |=
QuicSendSetStreamSendFlag(
Expand Down
1 change: 1 addition & 0 deletions src/core/send.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ QuicSendSetStreamSendFlag(
if (Stream->Flags.LocalCloseAcked) {
SendFlags &=
~(QUIC_STREAM_SEND_FLAG_SEND_ABORT |
QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT |
QUIC_STREAM_SEND_FLAG_DATA_BLOCKED |
QUIC_STREAM_SEND_FLAG_DATA |
QUIC_STREAM_SEND_FLAG_OPEN |
Expand Down
18 changes: 10 additions & 8 deletions src/core/send.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,14 @@ QuicPacketTypeToEncryptLevelV2(
// stream. The order reflects the order the data is framed into a packet.
//

#define QUIC_STREAM_SEND_FLAG_DATA_BLOCKED 0x0001U
#define QUIC_STREAM_SEND_FLAG_MAX_DATA 0x0002U
#define QUIC_STREAM_SEND_FLAG_SEND_ABORT 0x0004U
#define QUIC_STREAM_SEND_FLAG_RECV_ABORT 0x0008U
#define QUIC_STREAM_SEND_FLAG_DATA 0x0010U
#define QUIC_STREAM_SEND_FLAG_OPEN 0x0020U
#define QUIC_STREAM_SEND_FLAG_FIN 0x0040U
#define QUIC_STREAM_SEND_FLAG_DATA_BLOCKED 0x0001U
#define QUIC_STREAM_SEND_FLAG_MAX_DATA 0x0002U
#define QUIC_STREAM_SEND_FLAG_SEND_ABORT 0x0004U
#define QUIC_STREAM_SEND_FLAG_RECV_ABORT 0x0008U
#define QUIC_STREAM_SEND_FLAG_DATA 0x0010U
#define QUIC_STREAM_SEND_FLAG_OPEN 0x0020U
#define QUIC_STREAM_SEND_FLAG_FIN 0x0040U
#define QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT 0x0080U

#define QUIC_STREAM_SEND_FLAGS_ALL 0xFFFFU

Expand All @@ -211,7 +212,8 @@ inline BOOLEAN HasStreamControlFrames(uint32_t Flags)
(QUIC_STREAM_SEND_FLAG_DATA_BLOCKED |
QUIC_STREAM_SEND_FLAG_MAX_DATA |
QUIC_STREAM_SEND_FLAG_SEND_ABORT |
QUIC_STREAM_SEND_FLAG_RECV_ABORT);
QUIC_STREAM_SEND_FLAG_RECV_ABORT |
QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT);
}

inline BOOLEAN HasStreamDataFrames(uint32_t Flags)
Expand Down
3 changes: 3 additions & 0 deletions src/core/sent_packet_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ QuicSentPacketMetadataReleaseFrames(
case QUIC_FRAME_STREAM:
QuicStreamSentMetadataDecrement(Metadata->Frames[i].STREAM.Stream);
break;
case QUIC_FRAME_RELIABLE_RESET_STREAM:
QuicStreamSentMetadataDecrement(Metadata->Frames[i].RELIABLE_RESET_STREAM.Stream);
break;
#pragma warning(pop)
default:
//
Expand Down
3 changes: 3 additions & 0 deletions src/core/sent_packet_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ typedef struct QUIC_SENT_FRAME_METADATA {
struct {
QUIC_STREAM* Stream;
} RESET_STREAM;
struct {
QUIC_STREAM* Stream;
} RELIABLE_RESET_STREAM;
struct {
QUIC_STREAM* Stream;
} STOP_SENDING;
Expand Down
99 changes: 98 additions & 1 deletion src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,13 @@ QuicStreamShutdown(
// and shutdown complete events now, if they haven't already been
// delivered.
//
if (Stream->Flags.RemoteCloseResetReliable || Stream->Flags.LocalCloseResetReliable) {
QuicTraceLogStreamWarning(
ShutdownImmediatePendingReliableReset,
Stream,
"Invalid immediate shutdown request (pending reliable reset).");
return;
}
QuicStreamIndicateSendShutdownComplete(Stream, FALSE);
QuicStreamIndicateShutdownComplete(Stream);
}
Expand Down Expand Up @@ -633,7 +640,7 @@ QuicStreamParamSet(

case QUIC_PARAM_STREAM_PRIORITY: {

if (BufferLength != sizeof(Stream->SendPriority)) {
if (BufferLength != sizeof(Stream->SendPriority) || Buffer == NULL) {
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}
Expand All @@ -659,6 +666,58 @@ QuicStreamParamSet(
break;
}

case QUIC_PARAM_STREAM_RELIABLE_OFFSET:

if (BufferLength != sizeof(uint64_t) || Buffer == NULL) {
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}

if (!Stream->Connection->State.ReliableResetStreamNegotiated ||
*(uint64_t*)Buffer > Stream->QueuedSendOffset) {
Status = QUIC_STATUS_INVALID_STATE;
break;
}

if (Stream->Flags.LocalCloseReset) {
Status = QUIC_STATUS_INVALID_STATE;
break;
}

if (!Stream->Flags.LocalCloseResetReliable) {
//
// We haven't called shutdown reliable yet. App can set ReliableOffsetSend to be whatever.
//
Stream->ReliableOffsetSend = *(uint64_t*)Buffer;
} else if (*(uint64_t*)Buffer < Stream->ReliableOffsetSend) {
//
// TODO - Determine if we need to support this feature in future iterations.
//
// We have previously called shutdown reliable.
// Now we are loosening the conditions of the ReliableReset,
// but we have already sent the peer a stale frame, we must retransmit
// this new frame, and update the metadata.
//
QuicTraceLogStreamInfo(
MultipleReliableResetSendNotSupported,
Stream,
"Multiple RELIABLE_RESET frames sending not supported.");
Status = QUIC_STATUS_INVALID_STATE;
break;
} else {
Status = QUIC_STATUS_INVALID_STATE;
break;
}

QuicTraceLogStreamInfo(
ReliableSendOffsetSet,
Stream,
"Reliable send offset set to %llu",
*(uint64_t*)Buffer);

Status = QUIC_STATUS_SUCCESS;
break;

default:
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
Expand Down Expand Up @@ -838,6 +897,44 @@ QuicStreamParamGet(
Status = QUIC_STATUS_SUCCESS;
break;
}

case QUIC_PARAM_STREAM_RELIABLE_OFFSET:
if (*BufferLength < sizeof(uint64_t)) {
*BufferLength = sizeof(uint64_t);
Status = QUIC_STATUS_BUFFER_TOO_SMALL;
break;
}
if (Buffer == NULL) {
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}
if (Stream->ReliableOffsetSend == 0) {
Status = QUIC_STATUS_INVALID_STATE;
break;
}
*(uint64_t*) Buffer = Stream->ReliableOffsetSend;
Status = QUIC_STATUS_SUCCESS;
break;

case QUIC_PARAM_STREAM_RELIABLE_OFFSET_RECV:
if (*BufferLength < sizeof(uint64_t)) {
*BufferLength = sizeof(uint64_t);
Status = QUIC_STATUS_BUFFER_TOO_SMALL;
break;
}
if (Buffer == NULL) {
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}
if (!Stream->Flags.RemoteCloseResetReliable) {
Status = QUIC_STATUS_INVALID_STATE;
break;
}

*(uint64_t*)Buffer = Stream->RecvMaxLength;
Status = QUIC_STATUS_SUCCESS;
break;

default:
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
Expand Down
Loading

0 comments on commit 0ba1fc7

Please sign in to comment.