Skip to content

Commit

Permalink
clients: Implement storage fallback for recordings (#1303)
Browse files Browse the repository at this point in the history
* config: Create storage-fallback-urls flag

* config: Make the StorageFallbackURLs a global

* clients: Download manifest from backup OS

* clients: Download segments from backup URL

* [WIP] clients: Use storage fallback for probing segments too

This is incomplete as we still need to fix the probing made on
the manifest itself instead of just on each segment. Rabbit hole.

* [DEV-revert] box-only: Start a local proxy server to simulate an unreliable storage

Just proxies to Minio with a random error chance

* Add cucumber test

* Refactor

* remove comment

* fix unit test. (cukes still need fixing)

* Fix cukes

* add unit tests

* Address TODOs and review comments

* return variable no longer needed

* review comments

* Review comments

---------

Co-authored-by: Max Holland <[email protected]>
  • Loading branch information
victorges and mjh1 authored Jul 25, 2024
1 parent 21cbded commit ac85dfd
Show file tree
Hide file tree
Showing 19 changed files with 526 additions and 105 deletions.
24 changes: 24 additions & 0 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,30 @@ func GetFile(ctx context.Context, requestID, url string, dStorage *DStorageDownl
}
}

func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, string, error) {
rc, err := GetFile(ctx, requestID, url, dStorage)
if err == nil {
return rc, url, nil
}

backupURL := config.GetStorageBackupURL(url)
if backupURL == "" {
return nil, url, err
}
rc, backupErr := GetFile(ctx, requestID, backupURL, dStorage)
if backupErr == nil {
return rc, backupURL, nil
}

// prioritize retriable errors in the response so we don't skip retries
if !catErrs.IsUnretriable(err) {
return nil, url, err
} else if !catErrs.IsUnretriable(backupErr) {
return nil, backupURL, backupErr
}
return nil, url, err
}

var retryableHttpClient = newRetryableHttpClient()

func newRetryableHttpClient() *http.Client {
Expand Down
171 changes: 146 additions & 25 deletions clients/manifest.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clients

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -11,19 +12,23 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/grafov/m3u8"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/video"
)

const (
MasterManifestFilename = "index.m3u8"
DashManifestFilename = "index.mpd"
ClipManifestFilename = "clip.m3u8"
ManifestUploadTimeout = 5 * time.Minute
Fmp4PostfixDir = "fmp4"
MasterManifestFilename = "index.m3u8"
DashManifestFilename = "index.mpd"
ClipManifestFilename = "clip.m3u8"
ManifestUploadTimeout = 5 * time.Minute
Fmp4PostfixDir = "fmp4"
manifestNotFoundTolerance = 10 * time.Second
)

func DownloadRetryBackoffLong() backoff.BackOff {
Expand All @@ -33,27 +38,73 @@ func DownloadRetryBackoffLong() backoff.BackOff {
var DownloadRetryBackoff = DownloadRetryBackoffLong

func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.MediaPlaylist, error) {
var playlist m3u8.Playlist
var playlistType m3u8.ListType
playlist, playlistType, _, err := downloadManifest(requestID, sourceManifestOSURL)
if err != nil {
return m3u8.MediaPlaylist{}, err
}
return convertToMediaPlaylist(playlist, playlistType)
}

// RecordingBackupCheck checks whether manifests and segments are available on the primary or
// the backup store and returns a URL to new manifest with absolute segment URLs pointing to either primary or
// backup locations depending on where the segments are available.
func RecordingBackupCheck(requestID string, primaryManifestURL, osTransferURL *url.URL) (*url.URL, error) {
if config.GetStorageBackupURL(primaryManifestURL.String()) == "" {
return primaryManifestURL, nil
}

playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String())
if err != nil {
return nil, fmt.Errorf("error downloading manifest: %w", err)
}
mediaPlaylist, err := convertToMediaPlaylist(playlist, playlistType)
if err != nil {
return nil, err
}

// Check whether segments are available from primary or backup storage
dStorage := NewDStorageDownload()
err := backoff.Retry(func() error {
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage)
for _, segment := range mediaPlaylist.GetAllSegments() {
segURL, err := ManifestURLToSegmentURL(primaryManifestURL.String(), segment.URI)
if err != nil {
return fmt.Errorf("error downloading manifest: %s", err)
return nil, fmt.Errorf("error getting segment URL: %w", err)
}
defer rc.Close()

playlist, playlistType, err = m3u8.DecodeFrom(rc, true)
var actualSegURL string
err = backoff.Retry(func() error {
var rc io.ReadCloser
rc, actualSegURL, err = GetFileWithBackup(context.Background(), requestID, segURL.String(), dStorage)
if rc != nil {
rc.Close()
}
return err
}, DownloadRetryBackoff())
if err != nil {
return fmt.Errorf("error decoding manifest: %s", err)
return nil, fmt.Errorf("failed to find segment file %s: %w", segURL.Redacted(), err)
}
return nil
}, DownloadRetryBackoff())
segment.URI = actualSegURL
}

