Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clients: Implement storage fallback for recordings #1303

Merged
merged 19 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,30 @@ func GetFile(ctx context.Context, requestID, url string, dStorage *DStorageDownl
}
}

func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, string, error) {
rc, err := GetFile(ctx, requestID, url, dStorage)
if err == nil {
return rc, url, nil
}

backupURL := config.GetStorageBackupURL(url)
if backupURL == "" {
return nil, url, err
}
rc, backupErr := GetFile(ctx, requestID, backupURL, dStorage)
if backupErr == nil {
return rc, backupURL, nil
}

// prioritize retriable errors in the response so we don't skip retries
if !catErrs.IsUnretriable(err) {
return nil, url, err
} else if !catErrs.IsUnretriable(backupErr) {
return nil, backupURL, backupErr
}
return nil, url, err
}

var retryableHttpClient = newRetryableHttpClient()

func newRetryableHttpClient() *http.Client {
Expand Down
156 changes: 137 additions & 19 deletions clients/manifest.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clients

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -11,10 +12,13 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/grafov/m3u8"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/video"
)

Expand All @@ -33,27 +37,78 @@ func DownloadRetryBackoffLong() backoff.BackOff {
var DownloadRetryBackoff = DownloadRetryBackoffLong

func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.MediaPlaylist, error) {
var playlist m3u8.Playlist
var playlistType m3u8.ListType
playlist, playlistType, _, err := downloadManifest(requestID, sourceManifestOSURL)
if err != nil {
return m3u8.MediaPlaylist{}, err
}
return convertToMediaPlaylist(playlist, playlistType)
}

func DownloadRenditionManifestWithBackup(requestID string, primaryManifestURL, osTransferURL *url.URL) (*url.URL, error) {
manifestURL, playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String())
if err != nil {
return nil, fmt.Errorf("error downloading manifest: %w", err)
}
mediaPlaylist, err := convertToMediaPlaylist(playlist, playlistType)
if err != nil {
return nil, err
}

// If we had to fall back to the backup manifest then we need a new manifest with absolute URLs because
// successful segment uploads will exist in the primary store and therefore the relative URLs will be broken.
newPlaylistNeeded := manifestURL != primaryManifestURL.String()
mjh1 marked this conversation as resolved.
Show resolved Hide resolved

// Check whether segments are available from primary or backup storage
dStorage := NewDStorageDownload()
err := backoff.Retry(func() error {
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage)
for _, segment := range mediaPlaylist.GetAllSegments() {
segURL, err := ManifestURLToSegmentURL(primaryManifestURL.String(), segment.URI)
victorges marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error downloading manifest: %s", err)
return nil, fmt.Errorf("error getting segment URL: %w", err)
}
defer rc.Close()
var actualSegURL string
err = backoff.Retry(func() error {
var rc io.ReadCloser
rc, actualSegURL, err = GetFileWithBackup(context.TODO(), requestID, segURL.String(), dStorage)
if rc != nil {
rc.Close()
}
return err
}, DownloadRetryBackoff())
mjh1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("failed to find segment file %s: %w", segURL.Redacted(), err)
}
if actualSegURL != segURL.String() {
// If we had to fall back to a backup segment then we'll need a new manifest with absolute URLs
newPlaylistNeeded = true
}
segment.URI = actualSegURL
mjh1 marked this conversation as resolved.
Show resolved Hide resolved
}

playlist, playlistType, err = m3u8.DecodeFrom(rc, true)
if newPlaylistNeeded {
// TODO log line
mjh1 marked this conversation as resolved.
Show resolved Hide resolved

// write the manifest to storage and update the manifestURL variable
outputStorageURL := osTransferURL.JoinPath("input.m3u8")
mjh1 marked this conversation as resolved.
Show resolved Hide resolved
err = backoff.Retry(func() error {
return UploadToOSURL(outputStorageURL.String(), "", strings.NewReader(mediaPlaylist.String()), ManifestUploadTimeout)
}, UploadRetryBackoff())
if err != nil {
return fmt.Errorf("error decoding manifest: %s", err)
return nil, fmt.Errorf("failed to upload rendition playlist: %w", err)
}
return nil
}, DownloadRetryBackoff())
manifestURL, err = SignURL(outputStorageURL)
if err != nil {
return nil, fmt.Errorf("failed to sign manifest url: %w", err)
}
}

