Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clients: Implement storage fallback for recordings #1303

Merged
merged 19 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,30 @@ func GetFile(ctx context.Context, requestID, url string, dStorage *DStorageDownl
}
}

func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, string, error) {
rc, err := GetFile(ctx, requestID, url, dStorage)
if err == nil {
return rc, url, nil
}

backupURL := config.GetStorageBackupURL(url)
if backupURL == "" {
return nil, url, err
}
rc, backupErr := GetFile(ctx, requestID, backupURL, dStorage)
if backupErr == nil {
return rc, backupURL, nil
}

// prioritize retriable errors in the response so we don't skip retries
if !catErrs.IsUnretriable(err) {
return nil, url, err
} else if !catErrs.IsUnretriable(backupErr) {
return nil, backupURL, backupErr
}
return nil, url, err
}

var retryableHttpClient = newRetryableHttpClient()

func newRetryableHttpClient() *http.Client {
Expand Down
160 changes: 140 additions & 20 deletions clients/manifest.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clients

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -11,10 +12,13 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/grafov/m3u8"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/video"
)

Expand All @@ -33,27 +37,73 @@ func DownloadRetryBackoffLong() backoff.BackOff {
var DownloadRetryBackoff = DownloadRetryBackoffLong

func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.MediaPlaylist, error) {
var playlist m3u8.Playlist
var playlistType m3u8.ListType
playlist, playlistType, _, err := downloadManifest(requestID, sourceManifestOSURL)
if err != nil {
return m3u8.MediaPlaylist{}, err
}
return convertToMediaPlaylist(playlist, playlistType)
}

// RecordingBackupCheck checks whether manifests and segments are available on the primary or
// the backup store and returns a URL to new manifest with absolute segment URLs pointing to either primary or
// backup locations depending on where the segments are available.
func RecordingBackupCheck(requestID string, primaryManifestURL, osTransferURL *url.URL) (*url.URL, error) {
if config.GetStorageBackupURL(primaryManifestURL.String()) == "" {
return primaryManifestURL, nil
}

playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String())
if err != nil {
return nil, fmt.Errorf("error downloading manifest: %w", err)
}
mediaPlaylist, err := convertToMediaPlaylist(playlist, playlistType)
if err != nil {
return nil, err
}

// Check whether segments are available from primary or backup storage
dStorage := NewDStorageDownload()
err := backoff.Retry(func() error {
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage)
for _, segment := range mediaPlaylist.GetAllSegments() {
segURL, err := ManifestURLToSegmentURL(primaryManifestURL.String(), segment.URI)
victorges marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error downloading manifest: %s", err)
return nil, fmt.Errorf("error getting segment URL: %w", err)
}
defer rc.Close()

playlist, playlistType, err = m3u8.DecodeFrom(rc, true)
var actualSegURL string
err = backoff.Retry(func() error {
var rc io.ReadCloser
rc, actualSegURL, err = GetFileWithBackup(context.Background(), requestID, segURL.String(), dStorage)
if rc != nil {
rc.Close()
}
return err
}, DownloadRetryBackoff())
if err != nil {
return fmt.Errorf("error decoding manifest: %s", err)
return nil, fmt.Errorf("failed to find segment file %s: %w", segURL.Redacted(), err)
}
return nil
}, DownloadRetryBackoff())
segment.URI = actualSegURL
}

// write the manifest to storage and update the manifestURL variable
outputStorageURL := osTransferURL.JoinPath("input.m3u8")
err = backoff.Retry(func() error {
return UploadToOSURL(outputStorageURL.String(), "", strings.NewReader(mediaPlaylist.String()), ManifestUploadTimeout)
}, UploadRetryBackoff())
if err != nil {
return m3u8.MediaPlaylist{}, err
return nil, fmt.Errorf("failed to upload rendition playlist: %w", err)
}
manifestURL, err := SignURL(outputStorageURL)
if err != nil {
return nil, fmt.Errorf("failed to sign manifest url: %w", err)
}

newURL, err := url.Parse(manifestURL)
if err != nil {
return nil, fmt.Errorf("failed to parse new manifest URL: %w", err)
}
return newURL, nil
}