// write the manifest to storage and update the manifestURL variable
outputStorageURL := osTransferURL.JoinPath("input.m3u8")
err = backoff.Retry(func() error {
return UploadToOSURL(outputStorageURL.String(), "", strings.NewReader(mediaPlaylist.String()), ManifestUploadTimeout)
}, UploadRetryBackoff())
if err != nil {
return m3u8.MediaPlaylist{}, err
return nil, fmt.Errorf("failed to upload rendition playlist: %w", err)
}
manifestURL, err := SignURL(outputStorageURL)
if err != nil {
return nil, fmt.Errorf("failed to sign manifest url: %w", err)
}

newURL, err := url.Parse(manifestURL)
if err != nil {
return nil, fmt.Errorf("failed to parse new manifest URL: %w", err)
}
return newURL, nil
}

func convertToMediaPlaylist(playlist m3u8.Playlist, playlistType m3u8.ListType) (m3u8.MediaPlaylist, error) {
// We shouldn't ever receive Master playlists from the previous section
if playlistType != m3u8.MEDIA {
return m3u8.MediaPlaylist{}, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
Expand All @@ -64,10 +115,86 @@ func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.Medi
if !ok || mediaPlaylist == nil {
return m3u8.MediaPlaylist{}, fmt.Errorf("failed to parse playlist as MediaPlaylist")
}

return *mediaPlaylist, nil
}

func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (m3u8.Playlist, m3u8.ListType, error) {
var playlist, playlistBackup m3u8.Playlist
var playlistType, playlistTypeBackup m3u8.ListType
var size, sizeBackup int
var errPrimary, errBackup error

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
playlist, playlistType, size, errPrimary = downloadManifest(requestID, sourceManifestOSURL)
}()

backupManifestURL := config.GetStorageBackupURL(sourceManifestOSURL)
if backupManifestURL != "" {
wg.Add(1)
go func() {
defer wg.Done()
playlistBackup, playlistTypeBackup, sizeBackup, errBackup = downloadManifest(requestID, backupManifestURL)
}()
}
wg.Wait()

// If the file is not found in either storage, return the not found err from
// the primary. Otherwise, return any error that is not a simple not found
// (only not found errors passthrough below)
primaryNotFound, backupNotFound := errors.IsObjectNotFound(errPrimary), errors.IsObjectNotFound(errBackup)
if primaryNotFound && backupNotFound {
return nil, 0, errPrimary
}
if errPrimary != nil && !primaryNotFound {
return nil, 0, errPrimary
}
if errBackup != nil && !backupNotFound {
return nil, 0, errBackup
}

// Return the largest manifest as the most recent version
hasBackup := backupManifestURL != "" && errBackup == nil
if hasBackup && (errPrimary != nil || sizeBackup > size) {
return playlistBackup, playlistTypeBackup, nil
}
return playlist, playlistType, errPrimary
}

func downloadManifest(requestID, sourceManifestOSURL string) (playlist m3u8.Playlist, playlistType m3u8.ListType, size int, err error) {
dStorage := NewDStorageDownload()
start := time.Now()
err = backoff.Retry(func() error {
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage)
if err != nil {
if time.Since(start) > manifestNotFoundTolerance && errors.IsObjectNotFound(err) {
// bail out of the retries earlier for not found errors because it will be quite a common scenario
// where the backup manifest does not exist and we don't want to wait the whole 50s of retries for
// every recording job
return backoff.Permanent(err)
}
return err
}
defer rc.Close()

data := new(bytes.Buffer)
_, err = data.ReadFrom(rc)
if err != nil {
return fmt.Errorf("error reading manifest: %s", err)
}

size = data.Len()
playlist, playlistType, err = m3u8.Decode(*data, true)
if err != nil {
return fmt.Errorf("error decoding manifest: %s", err)
}
return nil
}, DownloadRetryBackoff())
return
}

type SourceSegment struct {
URL *url.URL
DurationMillis int64
Expand All @@ -76,13 +203,7 @@ type SourceSegment struct {
// Loop over each segment in a given manifest and convert it from a relative path to a full ObjectStore-compatible URL
func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) {
var urls []SourceSegment
for _, segment := range manifest.Segments {
// The segments list is a ring buffer - see https://github.com/grafov/m3u8/issues/140
// and so we only know we've hit the end of the list when we find a nil element
if segment == nil {
break
}

for _, segment := range manifest.GetAllSegments() {
u, err := ManifestURLToSegmentURL(sourceManifestURL, segment.URI)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit ac85dfd

Please sign in to comment.