From feea9800990814d733bc2287f80053d285492cde Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Mon, 24 Jun 2024 17:07:50 +0100 Subject: [PATCH] [WIP] clients: Use storage fallback for probing segments too This is incomplete as we still need to fix the probing made on the manifest itself instead of just on each segment. Rabbit hole. --- clients/input_copy.go | 18 ++++++++++-------- clients/manifest.go | 32 ++++++++++++++++++++++++++------ clients/manifest_test.go | 6 +++--- pipeline/ffmpeg.go | 2 +- thumbnails/thumbnails.go | 2 +- transcode/transcode.go | 4 ++-- 6 files changed, 43 insertions(+), 21 deletions(-) diff --git a/clients/input_copy.go b/clients/input_copy.go index feb5c2ddf..4956dfaf3 100644 --- a/clients/input_copy.go +++ b/clients/input_copy.go @@ -64,6 +64,8 @@ func (s *InputCopy) CopyInputToS3(requestID string, inputFile, osTransferURL *ur } } + // We're probing the raw HLS input file here and skipping storage fallback logic. Figure out how to handle this. + // TODO: Always copy HLS files above? Only when there are missing files? Create a local manifest pointing to fallback'ed segment URLs? log.Log(requestID, "starting probe", "source", inputFile.String(), "dest", osTransferURL.String()) inputFileProbe, err := s.Probe.ProbeFile(requestID, signedURL, "-analyzeduration", "15000000") if err != nil { @@ -193,7 +195,7 @@ func CopyAllInputFiles(requestID string, srcInputUrl, dstOutputUrl *url.URL, dec // Save the mapping between the input m3u8 manifest file to its corresponding OS-transfer destination url fileList[srcInputUrl.String()] = dstOutputUrl.String() // Now get a list of the OS-compatible segment URLs from the input manifest file - sourceSegmentUrls, err := GetSourceSegmentURLs(srcInputUrl.String(), playlist) + sourceSegmentUrls, err := GetSourceSegmentURLs(requestID, srcInputUrl.String(), playlist) if err != nil { return fmt.Errorf("error generating source segment URLs for HLS input manifest: %s", err) } @@ -287,28 +289,28 @@ func GetFile(ctx context.Context, requestID, url string, dStorage *DStorageDownl } } -func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, error) { +func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, string, error) { rc, err := GetFile(ctx, requestID, url, dStorage) if err == nil { - return rc, nil + return rc, url, nil } backupURL := config.GetStorageBackupURL(url) if backupURL == "" { - return nil, err + return nil, url, err } rc, backupErr := GetFile(ctx, requestID, backupURL, dStorage) if backupErr == nil { - return rc, nil + return rc, backupURL, nil } // prioritize retriable errors in the response so we don't skip retries if !xerrors.IsUnretriable(err) { - return nil, err + return nil, url, err } else if !xerrors.IsUnretriable(backupErr) { - return nil, backupErr + return nil, backupURL, backupErr } - return nil, err + return nil, url, err } var retryableHttpClient = newRetryableHttpClient() diff --git a/clients/manifest.go b/clients/manifest.go index 38d504365..d4548a88f 100644 --- a/clients/manifest.go +++ b/clients/manifest.go @@ -132,7 +132,7 @@ type SourceSegment struct { } // Loop over each segment in a given manifest and convert it from a relative path to a full ObjectStore-compatible URL -func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) { +func GetSourceSegmentURLs(requestID, sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) { var urls []SourceSegment for _, segment := range manifest.Segments { // The segments list is a ring buffer - see https://github.com/grafov/m3u8/issues/140 @@ -141,7 +141,7 @@ func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist) break } - u, err := ManifestURLToSegmentURL(sourceManifestURL, segment.URI) + u, err := ManifestURLToSegmentURL(requestID, sourceManifestURL, segment.URI) if err != nil { return nil, err } @@ -247,7 +247,7 @@ func GenerateAndUploadManifests(sourceManifest m3u8.MediaPlaylist, targetOSURL s return res, nil } -func ManifestURLToSegmentURL(manifestURL, segmentFilename string) (*url.URL, error) { +func ManifestURLToSegmentURL(requestID, manifestURL, segmentFilename string) (*url.URL, error) { base, err := url.Parse(manifestURL) if err != nil { return nil, fmt.Errorf("failed to parse manifest URL when converting to segment URL: %s", err) @@ -258,7 +258,27 @@ func ManifestURLToSegmentURL(manifestURL, segmentFilename string) (*url.URL, err return nil, fmt.Errorf("failed to parse segment filename when converting to segment URL: %s", err) } - return base.ResolveReference(relative), nil + segmentURL := base.ResolveReference(relative) + // Try to read the file from object store, to get the backup URL if it's not in the primary storage. + var actualSegURL string + dStorage := NewDStorageDownload() + err = backoff.Retry(func() error { + var rc io.ReadCloser + rc, actualSegURL, err = GetFileWithBackup(context.Background(), requestID, segmentURL.String(), dStorage) + if rc != nil { + rc.Close() + } + return err + }, DownloadRetryBackoff()) + if err != nil { + return nil, fmt.Errorf("failed to find segment file: %s", err) + } + + segmentURL, err = url.Parse(actualSegURL) + if err != nil { + return nil, fmt.Errorf("failed to parse segment URL: %s", err) + } + return segmentURL, nil } func SortTranscodedStats(transcodedStats []*video.RenditionStats) { @@ -284,7 +304,7 @@ func ClipInputManifest(requestID, sourceURL, clipTargetUrl string, startTimeUnix // Generate the absolute path URLS for segmens from the manifest's relative path // TODO: optimize later and only get absolute path URLs for the start/end segments - sourceSegmentURLs, err := GetSourceSegmentURLs(sourceURL, origManifest) + sourceSegmentURLs, err := GetSourceSegmentURLs(requestID, sourceURL, origManifest) if err != nil { return nil, fmt.Errorf("error clipping: failed to get segment urls: %w", err) } @@ -332,7 +352,7 @@ func ClipInputManifest(requestID, sourceURL, clipTargetUrl string, startTimeUnix segmentURL := sourceSegmentURLs[v.SeqId].URL dStorage := NewDStorageDownload() err = backoff.Retry(func() error { - rc, err := GetFileWithBackup(context.Background(), requestID, segmentURL.String(), dStorage) + rc, _, err := GetFileWithBackup(context.Background(), requestID, segmentURL.String(), dStorage) if err != nil { return fmt.Errorf("error clipping: failed to download segment %d: %w", v.SeqId, err) } diff --git a/clients/manifest_test.go b/clients/manifest_test.go index d27d775ab..39097fdc8 100644 --- a/clients/manifest_test.go +++ b/clients/manifest_test.go @@ -78,11 +78,11 @@ func TestItCanDownloadAValidRenditionManifest(t *testing.T) { } func TestItCanConvertRelativeURLsToOSURLs(t *testing.T) { - u, err := ManifestURLToSegmentURL("/tmp/file/something.m3u8", "001.ts") + u, err := ManifestURLToSegmentURL("blah", "/tmp/file/something.m3u8", "001.ts") require.NoError(t, err) require.Equal(t, "/tmp/file/001.ts", u.String()) - u, err = ManifestURLToSegmentURL("s3+https://REDACTED:REDACTED@storage.googleapis.com/something/output.m3u8", "001.ts") + u, err = ManifestURLToSegmentURL("blah", "s3+https://REDACTED:REDACTED@storage.googleapis.com/something/output.m3u8", "001.ts") require.NoError(t, err) require.Equal(t, "s3+https://REDACTED:REDACTED@storage.googleapis.com/something/001.ts", u.String()) } @@ -94,7 +94,7 @@ func TestItParsesManifestAndConvertsRelativeURLs(t *testing.T) { sourceMediaManifest, ok := sourceManifest.(*m3u8.MediaPlaylist) require.True(t, ok) - us, err := GetSourceSegmentURLs("s3+https://REDACTED:REDACTED@storage.googleapis.com/something/output.m3u8", *sourceMediaManifest) + us, err := GetSourceSegmentURLs("blah", "s3+https://REDACTED:REDACTED@storage.googleapis.com/something/output.m3u8", *sourceMediaManifest) require.NoError(t, err) require.Equal(t, 2, len(us)) diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index 6a37220fd..78647a6f5 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -305,7 +305,7 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS } func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error { - u, err := clients.ManifestURLToSegmentURL(sourceManifestURL, seg.URI) + u, err := clients.ManifestURLToSegmentURL(requestID, sourceManifestURL, seg.URI) if err != nil { return fmt.Errorf("error checking source segments: %w", err) } diff --git a/thumbnails/thumbnails.go b/thumbnails/thumbnails.go index 8f9bd8967..e7bb70659 100644 --- a/thumbnails/thumbnails.go +++ b/thumbnails/thumbnails.go @@ -169,7 +169,7 @@ func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error ) // save the segment to memory err = backoff.Retry(func() error { - rc, err = clients.GetFileWithBackup(context.Background(), requestID, segURL.String(), nil) + rc, _, err = clients.GetFileWithBackup(context.Background(), requestID, segURL.String(), nil) return err }, clients.DownloadRetryBackoff()) if err != nil { diff --git a/transcode/transcode.go b/transcode/transcode.go index 09ebfdde0..8b1ada48e 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -88,7 +88,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } // Generate the full segment URLs from the manifest - sourceSegmentURLs, err := clients.GetSourceSegmentURLs(sourceManifestOSURL, sourceManifest) + sourceSegmentURLs, err := clients.GetSourceSegmentURLs(transcodeRequest.RequestID, sourceManifestOSURL, sourceManifest) if err != nil { return outputs, segmentsCount, fmt.Errorf("error generating source segment URLs: %s", err) } @@ -536,7 +536,7 @@ func transcodeSegment( err := backoff.Retry(func() error { ctx, cancel := context.WithTimeout(context.Background(), clients.MaxCopyFileDuration) defer cancel() - rc, err := clients.GetFileWithBackup(ctx, transcodeRequest.RequestID, segment.Input.URL.String(), nil) + rc, _, err := clients.GetFileWithBackup(ctx, transcodeRequest.RequestID, segment.Input.URL.String(), nil) if err != nil { return fmt.Errorf("failed to download source segment %q: %w", segment.Input, err) }