Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add URI to logs and errors for better debugging #52

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions catalyst-uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import (
"encoding/json"
"flag"
"net/http"
"net/url"
"os"
"time"

Expand Down Expand Up @@ -44,24 +46,37 @@
stdout := os.Stdout
os.Stdout, _ = os.Open(os.DevNull)

uri := flag.Arg(0)
if uri == "" {
glog.Fatalf("Could not parse object store URI: %s", uri)
output := flag.Arg(0)
if output == "" {
glog.Fatal("Object store URI was empty")

Check warning on line 51 in catalyst-uploader.go

View check run for this annotation

Codecov / codecov/patch

catalyst-uploader.go#L49-L51

Added lines #L49 - L51 were not covered by tests
return 1
}

err = core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout)
uri, err := url.Parse(output)

Check warning on line 55 in catalyst-uploader.go

View check run for this annotation

Codecov / codecov/patch

catalyst-uploader.go#L55

Added line #L55 was not covered by tests
if err != nil {
glog.Fatalf("Uploader failed for %s: %s", uri, err)
glog.Fatalf("Failed to parse URI: %s", err)

Check warning on line 57 in catalyst-uploader.go

View check run for this annotation

Codecov / codecov/patch

catalyst-uploader.go#L57

Added line #L57 was not covered by tests
return 1
}

// success, write uploaded file details to stdout
err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri})
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout)

Check warning on line 61 in catalyst-uploader.go

View check run for this annotation

Codecov / codecov/patch

catalyst-uploader.go#L61

Added line #L61 was not covered by tests
if err != nil {
glog.Fatal(err)
glog.Fatalf("Uploader failed for %s: %s", uri.Redacted(), err)

Check warning on line 63 in catalyst-uploader.go

View check run for this annotation

Codecov / codecov/patch

catalyst-uploader.go#L63

Added line #L63 was not covered by tests
return 1
}

var respHeaders http.Header
if out != nil {
respHeaders = out.UploaderResponseHeaders
}
glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag"))
// success, write uploaded file details to stdout
if glog.V(5) {
err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()})
if err != nil {
glog.Fatal(err)
return 1
}

Check warning on line 78 in catalyst-uploader.go

View check run for this annotation

Codecov / codecov/patch

catalyst-uploader.go#L67-L78

Added lines #L67 - L78 were not covered by tests
}

return 0
}
39 changes: 21 additions & 18 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"context"
"fmt"
"io"
"net/url"
"os"
"os/exec"
"path/filepath"
Expand All @@ -32,50 +33,52 @@

const segmentWriteTimeout = 5 * time.Minute

func Upload(input io.Reader, outputURI string, waitBetweenWrites, writeTimeout time.Duration) error {
storageDriver, err := drivers.ParseOSURL(outputURI, true)
func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration) (*drivers.SaveDataOutput, error) {
output := outputURI.String()
storageDriver, err := drivers.ParseOSURL(output, true)
if err != nil {
return err
return nil, err

Check warning on line 40 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L40

Added line #L40 was not covered by tests
}
session := storageDriver.NewSession("")
if err != nil {
return err
return nil, err

Check warning on line 44 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L44

Added line #L44 was not covered by tests
}

// While we wait for storj to implement an easier method for global object deletion we are hacking something
// here to allow us to have recording objects deleted after 7 days.
fields := &drivers.FileProperties{}
if strings.Contains(outputURI, "gateway.storjshare.io/catalyst-recordings-com") {
if strings.Contains(output, "gateway.storjshare.io/catalyst-recordings-com") {
fields = &drivers.FileProperties{
Metadata: map[string]string{
"Object-Expires": "+720h", // Objects will be deleted after 30 days
},
}
}

if strings.HasSuffix(outputURI, ".ts") || strings.HasSuffix(outputURI, ".mp4") {
if strings.HasSuffix(output, ".ts") || strings.HasSuffix(output, ".mp4") {
// For segments we just write them in one go here and return early.
// (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.)
fileContents, err := io.ReadAll(input)
if err != nil {
return fmt.Errorf("failed to read file")
return nil, fmt.Errorf("failed to read file")

Check warning on line 63 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L63

Added line #L63 was not covered by tests
}

var out *drivers.SaveDataOutput

Check warning on line 66 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L66

Added line #L66 was not covered by tests
err = backoff.Retry(func() error {
_, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout)
out, err = session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, segmentWriteTimeout)

Check warning on line 68 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L68

Added line #L68 was not covered by tests
if err != nil {
glog.Errorf("failed upload attempt: %v", err)
glog.Errorf("failed upload attempt for %s: %v", outputURI.Redacted(), err)

Check warning on line 70 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L70

Added line #L70 was not covered by tests
}
return err
}, UploadRetryBackoff())
if err != nil {
return fmt.Errorf("failed to upload video: %w", err)
return nil, fmt.Errorf("failed to upload video %s: %w", outputURI.Redacted(), err)

Check warning on line 75 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L75

Added line #L75 was not covered by tests
}

if err = extractThumb(session, outputURI, fileContents); err != nil {
glog.Errorf("extracting thumbnail failed: %v", err)
if err = extractThumb(session, output, fileContents); err != nil {
glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err)

Check warning on line 79 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}
return nil
return out, nil

Check warning on line 81 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L81

Added line #L81 was not covered by tests
}

// For the manifest files we want a very short cache ttl as the files are updating every few seconds
Expand Down Expand Up @@ -107,22 +110,22 @@
// Just log this error, since it'll effectively be retried after the next interval
glog.Errorf("Failed to write: %v", err)
} else {
glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI, len(b))
glog.V(5).Infof("Wrote %s to storage: %d bytes", outputURI.Redacted(), len(b))
}
lastWrite = time.Now()
}
}
if err := scanner.Err(); err != nil {
return err
return nil, err

Check warning on line 119 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L119

Added line #L119 was not covered by tests
}

