-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clients: Implement storage fallback for recordings #1303
Changes from 5 commits
35c9df9
346b28a
613a4b0
63e8ee6
0f8d91b
a051c8f
3328231
f83502e
8623feb
bc71d54
557ae90
2e73ea3
a87b5cc
90d6964
0655802
d5122a8
cac0c5f
391709e
44e0fd8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,8 +44,15 @@ func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.Medi | |
return convertToMediaPlaylist(playlist, playlistType) | ||
} | ||
|
||
func DownloadRenditionManifestWithBackup(requestID string, primaryManifestURL, osTransferURL *url.URL) (*url.URL, error) { | ||
manifestURL, playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String()) | ||
// RecordingBackupCheck checks whether manifests and segments are available on the primary or | ||
// the backup store and returns a URL to new manifest with absolute segment URLs pointing to either primary or | ||
// backup locations depending on where the segments are available. | ||
func RecordingBackupCheck(requestID string, primaryManifestURL, osTransferURL *url.URL) (*url.URL, error) { | ||
if config.GetStorageBackupURL(primaryManifestURL.String()) == "" { | ||
return primaryManifestURL, nil | ||
} | ||
|
||
playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String()) | ||
if err != nil { | ||
return nil, fmt.Errorf("error downloading manifest: %w", err) | ||
} | ||
|
@@ -54,10 +61,6 @@ func DownloadRenditionManifestWithBackup(requestID string, primaryManifestURL, o | |
return nil, err | ||
} | ||
|
||
// If we had to fall back to the backup manifest then we need a new manifest with absolute URLs because | ||
// successful segment uploads will exist in the primary store and therefore the relative URLs will be broken. | ||
newPlaylistNeeded := manifestURL != primaryManifestURL.String() | ||
|
||
// Check whether segments are available from primary or backup storage | ||
dStorage := NewDStorageDownload() | ||
for _, segment := range mediaPlaylist.GetAllSegments() { | ||
|
@@ -68,7 +71,7 @@ func DownloadRenditionManifestWithBackup(requestID string, primaryManifestURL, o | |
var actualSegURL string | ||
err = backoff.Retry(func() error { | ||
var rc io.ReadCloser | ||
rc, actualSegURL, err = GetFileWithBackup(context.TODO(), requestID, segURL.String(), dStorage) | ||
rc, actualSegURL, err = GetFileWithBackup(context.Background(), requestID, segURL.String(), dStorage) | ||
if rc != nil { | ||
rc.Close() | ||
} | ||
|
@@ -77,28 +80,20 @@ func DownloadRenditionManifestWithBackup(requestID string, primaryManifestURL, o | |
if err != nil { | ||
return nil, fmt.Errorf("failed to find segment file %s: %w", segURL.Redacted(), err) | ||
} | ||
if actualSegURL != segURL.String() { | ||
// If we had to fall back to a backup segment then we'll need a new manifest with absolute URLs | ||
newPlaylistNeeded = true | ||
} | ||
segment.URI = actualSegURL | ||
} | ||
|
||
if newPlaylistNeeded { | ||
// TODO log line | ||
|
||
// write the manifest to storage and update the manifestURL variable | ||
outputStorageURL := osTransferURL.JoinPath("input.m3u8") | ||
err = backoff.Retry(func() error { | ||
return UploadToOSURL(outputStorageURL.String(), "", strings.NewReader(mediaPlaylist.String()), ManifestUploadTimeout) | ||
}, UploadRetryBackoff()) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to upload rendition playlist: %w", err) | ||
} | ||
manifestURL, err = SignURL(outputStorageURL) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to sign manifest url: %w", err) | ||
} | ||
// write the manifest to storage and update the manifestURL variable | ||
outputStorageURL := osTransferURL.JoinPath("input.m3u8") | ||
err = backoff.Retry(func() error { | ||
return UploadToOSURL(outputStorageURL.String(), "", strings.NewReader(mediaPlaylist.String()), ManifestUploadTimeout) | ||
}, UploadRetryBackoff()) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to upload rendition playlist: %w", err) | ||
} | ||
manifestURL, err := SignURL(outputStorageURL) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to sign manifest url: %w", err) | ||
} | ||
|
||
newURL, err := url.Parse(manifestURL) | ||
|
@@ -122,7 +117,7 @@ func convertToMediaPlaylist(playlist m3u8.Playlist, playlistType m3u8.ListType) | |
return *mediaPlaylist, nil | ||
} | ||
|
||
func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (string, m3u8.Playlist, m3u8.ListType, error) { | ||
func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (m3u8.Playlist, m3u8.ListType, error) { | ||
var playlist, playlistBackup m3u8.Playlist | ||
var playlistType, playlistTypeBackup m3u8.ListType | ||
var size, sizeBackup int | ||
|
@@ -150,28 +145,35 @@ func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (string, | |
// (only not found errors passthrough below) | ||
primaryNotFound, backupNotFound := errors.IsObjectNotFound(errPrimary), errors.IsObjectNotFound(errBackup) | ||
if primaryNotFound && backupNotFound { | ||
return "", nil, 0, errPrimary | ||
return nil, 0, errPrimary | ||
} | ||
if errPrimary != nil && !primaryNotFound { | ||
return "", nil, 0, errPrimary | ||
return nil, 0, errPrimary | ||
} | ||
if errBackup != nil && !backupNotFound { | ||
return "", nil, 0, errBackup | ||
return nil, 0, errBackup | ||
} | ||
|
||
// Return the largest manifest as the most recent version | ||
hasBackup := backupManifestURL != "" && errBackup == nil | ||
if hasBackup && (errPrimary != nil || sizeBackup > size) { | ||
return backupManifestURL, playlistBackup, playlistTypeBackup, nil | ||
return playlistBackup, playlistTypeBackup, nil | ||
} | ||
return sourceManifestOSURL, playlist, playlistType, errPrimary | ||
return playlist, playlistType, errPrimary | ||
} | ||
|
||
func downloadManifest(requestID, sourceManifestOSURL string) (playlist m3u8.Playlist, playlistType m3u8.ListType, size int, err error) { | ||
dStorage := NewDStorageDownload() | ||
start := time.Now() | ||
err = backoff.Retry(func() error { | ||
rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage) | ||
if err != nil { | ||
if time.Since(start) > 10*time.Second && errors.IsObjectNotFound(err) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this one? We anyway have the deadline in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add a comment for this, we annoyingly had to include this because there's a chance that we will get a not found due to eventual consistency, so we still want to retry not found errors. However we don't want to wait quite as long as normal errors because it'll be quite a common case where the backup doesn't exist. So we don't to add a whole 50s of delay to every recording job essentially. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be worth a lil constant for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'm fine with with extracting a constant plus adding a comment. |
||
// bail out of the retries earlier for not found errors because it will be quite a common scenario | ||
// where the backup manifest does not exist and we don't want to wait the whole 50s of retries for | ||
// every recording job | ||
return backoff.Permanent(err) | ||
} | ||
return err | ||
} | ||
defer rc.Close() | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,30 @@ | ||||||||
package config | ||||||||
|
||||||||
import "testing" | ||||||||
|
||||||||
func TestGetStorageBackupURL(t *testing.T) { | ||||||||
StorageFallbackURLs = map[string]string{"https://storj.livepeer.com/catalyst-recordings-com/hls": "https://google.livepeer.com/catalyst-recordings-com/hls"} | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to restore the default value after tests?
Suggested change
|
||||||||
tests := []struct { | ||||||||
name string | ||||||||
urlStr string | ||||||||
want string | ||||||||
}{ | ||||||||
{ | ||||||||
name: "should replace", | ||||||||
urlStr: "https://storj.livepeer.com/catalyst-recordings-com/hls/foo", | ||||||||
want: "https://google.livepeer.com/catalyst-recordings-com/hls/foo", | ||||||||
}, | ||||||||
{ | ||||||||
name: "should not replace", | ||||||||
urlStr: "https://blah.livepeer.com/catalyst-recordings-com/hls/foo", | ||||||||
want: "", | ||||||||
}, | ||||||||
} | ||||||||
for _, tt := range tests { | ||||||||
t.Run(tt.name, func(t *testing.T) { | ||||||||
if got := GetStorageBackupURL(tt.urlStr); got != tt.want { | ||||||||
t.Errorf("GetStorageBackupURL() = %v, want %v", got, tt.want) | ||||||||
} | ||||||||
}) | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is weird, if the primary works fine but the backup is broken, we should not return an error I guess. We should return the playlist from the primary bucket, isn't that the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's not found in the backup then we return the primary, but unfortunately we need to successfully check for the backups existence because we need to pick the largest of the two manifests lower down. If we failed to check the backup exists then it could be it does exist and is the more complete, larger of the two but we've missed that and gone ahead with the primary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we want to fail completely if there is an error from the backup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the backup should always be working but just giving us a 404 most of the time