func convertToMediaPlaylist(playlist m3u8.Playlist, playlistType m3u8.ListType) (m3u8.MediaPlaylist, error) {
// We shouldn't ever receive Master playlists from the previous section
if playlistType != m3u8.MEDIA {
return m3u8.MediaPlaylist{}, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
Expand All @@ -64,10 +114,86 @@ func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.Medi
if !ok || mediaPlaylist == nil {
return m3u8.MediaPlaylist{}, fmt.Errorf("failed to parse playlist as MediaPlaylist")
}

return *mediaPlaylist, nil
}

func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (m3u8.Playlist, m3u8.ListType, error) {
var playlist, playlistBackup m3u8.Playlist
var playlistType, playlistTypeBackup m3u8.ListType
var size, sizeBackup int
var errPrimary, errBackup error

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
playlist, playlistType, size, errPrimary = downloadManifest(requestID, sourceManifestOSURL)
}()

backupManifestURL := config.GetStorageBackupURL(sourceManifestOSURL)
if backupManifestURL != "" {
wg.Add(1)
go func() {
defer wg.Done()
playlistBackup, playlistTypeBackup, sizeBackup, errBackup = downloadManifest(requestID, backupManifestURL)
}()
}
wg.Wait()

// If the file is not found in either storage, return the not found err from
// the primary. Otherwise, return any error that is not a simple not found
// (only not found errors passthrough below)
primaryNotFound, backupNotFound := errors.IsObjectNotFound(errPrimary), errors.IsObjectNotFound(errBackup)
if primaryNotFound && backupNotFound {
return nil, 0, errPrimary
}
if errPrimary != nil && !primaryNotFound {
return nil, 0, errPrimary
}
if errBackup != nil && !backupNotFound {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird, if the primary works fine but the backup is broken, we should not return an error I guess. We should return the playlist from the primary bucket, isn't that the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not found in the backup then we return the primary, but unfortunately we need to successfully check for the backups existence because we need to pick the largest of the two manifests lower down. If we failed to check the backup exists then it could be it does exist and is the more complete, larger of the two but we've missed that and gone ahead with the primary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we want to fail completely if there is an error from the backup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the backup should always be working but just giving us a 404 most of the time

return nil, 0, errBackup
}

// Return the largest manifest as the most recent version
hasBackup := backupManifestURL != "" && errBackup == nil
if hasBackup && (errPrimary != nil || sizeBackup > size) {
return playlistBackup, playlistTypeBackup, nil
}
return playlist, playlistType, errPrimary
}

func downloadManifest(requestID, sourceManifestOSURL string) (playlist m3u8.Playlist, playlistType m3u8.ListType, size int, err error) {
dStorage := NewDStorageDownload()
start := time.Now()
err = backoff.Retry(func() error {
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage)
if err != nil {
if time.Since(start) > 10*time.Second && errors.IsObjectNotFound(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this one? We anyway have the deadline in DownloadRetryBackOff() set to 10 * 5s = 50s, why do also need this check?

Copy link
Contributor

@mjh1 mjh1 Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment for this, we annoyingly had to include this because there's a chance that we will get a not found due to eventual consistency, so we still want to retry not found errors. However we don't want to wait quite as long as normal errors because it'll be quite a common case where the backup doesn't exist. So we don't to add a whole 50s of delay to every recording job essentially.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth a lil constant for the 10s too. maybe const MANIFEST_NOT_FOUND_INCONSISTENCY_TOLERANCE = 10s?

Copy link
Contributor

@leszko leszko Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm fine with with extracting a constant plus adding a comment.

// bail out of the retries earlier for not found errors because it will be quite a common scenario
// where the backup manifest does not exist and we don't want to wait the whole 50s of retries for
// every recording job
return backoff.Permanent(err)
}
return err
}
defer rc.Close()

data := new(bytes.Buffer)
_, err = data.ReadFrom(rc)
if err != nil {
return fmt.Errorf("error reading manifest: %s", err)
}

size = data.Len()
playlist, playlistType, err = m3u8.Decode(*data, true)
if err != nil {
return fmt.Errorf("error decoding manifest: %s", err)
}
return nil
}, DownloadRetryBackoff())
return
}

type SourceSegment struct {
URL *url.URL
DurationMillis int64
Expand All @@ -76,13 +202,7 @@ type SourceSegment struct {
// Loop over each segment in a given manifest and convert it from a relative path to a full ObjectStore-compatible URL
func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) {
var urls []SourceSegment
for _, segment := range manifest.Segments {
// The segments list is a ring buffer - see https://github.com/grafov/m3u8/issues/140
// and so we only know we've hit the end of the list when we find a nil element
if segment == nil {
break
}

for _, segment := range manifest.GetAllSegments() {
u, err := ManifestURLToSegmentURL(sourceManifestURL, segment.URI)
if err != nil {
return nil, err
Expand Down
Loading
Loading