Skip to content

Commit

Permalink
clients: Handle not found errors gracefully (#1302)
Browse files Browse the repository at this point in the history
* go.mod: Update go-tools for 404 errors

* errors: Make NotFoundError a type

* errors: Make Unretriable errors break retry loops

* clients: Handle 404s in all file clients

* transcode: Make sure we propagate not found err upstream

Otherwise the backoff retry loop wouldn't stop.

This was the only use of clients.GetFile that wasn't wrapping
the error properly.

* nit: consistent import naming catErrs

* update go-tools to tagged version

---------

Co-authored-by: Max Holland <[email protected]>
  • Loading branch information
victorges and mjh1 authored Jul 3, 2024
1 parent 8b8f2c4 commit 7a3e994
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 42 deletions.
23 changes: 15 additions & 8 deletions clients/arweave_ipfs_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/livepeer/catalyst-api/config"
catErrs "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
)

Expand Down Expand Up @@ -64,35 +65,41 @@ func (d *DStorageDownload) DownloadDStorageFromGatewayList(u, requestID string)
defer func() { d.gatewaysListPosition++ }()
length := len(gateways)
until := d.gatewaysListPosition + length
var lastErr error
for i := d.gatewaysListPosition; i < until; i++ {
d.gatewaysListPosition = i % length
gateway := gateways[d.gatewaysListPosition]
opContent := downloadDStorageResourceFromSingleGateway(gateway, resourceID, requestID)
if opContent != nil {
opContent, err := downloadDStorageResourceFromSingleGateway(gateway, resourceID, requestID)
if err == nil {
return opContent, nil
}
lastErr = err
}

return nil, fmt.Errorf("failed to fetch %s from any of the gateways", u)
return nil, fmt.Errorf("failed to fetch %s from any of the gateways: %w", u, lastErr)
}

