Skip to content

Commit

Permalink
clients: Download manifest from backup OS
Browse files Browse the repository at this point in the history
  • Loading branch information
victorges committed Jun 21, 2024
1 parent 44190d9 commit d6ce834
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 51 deletions.
94 changes: 76 additions & 18 deletions clients/manifest.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clients

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -11,10 +12,13 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/grafov/m3u8"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/video"
)

Expand All @@ -33,25 +37,9 @@ func DownloadRetryBackoffLong() backoff.BackOff {
var DownloadRetryBackoff = DownloadRetryBackoffLong

func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.MediaPlaylist, error) {
var playlist m3u8.Playlist
var playlistType m3u8.ListType

dStorage := NewDStorageDownload()
err := backoff.Retry(func() error {
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage)
if err != nil {
return fmt.Errorf("error downloading manifest: %s", err)
}
defer rc.Close()

playlist, playlistType, err = m3u8.DecodeFrom(rc, true)
if err != nil {
return fmt.Errorf("error decoding manifest: %s", err)
}
return nil
}, DownloadRetryBackoff())
playlist, playlistType, err := downloadManifestWithBackup(requestID, sourceManifestOSURL)
if err != nil {
return m3u8.MediaPlaylist{}, err
return m3u8.MediaPlaylist{}, fmt.Errorf("error downloading manifest: %w", err)
}

// We shouldn't ever receive Master playlists from the previous section
Expand All @@ -68,6 +56,76 @@ func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.Medi
return *mediaPlaylist, nil
}

func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (m3u8.Playlist, m3u8.ListType, error) {
var playlist, playlistBackup m3u8.Playlist
var playlistType, playlistTypeBackup m3u8.ListType
var size, sizeBackup int
var errPrimary, errBackup error

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
playlist, playlistType, size, errPrimary = downloadManifest(requestID, sourceManifestOSURL)
}()

backupManifestURL := config.GetStorageBackupURL(sourceManifestOSURL)
if backupManifestURL != "" {
wg.Add(1)
go func() {
defer wg.Done()
playlistBackup, playlistTypeBackup, sizeBackup, errBackup = downloadManifest(requestID, backupManifestURL)
}()
}
wg.Wait()

// If the file is not found in either storage, return the not found err from
// the primary. Otherwise, return any error that is not a simple not found
// (only not found errors passthrough below)
primaryNotFound, backupNotFound := errors.IsObjectNotFound(errPrimary), errors.IsObjectNotFound(errBackup)
if primaryNotFound && backupNotFound {
return nil, 0, errPrimary
}
if errPrimary != nil && !primaryNotFound {
return nil, 0, errPrimary
}
if errBackup != nil && !backupNotFound {
return nil, 0, errBackup
}

// Return the largest manifest as the most recent version
hasBackup := backupManifestURL != "" && errBackup == nil
if hasBackup && (errPrimary != nil || sizeBackup > size) {
return playlistBackup, playlistTypeBackup, nil
}
return playlist, playlistType, errPrimary
}

func downloadManifest(requestID, sourceManifestOSURL string) (playlist m3u8.Playlist, playlistType m3u8.ListType, size int, err error) {
dStorage := NewDStorageDownload()
err = backoff.Retry(func() error {
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage)
if err != nil {
return err
}
defer rc.Close()

data := new(bytes.Buffer)
_, err = data.ReadFrom(rc)
if err != nil {
return fmt.Errorf("error reading manifest: %s", err)
}

size = data.Len()
playlist, playlistType, err = m3u8.Decode(*data, true)
if err != nil {
return fmt.Errorf("error decoding manifest: %s", err)
}
return nil
}, DownloadRetryBackoff())
return
}

type SourceSegment struct {
URL *url.URL
DurationMillis int64
Expand Down
1 change: 1 addition & 0 deletions clients/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestDownloadRenditionManifestFailsWhenItCantFindTheManifest(t *testing.T) {
_, err := DownloadRenditionManifest("blah", "/tmp/something/x.m3u8")
require.Error(t, err)
require.Contains(t, err.Error(), "error downloading manifest")
require.Contains(t, err.Error(), "ObjectNotFoundError")
}

func TestDownloadRenditionManifestFailsWhenItCantParseTheManifest(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions config/storage_backup_url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package config

import "strings"

// GetStorageBackupURL returns the backup URL for the given URL or an empty string if it doesn't exist. The backup URL
// is found by checking the `StorageFallbackURLs` global config map. If any of the primary URL prefixes (keys in map)
// are in `urlStr`, it is replaced with the backup URL prefix (associated value of the key in the map).
func GetStorageBackupURL(urlStr string) string {
for primary, backup := range StorageFallbackURLs {
if strings.HasPrefix(urlStr, primary) {
return strings.Replace(urlStr, primary, backup, 1)
}
}
return ""
}
37 changes: 4 additions & 33 deletions thumbnails/thumbnails.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,6 @@ const outputDir = "thumbnails"
// Wait a maximum of 5 mins for thumbnails to finish
var thumbWaitBackoff = backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 10)

func getMediaManifest(requestID string, input string) (*m3u8.MediaPlaylist, error) {
var (
rc io.ReadCloser
err error
)
err = backoff.Retry(func() error {
rc, err = clients.GetFile(context.Background(), requestID, input, nil)
return err
}, clients.DownloadRetryBackoff())
if err != nil {
return nil, fmt.Errorf("error downloading manifest: %w", err)
}
defer rc.Close()

manifest, playlistType, err := m3u8.DecodeFrom(rc, true)
if err != nil {
return nil, fmt.Errorf("failed to decode manifest: %w", err)
}

if playlistType != m3u8.MEDIA {
return nil, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
}
mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist)
if !ok || mediaPlaylist == nil {
return nil, fmt.Errorf("failed to parse playlist as MediaPlaylist")
}
return mediaPlaylist, nil
}

func getSegmentOffset(mediaPlaylist *m3u8.MediaPlaylist) (int64, error) {
segments := mediaPlaylist.GetAllSegments()
if len(segments) < 1 {
Expand All @@ -71,7 +42,7 @@ func getSegmentOffset(mediaPlaylist *m3u8.MediaPlaylist) (int64, error) {

func GenerateThumbsVTT(requestID string, input string, output *url.URL) error {
// download and parse the manifest
mediaPlaylist, err := getMediaManifest(requestID, input)
mediaPlaylist, err := clients.DownloadRenditionManifest(requestID, input)
if err != nil {
return err
}
Expand All @@ -83,7 +54,7 @@ func GenerateThumbsVTT(requestID string, input string, output *url.URL) error {
if err != nil {
return err
}
segmentOffset, err := getSegmentOffset(mediaPlaylist)
segmentOffset, err := getSegmentOffset(&mediaPlaylist)
if err != nil {
return err
}
Expand Down Expand Up @@ -172,15 +143,15 @@ func GenerateThumb(segmentURI string, input []byte, output *url.URL, segmentOffs

func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error {
// parse manifest and generate one thumbnail per segment
mediaPlaylist, err := getMediaManifest(requestID, input)
mediaPlaylist, err := clients.DownloadRenditionManifest(requestID, input)
if err != nil {
return err
}
inputURL, err := url.Parse(input)
if err != nil {
return err
}
segmentOffset, err := getSegmentOffset(mediaPlaylist)
segmentOffset, err := getSegmentOffset(&mediaPlaylist)
if err != nil {
return err
}
Expand Down

0 comments on commit d6ce834

Please sign in to comment.