From 98ebb75fc6054714d5f821eb1b2adaf0331f6a0e Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Wed, 8 Jan 2025 10:08:43 -0600 Subject: [PATCH] DNM demonstrate Reset removal and upated tests --- lib/events/filesessions/fileasync_test.go | 31 +++++++++++++---------- lib/events/stream.go | 18 ------------- 2 files changed, 18 insertions(+), 31 deletions(-) diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index 7e34693ac0ea9..5dc428ad0b327 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -665,19 +665,24 @@ func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *ev parts, err := uploader.GetParts(uploadID) require.NoError(t, err) - var outEvents []apievents.AuditEvent - var reader *events.ProtoReader - for i, part := range parts { - if i == 0 { - reader = events.NewProtoReader(bytes.NewReader(part)) - } else { - err := reader.Reset(bytes.NewReader(part)) - require.NoError(t, err) - } - out, err := reader.ReadAll(ctx) - require.NoError(t, err, "part crash %#v", part) - - outEvents = append(outEvents, out...) + // combine all uploaded parts to create the session recording content + var sessionRecordingContent bytes.Buffer + for _, part := range parts { + bytesWritten, err := sessionRecordingContent.Write(part) + require.NoError(t, err, "error writing part bytes to session recording content") + require.Equal(t, len(part), bytesWritten, "not all bytes were written to session recording content") } + + // Note: it is possible for duplicate event indices to be encountered in cases where the upload process + // encounters an error such as the connection being termianted, since the upload process will retry uploading + // those events for a successful upload. This is not an issue because session recording reader knows to drop + // events found with an event index already read. + reader := events.NewProtoReader(&sessionRecordingContent) + + outEvents, err := reader.ReadAll(ctx) + require.NoError(t, err, "error reading all session recording content") + + require.NoError(t, reader.Close(), "error closing session recording reader") + return outEvents } diff --git a/lib/events/stream.go b/lib/events/stream.go index 14817cc152830..e824637d53225 100644 --- a/lib/events/stream.go +++ b/lib/events/stream.go @@ -1002,24 +1002,6 @@ func (r *ProtoReader) Close() error { return nil } -// Reset sets reader to read from the new reader -// without resetting the stats, could be used -// to deduplicate the events -func (r *ProtoReader) Reset(reader io.Reader) error { - if r.error != nil { - return r.error - } - if r.gzipReader != nil { - if r.error = r.gzipReader.Close(); r.error != nil { - return trace.Wrap(r.error) - } - r.gzipReader = nil - } - r.reader = reader - r.state = protoReaderStateInit - return nil -} - func (r *ProtoReader) setError(err error) error { r.state = protoReaderStateError r.error = err