Skip to content

Commit

Permalink
clients: Handle 404s in all file clients
Browse files Browse the repository at this point in the history
  • Loading branch information
victorges committed Jun 21, 2024
1 parent 8873951 commit a1f6ebb
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 16 deletions.
23 changes: 15 additions & 8 deletions clients/arweave_ipfs_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/livepeer/catalyst-api/config"
xerrors "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
)

Expand Down Expand Up @@ -64,35 +65,41 @@ func (d *DStorageDownload) DownloadDStorageFromGatewayList(u, requestID string)
defer func() { d.gatewaysListPosition++ }()
length := len(gateways)
until := d.gatewaysListPosition + length
var lastErr error
for i := d.gatewaysListPosition; i < until; i++ {
d.gatewaysListPosition = i % length
gateway := gateways[d.gatewaysListPosition]
opContent := downloadDStorageResourceFromSingleGateway(gateway, resourceID, requestID)
if opContent != nil {
opContent, err := downloadDStorageResourceFromSingleGateway(gateway, resourceID, requestID)
if err == nil {
return opContent, nil
}
lastErr = err
}

return nil, fmt.Errorf("failed to fetch %s from any of the gateways", u)
return nil, fmt.Errorf("failed to fetch %s from any of the gateways: %w", u, lastErr)
}

func downloadDStorageResourceFromSingleGateway(gateway *url.URL, resourceId, requestID string) io.ReadCloser {
func downloadDStorageResourceFromSingleGateway(gateway *url.URL, resourceId, requestID string) (io.ReadCloser, error) {
fullURL := gateway.JoinPath(resourceId).String()
log.Log(requestID, "downloading from gateway", "resourceID", resourceId, "url", fullURL)
resp, err := http.DefaultClient.Get(fullURL)

if err != nil {
log.LogError(requestID, "failed to fetch content from gateway", err, "url", fullURL)
return nil
return nil, err
}

if resp.StatusCode >= 300 {
if resp.StatusCode == 404 {
resp.Body.Close()
log.Log(requestID, "dstorage gateway not found", "status_code", resp.StatusCode, "url", fullURL)
return nil, xerrors.NewObjectNotFoundError("not found in dstorage", nil)
} else if resp.StatusCode >= 300 {
resp.Body.Close()
log.Log(requestID, "unexpected response from gateway", "status_code", resp.StatusCode, "url", fullURL)
return nil
return nil, fmt.Errorf("unexpected response from gateway: %d", resp.StatusCode)
}

return resp.Body
return resp.Body, nil
}

func IsDStorageResource(dStorage string) bool {
Expand Down
13 changes: 9 additions & 4 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clients

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -312,13 +313,17 @@ func getFileHTTP(ctx context.Context, url string) (io.ReadCloser, error) {
if err != nil {
return nil, fmt.Errorf("error on import request: %w", err)
}

if resp.StatusCode >= 300 {
resp.Body.Close()
err := fmt.Errorf("bad status code from import request: %d %s", resp.StatusCode, resp.Status)
if resp.StatusCode < 500 {
err = xerrors.Unretriable(err)

msg := fmt.Sprintf("bad status code from import request: %d %s", resp.StatusCode, resp.Status)
if resp.StatusCode == 404 {
return nil, xerrors.NewObjectNotFoundError(msg, nil)
} else if resp.StatusCode < 500 {
return nil, xerrors.Unretriable(errors.New(msg))
}
return nil, err
return nil, errors.New(msg)
}
return resp.Body, nil
}
Expand Down
8 changes: 7 additions & 1 deletion clients/object_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package clients

import (
"context"
"errors"
"fmt"
"io"
"net/url"
"path/filepath"
"strings"
"time"

xerrors "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"

"github.com/cenkalti/backoff/v4"
Expand All @@ -29,7 +31,7 @@ func DownloadOSURL(osURL string) (io.ReadCloser, error) {
func GetOSURL(osURL, byteRange string) (*drivers.FileInfoReader, error) {
storageDriver, err := drivers.ParseOSURL(osURL, true)
if err != nil {
return nil, fmt.Errorf("failed to parse OS URL %q: %s", log.RedactURL(osURL), err)
return nil, xerrors.Unretriable(fmt.Errorf("failed to parse OS URL %q: %w", log.RedactURL(osURL), err))
}

start := time.Now()
Expand All @@ -50,6 +52,10 @@ func GetOSURL(osURL, byteRange string) (*drivers.FileInfoReader, error) {

if err != nil {
metrics.Metrics.ObjectStoreClient.FailureCount.WithLabelValues(host, "read", bucket).Inc()

if errors.Is(err, drivers.ErrNotExist) {
return nil, xerrors.NewObjectNotFoundError("not found in OS", err)
}
return nil, fmt.Errorf("failed to read from OS URL %q: %w", log.RedactURL(osURL), err)
}

Expand Down
8 changes: 5 additions & 3 deletions clients/object_store_client_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package clients

import (
"github.com/stretchr/testify/require"
"io"
"path"
"strings"
"testing"
"time"

caterrors "github.com/livepeer/catalyst-api/errors"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -39,8 +41,8 @@ func TestItFailsWithInvalidURLs(t *testing.T) {
func TestItFailsWithMissingFile(t *testing.T) {
_, err := DownloadOSURL("/tmp/this/should/not/exist.m3u8")
require.Error(t, err)
require.Contains(t, err.Error(), "failed to read from OS URL")
require.Contains(t, err.Error(), "no such file or directory")
require.Contains(t, err.Error(), "ObjectNotFoundError")
require.True(t, caterrors.IsObjectNotFound(err))
}

func TestPublish(t *testing.T) {
Expand Down

0 comments on commit a1f6ebb

Please sign in to comment.