newURL, err := url.Parse(manifestURL)
if err != nil {
return m3u8.MediaPlaylist{}, err
return nil, fmt.Errorf("failed to parse new manifest URL: %w", err)
}
return newURL, nil
}

func convertToMediaPlaylist(playlist m3u8.Playlist, playlistType m3u8.ListType) (m3u8.MediaPlaylist, error) {
// We shouldn't ever receive Master playlists from the previous section
if playlistType != m3u8.MEDIA {
return m3u8.MediaPlaylist{}, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
Expand All @@ -64,10 +119,79 @@ func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.Medi
if !ok || mediaPlaylist == nil {
return m3u8.MediaPlaylist{}, fmt.Errorf("failed to parse playlist as MediaPlaylist")
}

return *mediaPlaylist, nil
}

func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (string, m3u8.Playlist, m3u8.ListType, error) {
var playlist, playlistBackup m3u8.Playlist
var playlistType, playlistTypeBackup m3u8.ListType
var size, sizeBackup int
var errPrimary, errBackup error

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
playlist, playlistType, size, errPrimary = downloadManifest(requestID, sourceManifestOSURL)
}()

backupManifestURL := config.GetStorageBackupURL(sourceManifestOSURL)
if backupManifestURL != "" {
wg.Add(1)
go func() {
defer wg.Done()
playlistBackup, playlistTypeBackup, sizeBackup, errBackup = downloadManifest(requestID, backupManifestURL)
}()
}
wg.Wait()

// If the file is not found in either storage, return the not found err from
// the primary. Otherwise, return any error that is not a simple not found
// (only not found errors passthrough below)
primaryNotFound, backupNotFound := errors.IsObjectNotFound(errPrimary), errors.IsObjectNotFound(errBackup)
if primaryNotFound && backupNotFound {
return "", nil, 0, errPrimary
}
if errPrimary != nil && !primaryNotFound {
return "", nil, 0, errPrimary
}
if errBackup != nil && !backupNotFound {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird, if the primary works fine but the backup is broken, we should not return an error I guess. We should return the playlist from the primary bucket, isn't that the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not found in the backup then we return the primary, but unfortunately we need to successfully check for the backups existence because we need to pick the largest of the two manifests lower down. If we failed to check the backup exists then it could be it does exist and is the more complete, larger of the two but we've missed that and gone ahead with the primary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we want to fail completely if there is an error from the backup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the backup should always be working but just giving us a 404 most of the time

return "", nil, 0, errBackup
}

// Return the largest manifest as the most recent version
hasBackup := backupManifestURL != "" && errBackup == nil
if hasBackup && (errPrimary != nil || sizeBackup > size) {
return backupManifestURL, playlistBackup, playlistTypeBackup, nil
}
return sourceManifestOSURL, playlist, playlistType, errPrimary
}

func downloadManifest(requestID, sourceManifestOSURL string) (playlist m3u8.Playlist, playlistType m3u8.ListType, size int, err error) {
dStorage := NewDStorageDownload()
err = backoff.Retry(func() error {
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage)
if err != nil {
return err
}
defer rc.Close()

data := new(bytes.Buffer)
_, err = data.ReadFrom(rc)
if err != nil {
return fmt.Errorf("error reading manifest: %s", err)
}

size = data.Len()
playlist, playlistType, err = m3u8.Decode(*data, true)
if err != nil {
return fmt.Errorf("error decoding manifest: %s", err)
}
return nil
}, DownloadRetryBackoff())
return
}

type SourceSegment struct {
URL *url.URL
DurationMillis int64
Expand All @@ -76,13 +200,7 @@ type SourceSegment struct {
// Loop over each segment in a given manifest and convert it from a relative path to a full ObjectStore-compatible URL
func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) {
var urls []SourceSegment
for _, segment := range manifest.Segments {
// The segments list is a ring buffer - see https://github.com/grafov/m3u8/issues/140
// and so we only know we've hit the end of the list when we find a nil element
if segment == nil {
break
}

for _, segment := range manifest.GetAllSegments() {
u, err := ManifestURLToSegmentURL(sourceManifestURL, segment.URI)
if err != nil {
return nil, err
Expand Down
Loading