Skip to content

Commit

Permalink
Add URI to logs and errors for better debugging (#52)
Browse files Browse the repository at this point in the history
* Add URI to logs and errors for better debugging

* redact uri

* Log out useful headers to pass on to storage providers for debugging
  • Loading branch information
mjh1 authored Jan 30, 2024
1 parent fd91c08 commit eecd216
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 28 deletions.
31 changes: 23 additions & 8 deletions catalyst-uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"encoding/json"
"flag"
"net/http"
"net/url"
"os"
"time"

Expand Down Expand Up @@ -44,24 +46,37 @@ func run() int {
stdout := os.Stdout
os.Stdout, _ = os.Open(os.DevNull)

uri := flag.Arg(0)
if uri == "" {
glog.Fatalf("Could not parse object store URI: %s", uri)
output := flag.Arg(0)
if output == "" {
glog.Fatal("Object store URI was empty")
return 1
}

err = core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout)
uri, err := url.Parse(output)
if err != nil {
glog.Fatalf("Uploader failed for %s: %s", uri, err)
glog.Fatalf("Failed to parse URI: %s", err)
return 1
}

// success, write uploaded file details to stdout
err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri})
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout)
if err != nil {
glog.Fatal(err)
glog.Fatalf("Uploader failed for %s: %s", uri.Redacted(), err)
return 1
}

var respHeaders http.Header
if out != nil {
respHeaders = out.UploaderResponseHeaders
}
glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag"))
// success, write uploaded file details to stdout
if glog.V(5) {
err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()})
if err != nil {
glog.Fatal(err)
return 1
}
}

return 0
}
39 changes: 21 additions & 18 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"io"
"net/url"
"os"
"os/exec"
"path/filepath"
Expand All @@ -32,50 +33,52 @@ func UploadRetryBackoff() backoff.BackOff {

const segmentWriteTimeout = 5 * time.Minute

func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout time.Duration) error {
storageDriver, err := drivers.ParseOSURL(outputURI, true)
func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) (*drivers.SaveDataOutput, error) {
output := outputURI.String()
storageDriver, err := drivers.ParseOSURL(output, true)
if err != nil {
return err
return nil, err
}
session := storageDriver.NewSession("")
if err != nil {
return err
return nil, err
}

// While we wait for storj to implement an easier method for global object deletion we are hacking something
// here to allow us to have recording objects deleted after 7 days.
fields := &drivers.FileProperties{}
if strings.Contains(outputURI, "gateway.storjshare.io/catalyst-recordings-com") {
if strings.Contains(output, "gateway.storjshare.io/catalyst-recordings-com") {
fields = &drivers.FileProperties{
Metadata: map[string]string{
"Object-Expires": "+720h", // Objects will be deleted after 30 days
},
}
}

if strings.HasSuffix(outputURI, ".ts") || strings.HasSuffix(outputURI, ".mp4") {
if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") {
// For segments we just write them in one go here and return early.
// (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.)
fileContents, err := io.ReadAll(input)
if err != nil {
return fmt.Errorf("failed to read file")
return nil, fmt.Errorf("failed to read file")
}

var out *drivers.SaveDataOutput
err = backoff.Retry(func() error {
_, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout)
out, err = session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout)
if err != nil {
glog.Errorf("failed upload attempt: %v", err)
glog.Errorf("failed upload attempt for %s: %v", outputURI.Redacted(), err)
}
return err
}, UploadRetryBackoff())
if err != nil {
return fmt.Errorf("failed to upload video: %w", err)
return nil, fmt.Errorf("failed to upload video %s: %w", outputURI.Redacted(), err)
}

if err = extractThumb(session, outputURI, fileContents); err != nil {
glog.Errorf("extracting thumbnail failed: %v", err)
if err = extractThumb(session, output, fileContents); err != nil {
glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err)
}
return nil
return out, nil
}

// For the manifest files we want a very short cache ttl as the files are updating every few seconds
Expand Down Expand Up @@ -107,22 +110,22 @@ func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout t
// Just log this error, since it'll effectively be retried after the next interval
glog.Errorf("Failed to write: %v", err)
} else {
glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI, len(b))
glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI.Redacted(), len(b))
}
lastWrite = time.Now()
}
}
if err := scanner.Err(); err != nil {
return err
return nil, err
}

// We have to do this final write, otherwise there might be final data that's arrived since the last periodic write
if _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, writeTimeout); err != nil {
// Don't ignore this error, since there won't be any further attempts to write
return fmt.Errorf("failed to write final save: %w", err)
return nil, fmt.Errorf("failed to write final save: %w", err)
}
glog.Infof("Completed writing %s to storage", outputURI)
return nil
glog.Infof("Completed writing %s to storage", outputURI.Redacted())
return nil, nil
}

func extractThumb(session drivers.OSSession, filename string, segment []byte) error {
Expand Down
5 changes: 4 additions & 1 deletion core/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"io"
"net/url"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -32,7 +33,9 @@ func TestItWritesSlowInputIncrementally(t *testing.T) {

// Kick off the upload in a goroutine so that we can check the file is incrementally written
go func() {
err := Upload(slowReader, outputFile.Name(), 100*time.Millisecond, time.Second)
u, err := url.Parse(outputFile.Name())
require.NoError(t, err)
_, err = Upload(slowReader, u, 100*time.Millisecond, time.Second)
require.NoError(t, err, "")
}()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/golang/glog v1.1.0
github.com/google/uuid v1.3.0
github.com/livepeer/go-tools v0.3.4
github.com/livepeer/go-tools v0.3.6
github.com/stretchr/testify v1.8.4
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,8 @@ github.com/livepeer/go-tools v0.3.3 h1:Je2P+ovDIIlFWtlns5v+MmHtdIytsAJS6+XyeZ8sF
github.com/livepeer/go-tools v0.3.3/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/livepeer/go-tools v0.3.4 h1:Otl8VkGA5FdNQMTTN/yh0V72vhSbSQevUTL67AXz6kU=
github.com/livepeer/go-tools v0.3.4/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/livepeer/go-tools v0.3.6 h1:LhRnoVVGFCtfBh6WyKdwJ2bPD/h5gaRvsAszmCqKt1Q=
github.com/livepeer/go-tools v0.3.6/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/lucas-clemente/quic-go v0.19.3/go.mod h1:ADXpNbTQjq1hIzCpB+y/k5iz4n4z4IwqoLb94Kh5Hu8=
github.com/lucas-clemente/quic-go v0.28.1/go.mod h1:oGz5DKK41cJt5+773+BSO9BXDsREY4HLf7+0odGAPO0=
github.com/lucas-clemente/quic-go v0.29.1/go.mod h1:CTcNfLYJS2UuRNB+zcNlgvkjBhxX6Hm3WUxxAQx2mgE=
Expand Down

0 comments on commit eecd216

Please sign in to comment.