From d6ce834017ff7bbf749d050c3d08aea28552b7e6 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 20 Jun 2024 21:18:58 +0100 Subject: [PATCH] clients: Download manifest from backup OS --- clients/manifest.go | 94 +++++++++++++++++++++++++++++------- clients/manifest_test.go | 1 + config/storage_backup_url.go | 15 ++++++ thumbnails/thumbnails.go | 37 ++------------ 4 files changed, 96 insertions(+), 51 deletions(-) create mode 100644 config/storage_backup_url.go diff --git a/clients/manifest.go b/clients/manifest.go index 4c047bad7..25315997a 100644 --- a/clients/manifest.go +++ b/clients/manifest.go @@ -1,6 +1,7 @@ package clients import ( + "bytes" "context" "fmt" "io" @@ -11,10 +12,13 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/cenkalti/backoff/v4" "github.com/grafov/m3u8" + "github.com/livepeer/catalyst-api/config" + "github.com/livepeer/catalyst-api/errors" "github.com/livepeer/catalyst-api/video" ) @@ -33,25 +37,9 @@ func DownloadRetryBackoffLong() backoff.BackOff { var DownloadRetryBackoff = DownloadRetryBackoffLong func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.MediaPlaylist, error) { - var playlist m3u8.Playlist - var playlistType m3u8.ListType - - dStorage := NewDStorageDownload() - err := backoff.Retry(func() error { - rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage) - if err != nil { - return fmt.Errorf("error downloading manifest: %s", err) - } - defer rc.Close() - - playlist, playlistType, err = m3u8.DecodeFrom(rc, true) - if err != nil { - return fmt.Errorf("error decoding manifest: %s", err) - } - return nil - }, DownloadRetryBackoff()) + playlist, playlistType, err := downloadManifestWithBackup(requestID, sourceManifestOSURL) if err != nil { - return m3u8.MediaPlaylist{}, err + return m3u8.MediaPlaylist{}, fmt.Errorf("error downloading manifest: %w", err) } // We shouldn't ever receive Master playlists from the previous section @@ -68,6 +56,76 @@ func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.Medi return *mediaPlaylist, nil } +func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (m3u8.Playlist, m3u8.ListType, error) { + var playlist, playlistBackup m3u8.Playlist + var playlistType, playlistTypeBackup m3u8.ListType + var size, sizeBackup int + var errPrimary, errBackup error + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + playlist, playlistType, size, errPrimary = downloadManifest(requestID, sourceManifestOSURL) + }() + + backupManifestURL := config.GetStorageBackupURL(sourceManifestOSURL) + if backupManifestURL != "" { + wg.Add(1) + go func() { + defer wg.Done() + playlistBackup, playlistTypeBackup, sizeBackup, errBackup = downloadManifest(requestID, backupManifestURL) + }() + } + wg.Wait() + + // If the file is not found in either storage, return the not found err from + // the primary. Otherwise, return any error that is not a simple not found + // (only not found errors passthrough below) + primaryNotFound, backupNotFound := errors.IsObjectNotFound(errPrimary), errors.IsObjectNotFound(errBackup) + if primaryNotFound && backupNotFound { + return nil, 0, errPrimary + } + if errPrimary != nil && !primaryNotFound { + return nil, 0, errPrimary + } + if errBackup != nil && !backupNotFound { + return nil, 0, errBackup + } + + // Return the largest manifest as the most recent version + hasBackup := backupManifestURL != "" && errBackup == nil + if hasBackup && (errPrimary != nil || sizeBackup > size) { + return playlistBackup, playlistTypeBackup, nil + } + return playlist, playlistType, errPrimary +} + +func downloadManifest(requestID, sourceManifestOSURL string) (playlist m3u8.Playlist, playlistType m3u8.ListType, size int, err error) { + dStorage := NewDStorageDownload() + err = backoff.Retry(func() error { + rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage) + if err != nil { + return err + } + defer rc.Close() + + data := new(bytes.Buffer) + _, err = data.ReadFrom(rc) + if err != nil { + return fmt.Errorf("error reading manifest: %s", err) + } + + size = data.Len() + playlist, playlistType, err = m3u8.Decode(*data, true) + if err != nil { + return fmt.Errorf("error decoding manifest: %s", err) + } + return nil + }, DownloadRetryBackoff()) + return +} + type SourceSegment struct { URL *url.URL DurationMillis int64 diff --git a/clients/manifest_test.go b/clients/manifest_test.go index 4808e328a..d27d775ab 100644 --- a/clients/manifest_test.go +++ b/clients/manifest_test.go @@ -40,6 +40,7 @@ func TestDownloadRenditionManifestFailsWhenItCantFindTheManifest(t *testing.T) { _, err := DownloadRenditionManifest("blah", "/tmp/something/x.m3u8") require.Error(t, err) require.Contains(t, err.Error(), "error downloading manifest") + require.Contains(t, err.Error(), "ObjectNotFoundError") } func TestDownloadRenditionManifestFailsWhenItCantParseTheManifest(t *testing.T) { diff --git a/config/storage_backup_url.go b/config/storage_backup_url.go new file mode 100644 index 000000000..e197cd182 --- /dev/null +++ b/config/storage_backup_url.go @@ -0,0 +1,15 @@ +package config + +import "strings" + +// GetStorageBackupURL returns the backup URL for the given URL or an empty string if it doesn't exist. The backup URL +// is found by checking the `StorageFallbackURLs` global config map. If any of the primary URL prefixes (keys in map) +// are in `urlStr`, it is replaced with the backup URL prefix (associated value of the key in the map). +func GetStorageBackupURL(urlStr string) string { + for primary, backup := range StorageFallbackURLs { + if strings.HasPrefix(urlStr, primary) { + return strings.Replace(urlStr, primary, backup, 1) + } + } + return "" +} diff --git a/thumbnails/thumbnails.go b/thumbnails/thumbnails.go index 76587481d..6e7a8490b 100644 --- a/thumbnails/thumbnails.go +++ b/thumbnails/thumbnails.go @@ -28,35 +28,6 @@ const outputDir = "thumbnails" // Wait a maximum of 5 mins for thumbnails to finish var thumbWaitBackoff = backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 10) -func getMediaManifest(requestID string, input string) (*m3u8.MediaPlaylist, error) { - var ( - rc io.ReadCloser - err error - ) - err = backoff.Retry(func() error { - rc, err = clients.GetFile(context.Background(), requestID, input, nil) - return err - }, clients.DownloadRetryBackoff()) - if err != nil { - return nil, fmt.Errorf("error downloading manifest: %w", err) - } - defer rc.Close() - - manifest, playlistType, err := m3u8.DecodeFrom(rc, true) - if err != nil { - return nil, fmt.Errorf("failed to decode manifest: %w", err) - } - - if playlistType != m3u8.MEDIA { - return nil, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported") - } - mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist) - if !ok || mediaPlaylist == nil { - return nil, fmt.Errorf("failed to parse playlist as MediaPlaylist") - } - return mediaPlaylist, nil -} - func getSegmentOffset(mediaPlaylist *m3u8.MediaPlaylist) (int64, error) { segments := mediaPlaylist.GetAllSegments() if len(segments) < 1 { @@ -71,7 +42,7 @@ func getSegmentOffset(mediaPlaylist *m3u8.MediaPlaylist) (int64, error) { func GenerateThumbsVTT(requestID string, input string, output *url.URL) error { // download and parse the manifest - mediaPlaylist, err := getMediaManifest(requestID, input) + mediaPlaylist, err := clients.DownloadRenditionManifest(requestID, input) if err != nil { return err } @@ -83,7 +54,7 @@ func GenerateThumbsVTT(requestID string, input string, output *url.URL) error { if err != nil { return err } - segmentOffset, err := getSegmentOffset(mediaPlaylist) + segmentOffset, err := getSegmentOffset(&mediaPlaylist) if err != nil { return err } @@ -172,7 +143,7 @@ func GenerateThumb(segmentURI string, input []byte, output *url.URL, segmentOffs func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error { // parse manifest and generate one thumbnail per segment - mediaPlaylist, err := getMediaManifest(requestID, input) + mediaPlaylist, err := clients.DownloadRenditionManifest(requestID, input) if err != nil { return err } @@ -180,7 +151,7 @@ func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error if err != nil { return err } - segmentOffset, err := getSegmentOffset(mediaPlaylist) + segmentOffset, err := getSegmentOffset(&mediaPlaylist) if err != nil { return err }