Skip to content

Commit

Permalink
[WIP] clients: Use storage fallback for probing segments too
Browse files Browse the repository at this point in the history
This is incomplete as we still need to fix the probing made on
the manifest itself instead of just on each segment. Rabbit hole.
  • Loading branch information
victorges committed Jun 24, 2024
1 parent 8801281 commit 3408e19
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 21 deletions.
18 changes: 10 additions & 8 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (s *InputCopy) CopyInputToS3(requestID string, inputFile, osTransferURL *ur
}
}

// We're probing the raw HLS input file here and skipping storage fallback logic. Figure out how to handle this.
// TODO: Always copy HLS files above? Only when there are missing files? Create a local manifest pointing to fallback'ed segment URLs?
log.Log(requestID, "starting probe", "source", inputFile.String(), "dest", osTransferURL.String())
inputFileProbe, err := s.Probe.ProbeFile(requestID, signedURL, "-analyzeduration", "15000000")
if err != nil {
Expand Down Expand Up @@ -193,7 +195,7 @@ func CopyAllInputFiles(requestID string, srcInputUrl, dstOutputUrl *url.URL, dec
// Save the mapping between the input m3u8 manifest file to its corresponding OS-transfer destination url
fileList[srcInputUrl.String()] = dstOutputUrl.String()
// Now get a list of the OS-compatible segment URLs from the input manifest file
sourceSegmentUrls, err := GetSourceSegmentURLs(srcInputUrl.String(), playlist)
sourceSegmentUrls, err := GetSourceSegmentURLs(requestID, srcInputUrl.String(), playlist)
if err != nil {
return fmt.Errorf("error generating source segment URLs for HLS input manifest: %s", err)
}
Expand Down Expand Up @@ -287,28 +289,28 @@ func GetFile(ctx context.Context, requestID, url string, dStorage *DStorageDownl
}
}

func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, error) {
func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, string, error) {
rc, err := GetFile(ctx, requestID, url, dStorage)
if err == nil {
return rc, nil
return rc, url, nil
}

backupURL := config.GetStorageBackupURL(url)
if backupURL == "" {
return nil, err
return nil, url, err
}
rc, backupErr := GetFile(ctx, requestID, backupURL, dStorage)
if backupErr == nil {
return rc, nil
return rc, backupURL, nil
}

// prioritize retriable errors in the response so we don't skip retries
if !xerrors.IsUnretriable(err) {
return nil, err
return nil, url, err
} else if !xerrors.IsUnretriable(backupErr) {
return nil, backupErr
return nil, backupURL, backupErr
}
return nil, err
return nil, url, err
}

var retryableHttpClient = newRetryableHttpClient()
Expand Down
32 changes: 26 additions & 6 deletions clients/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type SourceSegment struct {
}