// We have to do this final write, otherwise there might be final data that's arrived since the last periodic write
if _, err := session.SaveData(context.Background(), "", bytes.NewReader(fileContents), fields, writeTimeout); err != nil {
// Don't ignore this error, since there won't be any further attempts to write
return fmt.Errorf("failed to write final save: %w", err)
return nil, fmt.Errorf("failed to write final save: %w", err)

Check warning on line 125 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L125

Added line #L125 was not covered by tests
}
glog.Infof("Completed writing %s to storage", outputURI)
return nil
glog.Infof("Completed writing %s to storage", outputURI.Redacted())
return nil, nil

Check warning on line 128 in core/uploader.go

View check run for this annotation

Codecov / codecov/patch

core/uploader.go#L127-L128

Added lines #L127 - L128 were not covered by tests
}

func extractThumb(session drivers.OSSession, filename string, segment []byte) error {
Expand Down
5 changes: 4 additions & 1 deletion core/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"io"
"net/url"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -32,7 +33,9 @@ func TestItWritesSlowInputIncrementally(t *testing.T) {

// Kick off the upload in a goroutine so that we can check the file is incrementally written
go func() {
err := Upload(slowReader, outputFile.Name(), 100*time.Millisecond, time.Second)
u, err := url.Parse(outputFile.Name())
require.NoError(t, err)
_, err = Upload(slowReader, u, 100*time.Millisecond, time.Second)
require.NoError(t, err, "")
}()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/golang/glog v1.1.0
github.com/google/uuid v1.3.0
github.com/livepeer/go-tools v0.3.4
github.com/livepeer/go-tools v0.3.6
github.com/stretchr/testify v1.8.4
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,8 @@ github.com/livepeer/go-tools v0.3.3 h1:Je2P+ovDIIlFWtlns5v+MmHtdIytsAJS6+XyeZ8sF
github.com/livepeer/go-tools v0.3.3/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/livepeer/go-tools v0.3.4 h1:Otl8VkGA5FdNQMTTN/yh0V72vhSbSQevUTL67AXz6kU=
github.com/livepeer/go-tools v0.3.4/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/livepeer/go-tools v0.3.6 h1:LhRnoVVGFCtfBh6WyKdwJ2bPD/h5gaRvsAszmCqKt1Q=
github.com/livepeer/go-tools v0.3.6/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/lucas-clemente/quic-go v0.19.3/go.mod h1:ADXpNbTQjq1hIzCpB+y/k5iz4n4z4IwqoLb94Kh5Hu8=
github.com/lucas-clemente/quic-go v0.28.1/go.mod h1:oGz5DKK41cJt5+773+BSO9BXDsREY4HLf7+0odGAPO0=
github.com/lucas-clemente/quic-go v0.29.1/go.mod h1:CTcNfLYJS2UuRNB+zcNlgvkjBhxX6Hm3WUxxAQx2mgE=
Expand Down
Loading