func downloadDStorageResourceFromSingleGateway(gateway *url.URL, resourceId, requestID string) io.ReadCloser {
func downloadDStorageResourceFromSingleGateway(gateway *url.URL, resourceId, requestID string) (io.ReadCloser, error) {
fullURL := gateway.JoinPath(resourceId).String()
log.Log(requestID, "downloading from gateway", "resourceID", resourceId, "url", fullURL)
resp, err := http.DefaultClient.Get(fullURL)

if err != nil {
log.LogError(requestID, "failed to fetch content from gateway", err, "url", fullURL)
return nil
return nil, err
}

if resp.StatusCode >= 300 {
if resp.StatusCode == 404 {
resp.Body.Close()
log.Log(requestID, "dstorage gateway not found", "status_code", resp.StatusCode, "url", fullURL)
return nil, catErrs.NewObjectNotFoundError("not found in dstorage", nil)
} else if resp.StatusCode >= 300 {
resp.Body.Close()
log.Log(requestID, "unexpected response from gateway", "status_code", resp.StatusCode, "url", fullURL)
return nil
return nil, fmt.Errorf("unexpected response from gateway: %d", resp.StatusCode)
}

return resp.Body
return resp.Body, nil
}

func IsDStorageResource(dStorage string) bool {
Expand Down
17 changes: 11 additions & 6 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clients

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -14,7 +15,7 @@ import (
"github.com/hashicorp/go-retryablehttp"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/crypto"
xerrors "github.com/livepeer/catalyst-api/errors"
catErrs "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/video"
"github.com/livepeer/go-tools/drivers"
Expand Down Expand Up @@ -306,19 +307,23 @@ func newRetryableHttpClient() *http.Client {
func getFileHTTP(ctx context.Context, url string) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, xerrors.Unretriable(fmt.Errorf("error creating http request: %w", err))
return nil, catErrs.Unretriable(fmt.Errorf("error creating http request: %w", err))
}
resp, err := retryableHttpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error on import request: %w", err)
}

if resp.StatusCode >= 300 {
resp.Body.Close()
err := fmt.Errorf("bad status code from import request: %d %s", resp.StatusCode, resp.Status)
if resp.StatusCode < 500 {
err = xerrors.Unretriable(err)

msg := fmt.Sprintf("bad status code from import request: %d %s", resp.StatusCode, resp.Status)
if resp.StatusCode == 404 {
return nil, catErrs.NewObjectNotFoundError(msg, nil)
} else if resp.StatusCode < 500 {
return nil, catErrs.Unretriable(errors.New(msg))
}
return nil, err
return nil, errors.New(msg)
}
return resp.Body, nil
}
Expand Down
8 changes: 7 additions & 1 deletion clients/object_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package clients

import (
"context"
"errors"
"fmt"
"io"
"net/url"
"path/filepath"
"strings"
"time"

catErrs "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"

"github.com/cenkalti/backoff/v4"
Expand All @@ -29,7 +31,7 @@ func DownloadOSURL(osURL string) (io.ReadCloser, error) {
func GetOSURL(osURL, byteRange string) (*drivers.FileInfoReader, error) {
storageDriver, err := drivers.ParseOSURL(osURL, true)
if err != nil {
return nil, fmt.Errorf("failed to parse OS URL %q: %s", log.RedactURL(osURL), err)
return nil, catErrs.Unretriable(fmt.Errorf("failed to parse OS URL %q: %w", log.RedactURL(osURL), err))
}

start := time.Now()
Expand All @@ -50,6 +52,10 @@ func GetOSURL(osURL, byteRange string) (*drivers.FileInfoReader, error) {

if err != nil {
metrics.Metrics.ObjectStoreClient.FailureCount.WithLabelValues(host, "read", bucket).Inc()

if errors.Is(err, drivers.ErrNotExist) {
return nil, catErrs.NewObjectNotFoundError("not found in OS", err)
}
return nil, fmt.Errorf("failed to read from OS URL %q: %w", log.RedactURL(osURL), err)
}

Expand Down
8 changes: 5 additions & 3 deletions clients/object_store_client_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package clients

import (
"github.com/stretchr/testify/require"
"io"
"path"
"strings"
"testing"
"time"

catErrs "github.com/livepeer/catalyst-api/errors"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -39,8 +41,8 @@ func TestItFailsWithInvalidURLs(t *testing.T) {
func TestItFailsWithMissingFile(t *testing.T) {
_, err := DownloadOSURL("/tmp/this/should/not/exist.m3u8")
require.Error(t, err)
require.Contains(t, err.Error(), "failed to read from OS URL")
require.Contains(t, err.Error(), "no such file or directory")
require.Contains(t, err.Error(), "ObjectNotFoundError")
require.True(t, catErrs.IsObjectNotFound(err))
}

func TestPublish(t *testing.T) {
Expand Down
54 changes: 45 additions & 9 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package errors
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"

"github.com/cenkalti/backoff/v4"
"github.com/livepeer/catalyst-api/log"
"github.com/xeipuuv/gojsonschema"
)
Expand Down Expand Up @@ -63,21 +65,55 @@ func WriteHTTPBadBodySchema(where string, w http.ResponseWriter, errors []gojson
return writeHttpError(w, sb.String(), http.StatusBadRequest, nil)
}

// Special wrapper for errors that should set the `Unretriable` field in the
// error callback sent on VOD upload jobs.
type UnretriableError struct{ error }
type unretriableError struct{ error }

// Unretriable returns an error that should be treated as final. This effectively means that the error stops backoff
// retry loops automatically and that it should be propagated back to the caller as such. This is done through the
// status callback through the "unretriable" field.
func Unretriable(err error) error {
return UnretriableError{err}
// Notice that permanent errors get unwrapped by the backoff lib when they're used to stop the retry loop. So we need
// to keep the unretriableError inside it so it's propagated upstream.
return backoff.Permanent(unretriableError{err})
}

// Returns whether the given error is an unretriable error.
// IsUnretriable returns whether the given error is an unretriable error.
func IsUnretriable(err error) bool {
return errors.As(err, &UnretriableError{})
return errors.As(err, &unretriableError{})
}

func (e unretriableError) Unwrap() error {
return e.error
}

type ObjectNotFoundError struct {
msg string
cause error
}

func (e ObjectNotFoundError) Error() string {
return e.msg
}

func (e ObjectNotFoundError) Unwrap() error {
return e.cause
}

func NewObjectNotFoundError(msg string, cause error) error {
if cause != nil {
msg = fmt.Sprintf("ObjectNotFoundError: %s: %s", msg, cause)
} else {
msg = fmt.Sprintf("ObjectNotFoundError: %s", msg)
}
// every not found is unretriable
return Unretriable(ObjectNotFoundError{msg: msg, cause: cause})
}

// IsObjectNotFound checks if the error is an ObjectNotFoundError.
func IsObjectNotFound(err error) bool {
return errors.As(err, &ObjectNotFoundError{})
}

var (
ObjectNotFoundError = errors.New("ObjectNotFoundError")
UnauthorisedError = errors.New("UnauthorisedError")
InvalidJWT = errors.New("InvalidJWTError")
UnauthorisedError = errors.New("UnauthorisedError")
InvalidJWT = errors.New("InvalidJWTError")
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/julienschmidt/httprouter v1.3.0
github.com/lib/pq v1.10.9
github.com/livepeer/go-api-client v0.4.23
github.com/livepeer/go-tools v0.3.7
github.com/livepeer/go-tools v0.3.8
github.com/livepeer/joy4 v0.1.1
github.com/livepeer/livepeer-data v0.8.1
github.com/livepeer/m3u8 v0.11.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+O
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
github.com/livepeer/go-api-client v0.4.23 h1:buvWJGxLzwsmvkXaxdI2bOUOAiYZJtcTMfFWMqnOrco=
github.com/livepeer/go-api-client v0.4.23/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-tools v0.3.7 h1:CaiwL7r85EkBd0GUxFyNAp/xMmrjTr/GgIlqoiMtoog=
github.com/livepeer/go-tools v0.3.7/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/livepeer/go-tools v0.3.8 h1:/SRoFeuWW3/p5aTZ9xieGWO3o04S70ME2yH0SVEEK4w=
github.com/livepeer/go-tools v0.3.8/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/livepeer/joy4 v0.1.1 h1:Tz7gVcmvpG/nfUKHU+XJn6Qke/k32mTWMiH9qB0bhnM=
github.com/livepeer/joy4 v0.1.1/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
github.com/livepeer/livepeer-data v0.8.1 h1:FOlCGbV0ws9hY+F88MZmQhLuvPn5nyl1vuKNWaxCW3c=
Expand Down
2 changes: 1 addition & 1 deletion handlers/playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *PlaybackHandler) Handle(w http.ResponseWriter, req *http.Request, param
func handleError(err error, req *http.Request, requestID string, w http.ResponseWriter) {
log.LogError(requestID, "error in playback handler", err, "url", req.URL)
switch {
case errors.Is(err, catErrs.ObjectNotFoundError):
case catErrs.IsObjectNotFound(err):
catErrs.WriteHTTPNotFound(w, "not found", nil)
case errors.Is(err, catErrs.UnauthorisedError):
catErrs.WriteHTTPUnauthorized(w, "denied", nil)
Expand Down
11 changes: 1 addition & 10 deletions playback/playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ import (
"net/url"
"strings"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/grafov/m3u8"
"github.com/livepeer/catalyst-api/clients"
caterrs "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/go-tools/drivers"
)

Expand Down Expand Up @@ -120,13 +117,7 @@ func osFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.F
return f, nil
}
// if this is the final bucket in the list then the error set here will be used in the final return
var awsErr awserr.Error
if errors.As(err, &awsErr) && awsErr.Code() == s3.ErrCodeNoSuchKey ||
strings.Contains(err.Error(), "no such file") {
err = fmt.Errorf("invalid request: %w %v", caterrs.ObjectNotFoundError, err)
} else {
err = fmt.Errorf("failed to get file for playback: %w", err)
}
err = fmt.Errorf("failed to get file for playback: %w", err)
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func transcodeSegment(
defer cancel()
rc, err := clients.GetFile(ctx, transcodeRequest.RequestID, segment.Input.URL.String(), nil)
if err != nil {
return fmt.Errorf("failed to download source segment %q: %s", segment.Input, err)
return fmt.Errorf("failed to download source segment %q: %w", segment.Input, err)
}
defer rc.Close()

Expand Down

0 comments on commit 7a3e994

Please sign in to comment.