From 567aec364b989d499b4e47c80ce26560247412b9 Mon Sep 17 00:00:00 2001 From: Emran M <3213391+emranemran@users.noreply.github.com> Date: Wed, 14 Feb 2024 01:13:01 -0800 Subject: [PATCH] upload: add TeeReader to count bytes during upload (#54) We occasionally see an EOF error when attempting to upload to S3 storage. An EOF from io.ReadCloser interfaces typically indicates that no more input is available. This commit is adding a TeeReader to count the number of bytes that were read from the local input file when attempting to upload -- this should help us diagnose upload errors where segments seem to be missing from s3 buckets. --- core/uploader.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/core/uploader.go b/core/uploader.go index 069c5fa..b1ca96f 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -18,6 +18,15 @@ import ( "github.com/livepeer/go-tools/drivers" ) +type ByteCounter struct { + Count int64 +} + +func (bc *ByteCounter) Write(p []byte) (n int, err error) { + bc.Count += int64(len(p)) + return n, nil +} + func newExponentialBackOffExecutor() *backoff.ExponentialBackOff { backOff := backoff.NewExponentialBackOff() backOff.InitialInterval = 200 * time.Millisecond @@ -55,6 +64,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout } } + byteCounter := &ByteCounter{} if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") { // For segments we just write them in one go here and return early. // (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.) @@ -63,16 +73,19 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, fmt.Errorf("failed to read file") } + // To count how many bytes we are trying to read then write (upload) to s3 storage + teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter) + var out *drivers.SaveDataOutput err = backoff.Retry(func() error { - out, err = session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout) + out, err = session.SaveData(context.Background(), "", teeReader, fields, segmentWriteTimeout) if err != nil { - glog.Errorf("failed upload attempt for %s: %v", outputURI.Redacted(), err) + glog.Errorf("failed upload attempt for %s (%d bytes): %v", outputURI.Redacted(), byteCounter.Count, err) } return err }, UploadRetryBackoff()) if err != nil { - return nil, fmt.Errorf("failed to upload video %s: %w", outputURI.Redacted(), err) + return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), byteCounter.Count, err) } if err = extractThumb(session, output, fileContents); err != nil {