Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/rafal/transcode-quality-params' …
Browse files Browse the repository at this point in the history
…into rafal/transcode-quality-params
  • Loading branch information
leszko committed Sep 8, 2023
2 parents 9dcabff + 921c2bb commit 5e3d658
Show file tree
Hide file tree
Showing 28 changed files with 661 additions and 345 deletions.
4 changes: 3 additions & 1 deletion api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
router := httprouter.New()
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized
withCapacityChecking := middleware.HasCapacity

capacityMiddleware := middleware.CapacityMiddleware{}
withCapacityChecking := capacityMiddleware.HasCapacity

geoHandlers := &geolocation.GeolocationHandlersCollection{
Config: cli,
Expand Down
6 changes: 4 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,12 @@ func (b *BalancerImpl) GetBestNode(ctx context.Context, redirectPrefixes []strin
return fallbackNode(fallbackAddr, fallbackPrefix, playbackID, redirectPrefixes[0], err)
}

// `playbackID`s matching the pattern `....-....-.....-....`
var regexpStreamKey = regexp.MustCompile(`^(?:\w{4}-){3}\w{4}$`)

func fallbackNode(fallbackAddr, fallbackPrefix, playbackID, defaultPrefix string, err error) (string, string, error) {
// Check for `playbackID`s matching the pattern `....-....-.....-....`
r := regexp.MustCompile(`^(?:\w{4}-){3}\w{4}$`)
if r.MatchString(playbackID) {
if regexpStreamKey.MatchString(playbackID) {
return fallbackAddr, playbackID, nil
}

Expand Down
42 changes: 20 additions & 22 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,49 +45,47 @@ func NewInputCopy() *InputCopy {
}

// CopyInputToS3 copies the input video to our S3 transfer bucket and probes the file.
func (s *InputCopy) CopyInputToS3(requestID string, inputFile, osTransferURL *url.URL, decryptor *crypto.DecryptionKeys) (inputVideoProbe video.InputVideo, signedURL string, err error) {
func (s *InputCopy) CopyInputToS3(requestID string, inputFile, osTransferURL *url.URL, decryptor *crypto.DecryptionKeys) (video.InputVideo, string, error) {
var signedURL string
var err error
if IsHLSInput(inputFile) {
log.Log(requestID, "skipping copy for hls")
signedURL = inputFile.String()
} else {
err = CopyAllInputFiles(requestID, inputFile, osTransferURL, decryptor)
if err != nil {
err = fmt.Errorf("failed to copy file(s): %w", err)
return
if err := CopyAllInputFiles(requestID, inputFile, osTransferURL, decryptor); err != nil {
return video.InputVideo{}, "", fmt.Errorf("failed to copy file(s): %w", err)
}

signedURL, err = getSignedURL(osTransferURL)
if err != nil {
return
return video.InputVideo{}, "", err
}
}

log.Log(requestID, "starting probe", "source", inputFile.String(), "dest", osTransferURL.String())
inputVideoProbe, err = s.Probe.ProbeFile(requestID, signedURL)
inputFileProbe, err := s.Probe.ProbeFile(requestID, signedURL)
if err != nil {
log.Log(requestID, "probe failed", "err", err, "source", inputFile.String(), "dest", osTransferURL.String())
err = fmt.Errorf("error probing MP4 input file from S3: %w", err)
return
return video.InputVideo{}, "", fmt.Errorf("error probing MP4 input file from S3: %w", err)
}

log.Log(requestID, "probe succeeded", "source", inputFile.String(), "dest", osTransferURL.String())
videoTrack, err := inputVideoProbe.GetTrack(video.TrackTypeVideo)
if err != nil {
err = fmt.Errorf("no video track found in input video: %w", err)
return
videoTrack, err := inputFileProbe.GetTrack(video.TrackTypeVideo)
hasVideoTrack := err == nil
if hasVideoTrack {
log.Log(requestID, "probed video track:", "container", inputFileProbe.Format, "codec", videoTrack.Codec, "bitrate", videoTrack.Bitrate, "duration", videoTrack.DurationSec, "w", videoTrack.Width, "h", videoTrack.Height, "pix-format", videoTrack.PixelFormat, "FPS", videoTrack.FPS)
}
audioTrack, _ := inputVideoProbe.GetTrack(video.TrackTypeAudio)
if videoTrack.FPS <= 0 {
if hasVideoTrack && videoTrack.FPS <= 0 {
// unsupported, includes things like motion jpegs
err = fmt.Errorf("invalid framerate: %f", videoTrack.FPS)
return
return video.InputVideo{}, "", fmt.Errorf("invalid framerate: %f", videoTrack.FPS)
}
if inputVideoProbe.SizeBytes > config.MaxInputFileSizeBytes {
err = fmt.Errorf("input file %d bytes was greater than %d bytes", inputVideoProbe.SizeBytes, config.MaxInputFileSizeBytes)
return
if inputFileProbe.SizeBytes > config.MaxInputFileSizeBytes {
return video.InputVideo{}, "", fmt.Errorf("input file %d bytes was greater than %d bytes", inputFileProbe.SizeBytes, config.MaxInputFileSizeBytes)
}
log.Log(requestID, "probed video track:", "container", inputVideoProbe.Format, "codec", videoTrack.Codec, "bitrate", videoTrack.Bitrate, "duration", videoTrack.DurationSec, "w", videoTrack.Width, "h", videoTrack.Height, "pix-format", videoTrack.PixelFormat, "FPS", videoTrack.FPS)

audioTrack, _ := inputFileProbe.GetTrack(video.TrackTypeAudio)
log.Log(requestID, "probed audio track", "codec", audioTrack.Codec, "bitrate", audioTrack.Bitrate, "duration", audioTrack.DurationSec, "channels", audioTrack.Channels)
return
return inputFileProbe, signedURL, nil
}

func getSignedURL(osTransferURL *url.URL) (string, error) {
Expand Down
43 changes: 35 additions & 8 deletions clients/mediaconvert.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,9 @@ func (mc *MediaConvert) Transcode(ctx context.Context, args TranscodeJobArgs) (o
)

videoTrack, err := mcArgs.InputFileInfo.GetTrack(video.TrackTypeVideo)
if err != nil {
return nil, fmt.Errorf("no video track found in input video: %w", err)
}
hasVideoTrack := err == nil

if len(mcArgs.Profiles) == 0 {
if hasVideoTrack && len(mcArgs.Profiles) == 0 {
mcArgs.Profiles, err = video.GetDefaultPlaybackProfiles(videoTrack)
if err != nil {
return nil, fmt.Errorf("failed to get playback profiles: %w", err)
Expand All @@ -148,6 +146,7 @@ func (mc *MediaConvert) Transcode(ctx context.Context, args TranscodeJobArgs) (o
mcArgs.MP4OutputLocation = mc.s3TransferBucket.JoinPath(mcMp4OutputRelPath)
}

// Do the actual transcode
err = mc.coreAwsTranscode(ctx, mcArgs, true)
if err == ErrJobAcceleration {
err = mc.coreAwsTranscode(ctx, mcArgs, false)
Expand Down Expand Up @@ -374,11 +373,39 @@ func outputGroups(hlsOutputFile, mp4OutputFile string, profiles []video.EncodedP
}

func outputs(container string, profiles []video.EncodedProfile) []*mediaconvert.Output {
outs := make([]*mediaconvert.Output, 0, len(profiles))
for _, profile := range profiles {
outs = append(outs, output(container, profile.Name, profile.Height, profile.Bitrate))
// If we don't have any video profiles, it means we're in audio-only mode
if len(profiles) == 0 {
return audioOnlyOutputs(container, "audioonly")
} else {
outs := make([]*mediaconvert.Output, 0, len(profiles))
for _, profile := range profiles {
outs = append(outs, output(container, profile.Name, profile.Height, profile.Bitrate))
}
return outs
}
}

func audioOnlyOutputs(container, name string) []*mediaconvert.Output {
return []*mediaconvert.Output{
{
AudioDescriptions: []*mediaconvert.AudioDescription{
{
CodecSettings: &mediaconvert.AudioCodecSettings{
Codec: aws.String("AAC"),
AacSettings: &mediaconvert.AacSettings{
Bitrate: aws.Int64(96000),
CodingMode: aws.String("CODING_MODE_2_0"),
SampleRate: aws.Int64(48000),
},
},
},
},
ContainerSettings: &mediaconvert.ContainerSettings{
Container: aws.String(container),
},
NameModifier: aws.String(name),
},
}
return outs
}

func output(container, name string, height, maxBitrate int64) *mediaconvert.Output {
Expand Down
8 changes: 7 additions & 1 deletion clients/transcode_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ func ParseTranscodeProviderURL(input string) (TranscodeProvider, error) {
return nil, err
}
// mediaconvert://<key id>:<key secret>@<endpoint host>?region=<aws region>&role=<arn for role>&s3_aux_bucket=<s3 bucket url>
if u.Scheme == "mediaconvert" {
if u.Scheme == "mediaconvert" || u.Scheme == "mediaconverthttp" {
endpoint := fmt.Sprintf("https://%s", u.Host)

// Only used by integration tests to avoid having to stub a TLS server
if u.Scheme == "mediaconverthttp" {
endpoint = fmt.Sprintf("http://%s", u.Host)
}

if u.Host == "" {
return nil, fmt.Errorf("missing endpoint in url: %s", u.String())
}
Expand Down
2 changes: 2 additions & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Cli struct {
StreamHealthHookURL string
BroadcasterURL string
SourcePlaybackHosts map[string]string
CdnRedirectPrefix *url.URL
CdnRedirectPlaybackIDs []string
}

// Return our own URL for callback trigger purposes
Expand Down
65 changes: 59 additions & 6 deletions handlers/geolocation/gelocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/golang/mock/gomock"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/metrics"
mockbalancer "github.com/livepeer/catalyst-api/mocks/balancer"
mockcluster "github.com/livepeer/catalyst-api/mocks/cluster"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -51,11 +54,12 @@ func TestPlaybackIDParserWithPrefix(t *testing.T) {
for i := 0; i < rand.Int()%16+1; i++ {
id := randomPlaybackID(rand.Int()%24 + 1)
path := fmt.Sprintf("/hls/%s+%s/index.m3u8", randomPrefix(), id)
_, playbackID, _, parsed := parsePlaybackID(path)
if !parsed {
pathType, _, playbackID, _ := parsePlaybackID(path)
if pathType == "" {
t.Fail()
}
require.Equal(t, id, playbackID)
require.Equal(t, pathType, "hls")
}
}

Expand All @@ -64,8 +68,8 @@ func TestPlaybackIDParserWithSegment(t *testing.T) {
id := randomPlaybackID(rand.Int()%24 + 1)
seg := "2_1"
path := fmt.Sprintf("/hls/%s+%s/%s/index.m3u8", randomPrefix(), id, seg)
_, playbackID, suffix, parsed := parsePlaybackID(path)
if !parsed {
pathType, _, playbackID, suffix := parsePlaybackID(path)
if pathType == "" {
t.Fail()
}
require.Equal(t, id, playbackID)
Expand All @@ -77,8 +81,8 @@ func TestPlaybackIDParserWithoutPrefix(t *testing.T) {
for i := 0; i < rand.Int()%16+1; i++ {
id := randomPlaybackID(rand.Int()%24 + 1)
path := fmt.Sprintf("/hls/%s/index.m3u8", id)
_, playbackID, _, parsed := parsePlaybackID(path)
if !parsed {
pathType, _, playbackID, _ := parsePlaybackID(path)
if pathType == "" {
t.Fail()
}
require.Equal(t, id, playbackID)
Expand Down Expand Up @@ -327,6 +331,55 @@ func TestNodeHostPortRedirect(t *testing.T) {
hasHeader("Location", "https://right-host/any/path")
}

func TestCdnRedirect(t *testing.T) {
n := mockHandlers(t)
CdnRedirectedPlaybackId := "def_ZXY-999"
n.Config.NodeHost = "someurl.com"
n.Config.CdnRedirectPrefix, _ = url.Parse("https://external-cdn.com/mist")
n.Config.CdnRedirectPlaybackIDs = []string{CdnRedirectedPlaybackId}

// to be redirected to the closest node
requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", playbackID)).
result(n).
hasStatus(http.StatusTemporaryRedirect).
hasHeader("Location", fmt.Sprintf("http://%s/hls/%s/index.m3u8", closestNodeAddr, playbackID))

// playbackID is configured to be redirected to CDN but the path is /json_video... so redirect it to the closest node
requireReq(t, fmt.Sprintf("/json_video+%s.js", CdnRedirectedPlaybackId)).
result(n).
hasStatus(http.StatusTemporaryRedirect).
hasHeader("Location", fmt.Sprintf("http://%s/json_video+%s.js", closestNodeAddr, CdnRedirectedPlaybackId))

// playbackID is configured to be redirected to CDN but it's /webrtc
require.Equal(t, testutil.CollectAndCount(metrics.Metrics.CDNRedirectWebRTC406), 0)
requireReq(t, fmt.Sprintf("/webrtc/%s", CdnRedirectedPlaybackId)).
result(n).
hasStatus(http.StatusNotAcceptable)
require.Equal(t, testutil.CollectAndCount(metrics.Metrics.CDNRedirectWebRTC406), 1)
require.Equal(t, testutil.ToFloat64(metrics.Metrics.CDNRedirectWebRTC406.WithLabelValues("unknown")), float64(0))
require.Equal(t, testutil.ToFloat64(metrics.Metrics.CDNRedirectWebRTC406.WithLabelValues(CdnRedirectedPlaybackId)), float64(1))

// this playbackID is configured to be redirected to CDN
require.Equal(t, testutil.CollectAndCount(metrics.Metrics.CDNRedirectCount), 0)

requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", CdnRedirectedPlaybackId)).
result(n).
hasStatus(http.StatusTemporaryRedirect).
hasHeader("Location", fmt.Sprintf("http://external-cdn.com/mist/hls/%s/index.m3u8", CdnRedirectedPlaybackId))

require.Equal(t, testutil.CollectAndCount(metrics.Metrics.CDNRedirectCount), 1)
require.Equal(t, testutil.ToFloat64(metrics.Metrics.CDNRedirectCount.WithLabelValues("unknown")), float64(0))
require.Equal(t, testutil.ToFloat64(metrics.Metrics.CDNRedirectCount.WithLabelValues(CdnRedirectedPlaybackId)), float64(1))

// don't redirect if `CdnRedirectPrefix` is not configured
n.Config.CdnRedirectPrefix = nil
requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", CdnRedirectedPlaybackId)).
result(n).
hasStatus(http.StatusTemporaryRedirect).
hasHeader("Location", fmt.Sprintf("http://%s/hls/%s/index.m3u8", closestNodeAddr, CdnRedirectedPlaybackId))

}

type httpReq struct {
*testing.T
*http.Request
Expand Down
Loading

0 comments on commit 5e3d658

Please sign in to comment.