diff --git a/clients/input_copy.go b/clients/input_copy.go index feb5c2ddf..4956dfaf3 100644 --- a/clients/input_copy.go +++ b/clients/input_copy.go @@ -64,6 +64,8 @@ func (s *InputCopy) CopyInputToS3(requestID string, inputFile, osTransferURL *ur } } + // We're probing the raw HLS input file here and skipping storage fallback logic. Figure out how to handle this. + // TODO: Always copy HLS files above? Only when there are missing files? Create a local manifest pointing to fallback'ed segment URLs? log.Log(requestID, "starting probe", "source", inputFile.String(), "dest", osTransferURL.String()) inputFileProbe, err := s.Probe.ProbeFile(requestID, signedURL, "-analyzeduration", "15000000") if err != nil { @@ -193,7 +195,7 @@ func CopyAllInputFiles(requestID string, srcInputUrl, dstOutputUrl *url.URL, dec // Save the mapping between the input m3u8 manifest file to its corresponding OS-transfer destination url fileList[srcInputUrl.String()] = dstOutputUrl.String() // Now get a list of the OS-compatible segment URLs from the input manifest file - sourceSegmentUrls, err := GetSourceSegmentURLs(srcInputUrl.String(), playlist) + sourceSegmentUrls, err := GetSourceSegmentURLs(requestID, srcInputUrl.String(), playlist) if err != nil { return fmt.Errorf("error generating source segment URLs for HLS input manifest: %s", err) } @@ -287,28 +289,28 @@ func GetFile(ctx context.Context, requestID, url string, dStorage *DStorageDownl } } -func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, error) { +func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, string, error) { rc, err := GetFile(ctx, requestID, url, dStorage) if err == nil { - return rc, nil + return rc, url, nil } backupURL := config.GetStorageBackupURL(url) if backupURL == "" { - return nil, err + return nil, url, err } rc, backupErr := GetFile(ctx, requestID, backupURL, dStorage) if backupErr == nil { - return rc, nil + return rc, backupURL, nil } // prioritize retriable errors in the response so we don't skip retries if !xerrors.IsUnretriable(err) { - return nil, err + return nil, url, err } else if !xerrors.IsUnretriable(backupErr) { - return nil, backupErr + return nil, backupURL, backupErr } - return nil, err + return nil, url, err } var retryableHttpClient = newRetryableHttpClient() diff --git a/clients/manifest.go b/clients/manifest.go index 38d504365..d4548a88f 100644 --- a/clients/manifest.go +++ b/clients/manifest.go @@ -132,7 +132,7 @@ type SourceSegment struct { } // Loop over each segment in a given manifest and convert it from a relative path to a full ObjectStore-compatible URL -func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) { +func GetSourceSegmentURLs(requestID, sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) { var urls []SourceSegment for _, segment := range manifest.Segments { // The segments list is a ring buffer - see https://github.com/grafov/m3u8/issues/140 @@ -141,7 +141,7 @@ func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist) break } - u, err := ManifestURLToSegmentURL(sourceManifestURL, segment.URI) + u, err := ManifestURLToSegmentURL(requestID, sourceManifestURL, segment.URI) if err != nil { return nil, err } @@ -247,7 +247,7 @@ func GenerateAndUploadManifests(sourceManifest m3u8.MediaPlaylist, targetOSURL s return res, nil } -func ManifestURLToSegmentURL(manifestURL, segmentFilename string) (*url.URL, error) { +func ManifestURLToSegmentURL(requestID, manifestURL, segmentFilename string) (*url.URL, error) { base, err := url.Parse(manifestURL) if err != nil { return nil, fmt.Errorf("failed to parse manifest URL when converting to segment URL: %s", err) @@ -258,7 +258,27 @@ func ManifestURLToSegmentURL(manifestURL, segmentFilename string) (*url.URL, err return nil, fmt.Errorf("failed to parse segment filename when converting to segment URL: %s", err) } - return base.ResolveReference(relative), nil + segmentURL := base.ResolveReference(relative) + // Try to read the file from object store, to get the backup URL if it's not in the primary storage. + var actualSegURL string + dStorage := NewDStorageDownload() + err = backoff.Retry(func() error { + var rc io.ReadCloser + rc, actualSegURL, err = GetFileWithBackup(context.Background(), requestID, segmentURL.String(), dStorage) + if rc != nil { + rc.Close() + } + return err + }, DownloadRetryBackoff()) + if err != nil { + return nil, fmt.Errorf("failed to find segment file: %s", err) + } + + segmentURL, err = url.Parse(actualSegURL) + if err != nil { + return nil, fmt.Errorf("failed to parse segment URL: %s", err) + } + return segmentURL, nil } func SortTranscodedStats(transcodedStats []*video.RenditionStats) { @@ -284,7 +304,7 @@ func ClipInputManifest(requestID, sourceURL, clipTargetUrl string, startTimeUnix // Generate the absolute path URLS for segmens from the manifest's relative path // TODO: optimize later and only get absolute path URLs for the start/end segments - sourceSegmentURLs, err := GetSourceSegmentURLs(sourceURL, origManifest) + sourceSegmentURLs, err := GetSourceSegmentURLs(requestID, sourceURL, origManifest) if err != nil { return nil, fmt.Errorf("error clipping: failed to get segment urls: %w", err) } @@ -332,7 +352,7 @@ func ClipInputManifest(requestID, sourceURL, clipTargetUrl string, startTimeUnix segmentURL := sourceSegmentURLs[v.SeqId].URL dStorage := NewDStorageDownload() err = backoff.Retry(func() error { - rc, err := GetFileWithBackup(context.Background(), requestID, segmentURL.String(), dStorage) + rc, _, err := GetFileWithBackup(context.Background(), requestID, segmentURL.String(), dStorage) if err != nil { return fmt.Errorf("error clipping: failed to download segment %d: %w", v.SeqId, err) } diff --git a/clients/manifest_test.go b/clients/manifest_test.go index d27d775ab..39097fdc8 100644 --- a/clients/manifest_test.go +++ b/clients/manifest_test.go @@ -78,11 +78,11 @@ func TestItCanDownloadAValidRenditionManifest(t *testing.T) { } func TestItCanConvertRelativeURLsToOSURLs(t *testing.T) { - u, err := ManifestURLToSegmentURL("/tmp/file/something.m3u8", "001.ts") + u, err := ManifestURLToSegmentURL("blah", "/tmp/file/something.m3u8", "001.ts") require.NoError(t, err) require.Equal(t, "/tmp/file/001.ts", u.String()) - u, err = ManifestURLToSegmentURL("s3+https://REDACTED:REDACTED@storage.googleapis.com/something/output.m3u8", "001.ts") + u, err = ManifestURLToSegmentURL("blah", "s3+https://REDACTED:REDACTED@storage.googleapis.com/something/output.m3u8", "001.ts") require.NoError(t, err) require.Equal(t, "s3+https://REDACTED:REDACTED@storage.googleapis.com/something/001.ts", u.String()) } @@ -94,7 +94,7 @@ func TestItParsesManifestAndConvertsRelativeURLs(t *testing.T) { sourceMediaManifest, ok := sourceManifest.(*m3u8.MediaPlaylist) require.True(t, ok) - us, err := GetSourceSegmentURLs("s3+https://REDACTED:REDACTED@storage.googleapis.com/something/output.m3u8", *sourceMediaManifest) + us, err := GetSourceSegmentURLs("blah", "s3+https://REDACTED:REDACTED@storage.googleapis.com/something/output.m3u8", *sourceMediaManifest) require.NoError(t, err) require.Equal(t, 2, len(us)) diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index 6a37220fd..78647a6f5 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -305,7 +305,7 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS } func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error { - u, err := clients.ManifestURLToSegmentURL(sourceManifestURL, seg.URI) + u, err := clients.ManifestURLToSegmentURL(requestID, sourceManifestURL, seg.URI) if err != nil { return fmt.Errorf("error checking source segments: %w", err) } diff --git a/thumbnails/thumbnails.go b/thumbnails/thumbnails.go index 8f9bd8967..e7bb70659 100644 --- a/thumbnails/thumbnails.go +++ b/thumbnails/thumbnails.go @@ -169,7 +169,7 @@ func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error ) // save the segment to memory err = backoff.Retry(func() error { - rc, err = clients.GetFileWithBackup(context.Background(), requestID, segURL.String(), nil) + rc, _, err = clients.GetFileWithBackup(context.Background(), requestID, segURL.String(), nil) return err }, clients.DownloadRetryBackoff()) if err != nil { diff --git a/transcode/transcode.go b/transcode/transcode.go index 09ebfdde0..8b1ada48e 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -88,7 +88,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } // Generate the full segment URLs from the manifest - sourceSegmentURLs, err := clients.GetSourceSegmentURLs(sourceManifestOSURL, sourceManifest) + sourceSegmentURLs, err := clients.GetSourceSegmentURLs(transcodeRequest.RequestID, sourceManifestOSURL, sourceManifest) if err != nil { return outputs, segmentsCount, fmt.Errorf("error generating source segment URLs: %s", err) } @@ -536,7 +536,7 @@ func transcodeSegment( err := backoff.Retry(func() error { ctx, cancel := context.WithTimeout(context.Background(), clients.MaxCopyFileDuration) defer cancel() - rc, err := clients.GetFileWithBackup(ctx, transcodeRequest.RequestID, segment.Input.URL.String(), nil) + rc, _, err := clients.GetFileWithBackup(ctx, transcodeRequest.RequestID, segment.Input.URL.String(), nil) if err != nil { return fmt.Errorf("failed to download source segment %q: %w", segment.Input, err) }