Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clients: Implement storage fallback for recordings #1303

Merged
merged 19 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions clients/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,15 @@ func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.Medi
return convertToMediaPlaylist(playlist, playlistType)
}

func DownloadRenditionManifestWithBackup(requestID string, primaryManifestURL, osTransferURL *url.URL) (*url.URL, error) {
manifestURL, playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String())
// RecordingBackupCheck checks whether manifests and segments are available on the primary or
// the backup store and returns a URL to new manifest with absolute segment URLs pointing to either primary or
// backup locations depending on where the segments are available.
func RecordingBackupCheck(requestID string, primaryManifestURL, osTransferURL *url.URL) (*url.URL, error) {
if config.GetStorageBackupURL(primaryManifestURL.String()) == "" {
return primaryManifestURL, nil
}

playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String())
if err != nil {
return nil, fmt.Errorf("error downloading manifest: %w", err)
}
Expand All @@ -54,10 +61,6 @@ func DownloadRenditionManifestWithBackup(requestID string, primaryManifestURL, o
return nil, err
}

// If we had to fall back to the backup manifest then we need a new manifest with absolute URLs because
// successful segment uploads will exist in the primary store and therefore the relative URLs will be broken.
newPlaylistNeeded := manifestURL != primaryManifestURL.String()

// Check whether segments are available from primary or backup storage
dStorage := NewDStorageDownload()
for _, segment := range mediaPlaylist.GetAllSegments() {
Expand All @@ -68,7 +71,7 @@ func DownloadRenditionManifestWithBackup(requestID string, primaryManifestURL, o
var actualSegURL string
err = backoff.Retry(func() error {
var rc io.ReadCloser
rc, actualSegURL, err = GetFileWithBackup(context.TODO(), requestID, segURL.String(), dStorage)
rc, actualSegURL, err = GetFileWithBackup(context.Background(), requestID, segURL.String(), dStorage)
if rc != nil {
rc.Close()
}
Expand All @@ -77,28 +80,20 @@ func DownloadRenditionManifestWithBackup(requestID string, primaryManifestURL, o
if err != nil {
return nil, fmt.Errorf("failed to find segment file %s: %w", segURL.Redacted(), err)
}
if actualSegURL != segURL.String() {
// If we had to fall back to a backup segment then we'll need a new manifest with absolute URLs
newPlaylistNeeded = true
}
segment.URI = actualSegURL
}

if newPlaylistNeeded {
// TODO log line

// write the manifest to storage and update the manifestURL variable
outputStorageURL := osTransferURL.JoinPath("input.m3u8")
err = backoff.Retry(func() error {
return UploadToOSURL(outputStorageURL.String(), "", strings.NewReader(mediaPlaylist.String()), ManifestUploadTimeout)
}, UploadRetryBackoff())
if err != nil {
return nil, fmt.Errorf("failed to upload rendition playlist: %w", err)
}
manifestURL, err = SignURL(outputStorageURL)
if err != nil {
return nil, fmt.Errorf("failed to sign manifest url: %w", err)
}
// write the manifest to storage and update the manifestURL variable
outputStorageURL := osTransferURL.JoinPath("input.m3u8")
err = backoff.Retry(func() error {
return UploadToOSURL(outputStorageURL.String(), "", strings.NewReader(mediaPlaylist.String()), ManifestUploadTimeout)
}, UploadRetryBackoff())
if err != nil {
return nil, fmt.Errorf("failed to upload rendition playlist: %w", err)
}
manifestURL, err := SignURL(outputStorageURL)
if err != nil {
return nil, fmt.Errorf("failed to sign manifest url: %w", err)
}

newURL, err := url.Parse(manifestURL)
Expand All @@ -122,7 +117,7 @@ func convertToMediaPlaylist(playlist m3u8.Playlist, playlistType m3u8.ListType)
return *mediaPlaylist, nil
}

func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (string, m3u8.Playlist, m3u8.ListType, error) {
func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (m3u8.Playlist, m3u8.ListType, error) {
var playlist, playlistBackup m3u8.Playlist
var playlistType, playlistTypeBackup m3u8.ListType
var size, sizeBackup int
Expand Down Expand Up @@ -150,28 +145,35 @@ func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (string,
// (only not found errors passthrough below)
primaryNotFound, backupNotFound := errors.IsObjectNotFound(errPrimary), errors.IsObjectNotFound(errBackup)
if primaryNotFound && backupNotFound {
return "", nil, 0, errPrimary
return nil, 0, errPrimary
}
if errPrimary != nil && !primaryNotFound {
return "", nil, 0, errPrimary
return nil, 0, errPrimary
}
if errBackup != nil && !backupNotFound {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird, if the primary works fine but the backup is broken, we should not return an error I guess. We should return the playlist from the primary bucket, isn't that the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not found in the backup then we return the primary, but unfortunately we need to successfully check for the backups existence because we need to pick the largest of the two manifests lower down. If we failed to check the backup exists then it could be it does exist and is the more complete, larger of the two but we've missed that and gone ahead with the primary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we want to fail completely if there is an error from the backup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the backup should always be working but just giving us a 404 most of the time

return "", nil, 0, errBackup
return nil, 0, errBackup
}

// Return the largest manifest as the most recent version
hasBackup := backupManifestURL != "" && errBackup == nil
if hasBackup && (errPrimary != nil || sizeBackup > size) {
return backupManifestURL, playlistBackup, playlistTypeBackup, nil
return playlistBackup, playlistTypeBackup, nil
}
return sourceManifestOSURL, playlist, playlistType, errPrimary
return playlist, playlistType, errPrimary
}

func downloadManifest(requestID, sourceManifestOSURL string) (playlist m3u8.Playlist, playlistType m3u8.ListType, size int, err error) {
dStorage := NewDStorageDownload()
start := time.Now()
err = backoff.Retry(func() error {
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage)
if err != nil {
if time.Since(start) > 10*time.Second && errors.IsObjectNotFound(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this one? We anyway have the deadline in DownloadRetryBackOff() set to 10 * 5s = 50s, why do also need this check?

Copy link
Contributor

@mjh1 mjh1 Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment for this, we annoyingly had to include this because there's a chance that we will get a not found due to eventual consistency, so we still want to retry not found errors. However we don't want to wait quite as long as normal errors because it'll be quite a common case where the backup doesn't exist. So we don't to add a whole 50s of delay to every recording job essentially.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth a lil constant for the 10s too. maybe const MANIFEST_NOT_FOUND_INCONSISTENCY_TOLERANCE = 10s?

Copy link
Contributor

@leszko leszko Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm fine with with extracting a constant plus adding a comment.

// bail out of the retries earlier for not found errors because it will be quite a common scenario
// where the backup manifest does not exist and we don't want to wait the whole 50s of retries for
// every recording job
return backoff.Permanent(err)
}
return err
}
defer rc.Close()
Expand Down
73 changes: 33 additions & 40 deletions clients/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,56 +365,49 @@ seg-1.ts
`

tests := []struct {
name string
primaryManifest string
backupManifest string
primarySegments []string
backupSegments []string
expectedAbsPaths bool
name string
primaryManifest string
backupManifest string
primarySegments []string
backupSegments []string
}{
{
name: "happy. all segments and manifest available on primary",
primaryManifest: completeManifest,
backupManifest: "",
primarySegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"},
expectedAbsPaths: false,
name: "happy. all segments and manifest available on primary",
primaryManifest: completeManifest,
backupManifest: "",
primarySegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"},
},
{
name: "all segments and manifest available on backup",
primaryManifest: inCompleteManifest,
backupManifest: completeManifest,
backupSegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"},
expectedAbsPaths: true,
name: "all segments and manifest available on backup",
primaryManifest: inCompleteManifest,
backupManifest: completeManifest,
backupSegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"},
},
{
name: "all segments on backup and newest manifest on primary",
primaryManifest: completeManifest,
backupManifest: inCompleteManifest,
backupSegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"},
expectedAbsPaths: true,
name: "all segments on backup and newest manifest on primary",
primaryManifest: completeManifest,
backupManifest: inCompleteManifest,
backupSegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"},
},
{
name: "all segments on primary and newest manifest on backup",
primaryManifest: inCompleteManifest,
backupManifest: completeManifest,
primarySegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"},
expectedAbsPaths: true,
name: "all segments on primary and newest manifest on backup",
primaryManifest: inCompleteManifest,
backupManifest: completeManifest,
primarySegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"},
},
{
name: "segments split between primary and backup, newest manifest on primary",
primaryManifest: completeManifest,
backupManifest: inCompleteManifest,
primarySegments: []string{"seg-0.ts", "seg-2.ts"},
backupSegments: []string{"seg-1.ts", "seg-3.ts"},
expectedAbsPaths: true,
name: "segments split between primary and backup, newest manifest on primary",
primaryManifest: completeManifest,
backupManifest: inCompleteManifest,
primarySegments: []string{"seg-0.ts", "seg-2.ts"},
backupSegments: []string{"seg-1.ts", "seg-3.ts"},
},
{
name: "segments split between primary and backup, newest manifest on backup",
primaryManifest: inCompleteManifest,
backupManifest: completeManifest,
primarySegments: []string{"seg-0.ts", "seg-2.ts"},
backupSegments: []string{"seg-1.ts", "seg-3.ts"},
expectedAbsPaths: true,
name: "segments split between primary and backup, newest manifest on backup",
primaryManifest: inCompleteManifest,
backupManifest: completeManifest,
primarySegments: []string{"seg-0.ts", "seg-2.ts"},
backupSegments: []string{"seg-1.ts", "seg-3.ts"},
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -445,7 +438,7 @@ seg-1.ts
require.NoError(t, err)
}

renditionUrl, err := DownloadRenditionManifestWithBackup("requestID", toUrl(t, filepath.Join(dir, "primary", "index.m3u8")), toUrl(t, filepath.Join(dir, "transfer")))
renditionUrl, err := RecordingBackupCheck("requestID", toUrl(t, filepath.Join(dir, "primary", "index.m3u8")), toUrl(t, filepath.Join(dir, "transfer")))
require.NoError(t, err)

file, err := os.Open(renditionUrl.String())
Expand All @@ -460,7 +453,7 @@ seg-1.ts

require.Len(t, mediaPlaylist.GetAllSegments(), 4)
for i, segment := range mediaPlaylist.GetAllSegments() {
require.Equal(t, tt.expectedAbsPaths, filepath.IsAbs(segment.URI))
require.True(t, filepath.IsAbs(segment.URI))
require.True(t, true, strings.HasSuffix(segment.URI, fmt.Sprintf("seg-%d.ts", i)))
}
})
Expand Down
30 changes: 30 additions & 0 deletions config/storage_backup_url_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package config

import "testing"

func TestGetStorageBackupURL(t *testing.T) {
StorageFallbackURLs = map[string]string{"https://storj.livepeer.com/catalyst-recordings-com/hls": "https://google.livepeer.com/catalyst-recordings-com/hls"}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to restore the default value after tests?

Suggested change
StorageFallbackURLs = map[string]string{"https://storj.livepeer.com/catalyst-recordings-com/hls": "https://google.livepeer.com/catalyst-recordings-com/hls"}
StorageFallbackURLs = map[string]string{"https://storj.livepeer.com/catalyst-recordings-com/hls": "https://google.livepeer.com/catalyst-recordings-com/hls"}
defer func() { StorageFallbackURLs = nil }()

tests := []struct {
name string
urlStr string
want string
}{
{
name: "should replace",
urlStr: "https://storj.livepeer.com/catalyst-recordings-com/hls/foo",
want: "https://google.livepeer.com/catalyst-recordings-com/hls/foo",
},
{
name: "should not replace",
urlStr: "https://blah.livepeer.com/catalyst-recordings-com/hls/foo",
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetStorageBackupURL(tt.urlStr); got != tt.want {
t.Errorf("GetStorageBackupURL() = %v, want %v", got, tt.want)
}
})
}
}
3 changes: 2 additions & 1 deletion errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"strings"

"github.com/cenkalti/backoff/v4"
"github.com/livepeer/catalyst-api/log"
"github.com/xeipuuv/gojsonschema"
)
Expand Down Expand Up @@ -72,7 +73,7 @@ type unretriableError struct{ error }
func Unretriable(err error) error {
// Notice that permanent errors get unwrapped by the backoff lib when they're used to stop the retry loop. So we need
// to keep the unretriableError inside it so it's propagated upstream.
return err // TODO temporary change to fix tests
return backoff.Permanent(unretriableError{err})
}

// IsUnretriable returns whether the given error is an unretriable error.
Expand Down
38 changes: 2 additions & 36 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"fmt"
"log"
"math/rand"
"net/http"
"net/http/httputil"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -41,35 +39,7 @@ import (
"golang.org/x/sync/errgroup"
)

func startProxyServer() error {
proxy := &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = "localhost:9000"
},
}

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
errorChance := float32(0.5)
if r.Method != "GET" {
errorChance = 0.95
}
if rand.Float32() < errorChance {
glog.Errorf("Random error for path=%s", r.URL.Path)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
proxy.ServeHTTP(w, r)
})

return http.ListenAndServe(":9420", nil)
}

func main() {
go func() {
log.Fatal(startProxyServer())
}()

err := flag.Set("logtostderr", "true")
if err != nil {
glog.Fatal(err)
Expand Down Expand Up @@ -147,7 +117,7 @@ func main() {
fs.StringVar(&cli.EncryptKey, "encrypt", "", "Key for encrypting network traffic within Serf. Must be a base64-encoded 32-byte key.")
fs.StringVar(&cli.VodDecryptPublicKey, "catalyst-public-key", "", "Public key of the catalyst node for encryption")
fs.StringVar(&cli.VodDecryptPrivateKey, "catalyst-private-key", "", "Private key of the catalyst node for encryption")
config.CommaMapFlag(fs, &cli.StorageFallbackURLs, "storage-fallback-urls", map[string]string{}, `Comma-separated map of primary to backup storage URLs. If a file fails downloading from one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`)
config.CommaMapFlag(fs, &cli.StorageFallbackURLs, "storage-fallback-urls", map[string]string{}, `Comma-separated map of primary to backup storage URLs. If a file fails downloading from one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced. E.g. https://storj.livepeer.com/catalyst-recordings-com/hls=https://google.livepeer.com/catalyst-recordings-com/hls`)
fs.StringVar(&cli.GateURL, "gate-url", "http://localhost:3004/api/access-control/gate", "Address to contact playback gating API for access control verification")
fs.StringVar(&cli.DataURL, "data-url", "http://localhost:3004/api/data", "Address of the Livepeer Data Endpoint")
config.InvertedBoolFlag(fs, &cli.MistTriggerSetup, "mist-trigger-setup", true, "Overwrite Mist triggers with the ones built into catalyst-api")
Expand Down Expand Up @@ -206,11 +176,7 @@ func main() {
return
}

if cli.MistUser != "" || cli.MistPassword != "" {
glog.Warning("DEPRECATION NOTICE: mist-user and mist-password are no longer used and will be removed in a later version")
}

// TODO: I don't love the global variables for these. Me neither.
// TODO: I don't love the global variables for these
config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs
config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs
config.HTTPInternalAddress = cli.HTTPInternalAddress
Expand Down
3 changes: 2 additions & 1 deletion pipeline/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ func (c *Coordinator) StartUploadJob(p UploadJobPayload) {

// Update osTransferURL if needed
if clients.IsHLSInput(sourceURL) {
sourceURL, err = clients.DownloadRenditionManifestWithBackup(p.RequestID, sourceURL, osTransferURL.JoinPath(".."))
// Handle falling back to backup bucket for manifest and segments
sourceURL, err = clients.RecordingBackupCheck(p.RequestID, sourceURL, osTransferURL.JoinPath(".."))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {

sourceManifest, err := clients.DownloadRenditionManifest(transcodeRequest.RequestID, transcodeRequest.SourceManifestURL)
if err != nil {
return nil, fmt.Errorf("error downloading source manifest %s: %w", transcodeRequest.SourceManifestURL, err)
return nil, fmt.Errorf("error downloading source manifest %s: %w", log.RedactURL(transcodeRequest.SourceManifestURL), err)
}

sourceSegments := sourceManifest.GetAllSegments()
Expand Down
7 changes: 4 additions & 3 deletions test/steps/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@ func (s *StepContext) AllOfTheSourceSegmentsAreWrittenToStorageWithinSeconds(num
session := osDriver.NewSession(filepath.Join(s.latestRequestID, "source"))

var latestNumSegments int
for x := 0; x < secs; x++ {
for x := 0; x < secs; x++ { // retry loop
if x > 0 {
time.Sleep(time.Second)
}
page, err := session.ListFiles(context.Background(), "", "")
if err != nil {
log.Println("failed to list files: ", err)
time.Sleep(time.Second)
continue
}

latestNumSegments = len(page.Files())
if latestNumSegments == numSegments+1 {
return nil
}
time.Sleep(time.Second)
}
return fmt.Errorf("did not find the expected number of source segments in %s (wanted %d, got %d)", s.SourceOutputDir, numSegments, latestNumSegments)
}
Expand Down
Loading
Loading