// Loop over each segment in a given manifest and convert it from a relative path to a full ObjectStore-compatible URL
func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) {
func GetSourceSegmentURLs(requestID, sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) {
var urls []SourceSegment
for _, segment := range manifest.Segments {
// The segments list is a ring buffer - see https://github.com/grafov/m3u8/issues/140
Expand All @@ -141,7 +141,7 @@ func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist)
break
}

u, err := ManifestURLToSegmentURL(sourceManifestURL, segment.URI)
u, err := ManifestURLToSegmentURL(requestID, sourceManifestURL, segment.URI)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -247,7 +247,7 @@ func GenerateAndUploadManifests(sourceManifest m3u8.MediaPlaylist, targetOSURL s
return res, nil
}

func ManifestURLToSegmentURL(manifestURL, segmentFilename string) (*url.URL, error) {
func ManifestURLToSegmentURL(requestID, manifestURL, segmentFilename string) (*url.URL, error) {
base, err := url.Parse(manifestURL)
if err != nil {
return nil, fmt.Errorf("failed to parse manifest URL when converting to segment URL: %s", err)
Expand All @@ -258,7 +258,27 @@ func ManifestURLToSegmentURL(manifestURL, segmentFilename string) (*url.URL, err
return nil, fmt.Errorf("failed to parse segment filename when converting to segment URL: %s", err)
}

return base.ResolveReference(relative), nil
segmentURL := base.ResolveReference(relative)
// Try to read the file from object store, to get the backup URL if it's not in the primary storage.
var actualSegURL string
dStorage := NewDStorageDownload()
err = backoff.Retry(func() error {
var rc io.ReadCloser
rc, actualSegURL, err = GetFileWithBackup(context.Background(), requestID, segmentURL.String(), dStorage)
if rc != nil {
rc.Close()
}
return err
}, DownloadRetryBackoff())
if err != nil {
return nil, fmt.Errorf("failed to find segment file: %s", err)
}

segmentURL, err = url.Parse(actualSegURL)
if err != nil {
return nil, fmt.Errorf("failed to parse segment URL: %s", err)
}
return segmentURL, nil
}

func SortTranscodedStats(transcodedStats []*video.RenditionStats) {
Expand All @@ -284,7 +304,7 @@ func ClipInputManifest(requestID, sourceURL, clipTargetUrl string, startTimeUnix

// Generate the absolute path URLS for segmens from the manifest's relative path
// TODO: optimize later and only get absolute path URLs for the start/end segments
sourceSegmentURLs, err := GetSourceSegmentURLs(sourceURL, origManifest)
sourceSegmentURLs, err := GetSourceSegmentURLs(requestID, sourceURL, origManifest)
if err != nil {
return nil, fmt.Errorf("error clipping: failed to get segment urls: %w", err)
}
Expand Down Expand Up @@ -332,7 +352,7 @@ func ClipInputManifest(requestID, sourceURL, clipTargetUrl string, startTimeUnix
segmentURL := sourceSegmentURLs[v.SeqId].URL
dStorage := NewDStorageDownload()
err = backoff.Retry(func() error {
rc, err := GetFileWithBackup(context.Background(), requestID, segmentURL.String(), dStorage)
rc, _, err := GetFileWithBackup(context.Background(), requestID, segmentURL.String(), dStorage)
if err != nil {
return fmt.Errorf("error clipping: failed to download segment %d: %w", v.SeqId, err)
}
Expand Down
6 changes: 3 additions & 3 deletions clients/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ func TestItCanDownloadAValidRenditionManifest(t *testing.T) {
}

func TestItCanConvertRelativeURLsToOSURLs(t *testing.T) {
u, err := ManifestURLToSegmentURL("/tmp/file/something.m3u8", "001.ts")
u, err := ManifestURLToSegmentURL("blah", "/tmp/file/something.m3u8", "001.ts")
require.NoError(t, err)
require.Equal(t, "/tmp/file/001.ts", u.String())

u, err = ManifestURLToSegmentURL("s3+https://REDACTED:[email protected]/something/output.m3u8", "001.ts")
u, err = ManifestURLToSegmentURL("blah", "s3+https://REDACTED:[email protected]/something/output.m3u8", "001.ts")
require.NoError(t, err)
require.Equal(t, "s3+https://REDACTED:[email protected]/something/001.ts", u.String())
}
Expand All @@ -94,7 +94,7 @@ func TestItParsesManifestAndConvertsRelativeURLs(t *testing.T) {
sourceMediaManifest, ok := sourceManifest.(*m3u8.MediaPlaylist)
require.True(t, ok)

us, err := GetSourceSegmentURLs("s3+https://REDACTED:[email protected]/something/output.m3u8", *sourceMediaManifest)
us, err := GetSourceSegmentURLs("blah", "s3+https://REDACTED:[email protected]/something/output.m3u8", *sourceMediaManifest)
require.NoError(t, err)

require.Equal(t, 2, len(us))
Expand Down
2 changes: 1 addition & 1 deletion pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS
}

func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error {
u, err := clients.ManifestURLToSegmentURL(sourceManifestURL, seg.URI)
u, err := clients.ManifestURLToSegmentURL(requestID, sourceManifestURL, seg.URI)
if err != nil {
return fmt.Errorf("error checking source segments: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion thumbnails/thumbnails.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error
)
// save the segment to memory
err = backoff.Retry(func() error {
rc, err = clients.GetFileWithBackup(context.Background(), requestID, segURL.String(), nil)
rc, _, err = clients.GetFileWithBackup(context.Background(), requestID, segURL.String(), nil)
return err
}, clients.DownloadRetryBackoff())
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
}

// Generate the full segment URLs from the manifest
sourceSegmentURLs, err := clients.GetSourceSegmentURLs(sourceManifestOSURL, sourceManifest)
sourceSegmentURLs, err := clients.GetSourceSegmentURLs(transcodeRequest.RequestID, sourceManifestOSURL, sourceManifest)
if err != nil {
return outputs, segmentsCount, fmt.Errorf("error generating source segment URLs: %s", err)
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func transcodeSegment(
err := backoff.Retry(func() error {
ctx, cancel := context.WithTimeout(context.Background(), clients.MaxCopyFileDuration)
defer cancel()
rc, err := clients.GetFileWithBackup(ctx, transcodeRequest.RequestID, segment.Input.URL.String(), nil)
rc, _, err := clients.GetFileWithBackup(ctx, transcodeRequest.RequestID, segment.Input.URL.String(), nil)
if err != nil {
return fmt.Errorf("failed to download source segment %q: %w", segment.Input, err)
}
Expand Down

0 comments on commit 3408e19

Please sign in to comment.