Skip to content

Commit

Permalink
upload: add TeeReader to count bytes during upload (#54)
Browse files Browse the repository at this point in the history
We occasionally see an EOF error when attempting to upload to S3
storage. An EOF from io.ReadCloser interfaces typically indicates that
no more input is available. This commit is adding a TeeReader to count
the number of bytes that were read from the local input file when
attempting to upload -- this should help us diagnose upload errors where
segments seem to be missing from s3 buckets.
  • Loading branch information
emranemran authored Feb 14, 2024
1 parent eecd216 commit 567aec3
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ import (
"github.com/livepeer/go-tools/drivers"
)

type ByteCounter struct {
Count int64
}

func (bc *ByteCounter) Write(p []byte) (n int, err error) {
bc.Count += int64(len(p))
return n, nil
}

func newExponentialBackOffExecutor() *backoff.ExponentialBackOff {
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = 200 * time.Millisecond
Expand Down Expand Up @@ -55,6 +64,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
}
}

byteCounter := &ByteCounter{}
if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") {
// For segments we just write them in one go here and return early.
// (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.)
Expand All @@ -63,16 +73,19 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
return nil, fmt.Errorf("failed to read file")
}

// To count how many bytes we are trying to read then write (upload) to s3 storage
teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter)

var out *drivers.SaveDataOutput
err = backoff.Retry(func() error {
out, err = session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout)
out, err = session.SaveData(context.Background(), "", teeReader, fields, segmentWriteTimeout)
if err != nil {
glog.Errorf("failed upload attempt for %s: %v", outputURI.Redacted(), err)
glog.Errorf("failed upload attempt for %s (%d bytes): %v", outputURI.Redacted(), byteCounter.Count, err)
}
return err
}, UploadRetryBackoff())
if err != nil {
return nil, fmt.Errorf("failed to upload video %s: %w", outputURI.Redacted(), err)
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), byteCounter.Count, err)
}

if err = extractThumb(session, output, fileContents); err != nil {
Expand Down

0 comments on commit 567aec3

Please sign in to comment.