diff --git a/docs/Settings.md b/docs/Settings.md index 357440a954..db8dcfa143 100644 --- a/docs/Settings.md +++ b/docs/Settings.md @@ -209,6 +209,7 @@ These parameters are access by calling [GetParam](./api/GetParam.md) or [SetPara | `QUIC_PARAM_STREAM_IDEAL_SEND_BUFFER_SIZE`
2 | uint64_t - bytes | Get-only | Ideal buffer size to queue to the stream. Assumes only one stream sends steadily. | | `QUIC_PARAM_STREAM_PRIORITY`
3 | uint16_t | Get/Set | Stream priority. | | `QUIC_PARAM_STREAM_STATISTICS`
4 | QUIC_STREAM_STATISTICS | Get-only | Stream-level statistics. | +| `QUIC_PARAM_STREAM_RELIABLE_OFFSET`
5 | uint64_t | Get/Set | Part of the new Reliable Reset preview feature. Sets/Gets the number of bytes a sender must send before closing SEND path. ## See Also diff --git a/docs/api/StreamShutdown.md b/docs/api/StreamShutdown.md index b5d58dd390..f6791dce9f 100644 --- a/docs/api/StreamShutdown.md +++ b/docs/api/StreamShutdown.md @@ -54,6 +54,10 @@ The stream can also be gracefully shutdown via the `QUIC_SEND_FLAG_FIN` flag. Se Any stream (even one that hasn't been started) may be called to shutdown. If the stream has not been started yet, then the shutdown is effectively queued. If the app never calls [StreamStart](StreamStart.md) then the shutdown will never been sent out on the wire. +# Reliable Reset + +If an app decides to enable Preview Features, the shutdown path can be configured with the QUIC_PARAM_STREAM_RELIABLE_OFFSET Stream parameter, which determines the number of bytes a sender must deliver before it can shut down their SEND path. + # See Also [StreamOpen](StreamOpen.md)
diff --git a/scripts/test.ps1 b/scripts/test.ps1 index c0d23f56f7..46c0a4e13d 100644 --- a/scripts/test.ps1 +++ b/scripts/test.ps1 @@ -67,6 +67,9 @@ This script runs the MsQuic tests. .Parameter DuoNic Uses DuoNic instead of loopback (DuoNic must already be installed via 'prepare-machine.ps1 -InstallDuoNic'). +.Parameter NumIterations + Number of times to run this particular command. Catches tricky edge cases due to random nature of networks. + .EXAMPLE test.ps1 @@ -85,6 +88,8 @@ This script runs the MsQuic tests. .EXAMPLE test.ps1 -LogProfile Full.Verbose -Compress +.EXAMPLE + test.ps1 -Filter ParameterValidation* -NumIterations 10 #> param ( @@ -172,7 +177,10 @@ param ( [switch]$UseQtip = $false, [Parameter(Mandatory = $false)] - [string]$OsRunner = "" + [string]$OsRunner = "", + + [Parameter(Mandatory = $false)] + [int]$NumIterations = 1 ) Set-StrictMode -Version 'Latest' @@ -344,12 +352,17 @@ if (![string]::IsNullOrWhiteSpace($ExtraArtifactDir)) { $TestArguments += " -ExtraArtifactDir $ExtraArtifactDir" } -# Run the script. -if (!$Kernel -and !$SkipUnitTests) { - Invoke-Expression ($RunTest + " -Path $MsQuicPlatTest " + $TestArguments) - Invoke-Expression ($RunTest + " -Path $MsQuicCoreTest " + $TestArguments) +for ($iteration = 1; $iteration -le $NumIterations; $iteration++) { + if ($NumIterations -gt 1) { + Write-Host "------- Iteration $iteration -------" + } + # Run the script. + if (!$Kernel -and !$SkipUnitTests) { + Invoke-Expression ($RunTest + " -Path $MsQuicPlatTest " + $TestArguments) + Invoke-Expression ($RunTest + " -Path $MsQuicCoreTest " + $TestArguments) + } + Invoke-Expression ($RunTest + " -Path $MsQuicTest " + $TestArguments) } -Invoke-Expression ($RunTest + " -Path $MsQuicTest " + $TestArguments) if ($CodeCoverage) { # Merge code coverage results diff --git a/src/core/connection.c b/src/core/connection.c index fb288af1c3..e4b1225f2c 100644 --- a/src/core/connection.c +++ b/src/core/connection.c @@ -4705,7 +4705,8 @@ QuicConnRecvFrames( case QUIC_FRAME_STREAM_6: case QUIC_FRAME_STREAM_7: case QUIC_FRAME_MAX_STREAM_DATA: - case QUIC_FRAME_STREAM_DATA_BLOCKED: { + case QUIC_FRAME_STREAM_DATA_BLOCKED: + case QUIC_FRAME_RELIABLE_RESET_STREAM: { if (Closed) { if (!QuicStreamFrameSkip( FrameType, PayloadLength, Payload, &Offset)) { @@ -5316,7 +5317,7 @@ QuicConnRecvFrames( case QUIC_FRAME_IMMEDIATE_ACK: // Always accept the frame, because we always enable support. AckImmediately = TRUE; break; - + case QUIC_FRAME_TIMESTAMP: { // Always accept the frame, because we always enable support. if (!Connection->State.TimestampRecvNegotiated) { QuicTraceEvent( @@ -5343,9 +5344,7 @@ QuicConnRecvFrames( Packet->SendTimestamp = Frame.Timestamp; break; } - - case QUIC_FRAME_RELIABLE_RESET_STREAM: - // TODO - Implement this frame. + default: // // No default case necessary, as we have already validated the frame diff --git a/src/core/frame.c b/src/core/frame.c index d706f681d8..7e1a923e11 100644 --- a/src/core/frame.c +++ b/src/core/frame.c @@ -2006,6 +2006,31 @@ QuicFrameLog( Frame.Timestamp); break; } + + case QUIC_FRAME_RELIABLE_RESET_STREAM: { + QUIC_RELIABLE_RESET_STREAM_EX Frame; + if (!QuicReliableResetFrameDecode(PacketLength, Packet, Offset, &Frame)) { + QuicTraceLogVerbose( + FrameLogReliableResetStreamInvalid, + "[%c][%cX][%llu] RELIABLE_RESET_STREAM [Invalid]", + PtkConnPre(Connection), + PktRxPre(Rx), + PacketNumber); + return FALSE; + } + + QuicTraceLogVerbose( + FrameLogReliableResetStream, + "[%c][%cX][%llu] RELIABLE_RESET_STREAM ID:%llu ErrorCode:0x%llX FinalSize:%llu ReliableSize:%llu", + PtkConnPre(Connection), + PktRxPre(Rx), + PacketNumber, + Frame.StreamID, + Frame.ErrorCode, + Frame.FinalSize, + Frame.ReliableSize); + break; + } default: CXPLAT_FRE_ASSERT(FALSE); diff --git a/src/core/loss_detection.c b/src/core/loss_detection.c index 1cd29899d1..643168146b 100644 --- a/src/core/loss_detection.c +++ b/src/core/loss_detection.c @@ -543,6 +543,10 @@ QuicLossDetectionOnPacketAcknowledged( QuicStreamOnResetAck(Packet->Frames[i].RESET_STREAM.Stream); break; + case QUIC_FRAME_RELIABLE_RESET_STREAM: + QuicStreamOnResetReliableAck(Packet->Frames[i].RELIABLE_RESET_STREAM.Stream); + break; + case QUIC_FRAME_CRYPTO: QuicCryptoOnAck(&Connection->Crypto, &Packet->Frames[i]); break; @@ -699,6 +703,15 @@ QuicLossDetectionRetransmitFrames( FALSE); break; + case QUIC_FRAME_RELIABLE_RESET_STREAM: + NewDataQueued |= + QuicSendSetStreamSendFlag( + &Connection->Send, + Packet->Frames[i].RELIABLE_RESET_STREAM.Stream, + QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT, + FALSE); + break; + case QUIC_FRAME_STOP_SENDING: NewDataQueued |= QuicSendSetStreamSendFlag( diff --git a/src/core/send.c b/src/core/send.c index a0b4714461..0fcc615c7a 100644 --- a/src/core/send.c +++ b/src/core/send.c @@ -387,6 +387,7 @@ QuicSendSetStreamSendFlag( if (Stream->Flags.LocalCloseAcked) { SendFlags &= ~(QUIC_STREAM_SEND_FLAG_SEND_ABORT | + QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT | QUIC_STREAM_SEND_FLAG_DATA_BLOCKED | QUIC_STREAM_SEND_FLAG_DATA | QUIC_STREAM_SEND_FLAG_OPEN | diff --git a/src/core/send.h b/src/core/send.h index 24ea97761a..5ccb148775 100644 --- a/src/core/send.h +++ b/src/core/send.h @@ -195,13 +195,14 @@ QuicPacketTypeToEncryptLevelV2( // stream. The order reflects the order the data is framed into a packet. // -#define QUIC_STREAM_SEND_FLAG_DATA_BLOCKED 0x0001U -#define QUIC_STREAM_SEND_FLAG_MAX_DATA 0x0002U -#define QUIC_STREAM_SEND_FLAG_SEND_ABORT 0x0004U -#define QUIC_STREAM_SEND_FLAG_RECV_ABORT 0x0008U -#define QUIC_STREAM_SEND_FLAG_DATA 0x0010U -#define QUIC_STREAM_SEND_FLAG_OPEN 0x0020U -#define QUIC_STREAM_SEND_FLAG_FIN 0x0040U +#define QUIC_STREAM_SEND_FLAG_DATA_BLOCKED 0x0001U +#define QUIC_STREAM_SEND_FLAG_MAX_DATA 0x0002U +#define QUIC_STREAM_SEND_FLAG_SEND_ABORT 0x0004U +#define QUIC_STREAM_SEND_FLAG_RECV_ABORT 0x0008U +#define QUIC_STREAM_SEND_FLAG_DATA 0x0010U +#define QUIC_STREAM_SEND_FLAG_OPEN 0x0020U +#define QUIC_STREAM_SEND_FLAG_FIN 0x0040U +#define QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT 0x0080U #define QUIC_STREAM_SEND_FLAGS_ALL 0xFFFFU @@ -211,7 +212,8 @@ inline BOOLEAN HasStreamControlFrames(uint32_t Flags) (QUIC_STREAM_SEND_FLAG_DATA_BLOCKED | QUIC_STREAM_SEND_FLAG_MAX_DATA | QUIC_STREAM_SEND_FLAG_SEND_ABORT | - QUIC_STREAM_SEND_FLAG_RECV_ABORT); + QUIC_STREAM_SEND_FLAG_RECV_ABORT | + QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT); } inline BOOLEAN HasStreamDataFrames(uint32_t Flags) diff --git a/src/core/sent_packet_metadata.c b/src/core/sent_packet_metadata.c index 17b48f04fb..ca3567eb3c 100644 --- a/src/core/sent_packet_metadata.c +++ b/src/core/sent_packet_metadata.c @@ -46,6 +46,9 @@ QuicSentPacketMetadataReleaseFrames( case QUIC_FRAME_STREAM: QuicStreamSentMetadataDecrement(Metadata->Frames[i].STREAM.Stream); break; + case QUIC_FRAME_RELIABLE_RESET_STREAM: + QuicStreamSentMetadataDecrement(Metadata->Frames[i].RELIABLE_RESET_STREAM.Stream); + break; #pragma warning(pop) default: // diff --git a/src/core/sent_packet_metadata.h b/src/core/sent_packet_metadata.h index 669d1d9fca..88b899df3e 100644 --- a/src/core/sent_packet_metadata.h +++ b/src/core/sent_packet_metadata.h @@ -27,6 +27,9 @@ typedef struct QUIC_SENT_FRAME_METADATA { struct { QUIC_STREAM* Stream; } RESET_STREAM; + struct { + QUIC_STREAM* Stream; + } RELIABLE_RESET_STREAM; struct { QUIC_STREAM* Stream; } STOP_SENDING; diff --git a/src/core/stream.c b/src/core/stream.c index cefa45b56e..7183a9626a 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -574,6 +574,13 @@ QuicStreamShutdown( // and shutdown complete events now, if they haven't already been // delivered. // + if (Stream->Flags.RemoteCloseResetReliable || Stream->Flags.LocalCloseResetReliable) { + QuicTraceLogStreamWarning( + ShutdownImmediatePendingReliableReset, + Stream, + "Invalid immediate shutdown request (pending reliable reset)."); + return; + } QuicStreamIndicateSendShutdownComplete(Stream, FALSE); QuicStreamIndicateShutdownComplete(Stream); } @@ -633,7 +640,7 @@ QuicStreamParamSet( case QUIC_PARAM_STREAM_PRIORITY: { - if (BufferLength != sizeof(Stream->SendPriority)) { + if (BufferLength != sizeof(Stream->SendPriority) || Buffer == NULL) { Status = QUIC_STATUS_INVALID_PARAMETER; break; } @@ -659,6 +666,58 @@ QuicStreamParamSet( break; } + case QUIC_PARAM_STREAM_RELIABLE_OFFSET: + + if (BufferLength != sizeof(uint64_t) || Buffer == NULL) { + Status = QUIC_STATUS_INVALID_PARAMETER; + break; + } + + if (!Stream->Connection->State.ReliableResetStreamNegotiated || + *(uint64_t*)Buffer > Stream->QueuedSendOffset) { + Status = QUIC_STATUS_INVALID_STATE; + break; + } + + if (Stream->Flags.LocalCloseReset) { + Status = QUIC_STATUS_INVALID_STATE; + break; + } + + if (!Stream->Flags.LocalCloseResetReliable) { + // + // We haven't called shutdown reliable yet. App can set ReliableOffsetSend to be whatever. + // + Stream->ReliableOffsetSend = *(uint64_t*)Buffer; + } else if (*(uint64_t*)Buffer < Stream->ReliableOffsetSend) { + // + // TODO - Determine if we need to support this feature in future iterations. + // + // We have previously called shutdown reliable. + // Now we are loosening the conditions of the ReliableReset, + // but we have already sent the peer a stale frame, we must retransmit + // this new frame, and update the metadata. + // + QuicTraceLogStreamInfo( + MultipleReliableResetSendNotSupported, + Stream, + "Multiple RELIABLE_RESET frames sending not supported."); + Status = QUIC_STATUS_INVALID_STATE; + break; + } else { + Status = QUIC_STATUS_INVALID_STATE; + break; + } + + QuicTraceLogStreamInfo( + ReliableSendOffsetSet, + Stream, + "Reliable send offset set to %llu", + *(uint64_t*)Buffer); + + Status = QUIC_STATUS_SUCCESS; + break; + default: Status = QUIC_STATUS_INVALID_PARAMETER; break; @@ -838,6 +897,44 @@ QuicStreamParamGet( Status = QUIC_STATUS_SUCCESS; break; } + + case QUIC_PARAM_STREAM_RELIABLE_OFFSET: + if (*BufferLength < sizeof(uint64_t)) { + *BufferLength = sizeof(uint64_t); + Status = QUIC_STATUS_BUFFER_TOO_SMALL; + break; + } + if (Buffer == NULL) { + Status = QUIC_STATUS_INVALID_PARAMETER; + break; + } + if (Stream->ReliableOffsetSend == 0) { + Status = QUIC_STATUS_INVALID_STATE; + break; + } + *(uint64_t*) Buffer = Stream->ReliableOffsetSend; + Status = QUIC_STATUS_SUCCESS; + break; + + case QUIC_PARAM_STREAM_RELIABLE_OFFSET_RECV: + if (*BufferLength < sizeof(uint64_t)) { + *BufferLength = sizeof(uint64_t); + Status = QUIC_STATUS_BUFFER_TOO_SMALL; + break; + } + if (Buffer == NULL) { + Status = QUIC_STATUS_INVALID_PARAMETER; + break; + } + if (!Stream->Flags.RemoteCloseResetReliable) { + Status = QUIC_STATUS_INVALID_STATE; + break; + } + + *(uint64_t*)Buffer = Stream->RecvMaxLength; + Status = QUIC_STATUS_SUCCESS; + break; + default: Status = QUIC_STATUS_INVALID_PARAMETER; break; diff --git a/src/core/stream.h b/src/core/stream.h index 3c35daaafe..263c270647 100644 --- a/src/core/stream.h +++ b/src/core/stream.h @@ -122,6 +122,9 @@ typedef union QUIC_STREAM_FLAGS { BOOLEAN LocalNotAllowed : 1; // Peer's unidirectional stream. BOOLEAN LocalCloseFin : 1; // Locally closed (graceful). BOOLEAN LocalCloseReset : 1; // Locally closed (locally aborted). + BOOLEAN LocalCloseResetReliable : 1; // Indicates that we should shutdown the send path once we sent/ACK'd ReliableOffsetSend bytes. + BOOLEAN LocalCloseResetReliableAcked : 1; // Indicates the peer has acknowledged we will stop sending once we sent/ACK'd ReliableOffsetSend bytes. + BOOLEAN RemoteCloseResetReliable : 1; // Indicates that the peer initiaited a reliable reset. Keep Recv path available for RecvMaxLength bytes. BOOLEAN ReceivedStopSending : 1; // Peer sent STOP_SENDING frame. BOOLEAN LocalCloseAcked : 1; // Any close acknowledged. BOOLEAN FinAcked : 1; // Our FIN was acknowledged. @@ -165,7 +168,9 @@ typedef enum QUIC_STREAM_SEND_STATE { QUIC_STREAM_SEND_RESET, QUIC_STREAM_SEND_RESET_ACKED, QUIC_STREAM_SEND_FIN, - QUIC_STREAM_SEND_FIN_ACKED + QUIC_STREAM_SEND_FIN_ACKED, + QUIC_STREAM_SEND_RELIABLE_RESET, + QUIC_STREAM_SEND_RELIABLE_RESET_ACKED } QUIC_STREAM_SEND_STATE; typedef enum QUIC_STREAM_RECV_STATE { @@ -174,7 +179,8 @@ typedef enum QUIC_STREAM_RECV_STATE { QUIC_STREAM_RECV_PAUSED, QUIC_STREAM_RECV_STOPPED, QUIC_STREAM_RECV_RESET, - QUIC_STREAM_RECV_FIN + QUIC_STREAM_RECV_FIN, + QUIC_STREAM_RECV_RELIABLE_RESET } QUIC_STREAM_RECV_STATE; // @@ -345,6 +351,12 @@ typedef struct QUIC_STREAM { // uint64_t RecoveryNextOffset; uint64_t RecoveryEndOffset; + + // + // If > 0, bytes up to offset must be re-transmitted and ACK'd from peer before we can abort this stream. + // + uint64_t ReliableOffsetSend; + #define RECOV_WINDOW_OPEN(S) ((S)->RecoveryNextOffset < (S)->RecoveryEndOffset) // @@ -448,6 +460,12 @@ QuicStreamSendGetState( { if (Stream->Flags.LocalNotAllowed) { return QUIC_STREAM_SEND_DISABLED; + } else if (Stream->Flags.LocalCloseResetReliable) { + if (Stream->Flags.LocalCloseResetReliableAcked) { + return QUIC_STREAM_SEND_RELIABLE_RESET_ACKED; + } else { + return QUIC_STREAM_SEND_RELIABLE_RESET; + } } else if (Stream->Flags.LocalCloseAcked) { if (Stream->Flags.FinAcked) { return QUIC_STREAM_SEND_FIN_ACKED; @@ -473,6 +491,8 @@ QuicStreamRecvGetState( return QUIC_STREAM_RECV_DISABLED; } else if (Stream->Flags.RemoteCloseReset) { return QUIC_STREAM_RECV_RESET; + } else if (Stream->Flags.RemoteCloseResetReliable) { + return QUIC_STREAM_RECV_RELIABLE_RESET; } else if (Stream->Flags.RemoteCloseFin) { return QUIC_STREAM_RECV_FIN; } else if (Stream->Flags.SentStopSending) { @@ -902,6 +922,24 @@ QuicStreamOnResetAck( _In_ QUIC_STREAM* Stream ); +// +// Called when an ACK is received for a RELIABLE_RESET frame we sent. +// +_IRQL_requires_max_(PASSIVE_LEVEL) +void +QuicStreamOnResetReliableAck( + _In_ QUIC_STREAM* Stream + ); + +// +// Cleanups up state once we finish processing a RELIABLE_RESET frame. +// +_IRQL_requires_max_(PASSIVE_LEVEL) +void +QuicStreamCleanupReliableReset( + _In_ QUIC_STREAM* Stream + ); + // // Dumps send state to the logs. // diff --git a/src/core/stream_recv.c b/src/core/stream_recv.c index 028bf393d0..a50ea255cd 100644 --- a/src/core/stream_recv.c +++ b/src/core/stream_recv.c @@ -162,6 +162,86 @@ QuicStreamRecvQueueFlush( } } +// +// Deliver a notification to the app that the peer has aborted their send path. +// +_IRQL_requires_max_(PASSIVE_LEVEL) +void +QuicStreamIndicatePeerSendAbortedEvent( + _In_ QUIC_STREAM* Stream, + _In_ QUIC_VAR_INT ErrorCode + ) +{ + QuicTraceLogStreamInfo( + RemoteCloseReset, + Stream, + "Closed remotely (reset)"); + QUIC_STREAM_EVENT Event; + Event.Type = QUIC_STREAM_EVENT_PEER_SEND_ABORTED; + Event.PEER_SEND_ABORTED.ErrorCode = ErrorCode; + QuicTraceLogStreamVerbose( + IndicatePeerSendAbort, + Stream, + "Indicating QUIC_STREAM_EVENT_PEER_SEND_ABORTED (0x%llX)", + ErrorCode); + (void)QuicStreamIndicateEvent(Stream, &Event); +} + +// +// Processes a received RELIABLE_RESET frame's payload. +// +_IRQL_requires_max_(PASSIVE_LEVEL) +void +QuicStreamProcessReliableResetFrame( + _In_ QUIC_STREAM* Stream, + _In_ QUIC_VAR_INT ErrorCode, + _In_ QUIC_VAR_INT ReliableOffset + ) +{ + if (!Stream->Connection->State.ReliableResetStreamNegotiated) { + // + // The peer tried to use an exprimental feature without + // negotiating first. Kill the connection. + // + QuicTraceLogStreamWarning( + ReliableResetNotNegotiatedError, + Stream, + "Received ReliableReset without negotiation."); + QuicConnTransportError(Stream->Connection, QUIC_ERROR_TRANSPORT_PARAMETER_ERROR); + return; + } + + if (Stream->RecvMaxLength == 0 || ReliableOffset < Stream->RecvMaxLength) { + // + // As outlined in the spec, if we receive multiple CLOSE_STREAM frames, we only accept strictly + // decreasing offsets. + // + Stream->RecvMaxLength = ReliableOffset; + Stream->Flags.RemoteCloseResetReliable = TRUE; + + QuicTraceLogStreamInfo( + ReliableRecvOffsetSet, + Stream, + "Reliable recv offset set to %llu", + ReliableOffset); + } + + if (Stream->RecvBuffer.BaseOffset >= Stream->RecvMaxLength) { + QuicTraceEvent( + StreamRecvState, + "[strm][%p] Recv State: %hhu", + Stream, + QuicStreamRecvGetState(Stream)); + QuicStreamIndicatePeerSendAbortedEvent(Stream, ErrorCode); + QuicStreamRecvShutdown(Stream, TRUE, ErrorCode); + } else { + // + // We still have data to deliver to the app, just cache the error code for later. + // + Stream->RecvShutdownErrorCode = ErrorCode; + } +} + // // Processes a received RESET_STREAM frame's payload. // @@ -241,20 +321,7 @@ QuicStreamProcessResetFrame( QuicStreamRecvGetState(Stream)); if (!Stream->Flags.SentStopSending) { - QuicTraceLogStreamInfo( - RemoteCloseReset, - Stream, - "Closed remotely (reset)"); - - QUIC_STREAM_EVENT Event; - Event.Type = QUIC_STREAM_EVENT_PEER_SEND_ABORTED; - Event.PEER_SEND_ABORTED.ErrorCode = ErrorCode; - QuicTraceLogStreamVerbose( - IndicatePeerSendAbort, - Stream, - "Indicating QUIC_STREAM_EVENT_PEER_SEND_ABORTED (0x%llX)", - ErrorCode); - (void)QuicStreamIndicateEvent(Stream, &Event); + QuicStreamIndicatePeerSendAbortedEvent(Stream, ErrorCode); } // @@ -380,9 +447,18 @@ QuicStreamProcessStreamFrame( goto Error; } - if (EndOffset > Stream->RecvMaxLength) { + if (Stream->Flags.RemoteCloseResetReliable) { + if (Stream->RecvBuffer.BaseOffset >= Stream->RecvMaxLength) { + // + // We've aborted reliably, but the stream goes past reliable offset, we can just + // ignore it. + // + Status = QUIC_STATUS_SUCCESS; + goto Error; + } + } else if (EndOffset > Stream->RecvMaxLength) { // - // Frame goes past the FIN. + // Frame goes past the FIN, and the stream is not reset reliably. // Status = QUIC_STATUS_INVALID_PARAMETER; goto Error; @@ -617,6 +693,20 @@ QuicStreamRecv( break; } + case QUIC_FRAME_RELIABLE_RESET_STREAM: { + QUIC_RELIABLE_RESET_STREAM_EX Frame; + if (!QuicReliableResetFrameDecode(BufferLength, Buffer, Offset, &Frame)) { + return QUIC_STATUS_INVALID_PARAMETER; + } + + QuicStreamProcessReliableResetFrame( + Stream, + Frame.ErrorCode, + Frame.ReliableSize); + + break; + } + default: // QUIC_FRAME_STREAM* { QUIC_STREAM_EX Frame; @@ -1060,6 +1150,18 @@ QuicStreamReceiveComplete( &Stream->Connection->Send, Stream, QUIC_STREAM_SEND_FLAG_MAX_DATA | QUIC_STREAM_SEND_FLAG_RECV_ABORT); + } else if (Stream->Flags.RemoteCloseResetReliable && Stream->RecvBuffer.BaseOffset >= Stream->RecvMaxLength) { + // + // ReliableReset was initiated by the peer, and we sent enough data to the app, we can alert the app + // we're done and shutdown the RECV direction of this stream. + // + QuicTraceEvent( + StreamRecvState, + "[strm][%p] Recv State: %hhu", + Stream, + QuicStreamRecvGetState(Stream)); + QuicStreamIndicatePeerSendAbortedEvent(Stream, Stream->RecvShutdownErrorCode); + QuicStreamRecvShutdown(Stream, TRUE, Stream->RecvShutdownErrorCode); } return FALSE; diff --git a/src/core/stream_send.c b/src/core/stream_send.c index 9fa05dbd83..042e6c7028 100644 --- a/src/core/stream_send.c +++ b/src/core/stream_send.c @@ -159,7 +159,11 @@ QuicStreamSendShutdown( QUIC_STREAM_SEND_FLAG_FIN, DelaySend); - } else { + } else if (Stream->ReliableOffsetSend == 0 || Stream->Flags.LocalCloseResetReliable) { + // + // Enter abortive branch if we are not aborting reliablely or we have done it already. + // Essentially, Reset trumps Reliable Reset, so if we have to call shutdown again, we reset. + // // // Can't be blocked by (stream) FC any more if we've aborted sending any @@ -230,11 +234,30 @@ QuicStreamSendShutdown( &Stream->Connection->Send, Stream, QUIC_STREAM_SEND_FLAG_DATA_BLOCKED | + QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT | QUIC_STREAM_SEND_FLAG_DATA | QUIC_STREAM_SEND_FLAG_OPEN | QUIC_STREAM_SEND_FLAG_FIN); } - } + } else { + if (Stream->Flags.LocalCloseReset) { + // + // We have already closed the stream (graceful or abortive) so we + // can't reliably abort it. + // + goto Exit; + } + Stream->Flags.LocalCloseResetReliable = TRUE; + Stream->SendShutdownErrorCode = ErrorCode; + // + // Queue up a RESET RELIABLE STREAM frame to be sent. We will clear up any flags later. + // + QuicSendSetStreamSendFlag( + &Stream->Connection->Send, + Stream, + QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT, + FALSE); +} QuicStreamSendDumpState(Stream); @@ -506,7 +529,6 @@ QuicStreamSendFlush( BOOLEAN Start = FALSE; while (ApiSendRequests != NULL) { - QUIC_SEND_REQUEST* SendRequest = ApiSendRequests; ApiSendRequests = ApiSendRequests->Next; SendRequest->Next = NULL; @@ -516,7 +538,7 @@ QuicStreamSendFlush( if (!Stream->Flags.SendEnabled) { // - // Only possible if they queue muliple sends, with a FIN flag set + // Only possible if they queue multiple sends, with a FIN flag set // NOT in the last one. // QuicStreamCompleteSendRequest(Stream, SendRequest, TRUE, FALSE); @@ -1069,7 +1091,6 @@ QuicStreamSendWrite( } if (Stream->SendFlags & QUIC_STREAM_SEND_FLAG_SEND_ABORT) { - QUIC_RESET_STREAM_EX Frame = { Stream->ID, Stream->SendShutdownErrorCode, Stream->MaxSentLength }; if (QuicResetStreamFrameEncode( @@ -1087,6 +1108,24 @@ QuicStreamSendWrite( } } + if (Stream->SendFlags & QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT) { + QUIC_RELIABLE_RESET_STREAM_EX Frame = { Stream->ID, Stream->SendShutdownErrorCode, Stream->MaxSentLength, Stream->ReliableOffsetSend }; + + if (QuicReliableResetFrameEncode( + &Frame, + &Builder->DatagramLength, + AvailableBufferLength, + Builder->Datagram->Buffer)) { + + Stream->SendFlags &= ~QUIC_STREAM_SEND_FLAG_RELIABLE_ABORT; + if (QuicPacketBuilderAddStreamFrame(Builder, Stream, QUIC_FRAME_RELIABLE_RESET_STREAM)) { + return TRUE; + } + } else { + RanOutOfRoom = TRUE; + } + } + if (Stream->SendFlags & QUIC_STREAM_SEND_FLAG_RECV_ABORT) { QUIC_STOP_SENDING_EX Frame = { Stream->ID, Stream->RecvShutdownErrorCode }; @@ -1177,6 +1216,14 @@ QuicStreamOnLoss( return FALSE; } + if (Stream->Flags.LocalCloseResetReliableAcked && Stream->UnAckedOffset >= Stream->ReliableOffsetSend) { + // + // Ignore any STREAM frame packet loss if we aborted reliably and + // received acks for enough data. + // + return FALSE; + } + uint32_t AddSendFlags = 0; uint64_t Start = FrameMetadata->StreamOffset; @@ -1477,6 +1524,23 @@ QuicStreamOnAck( } } + // + // If this stream has been reset reliably, we only close if we have received enough bytes. + // + const BOOLEAN ReliableResetShutdown = + !Stream->Flags.LocalCloseAcked && + Stream->Flags.LocalCloseResetReliableAcked && + Stream->UnAckedOffset >= Stream->ReliableOffsetSend; + if (ReliableResetShutdown) { + QuicTraceEvent( + StreamSendState, + "[strm][%p] Send State: %hhu", + Stream, + QuicStreamSendGetState(Stream)); + QuicStreamCleanupReliableReset(Stream); + QuicStreamSendShutdown(Stream, FALSE, TRUE, FALSE, Stream->SendShutdownErrorCode); + } + if (!QuicStreamHasPendingStreamData(Stream)) { // // Make sure the stream isn't queued to send any stream data. @@ -1495,6 +1559,27 @@ QuicStreamOnAck( QuicStreamValidateRecoveryState(Stream); } +_IRQL_requires_max_(PASSIVE_LEVEL) +void +QuicStreamCleanupReliableReset( + _In_ QUIC_STREAM* Stream + ) +{ + // + // Cleanup. Cancels all send requests with offsets after ReliableOffsetSend. + // Assume this function gets called only when we're about to close the SEND + // Path of this stream once sufficient data has been received and ACK'd by the peer. + // + while (Stream->SendRequests) { + QUIC_SEND_REQUEST* Req = Stream->SendRequests; + Stream->SendRequests = Req->Next; + if (Stream->SendRequests == NULL) { + Stream->SendRequestsTail = &Stream->SendRequests; + } + QuicStreamCompleteSendRequest(Stream, Req, FALSE, TRUE); + } +} + _IRQL_requires_max_(PASSIVE_LEVEL) void QuicStreamOnResetAck( @@ -1513,6 +1598,37 @@ QuicStreamOnResetAck( } } +_IRQL_requires_max_(PASSIVE_LEVEL) +void +QuicStreamOnResetReliableAck( + _In_ QUIC_STREAM* Stream + ) +{ + QuicTraceLogStreamVerbose( + ResetReliableAck, + Stream, + "Reset Reliable ACKed in OnResetReliableAck. Send side. UnAckedOffset=%llu, ReliableOffsetSend=%llu", + Stream->UnAckedOffset, Stream->ReliableOffsetSend); + + if (Stream->UnAckedOffset >= Stream->ReliableOffsetSend && !Stream->Flags.LocalCloseAcked) { + Stream->Flags.LocalCloseResetReliableAcked = TRUE; + QuicTraceEvent( + StreamSendState, + "[strm][%p] Send State: %hhu", + Stream, + QuicStreamSendGetState(Stream)); + QuicStreamCleanupReliableReset(Stream); + QuicStreamSendShutdown(Stream, FALSE, TRUE, FALSE, Stream->SendShutdownErrorCode); + } else { + QuicTraceEvent( + StreamSendState, + "[strm][%p] Send State: %hhu", + Stream, + QuicStreamSendGetState(Stream)); + Stream->Flags.LocalCloseResetReliableAcked = TRUE; + } +} + _IRQL_requires_max_(PASSIVE_LEVEL) void QuicStreamSendDumpState( diff --git a/src/core/stream_set.c b/src/core/stream_set.c index da85ef12f4..5d37a8a15c 100644 --- a/src/core/stream_set.c +++ b/src/core/stream_set.c @@ -160,7 +160,6 @@ QuicStreamSetShutdown( CxPlatHashtableEnumerateBegin(StreamSet->StreamTable, &Enumerator); while ((Entry = CxPlatHashtableEnumerateNext(StreamSet->StreamTable, &Enumerator)) != NULL) { QUIC_STREAM* Stream = CXPLAT_CONTAINING_RECORD(Entry, QUIC_STREAM, TableEntry); - QuicStreamShutdown( Stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND | diff --git a/src/core/unittest/main.h b/src/core/unittest/main.h index 1ad53dacd6..b0c604e81e 100644 --- a/src/core/unittest/main.h +++ b/src/core/unittest/main.h @@ -87,6 +87,8 @@ std::ostream& operator << (std::ostream& o, const QUIC_FRAME_TYPE& type) { return o << "QUIC_FRAME_ACK_FREQUENCY"; case QUIC_FRAME_IMMEDIATE_ACK: return o << "QUIC_FRAME_IMMEDIATE_ACK"; + case QUIC_FRAME_RELIABLE_RESET_STREAM: + return o << "QUIC_FRAME_RELIABLE_RESET_STREAM"; default: return o << "UNRECOGNIZED_FRAME_TYPE(" << (uint32_t) type << ")"; } diff --git a/src/cs/lib/msquic_generated.cs b/src/cs/lib/msquic_generated.cs index 7d82104cb6..5f504f9628 100644 --- a/src/cs/lib/msquic_generated.cs +++ b/src/cs/lib/msquic_generated.cs @@ -3275,6 +3275,9 @@ internal static unsafe partial class MsQuic [NativeTypeName("#define QUIC_PARAM_STREAM_STATISTICS 0X08000004")] internal const uint QUIC_PARAM_STREAM_STATISTICS = 0X08000004; + [NativeTypeName("#define QUIC_PARAM_STREAM_RELIABLE_OFFSET 0x08000005")] + internal const uint QUIC_PARAM_STREAM_RELIABLE_OFFSET = 0x08000005; + [NativeTypeName("#define QUIC_API_VERSION_2 2")] internal const uint QUIC_API_VERSION_2 = 2; } diff --git a/src/generated/linux/frame.c.clog.h b/src/generated/linux/frame.c.clog.h index 2e2c70ffdb..99477a4200 100644 --- a/src/generated/linux/frame.c.clog.h +++ b/src/generated/linux/frame.c.clog.h @@ -1271,6 +1271,58 @@ tracepoint(CLOG_FRAME_C, FrameLogTimestamp , arg2, arg3, arg4, arg5);\ +/*---------------------------------------------------------- +// Decoder Ring for FrameLogReliableResetStreamInvalid +// [%c][%cX][%llu] RELIABLE_RESET_STREAM [Invalid] +// QuicTraceLogVerbose( + FrameLogReliableResetStreamInvalid, + "[%c][%cX][%llu] RELIABLE_RESET_STREAM [Invalid]", + PtkConnPre(Connection), + PktRxPre(Rx), + PacketNumber); +// arg2 = arg2 = PtkConnPre(Connection) = arg2 +// arg3 = arg3 = PktRxPre(Rx) = arg3 +// arg4 = arg4 = PacketNumber = arg4 +----------------------------------------------------------*/ +#ifndef _clog_5_ARGS_TRACE_FrameLogReliableResetStreamInvalid +#define _clog_5_ARGS_TRACE_FrameLogReliableResetStreamInvalid(uniqueId, encoded_arg_string, arg2, arg3, arg4)\ +tracepoint(CLOG_FRAME_C, FrameLogReliableResetStreamInvalid , arg2, arg3, arg4);\ + +#endif + + + + +/*---------------------------------------------------------- +// Decoder Ring for FrameLogReliableResetStream +// [%c][%cX][%llu] RELIABLE_RESET_STREAM ID:%llu ErrorCode:0x%llX FinalSize:%llu ReliableSize:%llu +// QuicTraceLogVerbose( + FrameLogReliableResetStream, + "[%c][%cX][%llu] RELIABLE_RESET_STREAM ID:%llu ErrorCode:0x%llX FinalSize:%llu ReliableSize:%llu", + PtkConnPre(Connection), + PktRxPre(Rx), + PacketNumber, + Frame.StreamID, + Frame.ErrorCode, + Frame.FinalSize, + Frame.ReliableSize); +// arg2 = arg2 = PtkConnPre(Connection) = arg2 +// arg3 = arg3 = PktRxPre(Rx) = arg3 +// arg4 = arg4 = PacketNumber = arg4 +// arg5 = arg5 = Frame.StreamID = arg5 +// arg6 = arg6 = Frame.ErrorCode = arg6 +// arg7 = arg7 = Frame.FinalSize = arg7 +// arg8 = arg8 = Frame.ReliableSize = arg8 +----------------------------------------------------------*/ +#ifndef _clog_9_ARGS_TRACE_FrameLogReliableResetStream +#define _clog_9_ARGS_TRACE_FrameLogReliableResetStream(uniqueId, encoded_arg_string, arg2, arg3, arg4, arg5, arg6, arg7, arg8)\ +tracepoint(CLOG_FRAME_C, FrameLogReliableResetStream , arg2, arg3, arg4, arg5, arg6, arg7, arg8);\ + +#endif + + + + /*---------------------------------------------------------- // Decoder Ring for ConnError // [conn][%p] ERROR, %s. diff --git a/src/generated/linux/frame.c.clog.h.lttng.h b/src/generated/linux/frame.c.clog.h.lttng.h index b5c312b66f..bbb7dc77f1 100644 --- a/src/generated/linux/frame.c.clog.h.lttng.h +++ b/src/generated/linux/frame.c.clog.h.lttng.h @@ -1609,6 +1609,76 @@ TRACEPOINT_EVENT(CLOG_FRAME_C, FrameLogTimestamp, +/*---------------------------------------------------------- +// Decoder Ring for FrameLogReliableResetStreamInvalid +// [%c][%cX][%llu] RELIABLE_RESET_STREAM [Invalid] +// QuicTraceLogVerbose( + FrameLogReliableResetStreamInvalid, + "[%c][%cX][%llu] RELIABLE_RESET_STREAM [Invalid]", + PtkConnPre(Connection), + PktRxPre(Rx), + PacketNumber); +// arg2 = arg2 = PtkConnPre(Connection) = arg2 +// arg3 = arg3 = PktRxPre(Rx) = arg3 +// arg4 = arg4 = PacketNumber = arg4 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_FRAME_C, FrameLogReliableResetStreamInvalid, + TP_ARGS( + unsigned char, arg2, + unsigned char, arg3, + unsigned long long, arg4), + TP_FIELDS( + ctf_integer(unsigned char, arg2, arg2) + ctf_integer(unsigned char, arg3, arg3) + ctf_integer(uint64_t, arg4, arg4) + ) +) + + + +/*---------------------------------------------------------- +// Decoder Ring for FrameLogReliableResetStream +// [%c][%cX][%llu] RELIABLE_RESET_STREAM ID:%llu ErrorCode:0x%llX FinalSize:%llu ReliableSize:%llu +// QuicTraceLogVerbose( + FrameLogReliableResetStream, + "[%c][%cX][%llu] RELIABLE_RESET_STREAM ID:%llu ErrorCode:0x%llX FinalSize:%llu ReliableSize:%llu", + PtkConnPre(Connection), + PktRxPre(Rx), + PacketNumber, + Frame.StreamID, + Frame.ErrorCode, + Frame.FinalSize, + Frame.ReliableSize); +// arg2 = arg2 = PtkConnPre(Connection) = arg2 +// arg3 = arg3 = PktRxPre(Rx) = arg3 +// arg4 = arg4 = PacketNumber = arg4 +// arg5 = arg5 = Frame.StreamID = arg5 +// arg6 = arg6 = Frame.ErrorCode = arg6 +// arg7 = arg7 = Frame.FinalSize = arg7 +// arg8 = arg8 = Frame.ReliableSize = arg8 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_FRAME_C, FrameLogReliableResetStream, + TP_ARGS( + unsigned char, arg2, + unsigned char, arg3, + unsigned long long, arg4, + unsigned long long, arg5, + unsigned long long, arg6, + unsigned long long, arg7, + unsigned long long, arg8), + TP_FIELDS( + ctf_integer(unsigned char, arg2, arg2) + ctf_integer(unsigned char, arg3, arg3) + ctf_integer(uint64_t, arg4, arg4) + ctf_integer(uint64_t, arg5, arg5) + ctf_integer(uint64_t, arg6, arg6) + ctf_integer(uint64_t, arg7, arg7) + ctf_integer(uint64_t, arg8, arg8) + ) +) + + + /*---------------------------------------------------------- // Decoder Ring for ConnError // [conn][%p] ERROR, %s. diff --git a/src/generated/linux/stream.c.clog.h b/src/generated/linux/stream.c.clog.h index fed24b8ffe..c201623442 100644 --- a/src/generated/linux/stream.c.clog.h +++ b/src/generated/linux/stream.c.clog.h @@ -69,6 +69,24 @@ tracepoint(CLOG_STREAM_C, EventSilentDiscard , arg1);\ +/*---------------------------------------------------------- +// Decoder Ring for ShutdownImmediatePendingReliableReset +// [strm][%p] Invalid immediate shutdown request (pending reliable reset). +// QuicTraceLogStreamWarning( + ShutdownImmediatePendingReliableReset, + Stream, + "Invalid immediate shutdown request (pending reliable reset)."); +// arg1 = arg1 = Stream = arg1 +----------------------------------------------------------*/ +#ifndef _clog_3_ARGS_TRACE_ShutdownImmediatePendingReliableReset +#define _clog_3_ARGS_TRACE_ShutdownImmediatePendingReliableReset(uniqueId, arg1, encoded_arg_string)\ +tracepoint(CLOG_STREAM_C, ShutdownImmediatePendingReliableReset , arg1);\ + +#endif + + + + /*---------------------------------------------------------- // Decoder Ring for UpdatePriority // [strm][%p] New send priority = %hu @@ -89,6 +107,44 @@ tracepoint(CLOG_STREAM_C, UpdatePriority , arg1, arg3);\ +/*---------------------------------------------------------- +// Decoder Ring for MultipleReliableResetSendNotSupported +// [strm][%p] Multiple RELIABLE_RESET frames sending not supported. +// QuicTraceLogStreamInfo( + MultipleReliableResetSendNotSupported, + Stream, + "Multiple RELIABLE_RESET frames sending not supported."); +// arg1 = arg1 = Stream = arg1 +----------------------------------------------------------*/ +#ifndef _clog_3_ARGS_TRACE_MultipleReliableResetSendNotSupported +#define _clog_3_ARGS_TRACE_MultipleReliableResetSendNotSupported(uniqueId, arg1, encoded_arg_string)\ +tracepoint(CLOG_STREAM_C, MultipleReliableResetSendNotSupported , arg1);\ + +#endif + + + + +/*---------------------------------------------------------- +// Decoder Ring for ReliableSendOffsetSet +// [strm][%p] Reliable send offset set to %llu +// QuicTraceLogStreamInfo( + ReliableSendOffsetSet, + Stream, + "Reliable send offset set to %llu", + *(uint64_t*)Buffer); +// arg1 = arg1 = Stream = arg1 +// arg3 = arg3 = *(uint64_t*)Buffer = arg3 +----------------------------------------------------------*/ +#ifndef _clog_4_ARGS_TRACE_ReliableSendOffsetSet +#define _clog_4_ARGS_TRACE_ReliableSendOffsetSet(uniqueId, arg1, encoded_arg_string, arg3)\ +tracepoint(CLOG_STREAM_C, ReliableSendOffsetSet , arg1, arg3);\ + +#endif + + + + /*---------------------------------------------------------- // Decoder Ring for ConfiguredForDelayedIDFC // [strm][%p] Configured for delayed ID FC updates diff --git a/src/generated/linux/stream.c.clog.h.lttng.h b/src/generated/linux/stream.c.clog.h.lttng.h index d0457dd5aa..b7796f6d6d 100644 --- a/src/generated/linux/stream.c.clog.h.lttng.h +++ b/src/generated/linux/stream.c.clog.h.lttng.h @@ -39,6 +39,25 @@ TRACEPOINT_EVENT(CLOG_STREAM_C, EventSilentDiscard, +/*---------------------------------------------------------- +// Decoder Ring for ShutdownImmediatePendingReliableReset +// [strm][%p] Invalid immediate shutdown request (pending reliable reset). +// QuicTraceLogStreamWarning( + ShutdownImmediatePendingReliableReset, + Stream, + "Invalid immediate shutdown request (pending reliable reset)."); +// arg1 = arg1 = Stream = arg1 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_STREAM_C, ShutdownImmediatePendingReliableReset, + TP_ARGS( + const void *, arg1), + TP_FIELDS( + ctf_integer_hex(uint64_t, arg1, arg1) + ) +) + + + /*---------------------------------------------------------- // Decoder Ring for UpdatePriority // [strm][%p] New send priority = %hu @@ -62,6 +81,48 @@ TRACEPOINT_EVENT(CLOG_STREAM_C, UpdatePriority, +/*---------------------------------------------------------- +// Decoder Ring for MultipleReliableResetSendNotSupported +// [strm][%p] Multiple RELIABLE_RESET frames sending not supported. +// QuicTraceLogStreamInfo( + MultipleReliableResetSendNotSupported, + Stream, + "Multiple RELIABLE_RESET frames sending not supported."); +// arg1 = arg1 = Stream = arg1 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_STREAM_C, MultipleReliableResetSendNotSupported, + TP_ARGS( + const void *, arg1), + TP_FIELDS( + ctf_integer_hex(uint64_t, arg1, arg1) + ) +) + + + +/*---------------------------------------------------------- +// Decoder Ring for ReliableSendOffsetSet +// [strm][%p] Reliable send offset set to %llu +// QuicTraceLogStreamInfo( + ReliableSendOffsetSet, + Stream, + "Reliable send offset set to %llu", + *(uint64_t*)Buffer); +// arg1 = arg1 = Stream = arg1 +// arg3 = arg3 = *(uint64_t*)Buffer = arg3 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_STREAM_C, ReliableSendOffsetSet, + TP_ARGS( + const void *, arg1, + unsigned long long, arg3), + TP_FIELDS( + ctf_integer_hex(uint64_t, arg1, arg1) + ctf_integer(uint64_t, arg3, arg3) + ) +) + + + /*---------------------------------------------------------- // Decoder Ring for ConfiguredForDelayedIDFC // [strm][%p] Configured for delayed ID FC updates diff --git a/src/generated/linux/stream_recv.c.clog.h b/src/generated/linux/stream_recv.c.clog.h index 89e31fe828..57e768e3f0 100644 --- a/src/generated/linux/stream_recv.c.clog.h +++ b/src/generated/linux/stream_recv.c.clog.h @@ -33,6 +33,24 @@ #ifdef __cplusplus extern "C" { #endif +/*---------------------------------------------------------- +// Decoder Ring for ReliableResetNotNegotiatedError +// [strm][%p] Received ReliableReset without negotiation. +// QuicTraceLogStreamWarning( + ReliableResetNotNegotiatedError, + Stream, + "Received ReliableReset without negotiation."); +// arg1 = arg1 = Stream = arg1 +----------------------------------------------------------*/ +#ifndef _clog_3_ARGS_TRACE_ReliableResetNotNegotiatedError +#define _clog_3_ARGS_TRACE_ReliableResetNotNegotiatedError(uniqueId, arg1, encoded_arg_string)\ +tracepoint(CLOG_STREAM_RECV_C, ReliableResetNotNegotiatedError , arg1);\ + +#endif + + + + /*---------------------------------------------------------- // Decoder Ring for ResetEarly // [strm][%p] Tried to reset at earlier final size! @@ -109,9 +127,9 @@ tracepoint(CLOG_STREAM_RECV_C, ReceiveBeyondFlowControl , arg1);\ // Decoder Ring for RemoteCloseReset // [strm][%p] Closed remotely (reset) // QuicTraceLogStreamInfo( - RemoteCloseReset, - Stream, - "Closed remotely (reset)"); + RemoteCloseReset, + Stream, + "Closed remotely (reset)"); // arg1 = arg1 = Stream = arg1 ----------------------------------------------------------*/ #ifndef _clog_3_ARGS_TRACE_RemoteCloseReset @@ -123,6 +141,26 @@ tracepoint(CLOG_STREAM_RECV_C, RemoteCloseReset , arg1);\ +/*---------------------------------------------------------- +// Decoder Ring for ReliableRecvOffsetSet +// [strm][%p] Reliable recv offset set to %llu +// QuicTraceLogStreamInfo( + ReliableRecvOffsetSet, + Stream, + "Reliable recv offset set to %llu", + ReliableOffset); +// arg1 = arg1 = Stream = arg1 +// arg3 = arg3 = ReliableOffset = arg3 +----------------------------------------------------------*/ +#ifndef _clog_4_ARGS_TRACE_ReliableRecvOffsetSet +#define _clog_4_ARGS_TRACE_ReliableRecvOffsetSet(uniqueId, arg1, encoded_arg_string, arg3)\ +tracepoint(CLOG_STREAM_RECV_C, ReliableRecvOffsetSet , arg1, arg3);\ + +#endif + + + + /*---------------------------------------------------------- // Decoder Ring for LocalCloseStopSending // [strm][%p] Closed locally (stop sending) @@ -181,10 +219,10 @@ tracepoint(CLOG_STREAM_RECV_C, QueueRecvFlush , arg1);\ // Decoder Ring for IndicatePeerSendAbort // [strm][%p] Indicating QUIC_STREAM_EVENT_PEER_SEND_ABORTED (0x%llX) // QuicTraceLogStreamVerbose( - IndicatePeerSendAbort, - Stream, - "Indicating QUIC_STREAM_EVENT_PEER_SEND_ABORTED (0x%llX)", - ErrorCode); + IndicatePeerSendAbort, + Stream, + "Indicating QUIC_STREAM_EVENT_PEER_SEND_ABORTED (0x%llX)", + ErrorCode); // arg1 = arg1 = Stream = arg1 // arg3 = arg3 = ErrorCode = arg3 ----------------------------------------------------------*/ diff --git a/src/generated/linux/stream_recv.c.clog.h.lttng.h b/src/generated/linux/stream_recv.c.clog.h.lttng.h index a0d85ab99a..87d5cf587b 100644 --- a/src/generated/linux/stream_recv.c.clog.h.lttng.h +++ b/src/generated/linux/stream_recv.c.clog.h.lttng.h @@ -1,6 +1,25 @@ +/*---------------------------------------------------------- +// Decoder Ring for ReliableResetNotNegotiatedError +// [strm][%p] Received ReliableReset without negotiation. +// QuicTraceLogStreamWarning( + ReliableResetNotNegotiatedError, + Stream, + "Received ReliableReset without negotiation."); +// arg1 = arg1 = Stream = arg1 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, ReliableResetNotNegotiatedError, + TP_ARGS( + const void *, arg1), + TP_FIELDS( + ctf_integer_hex(uint64_t, arg1, arg1) + ) +) + + + /*---------------------------------------------------------- // Decoder Ring for ResetEarly // [strm][%p] Tried to reset at earlier final size! @@ -81,9 +100,9 @@ TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, ReceiveBeyondFlowControl, // Decoder Ring for RemoteCloseReset // [strm][%p] Closed remotely (reset) // QuicTraceLogStreamInfo( - RemoteCloseReset, - Stream, - "Closed remotely (reset)"); + RemoteCloseReset, + Stream, + "Closed remotely (reset)"); // arg1 = arg1 = Stream = arg1 ----------------------------------------------------------*/ TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, RemoteCloseReset, @@ -96,6 +115,29 @@ TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, RemoteCloseReset, +/*---------------------------------------------------------- +// Decoder Ring for ReliableRecvOffsetSet +// [strm][%p] Reliable recv offset set to %llu +// QuicTraceLogStreamInfo( + ReliableRecvOffsetSet, + Stream, + "Reliable recv offset set to %llu", + ReliableOffset); +// arg1 = arg1 = Stream = arg1 +// arg3 = arg3 = ReliableOffset = arg3 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, ReliableRecvOffsetSet, + TP_ARGS( + const void *, arg1, + unsigned long long, arg3), + TP_FIELDS( + ctf_integer_hex(uint64_t, arg1, arg1) + ctf_integer(uint64_t, arg3, arg3) + ) +) + + + /*---------------------------------------------------------- // Decoder Ring for LocalCloseStopSending // [strm][%p] Closed locally (stop sending) @@ -157,10 +199,10 @@ TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, QueueRecvFlush, // Decoder Ring for IndicatePeerSendAbort // [strm][%p] Indicating QUIC_STREAM_EVENT_PEER_SEND_ABORTED (0x%llX) // QuicTraceLogStreamVerbose( - IndicatePeerSendAbort, - Stream, - "Indicating QUIC_STREAM_EVENT_PEER_SEND_ABORTED (0x%llX)", - ErrorCode); + IndicatePeerSendAbort, + Stream, + "Indicating QUIC_STREAM_EVENT_PEER_SEND_ABORTED (0x%llX)", + ErrorCode); // arg1 = arg1 = Stream = arg1 // arg3 = arg3 = ErrorCode = arg3 ----------------------------------------------------------*/ diff --git a/src/generated/linux/stream_send.c.clog.h b/src/generated/linux/stream_send.c.clog.h index 94874bab6c..2c4508c60d 100644 --- a/src/generated/linux/stream_send.c.clog.h +++ b/src/generated/linux/stream_send.c.clog.h @@ -289,6 +289,27 @@ tracepoint(CLOG_STREAM_SEND_C, SendQueueDrained , arg1);\ +/*---------------------------------------------------------- +// Decoder Ring for ResetReliableAck +// [strm][%p] Reset Reliable ACKed in OnResetReliableAck. Send side. UnAckedOffset=%llu, ReliableOffsetSend=%llu +// QuicTraceLogStreamVerbose( + ResetReliableAck, + Stream, + "Reset Reliable ACKed in OnResetReliableAck. Send side. UnAckedOffset=%llu, ReliableOffsetSend=%llu", + Stream->UnAckedOffset, Stream->ReliableOffsetSend); +// arg1 = arg1 = Stream = arg1 +// arg3 = arg3 = Stream->UnAckedOffset = arg3 +// arg4 = arg4 = Stream->ReliableOffsetSend = arg4 +----------------------------------------------------------*/ +#ifndef _clog_5_ARGS_TRACE_ResetReliableAck +#define _clog_5_ARGS_TRACE_ResetReliableAck(uniqueId, arg1, encoded_arg_string, arg3, arg4)\ +tracepoint(CLOG_STREAM_SEND_C, ResetReliableAck , arg1, arg3, arg4);\ + +#endif + + + + /*---------------------------------------------------------- // Decoder Ring for SendDump // [strm][%p] SF:%hX FC:%llu QS:%llu MAX:%llu UNA:%llu NXT:%llu RECOV:%llu-%llu diff --git a/src/generated/linux/stream_send.c.clog.h.lttng.h b/src/generated/linux/stream_send.c.clog.h.lttng.h index edb1505670..f37ae2c3cb 100644 --- a/src/generated/linux/stream_send.c.clog.h.lttng.h +++ b/src/generated/linux/stream_send.c.clog.h.lttng.h @@ -308,6 +308,32 @@ TRACEPOINT_EVENT(CLOG_STREAM_SEND_C, SendQueueDrained, +/*---------------------------------------------------------- +// Decoder Ring for ResetReliableAck +// [strm][%p] Reset Reliable ACKed in OnResetReliableAck. Send side. UnAckedOffset=%llu, ReliableOffsetSend=%llu +// QuicTraceLogStreamVerbose( + ResetReliableAck, + Stream, + "Reset Reliable ACKed in OnResetReliableAck. Send side. UnAckedOffset=%llu, ReliableOffsetSend=%llu", + Stream->UnAckedOffset, Stream->ReliableOffsetSend); +// arg1 = arg1 = Stream = arg1 +// arg3 = arg3 = Stream->UnAckedOffset = arg3 +// arg4 = arg4 = Stream->ReliableOffsetSend = arg4 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_STREAM_SEND_C, ResetReliableAck, + TP_ARGS( + const void *, arg1, + unsigned long long, arg3, + unsigned long long, arg4), + TP_FIELDS( + ctf_integer_hex(uint64_t, arg1, arg1) + ctf_integer(uint64_t, arg3, arg3) + ctf_integer(uint64_t, arg4, arg4) + ) +) + + + /*---------------------------------------------------------- // Decoder Ring for SendDump // [strm][%p] SF:%hX FC:%llu QS:%llu MAX:%llu UNA:%llu NXT:%llu RECOV:%llu-%llu diff --git a/src/inc/msquic.h b/src/inc/msquic.h index 235acfef80..8a12585efd 100644 --- a/src/inc/msquic.h +++ b/src/inc/msquic.h @@ -929,6 +929,9 @@ typedef struct QUIC_SCHANNEL_CONTEXT_ATTRIBUTE_EX_W { #define QUIC_PARAM_STREAM_IDEAL_SEND_BUFFER_SIZE 0x08000002 // uint64_t - bytes #define QUIC_PARAM_STREAM_PRIORITY 0x08000003 // uint16_t - 0 (low) to 0xFFFF (high) - 0x7FFF (default) #define QUIC_PARAM_STREAM_STATISTICS 0X08000004 // QUIC_STREAM_STATISTICS +#ifdef QUIC_API_ENABLE_PREVIEW_FEATURES +#define QUIC_PARAM_STREAM_RELIABLE_OFFSET 0x08000005 // uint64_t +#endif typedef _IRQL_requires_max_(PASSIVE_LEVEL) diff --git a/src/inc/msquic.hpp b/src/inc/msquic.hpp index ee9efe5182..a0b22586d3 100644 --- a/src/inc/msquic.hpp +++ b/src/inc/msquic.hpp @@ -1441,6 +1441,40 @@ struct MsQuicStream { Statistics); } + #ifdef QUIC_API_ENABLE_PREVIEW_FEATURES + QUIC_STATUS + SetReliableOffset(_In_ uint64_t Offset) noexcept { + return + MsQuic->SetParam( + Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET, + sizeof(Offset), + &Offset); + } + + QUIC_STATUS + GetReliableOffset(_Out_ uint64_t* Offset) const noexcept { + uint32_t Size = sizeof(*Offset); + return + MsQuic->GetParam( + Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET, + &Size, + Offset); + } + + QUIC_STATUS + GetReliableOffsetRecv(_Out_ uint64_t* Offset) const noexcept { + uint32_t Size = sizeof(*Offset); + return + MsQuic->GetParam( + Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET_RECV, + &Size, + Offset); + } + #endif + QUIC_STATUS GetInitStatus() const noexcept { return InitStatus; } bool IsValid() const { return QUIC_SUCCEEDED(InitStatus); } MsQuicStream(const MsQuicStream& Other) = delete; diff --git a/src/inc/msquicp.h b/src/inc/msquicp.h index b9aa0a586c..99820321b7 100644 --- a/src/inc/msquicp.h +++ b/src/inc/msquicp.h @@ -134,6 +134,10 @@ typedef struct QUIC_PRIVATE_TRANSPORT_PARAMETER { #define QUIC_PARAM_CONN_KEEP_ALIVE_PADDING 0x85000003 // uint16_t #define QUIC_PARAM_CONN_DISABLE_VNE_TP_GENERATION 0x85000004 // BOOLEAN +#ifdef QUIC_API_ENABLE_PREVIEW_FEATURES +#define QUIC_PARAM_STREAM_RELIABLE_OFFSET_RECV 0x88000000 // uint64_t +#endif + #if defined(__cplusplus) } #endif diff --git a/src/manifest/MsQuicEtw.man b/src/manifest/MsQuicEtw.man index 3ee4917c47..83741b35f1 100644 --- a/src/manifest/MsQuicEtw.man +++ b/src/manifest/MsQuicEtw.man @@ -308,6 +308,14 @@ message="$(string.Enum.QUIC_STREAM_SEND_STATE.FIN_ACKED)" value="5" /> + + + + + + Generate() { ::std::vector list; - for (uint32_t Test = 0; Test < 8; ++Test) + for (uint32_t Test = 0; Test < 9; ++Test) list.push_back({ Test }); return list; } diff --git a/src/test/bin/winkernel/control.cpp b/src/test/bin/winkernel/control.cpp index df0fb17f1b..3f6285fa64 100644 --- a/src/test/bin/winkernel/control.cpp +++ b/src/test/bin/winkernel/control.cpp @@ -485,6 +485,8 @@ size_t QUIC_IOCTL_BUFFER_SIZES[] = sizeof(QUIC_RUN_FEATURE_NEGOTIATION), sizeof(QUIC_RUN_FEATURE_NEGOTIATION), 0, + 0, + 0, }; CXPLAT_STATIC_ASSERT( @@ -1363,6 +1365,14 @@ QuicTestCtlEvtIoDeviceControl( Params->FeatureNegotiationParams.ServerSupport, Params->FeatureNegotiationParams.ClientSupport)); break; + + case IOCTL_QUIC_RUN_STREAM_RELIABLE_RESET: + QuicTestCtlRun(QuicTestStreamReliableReset()); + break; + + case IOCTL_QUIC_RUN_STREAM_RELIABLE_RESET_MULTIPLE_SENDS: + QuicTestCtlRun(QuicTestStreamReliableResetMultipleSends()); + break; #endif case IOCTL_QUIC_RUN_STATELESS_RESET_KEY: diff --git a/src/test/lib/ApiTest.cpp b/src/test/lib/ApiTest.cpp index fc6f51076a..ae756083af 100644 --- a/src/test/lib/ApiTest.cpp +++ b/src/test/lib/ApiTest.cpp @@ -4344,13 +4344,13 @@ void QuicTest_QUIC_PARAM_CONN_ORIG_DEST_CID(MsQuicRegistration& Registration, Ms // uint32_t SizeOfBuffer = 8; uint8_t Buffer[8] = {0}; - uint8_t ZeroBuffer[8] = {0}; + uint8_t ZeroBuffer[8] = {0}; TestScopeLogger LogScope1("GetParam test success case"); TEST_QUIC_STATUS( - QUIC_STATUS_SUCCESS, + QUIC_STATUS_SUCCESS, Connection.GetParam( QUIC_PARAM_CONN_ORIG_DEST_CID, - &SizeOfBuffer, + &SizeOfBuffer, Buffer ) ) @@ -4368,10 +4368,10 @@ void QuicTest_QUIC_PARAM_CONN_ORIG_DEST_CID(MsQuicRegistration& Registration, Ms uint32_t SizeOfBuffer = 8; TestScopeLogger LogScope1("GetParam null buffer check"); TEST_QUIC_STATUS( - QUIC_STATUS_INVALID_PARAMETER, + QUIC_STATUS_INVALID_PARAMETER, Connection.GetParam( QUIC_PARAM_CONN_ORIG_DEST_CID, - &SizeOfBuffer, + &SizeOfBuffer, nullptr ) ) @@ -4389,10 +4389,10 @@ void QuicTest_QUIC_PARAM_CONN_ORIG_DEST_CID(MsQuicRegistration& Registration, Ms TestScopeLogger LogScope1("GetParam buffer too small check"); uint8_t Buffer[1]; TEST_QUIC_STATUS( - QUIC_STATUS_BUFFER_TOO_SMALL, + QUIC_STATUS_BUFFER_TOO_SMALL, Connection.GetParam( QUIC_PARAM_CONN_ORIG_DEST_CID, - &SizeOfBuffer, + &SizeOfBuffer, Buffer ) ) @@ -4408,18 +4408,18 @@ void QuicTest_QUIC_PARAM_CONN_ORIG_DEST_CID(MsQuicRegistration& Registration, Ms 4433)); uint32_t SizeOfBuffer = 100; uint8_t Buffer[100] = {0}; - uint8_t ZeroBuffer[100] = {0}; + uint8_t ZeroBuffer[100] = {0}; TestScopeLogger LogScope1("GetParam size of buffer bigger than needed"); TEST_QUIC_STATUS( - QUIC_STATUS_SUCCESS, + QUIC_STATUS_SUCCESS, Connection.GetParam( QUIC_PARAM_CONN_ORIG_DEST_CID, - &SizeOfBuffer, + &SizeOfBuffer, Buffer ) ) TEST_NOT_EQUAL(memcmp(Buffer, ZeroBuffer, sizeof(Buffer)), 0); - // + // // There is no way the CID written should be 100 bytes according to the RFC. // TEST_TRUE(SizeOfBuffer < 100); @@ -4436,19 +4436,19 @@ void QuicTest_QUIC_PARAM_CONN_ORIG_DEST_CID(MsQuicRegistration& Registration, Ms uint32_t SizeOfBuffer = 0; TestScopeLogger LogScope1("GetParam check OrigDestCID size with nullptr"); TEST_QUIC_STATUS( - QUIC_STATUS_BUFFER_TOO_SMALL, + QUIC_STATUS_BUFFER_TOO_SMALL, Connection.GetParam( QUIC_PARAM_CONN_ORIG_DEST_CID, - &SizeOfBuffer, + &SizeOfBuffer, nullptr ) ) TEST_TRUE(SizeOfBuffer >= 8); TEST_QUIC_STATUS( - QUIC_STATUS_INVALID_PARAMETER, + QUIC_STATUS_INVALID_PARAMETER, Connection.GetParam( QUIC_PARAM_CONN_ORIG_DEST_CID, - &SizeOfBuffer, + &SizeOfBuffer, nullptr ) ) @@ -5004,6 +5004,74 @@ void QuicTestStreamParam() TEST_EQUAL(Length, sizeof(QUIC_STREAM_STATISTICS)); } } + // + // QUIC_PARAM_STREAM_RELIABLE_OFFSET + // QUIC_PARAM_STREAM_RELIABLE_OFFSET_RECV + // + { + TestScopeLogger LogScope0("QUIC_PARAM_STREAM_RELIABLE_OFFSET"); + MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_NONE); + uint32_t BufferSize = 1; + + // + // GetParam Test Invalid States. + // + { + TestScopeLogger LogScope1("GetParam for invalid states"); + TEST_QUIC_STATUS( + QUIC_STATUS_BUFFER_TOO_SMALL, + MsQuic->GetParam( + Stream.Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET, + &BufferSize, + NULL)); + BufferSize = 1; + TEST_QUIC_STATUS( + QUIC_STATUS_BUFFER_TOO_SMALL, + MsQuic->GetParam( + Stream.Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET_RECV, + &BufferSize, + NULL)); + + BufferSize = 64; + + TEST_QUIC_STATUS( + QUIC_STATUS_INVALID_PARAMETER, + MsQuic->GetParam( + Stream.Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET_RECV, + &BufferSize, + NULL)); + TEST_QUIC_STATUS( + QUIC_STATUS_INVALID_PARAMETER, + MsQuic->GetParam( + Stream.Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET_RECV, + &BufferSize, + NULL)); + + // + // Should return invalid state since we haven't set it yet. + // + uint64_t Buffer = 10000; + TEST_QUIC_STATUS( + QUIC_STATUS_INVALID_STATE, + MsQuic->GetParam( + Stream.Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET, + &BufferSize, + &Buffer)); + Buffer = 10000; + TEST_QUIC_STATUS( + QUIC_STATUS_INVALID_STATE, + MsQuic->GetParam( + Stream.Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET_RECV, + &BufferSize, + &Buffer)); + } + } } void diff --git a/src/test/lib/DataTest.cpp b/src/test/lib/DataTest.cpp index 1de9e36121..4eac48b7ee 100644 --- a/src/test/lib/DataTest.cpp +++ b/src/test/lib/DataTest.cpp @@ -13,7 +13,6 @@ #ifdef QUIC_CLOG #include "DataTest.cpp.clog.h" #endif - #if defined(_KERNEL_MODE) static bool UseQTIP = false; #elif defined(QUIC_API_ENABLE_PREVIEW_FEATURES) @@ -3280,3 +3279,204 @@ QuicTestConnectAndIdleForDestCidChange( } } } + +#define BUFFER_SIZE 10000 +#define RELIABLE_SIZE 5000 +#define BUFFER_SIZE_MULTI_SENDS 10000 +#define RELIABLE_SIZE_MULTI_SENDS 20000 +// +// These Context Structs are useful helpers for the StreamReliableReset test suite. +// It keeps track of the order of absolute offsets of all the send requests received, and the total number of bytes received. +// If everything works, SendCompleteOrder MUST be monotonically increasing. +// +struct SendContext { + BOOLEAN Successful; + uint64_t SeqNum; +}; +struct StreamReliableReset { + + CxPlatEvent ClientStreamShutdownComplete; + CxPlatEvent ServerStreamShutdownComplete; + uint64_t ReceivedBufferSize; + uint64_t SequenceNum; + QUIC_UINT62 ShutdownErrorCode; + static QUIC_STATUS ClientStreamCallback(_In_ MsQuicStream*, _In_opt_ void* ClientContext, _Inout_ QUIC_STREAM_EVENT* Event) { + auto TestContext = (StreamReliableReset*)ClientContext; + if (Event->Type == QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE) { + TestContext->ClientStreamShutdownComplete.Set(); + } + // Get the send context of the Event + if (Event->Type == QUIC_STREAM_EVENT_SEND_COMPLETE) { + auto Context = (SendContext*)Event->SEND_COMPLETE.ClientContext; + Context->Successful = Event->SEND_COMPLETE.Canceled == FALSE; + Context->SeqNum = TestContext->SequenceNum++; + } + return QUIC_STATUS_SUCCESS; + } + + static QUIC_STATUS ServerStreamCallback(_In_ MsQuicStream*, _In_opt_ void* ServerContext, _Inout_ QUIC_STREAM_EVENT* Event) { + auto TestContext = (StreamReliableReset*)ServerContext; + if (Event->Type == QUIC_STREAM_EVENT_RECEIVE) { + TestContext->ReceivedBufferSize += Event->RECEIVE.TotalBufferLength; + } + if (Event->Type == QUIC_STREAM_EVENT_PEER_SEND_ABORTED) { + TestContext->ShutdownErrorCode = Event->PEER_SEND_ABORTED.ErrorCode; + } + if (Event->Type == QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE) { + TestContext->ServerStreamShutdownComplete.Set(); + } + return QUIC_STATUS_SUCCESS; + } + + static QUIC_STATUS ConnCallback(_In_ MsQuicConnection*, _In_opt_ void* Context, _Inout_ QUIC_CONNECTION_EVENT* Event) { + if (Event->Type == QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED) { + new(std::nothrow) MsQuicStream(Event->PEER_STREAM_STARTED.Stream, CleanUpAutoDelete, ServerStreamCallback, Context); + } + return QUIC_STATUS_SUCCESS; + } +}; + + +void +QuicTestStreamReliableReset( + ) +{ + MsQuicRegistration Registration(true); + TEST_TRUE(Registration.IsValid()); + + MsQuicSettings TestSettings; + TestSettings.SetReliableResetEnabled(true); + TestSettings.SetPeerBidiStreamCount(1); + + MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", TestSettings, ServerSelfSignedCredConfig); + TEST_QUIC_SUCCEEDED(ServerConfiguration.GetInitStatus()); + + MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", TestSettings, MsQuicCredentialConfig()); + TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus()); + + StreamReliableReset Context; + uint8_t SendDataBuffer[BUFFER_SIZE]; + + QUIC_BUFFER SendBuffer { sizeof(SendDataBuffer), SendDataBuffer }; + Context.ReceivedBufferSize = 0; + + MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, StreamReliableReset::ConnCallback, &Context); + TEST_QUIC_SUCCEEDED(Listener.GetInitStatus()); + TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest")); + QuicAddr ServerLocalAddr; + TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr)); + + MsQuicConnection Connection(Registration); + TEST_QUIC_SUCCEEDED(Connection.GetInitStatus()); + + TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort())); + TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Connection.HandshakeComplete); + TEST_TRUE(Listener.LastConnection->HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Listener.LastConnection->HandshakeComplete); + CxPlatSleep(50); // Wait for things to idle out + + for (uint64_t Bitmap = 0; Bitmap < 8; ++Bitmap) { // Try dropping the first 3 packets + char Name[64]; sprintf_s(Name, sizeof(Name), "Try Reliably Shutting Down Stream %llu", (unsigned long long)Bitmap); + TestScopeLogger logScope(Name); + BitmapLossHelper LossHelper(Bitmap); + MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_NONE, CleanUpManual, StreamReliableReset::ClientStreamCallback, &Context); + TEST_QUIC_SUCCEEDED(Stream.GetInitStatus()); + TEST_QUIC_SUCCEEDED(Stream.Start()); + SendContext send1 = {FALSE, 0}; + TEST_QUIC_SUCCEEDED(Stream.Send(&SendBuffer, 1, QUIC_SEND_FLAG_DELAY_SEND, &send1)); + TEST_QUIC_STATUS( + QUIC_STATUS_INVALID_STATE, + Stream.SetReliableOffset(UINT64_MAX)); + TEST_QUIC_SUCCEEDED(Stream.SetReliableOffset(RELIABLE_SIZE)); + const QUIC_UINT62 AbortSendShutdownErrorCode = 0x696969696969; + const QUIC_UINT62 AbortRecvShutdownErrorCode = 0x420420420420; + TEST_QUIC_SUCCEEDED(Stream.Shutdown(AbortSendShutdownErrorCode, QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND)); // Queues up a shutdown operation. + TEST_QUIC_SUCCEEDED(Stream.Shutdown(AbortRecvShutdownErrorCode, QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE)); + TEST_QUIC_STATUS(QUIC_STATUS_INVALID_STATE, Stream.SetReliableOffset(RELIABLE_SIZE)); + // Should behave similar to QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, with some restrictions. + TEST_TRUE(Context.ClientStreamShutdownComplete.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Context.ServerStreamShutdownComplete.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Context.ReceivedBufferSize >= RELIABLE_SIZE); + + // We shouldn't be able to change ReliableSize now that the stream has already been reset. + TEST_QUIC_STATUS(QUIC_STATUS_INVALID_STATE, Stream.SetReliableOffset(1)); + + // Test that the error code we got was for the SEND shutdown. + TEST_TRUE(Context.ShutdownErrorCode == AbortSendShutdownErrorCode); + } +} +void +QuicTestStreamReliableResetMultipleSends( + ) +{ + MsQuicRegistration Registration(true); + TEST_TRUE(Registration.IsValid()); + + MsQuicSettings TestSettings; + TestSettings.SetReliableResetEnabled(true); + TestSettings.SetPeerBidiStreamCount(1); + + MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", TestSettings, ServerSelfSignedCredConfig); + TEST_QUIC_SUCCEEDED(ServerConfiguration.GetInitStatus()); + + MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", TestSettings, MsQuicCredentialConfig()); + TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus()); + + StreamReliableReset Context; + uint8_t SendDataBuffer[BUFFER_SIZE_MULTI_SENDS]; + + QUIC_BUFFER SendBuffer { sizeof(SendDataBuffer), SendDataBuffer }; + Context.ReceivedBufferSize = 0; + Context.SequenceNum = 0; + Context.ShutdownErrorCode = 0; + + MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, StreamReliableReset::ConnCallback, &Context); + TEST_QUIC_SUCCEEDED(Listener.GetInitStatus()); + TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest")); + QuicAddr ServerLocalAddr; + TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr)); + + MsQuicConnection Connection(Registration); + TEST_QUIC_SUCCEEDED(Connection.GetInitStatus()); + + TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort())); + TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Connection.HandshakeComplete); + TEST_TRUE(Listener.LastConnection->HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Listener.LastConnection->HandshakeComplete); + CxPlatSleep(50); // Wait for things to idle out + + MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_NONE, CleanUpManual, StreamReliableReset::ClientStreamCallback, &Context); + TEST_QUIC_SUCCEEDED(Stream.GetInitStatus()); + TEST_QUIC_SUCCEEDED(Stream.Start()); + SendContext send1 = {FALSE, 0}; + SendContext send2 = {FALSE, 0}; + SendContext send3 = {FALSE, 0}; + SendContext send4 = {FALSE, 0}; + SendContext send5 = {FALSE, 0}; + TEST_QUIC_SUCCEEDED(Stream.Send(&SendBuffer, 1, QUIC_SEND_FLAG_DELAY_SEND, &send1)); + TEST_QUIC_SUCCEEDED(Stream.Send(&SendBuffer, 1, QUIC_SEND_FLAG_DELAY_SEND, &send2)); + TEST_QUIC_SUCCEEDED(Stream.Send(&SendBuffer, 1, QUIC_SEND_FLAG_DELAY_SEND, &send3)); + TEST_QUIC_SUCCEEDED(Stream.Send(&SendBuffer, 1, QUIC_SEND_FLAG_DELAY_SEND, &send4)); + TEST_QUIC_SUCCEEDED(Stream.Send(&SendBuffer, 1, QUIC_SEND_FLAG_DELAY_SEND, &send5)); + TEST_QUIC_SUCCEEDED(Stream.SetReliableOffset(RELIABLE_SIZE_MULTI_SENDS)); + + const QUIC_UINT62 AbortShutdownErrorCode = 0x696969696969; + TEST_QUIC_SUCCEEDED(Stream.Shutdown(AbortShutdownErrorCode)); // Queues up a shutdown operation. + // Should behave similar to QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, with some restrictions. + TEST_TRUE(Context.ClientStreamShutdownComplete.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Context.ServerStreamShutdownComplete.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Context.ReceivedBufferSize >= RELIABLE_SIZE_MULTI_SENDS); + + // Test order of completion, and that our first 2 sends MUST be successful. + TEST_TRUE(send1.Successful); + TEST_TRUE(send2.Successful); + TEST_TRUE(send1.SeqNum < send2.SeqNum); + TEST_TRUE(send2.SeqNum < send3.SeqNum); + TEST_TRUE(send3.SeqNum < send4.SeqNum); + TEST_TRUE(send4.SeqNum < send5.SeqNum); + + // Test Error code matches what we sent. + TEST_TRUE(Context.ShutdownErrorCode == AbortShutdownErrorCode); +} diff --git a/src/test/lib/EventTest.cpp b/src/test/lib/EventTest.cpp index 98bc1b4005..bade9c0dc0 100644 --- a/src/test/lib/EventTest.cpp +++ b/src/test/lib/EventTest.cpp @@ -1448,6 +1448,135 @@ QuicTestValidateStreamEvents8( } // Connections scope } +void +QuicTestValidateStreamEvents9( + _In_ MsQuicRegistration& Registration, + _In_ MsQuicListener& Listener, + _In_ QuicAddr& ServerLocalAddr + ) +{ + TestScopeLogger ScopeLogger(__FUNCTION__); + + MsQuicSettings Settings; + Settings.SetPeerBidiStreamCount(1).SetMinimumMtu(1280).SetMaximumMtu(1280); + Settings.SetReliableResetEnabled(true); + MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", Settings, ServerSelfSignedCredConfig); + TEST_TRUE(ServerConfiguration.IsValid()); + + MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", Settings, MsQuicCredentialConfig()); + TEST_TRUE(ClientConfiguration.IsValid()); + + { // Connections scope + ConnValidator Client, Server(ServerConfiguration); + + Listener.Context = &Server; + + TEST_QUIC_SUCCEEDED( + MsQuic->ConnectionOpen( + Registration, + ConnValidatorCallback, + &Client, + &Client.Handle)); + + { // Stream scope + + StreamValidator ClientStream( + new(std::nothrow) StreamEventValidator* [5] { + new(std::nothrow) StreamStartCompleteEventValidator(), + new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SEND_COMPLETE, 0, true), + new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE), + new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE, QUIC_EVENT_ACTION_SHUTDOWN_CONNECTION), + nullptr + }); + StreamValidator ServerStream( + new(std::nothrow) StreamEventValidator* [6] { + new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_RECEIVE), + new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_PEER_SEND_ABORTED), + new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED), + new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE), + new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE), + nullptr + }); + + Client.SetExpectedEvents( + new(std::nothrow) ConnEventValidator* [8] { + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_RELIABLE_RESET_NEGOTIATED), + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_STREAMS_AVAILABLE), + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_DATAGRAM_STATE_CHANGED), + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_CONNECTED), + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_STREAMS_AVAILABLE, 0, true), + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_RESUMPTION_TICKET_RECEIVED, 0, true), // TODO - Schannel does resumption regardless + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE), + nullptr + }); + Server.SetExpectedEvents( + new(std::nothrow) ConnEventValidator* [6] { + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_RELIABLE_RESET_NEGOTIATED), + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_CONNECTED), + new(std::nothrow) NewStreamEventValidator(&ServerStream), + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER), + new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE), + nullptr + }); + + TEST_QUIC_SUCCEEDED( + MsQuic->StreamOpen( + Client.Handle, + QUIC_STREAM_OPEN_FLAG_NONE, + StreamValidatorCallback, + &ClientStream, + &ClientStream.Handle)); + TEST_QUIC_SUCCEEDED( + MsQuic->StreamStart( + ClientStream.Handle, + QUIC_STREAM_START_FLAG_NONE)); + + TEST_QUIC_SUCCEEDED( + MsQuic->ConnectionStart( + Client.Handle, + ClientConfiguration, + QuicAddrGetFamily(&ServerLocalAddr.SockAddr), + QUIC_TEST_LOOPBACK_FOR_AF( + QuicAddrGetFamily(&ServerLocalAddr.SockAddr)), + ServerLocalAddr.GetPort())); + + TEST_TRUE(Client.HandshakeComplete.WaitTimeout(1000)); + CxPlatSleep(200); + + uint8_t Buffer[1] = {0x1}; + QUIC_BUFFER SendBuffer = { sizeof(Buffer), (uint8_t*) Buffer }; + TEST_QUIC_SUCCEEDED( + MsQuic->StreamSend( + ClientStream.Handle, + &SendBuffer, + 1, + QUIC_SEND_FLAG_NONE, + nullptr)); + + uint64_t ReliableOffset = 1; + TEST_QUIC_SUCCEEDED( + MsQuic->SetParam( + ClientStream.Handle, + QUIC_PARAM_STREAM_RELIABLE_OFFSET, + sizeof(ReliableOffset), + &ReliableOffset + ) + ); + + CxPlatSleep(100); // Wait for the sends to be processed + + TEST_QUIC_SUCCEEDED( + MsQuic->StreamShutdown( + ClientStream.Handle, + QUIC_STREAM_SHUTDOWN_FLAG_ABORT, + 0)); + + TEST_TRUE(Client.Complete.WaitTimeout(2000)); + TEST_TRUE(Server.Complete.WaitTimeout(1000)); + } // Stream scope + } // Connections scope +} + void QuicTestValidateStreamEvents(uint32_t Test) { MsQuicRegistration Registration(true); @@ -1473,7 +1602,8 @@ void QuicTestValidateStreamEvents(uint32_t Test) QuicTestValidateStreamEvents5, QuicTestValidateStreamEvents6, QuicTestValidateStreamEvents7, - QuicTestValidateStreamEvents8 + QuicTestValidateStreamEvents8, + QuicTestValidateStreamEvents9 }; Tests[Test](Registration, Listener, ServerLocalAddr); diff --git a/src/tools/spin/spinquic.cpp b/src/tools/spin/spinquic.cpp index dddcfa07fc..02cc2748cb 100644 --- a/src/tools/spin/spinquic.cpp +++ b/src/tools/spin/spinquic.cpp @@ -782,7 +782,7 @@ void SpinQuicSetRandomStreamParam(HQUIC Stream, uint16_t ThreadID) { SetParamHelper Helper; - switch (0x08000000 | (GetRandom(5))) { + switch (0x08000000 | (GetRandom(6))) { case QUIC_PARAM_STREAM_ID: // QUIC_UINT62 break; // Get Only case QUIC_PARAM_STREAM_0RTT_LENGTH: // QUIC_ADDR @@ -794,6 +794,8 @@ void SpinQuicSetRandomStreamParam(HQUIC Stream, uint16_t ThreadID) break; case QUIC_PARAM_STREAM_STATISTICS: break; // Get Only + case QUIC_PARAM_STREAM_RELIABLE_OFFSET: + Helper.SetUint64(QUIC_PARAM_STREAM_RELIABLE_OFFSET, (uint64_t)GetRandom(UINT64_MAX)); default: break; }