diff --git a/api/http_internal.go b/api/http_internal.go index c7e182dc5..c0f9d7962 100644 --- a/api/http_internal.go +++ b/api/http_internal.go @@ -59,7 +59,9 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato router := httprouter.New() withLogging := middleware.LogRequest() withAuth := middleware.IsAuthorized - withCapacityChecking := middleware.HasCapacity + + capacityMiddleware := middleware.CapacityMiddleware{} + withCapacityChecking := capacityMiddleware.HasCapacity geoHandlers := &geolocation.GeolocationHandlersCollection{ Config: cli, diff --git a/balancer/balancer.go b/balancer/balancer.go index 84c6847c9..ce6493795 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -335,10 +335,12 @@ func (b *BalancerImpl) GetBestNode(ctx context.Context, redirectPrefixes []strin return fallbackNode(fallbackAddr, fallbackPrefix, playbackID, redirectPrefixes[0], err) } +// `playbackID`s matching the pattern `....-....-.....-....` +var regexpStreamKey = regexp.MustCompile(`^(?:\w{4}-){3}\w{4}$`) + func fallbackNode(fallbackAddr, fallbackPrefix, playbackID, defaultPrefix string, err error) (string, string, error) { // Check for `playbackID`s matching the pattern `....-....-.....-....` - r := regexp.MustCompile(`^(?:\w{4}-){3}\w{4}$`) - if r.MatchString(playbackID) { + if regexpStreamKey.MatchString(playbackID) { return fallbackAddr, playbackID, nil } diff --git a/clients/input_copy.go b/clients/input_copy.go index 0509c8e8c..2137ac718 100644 --- a/clients/input_copy.go +++ b/clients/input_copy.go @@ -45,49 +45,47 @@ func NewInputCopy() *InputCopy { } // CopyInputToS3 copies the input video to our S3 transfer bucket and probes the file. -func (s *InputCopy) CopyInputToS3(requestID string, inputFile, osTransferURL *url.URL, decryptor *crypto.DecryptionKeys) (inputVideoProbe video.InputVideo, signedURL string, err error) { +func (s *InputCopy) CopyInputToS3(requestID string, inputFile, osTransferURL *url.URL, decryptor *crypto.DecryptionKeys) (video.InputVideo, string, error) { + var signedURL string + var err error if IsHLSInput(inputFile) { log.Log(requestID, "skipping copy for hls") signedURL = inputFile.String() } else { - err = CopyAllInputFiles(requestID, inputFile, osTransferURL, decryptor) - if err != nil { - err = fmt.Errorf("failed to copy file(s): %w", err) - return + if err := CopyAllInputFiles(requestID, inputFile, osTransferURL, decryptor); err != nil { + return video.InputVideo{}, "", fmt.Errorf("failed to copy file(s): %w", err) } signedURL, err = getSignedURL(osTransferURL) if err != nil { - return + return video.InputVideo{}, "", err } } log.Log(requestID, "starting probe", "source", inputFile.String(), "dest", osTransferURL.String()) - inputVideoProbe, err = s.Probe.ProbeFile(requestID, signedURL) + inputFileProbe, err := s.Probe.ProbeFile(requestID, signedURL) if err != nil { log.Log(requestID, "probe failed", "err", err, "source", inputFile.String(), "dest", osTransferURL.String()) - err = fmt.Errorf("error probing MP4 input file from S3: %w", err) - return + return video.InputVideo{}, "", fmt.Errorf("error probing MP4 input file from S3: %w", err) } + log.Log(requestID, "probe succeeded", "source", inputFile.String(), "dest", osTransferURL.String()) - videoTrack, err := inputVideoProbe.GetTrack(video.TrackTypeVideo) - if err != nil { - err = fmt.Errorf("no video track found in input video: %w", err) - return + videoTrack, err := inputFileProbe.GetTrack(video.TrackTypeVideo) + hasVideoTrack := err == nil + if hasVideoTrack { + log.Log(requestID, "probed video track:", "container", inputFileProbe.Format, "codec", videoTrack.Codec, "bitrate", videoTrack.Bitrate, "duration", videoTrack.DurationSec, "w", videoTrack.Width, "h", videoTrack.Height, "pix-format", videoTrack.PixelFormat, "FPS", videoTrack.FPS) } - audioTrack, _ := inputVideoProbe.GetTrack(video.TrackTypeAudio) - if videoTrack.FPS <= 0 { + if hasVideoTrack && videoTrack.FPS <= 0 { // unsupported, includes things like motion jpegs - err = fmt.Errorf("invalid framerate: %f", videoTrack.FPS) - return + return video.InputVideo{}, "", fmt.Errorf("invalid framerate: %f", videoTrack.FPS) } - if inputVideoProbe.SizeBytes > config.MaxInputFileSizeBytes { - err = fmt.Errorf("input file %d bytes was greater than %d bytes", inputVideoProbe.SizeBytes, config.MaxInputFileSizeBytes) - return + if inputFileProbe.SizeBytes > config.MaxInputFileSizeBytes { + return video.InputVideo{}, "", fmt.Errorf("input file %d bytes was greater than %d bytes", inputFileProbe.SizeBytes, config.MaxInputFileSizeBytes) } - log.Log(requestID, "probed video track:", "container", inputVideoProbe.Format, "codec", videoTrack.Codec, "bitrate", videoTrack.Bitrate, "duration", videoTrack.DurationSec, "w", videoTrack.Width, "h", videoTrack.Height, "pix-format", videoTrack.PixelFormat, "FPS", videoTrack.FPS) + + audioTrack, _ := inputFileProbe.GetTrack(video.TrackTypeAudio) log.Log(requestID, "probed audio track", "codec", audioTrack.Codec, "bitrate", audioTrack.Bitrate, "duration", audioTrack.DurationSec, "channels", audioTrack.Channels) - return + return inputFileProbe, signedURL, nil } func getSignedURL(osTransferURL *url.URL) (string, error) { diff --git a/clients/mediaconvert.go b/clients/mediaconvert.go index f0fe62ed6..4fcc8044f 100644 --- a/clients/mediaconvert.go +++ b/clients/mediaconvert.go @@ -122,11 +122,9 @@ func (mc *MediaConvert) Transcode(ctx context.Context, args TranscodeJobArgs) (o ) videoTrack, err := mcArgs.InputFileInfo.GetTrack(video.TrackTypeVideo) - if err != nil { - return nil, fmt.Errorf("no video track found in input video: %w", err) - } + hasVideoTrack := err == nil - if len(mcArgs.Profiles) == 0 { + if hasVideoTrack && len(mcArgs.Profiles) == 0 { mcArgs.Profiles, err = video.GetDefaultPlaybackProfiles(videoTrack) if err != nil { return nil, fmt.Errorf("failed to get playback profiles: %w", err) @@ -148,6 +146,7 @@ func (mc *MediaConvert) Transcode(ctx context.Context, args TranscodeJobArgs) (o mcArgs.MP4OutputLocation = mc.s3TransferBucket.JoinPath(mcMp4OutputRelPath) } + // Do the actual transcode err = mc.coreAwsTranscode(ctx, mcArgs, true) if err == ErrJobAcceleration { err = mc.coreAwsTranscode(ctx, mcArgs, false) @@ -374,11 +373,39 @@ func outputGroups(hlsOutputFile, mp4OutputFile string, profiles []video.EncodedP } func outputs(container string, profiles []video.EncodedProfile) []*mediaconvert.Output { - outs := make([]*mediaconvert.Output, 0, len(profiles)) - for _, profile := range profiles { - outs = append(outs, output(container, profile.Name, profile.Height, profile.Bitrate)) + // If we don't have any video profiles, it means we're in audio-only mode + if len(profiles) == 0 { + return audioOnlyOutputs(container, "audioonly") + } else { + outs := make([]*mediaconvert.Output, 0, len(profiles)) + for _, profile := range profiles { + outs = append(outs, output(container, profile.Name, profile.Height, profile.Bitrate)) + } + return outs + } +} + +func audioOnlyOutputs(container, name string) []*mediaconvert.Output { + return []*mediaconvert.Output{ + { + AudioDescriptions: []*mediaconvert.AudioDescription{ + { + CodecSettings: &mediaconvert.AudioCodecSettings{ + Codec: aws.String("AAC"), + AacSettings: &mediaconvert.AacSettings{ + Bitrate: aws.Int64(96000), + CodingMode: aws.String("CODING_MODE_2_0"), + SampleRate: aws.Int64(48000), + }, + }, + }, + }, + ContainerSettings: &mediaconvert.ContainerSettings{ + Container: aws.String(container), + }, + NameModifier: aws.String(name), + }, } - return outs } func output(container, name string, height, maxBitrate int64) *mediaconvert.Output { diff --git a/clients/transcode_provider.go b/clients/transcode_provider.go index 7d0b62ffb..7d495e9fb 100644 --- a/clients/transcode_provider.go +++ b/clients/transcode_provider.go @@ -47,8 +47,14 @@ func ParseTranscodeProviderURL(input string) (TranscodeProvider, error) { return nil, err } // mediaconvert://:@?region=&role=&s3_aux_bucket= - if u.Scheme == "mediaconvert" { + if u.Scheme == "mediaconvert" || u.Scheme == "mediaconverthttp" { endpoint := fmt.Sprintf("https://%s", u.Host) + + // Only used by integration tests to avoid having to stub a TLS server + if u.Scheme == "mediaconverthttp" { + endpoint = fmt.Sprintf("http://%s", u.Host) + } + if u.Host == "" { return nil, fmt.Errorf("missing endpoint in url: %s", u.String()) } diff --git a/config/cli.go b/config/cli.go index f396bea42..00ebda342 100644 --- a/config/cli.go +++ b/config/cli.go @@ -60,6 +60,8 @@ type Cli struct { StreamHealthHookURL string BroadcasterURL string SourcePlaybackHosts map[string]string + CdnRedirectPrefix *url.URL + CdnRedirectPlaybackIDs []string } // Return our own URL for callback trigger purposes diff --git a/handlers/geolocation/gelocation_test.go b/handlers/geolocation/gelocation_test.go index d0e82e441..3888a1f11 100644 --- a/handlers/geolocation/gelocation_test.go +++ b/handlers/geolocation/gelocation_test.go @@ -6,14 +6,17 @@ import ( "math/rand" "net/http" "net/http/httptest" + "net/url" "testing" "github.com/golang/mock/gomock" "github.com/julienschmidt/httprouter" "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/config" + "github.com/livepeer/catalyst-api/metrics" mockbalancer "github.com/livepeer/catalyst-api/mocks/balancer" mockcluster "github.com/livepeer/catalyst-api/mocks/cluster" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" ) @@ -51,11 +54,12 @@ func TestPlaybackIDParserWithPrefix(t *testing.T) { for i := 0; i < rand.Int()%16+1; i++ { id := randomPlaybackID(rand.Int()%24 + 1) path := fmt.Sprintf("/hls/%s+%s/index.m3u8", randomPrefix(), id) - _, playbackID, _, parsed := parsePlaybackID(path) - if !parsed { + pathType, _, playbackID, _ := parsePlaybackID(path) + if pathType == "" { t.Fail() } require.Equal(t, id, playbackID) + require.Equal(t, pathType, "hls") } } @@ -64,8 +68,8 @@ func TestPlaybackIDParserWithSegment(t *testing.T) { id := randomPlaybackID(rand.Int()%24 + 1) seg := "2_1" path := fmt.Sprintf("/hls/%s+%s/%s/index.m3u8", randomPrefix(), id, seg) - _, playbackID, suffix, parsed := parsePlaybackID(path) - if !parsed { + pathType, _, playbackID, suffix := parsePlaybackID(path) + if pathType == "" { t.Fail() } require.Equal(t, id, playbackID) @@ -77,8 +81,8 @@ func TestPlaybackIDParserWithoutPrefix(t *testing.T) { for i := 0; i < rand.Int()%16+1; i++ { id := randomPlaybackID(rand.Int()%24 + 1) path := fmt.Sprintf("/hls/%s/index.m3u8", id) - _, playbackID, _, parsed := parsePlaybackID(path) - if !parsed { + pathType, _, playbackID, _ := parsePlaybackID(path) + if pathType == "" { t.Fail() } require.Equal(t, id, playbackID) @@ -327,6 +331,55 @@ func TestNodeHostPortRedirect(t *testing.T) { hasHeader("Location", "https://right-host/any/path") } +func TestCdnRedirect(t *testing.T) { + n := mockHandlers(t) + CdnRedirectedPlaybackId := "def_ZXY-999" + n.Config.NodeHost = "someurl.com" + n.Config.CdnRedirectPrefix, _ = url.Parse("https://external-cdn.com/mist") + n.Config.CdnRedirectPlaybackIDs = []string{CdnRedirectedPlaybackId} + + // to be redirected to the closest node + requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", playbackID)). + result(n). + hasStatus(http.StatusTemporaryRedirect). + hasHeader("Location", fmt.Sprintf("http://%s/hls/%s/index.m3u8", closestNodeAddr, playbackID)) + + // playbackID is configured to be redirected to CDN but the path is /json_video... so redirect it to the closest node + requireReq(t, fmt.Sprintf("/json_video+%s.js", CdnRedirectedPlaybackId)). + result(n). + hasStatus(http.StatusTemporaryRedirect). + hasHeader("Location", fmt.Sprintf("http://%s/json_video+%s.js", closestNodeAddr, CdnRedirectedPlaybackId)) + + // playbackID is configured to be redirected to CDN but it's /webrtc + require.Equal(t, testutil.CollectAndCount(metrics.Metrics.CDNRedirectWebRTC406), 0) + requireReq(t, fmt.Sprintf("/webrtc/%s", CdnRedirectedPlaybackId)). + result(n). + hasStatus(http.StatusNotAcceptable) + require.Equal(t, testutil.CollectAndCount(metrics.Metrics.CDNRedirectWebRTC406), 1) + require.Equal(t, testutil.ToFloat64(metrics.Metrics.CDNRedirectWebRTC406.WithLabelValues("unknown")), float64(0)) + require.Equal(t, testutil.ToFloat64(metrics.Metrics.CDNRedirectWebRTC406.WithLabelValues(CdnRedirectedPlaybackId)), float64(1)) + + // this playbackID is configured to be redirected to CDN + require.Equal(t, testutil.CollectAndCount(metrics.Metrics.CDNRedirectCount), 0) + + requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", CdnRedirectedPlaybackId)). + result(n). + hasStatus(http.StatusTemporaryRedirect). + hasHeader("Location", fmt.Sprintf("http://external-cdn.com/mist/hls/%s/index.m3u8", CdnRedirectedPlaybackId)) + + require.Equal(t, testutil.CollectAndCount(metrics.Metrics.CDNRedirectCount), 1) + require.Equal(t, testutil.ToFloat64(metrics.Metrics.CDNRedirectCount.WithLabelValues("unknown")), float64(0)) + require.Equal(t, testutil.ToFloat64(metrics.Metrics.CDNRedirectCount.WithLabelValues(CdnRedirectedPlaybackId)), float64(1)) + + // don't redirect if `CdnRedirectPrefix` is not configured + n.Config.CdnRedirectPrefix = nil + requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", CdnRedirectedPlaybackId)). + result(n). + hasStatus(http.StatusTemporaryRedirect). + hasHeader("Location", fmt.Sprintf("http://%s/hls/%s/index.m3u8", closestNodeAddr, CdnRedirectedPlaybackId)) + +} + type httpReq struct { *testing.T *http.Request diff --git a/handlers/geolocation/geolocation.go b/handlers/geolocation/geolocation.go index d17f99434..0bde50bef 100644 --- a/handlers/geolocation/geolocation.go +++ b/handlers/geolocation/geolocation.go @@ -14,6 +14,7 @@ import ( "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/handlers/misttriggers" + "github.com/livepeer/catalyst-api/metrics" ) type GeolocationHandlersCollection struct { @@ -24,31 +25,53 @@ type GeolocationHandlersCollection struct { // this package handles geolocation for playback and origin discovery for node replication -// redirect an incoming user to a node for playback or 404 handling +// Redirect an incoming user to: CDN (only for /hls), closest node (geolocate) +// or another service (like mist HLS) on the current host for playback. func (c *GeolocationHandlersCollection) RedirectHandler() httprouter.Handle { - return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - nodeHost := c.Config.NodeHost - redirectPrefixes := c.Config.RedirectPrefixes - - if nodeHost != "" { - host := r.Host - if host != nodeHost { - newURL, err := url.Parse(r.URL.String()) - if err != nil { - glog.Errorf("failed to parse incoming url for redirect url=%s err=%s", r.URL.String(), err) - w.WriteHeader(http.StatusInternalServerError) + return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) { + host := r.Host + pathType, prefix, playbackID, pathTmpl := parsePlaybackID(r.URL.Path) + + if c.Config.CdnRedirectPrefix != nil && (pathType == "hls" || pathType == "webrtc") { + if contains(c.Config.CdnRedirectPlaybackIDs, playbackID) { + if pathType == "webrtc" { + // For webRTC streams on the `CdnRedirectPlaybackIDs` list we return `406` + // so the player can fallback to a new HLS request. For webRTC streams not + // on the CdnRedirectPlaybackIDs list we do regular geolocation. + w.WriteHeader(http.StatusNotAcceptable) // 406 error + metrics.Metrics.CDNRedirectWebRTC406.WithLabelValues(playbackID).Inc() + glog.V(6).Infof("%s not supported for CDN-redirected %s", r.URL.Path, playbackID) return } + newURL, _ := url.Parse(r.URL.String()) newURL.Scheme = protocol(r) - newURL.Host = nodeHost + newURL.Host = c.Config.CdnRedirectPrefix.Host + newURL.Path, _ = url.JoinPath(c.Config.CdnRedirectPrefix.Path, newURL.Path) http.Redirect(w, r, newURL.String(), http.StatusTemporaryRedirect) - glog.V(6).Infof("NodeHost redirect host=%s nodeHost=%s from=%s to=%s", host, nodeHost, r.URL, newURL) + metrics.Metrics.CDNRedirectCount.WithLabelValues(playbackID).Inc() + glog.V(6).Infof("CDN redirect host=%s from=%s to=%s", host, r.URL, newURL) + return + } + } + + nodeHost := c.Config.NodeHost + + if nodeHost != "" && nodeHost != host { + newURL, err := url.Parse(r.URL.String()) + if err != nil { + glog.Errorf("failed to parse incoming url for redirect url=%s err=%s", r.URL.String(), err) + w.WriteHeader(http.StatusInternalServerError) return } + newURL.Scheme = protocol(r) + newURL.Host = nodeHost + http.Redirect(w, r, newURL.String(), http.StatusTemporaryRedirect) + glog.V(6).Infof("NodeHost redirect host=%s nodeHost=%s from=%s to=%s", host, nodeHost, r.URL, newURL) + return } - prefix, playbackID, pathTmpl, isValid := parsePlaybackID(r.URL.Path) - if !isValid { + if pathType == "" { + glog.Warningf("Can not parse playbackID from path %s", r.URL.Path) w.WriteHeader(http.StatusNotFound) return } @@ -56,6 +79,7 @@ func (c *GeolocationHandlersCollection) RedirectHandler() httprouter.Handle { lat := r.Header.Get("X-Latitude") lon := r.Header.Get("X-Longitude") + redirectPrefixes := c.Config.RedirectPrefixes bestNode, fullPlaybackID, err := c.Balancer.GetBestNode(context.Background(), redirectPrefixes, playbackID, lat, lon, prefix) if err != nil { glog.Errorf("failed to find either origin or fallback server for playbackID=%s err=%s", playbackID, err) @@ -117,60 +141,63 @@ func parsePlus(plusString string) (string, string) { return prefix, playbackID } +var regexpHLSPath = regexp.MustCompile(`^/hls/([\w+-]+)/(.*index.m3u8.*)$`) + // Incoming requests might come with some prefix attached to the // playback ID. We try to drop that here by splitting at `+` and // picking the last piece. For eg. // incoming path = '/hls/video+4712oox4msvs9qsf/index.m3u8' // playbackID = '4712oox4msvs9qsf' -func parsePlaybackIDHLS(path string) (string, string, string, bool) { - r := regexp.MustCompile(`^/hls/([\w+-]+)/(.*index.m3u8.*)$`) - m := r.FindStringSubmatch(path) +func parsePlaybackIDHLS(path string) (string, string, string, string) { + m := regexpHLSPath.FindStringSubmatch(path) if len(m) < 3 { - return "", "", "", false + return "", "", "", "" } prefix, playbackID := parsePlus(m[1]) if playbackID == "" { - return "", "", "", false + return "", "", "", "" } pathTmpl := "/hls/%s/" + m[2] - return prefix, playbackID, pathTmpl, true + return "hls", prefix, playbackID, pathTmpl } -func parsePlaybackIDJS(path string) (string, string, string, bool) { - r := regexp.MustCompile(`^/json_([\w+-]+).js$`) - m := r.FindStringSubmatch(path) +var regexpJSONPath = regexp.MustCompile(`^/json_([\w+-]+).js$`) + +func parsePlaybackIDJS(path string) (string, string, string, string) { + m := regexpJSONPath.FindStringSubmatch(path) if len(m) < 2 { - return "", "", "", false + return "", "", "", "" } prefix, playbackID := parsePlus(m[1]) if playbackID == "" { - return "", "", "", false + return "", "", "", "" } - return prefix, playbackID, "/json_%s.js", true + return "json", prefix, playbackID, "/json_%s.js" } -func parsePlaybackIDWebRTC(path string) (string, string, string, bool) { - r := regexp.MustCompile(`^/webrtc/([\w+-]+)$`) - m := r.FindStringSubmatch(path) +var regexpWebRTCPath = regexp.MustCompile(`^/webrtc/([\w+-]+)$`) + +func parsePlaybackIDWebRTC(path string) (string, string, string, string) { + m := regexpWebRTCPath.FindStringSubmatch(path) if len(m) < 2 { - return "", "", "", false + return "", "", "", "" } prefix, playbackID := parsePlus(m[1]) if playbackID == "" { - return "", "", "", false + return "", "", "", "" } - return prefix, playbackID, "/webrtc/%s", true + return "webrtc", prefix, playbackID, "/webrtc/%s" } -func parsePlaybackID(path string) (string, string, string, bool) { - parsers := []func(string) (string, string, string, bool){parsePlaybackIDHLS, parsePlaybackIDJS, parsePlaybackIDWebRTC} +func parsePlaybackID(path string) (string, string, string, string) { + parsers := []func(string) (string, string, string, string){parsePlaybackIDHLS, parsePlaybackIDJS, parsePlaybackIDWebRTC} for _, parser := range parsers { - prefix, playbackID, suffix, isValid := parser(path) - if isValid { - return prefix, playbackID, suffix, isValid + pathType, prefix, playbackID, suffix := parser(path) + if pathType != "" { + return pathType, prefix, playbackID, suffix } } - return "", "", "", false + return "", "", "", "" } func protocol(r *http.Request) string { @@ -179,3 +206,12 @@ func protocol(r *http.Request) string { } return "http" } + +func contains[T comparable](list []T, v T) bool { + for _, elm := range list { + if elm == v { + return true + } + } + return false +} diff --git a/main.go b/main.go index a4fa045da..c315cfff5 100644 --- a/main.go +++ b/main.go @@ -87,6 +87,8 @@ func main() { fs.StringVar(&cli.NodeName, "node", hostname, "Name of this node within the cluster") config.SpaceSliceFlag(fs, &cli.BalancerArgs, "balancer-args", []string{}, "arguments passed to MistUtilLoad") fs.StringVar(&cli.NodeHost, "node-host", "", "Hostname this node should handle requests for. Requests on any other domain will trigger a redirect. Useful as a 404 handler to send users to another node.") + config.CommaSliceFlag(fs, &cli.CdnRedirectPlaybackIDs, "cdn-redirect-playback-ids", []string{}, "PlaybackIDs to be redirected to CDN.") + config.URLVarFlag(fs, &cli.CdnRedirectPrefix, "cdn-redirect-prefix", "", "CDN URL where streams selected by -cdn-redirect-playback-ids are redirected. E.g. https://externalcdn.livepeer.com/mist/") fs.Float64Var(&cli.NodeLatitude, "node-latitude", 0, "Latitude of this Catalyst node. Used for load balancing.") fs.Float64Var(&cli.NodeLongitude, "node-longitude", 0, "Longitude of this Catalyst node. Used for load balancing.") config.CommaSliceFlag(fs, &cli.RedirectPrefixes, "redirect-prefixes", []string{}, "Set of valid prefixes of playback id which are handled by mistserver") diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index 5f0c519a3..2c25b5630 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -29,7 +29,6 @@ const audioEnabledStreamSuffix = "rec" const waitForPushError = 7 * time.Second const waitForPushErrorIncreased = 2 * time.Minute const keepStreamAfterEnd = 15 * time.Second -const statsCollectionPeriod = 10 * time.Second const ownExchangeName = "lp_mist_api_connector" const webhooksExchangeName = "webhook_default_exchange" @@ -102,6 +101,7 @@ type ( broker misttriggers.TriggerBroker mist clients.MistAPIClient multistreamUpdated chan struct{} + metricsCollector *metricsCollector } ) @@ -150,12 +150,12 @@ func (mc *mac) Start(ctx context.Context) error { glog.Infof("AMQP url is empty!") } if producer != nil && mc.config.MistScrapeMetrics { - startMetricsCollector(ctx, statsCollectionPeriod, mc.nodeID, mc.ownRegion, mc.mist, lapi, producer, ownExchangeName, mc) + mc.metricsCollector = createMetricsCollector(mc.nodeID, mc.ownRegion, mc.mist, lapi, producer, ownExchangeName, mc) } mc.multistreamUpdated = make(chan struct{}, 1) go func() { - mc.reconcileMultistreamLoop(ctx) + mc.reconcileLoop(ctx) }() <-ctx.Done() @@ -497,9 +497,9 @@ func (mc *mac) shouldEnableAudio(stream *api.Stream) bool { return audio } -// reconcileMultistreamLoop calls reconcileMultistream periodically or when multistreamUpdated is triggered on demand. -func (mc *mac) reconcileMultistreamLoop(ctx context.Context) { - ticker := time.NewTicker(1 * time.Minute) +// reconcileLoop calls reconcileMultistream and processStats periodically or when multistreamUpdated is triggered on demand. +func (mc *mac) reconcileLoop(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { @@ -508,7 +508,13 @@ func (mc *mac) reconcileMultistreamLoop(ctx context.Context) { case <-ticker.C: case <-mc.multistreamUpdated: } - mc.reconcileMultistream() + mistState, err := mc.mist.GetState() + if err != nil { + glog.Errorf("error executing query on Mist, cannot reconcile err=%v", err) + continue + } + mc.reconcileMultistream(mistState) + mc.processStats(mistState) } } @@ -518,7 +524,7 @@ func (mc *mac) reconcileMultistreamLoop(ctx context.Context) { // - Mist removed its push for some reason // Note that we use Mist AUTO_PUSH (which in turn makes sure that the PUSH is always available). // Note also that we only create AUTO_PUSH for active streams which are ingest (not playback). -func (mc *mac) reconcileMultistream() { +func (mc *mac) reconcileMultistream(mistState clients.MistState) { type key struct { stream string target string @@ -531,12 +537,6 @@ func (mc *mac) reconcileMultistream() { (strings.HasPrefix(strings.ToLower(k.target), "rtmp:") || strings.HasPrefix(strings.ToLower(k.target), "srt:")) } - mistState, err := mc.mist.GetState() - if err != nil { - glog.Errorf("error executing query on Mist, cannot reconcile multistream err=%v", err) - return - } - // Get the existing PUSH_AUTO from Mist var filteredMistPushAutoList []*clients.MistPushAuto mistMap := map[key]bool{} @@ -614,6 +614,12 @@ func (mc *mac) reconcileMultistream() { } } +func (mc *mac) processStats(mistState clients.MistState) { + if mc.metricsCollector != nil { + mc.metricsCollector.collectMetricsLogged(mc.ctx, 60*time.Second, mistState) + } +} + func (mc *mac) getPushUrl(stream *api.Stream, targetRef *api.MultistreamTargetRef) (*api.MultistreamTarget, string, error) { target, err := mc.lapi.GetMultistreamTarget(targetRef.ID) if err != nil { diff --git a/mapic/mistapiconnector_app_test.go b/mapic/mistapiconnector_app_test.go index 38f77004b..6dc6cf93d 100644 --- a/mapic/mistapiconnector_app_test.go +++ b/mapic/mistapiconnector_app_test.go @@ -1,13 +1,14 @@ package mistapiconnector import ( + "testing" + "github.com/golang/mock/gomock" "github.com/livepeer/catalyst-api/clients" "github.com/livepeer/catalyst-api/config" mockmistclient "github.com/livepeer/catalyst-api/mocks/clients" "github.com/livepeer/go-api-client" "github.com/stretchr/testify/require" - "testing" ) func TestReconcileMultistream(t *testing.T) { @@ -157,7 +158,9 @@ func TestReconcileMultistream(t *testing.T) { return nil }).AnyTimes() - mc.reconcileMultistream() + mistState, err := mm.GetState() + require.NoError(t, err) + mc.reconcileMultistream(mistState) expectedAutoToAdd := []streamTarget{ { diff --git a/mapic/stats_collector.go b/mapic/stats_collector.go index 38f194c79..c3dd9e99a 100644 --- a/mapic/stats_collector.go +++ b/mapic/stats_collector.go @@ -19,6 +19,7 @@ const lastSeenBumpPeriod = 30 * time.Second type infoProvider interface { getStreamInfo(mistID string) (*streamInfo, error) + wildcardPlaybackID(stream *api.Stream) string } type metricsCollector struct { @@ -30,45 +31,27 @@ type metricsCollector struct { infoProvider } -func startMetricsCollector(ctx context.Context, period time.Duration, nodeID, ownRegion string, mapi clients.MistAPIClient, lapi *api.Client, producer event.AMQPProducer, amqpExchange string, infop infoProvider) { +func createMetricsCollector(nodeID, ownRegion string, mapi clients.MistAPIClient, lapi *api.Client, producer event.AMQPProducer, amqpExchange string, infop infoProvider) *metricsCollector { mc := &metricsCollector{nodeID, ownRegion, mapi, lapi, producer, amqpExchange, infop} - mc.collectMetricsLogged(ctx, period) - go mc.mainLoop(ctx, period) + return mc } -func (c *metricsCollector) mainLoop(loopCtx context.Context, period time.Duration) { - ticker := time.NewTicker(period) - defer ticker.Stop() - for { - select { - case <-loopCtx.Done(): - return - case <-ticker.C: - c.collectMetricsLogged(loopCtx, period) - } - } -} - -func (c *metricsCollector) collectMetricsLogged(ctx context.Context, timeout time.Duration) { +func (c *metricsCollector) collectMetricsLogged(ctx context.Context, timeout time.Duration, mistState clients.MistState) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - if err := c.collectMetrics(ctx); err != nil { + if err := c.collectMetrics(ctx, mistState); err != nil { glog.Errorf("Error collecting mist metrics. err=%v", err) } } -func (c *metricsCollector) collectMetrics(ctx context.Context) error { +func (c *metricsCollector) collectMetrics(ctx context.Context, mistState clients.MistState) error { defer func() { if rec := recover(); rec != nil { glog.Errorf("Panic in metrics collector. value=%v", rec) } }() - mistStats, err := c.mist.GetState() - if err != nil { - return err - } - streamsMetrics := compileStreamMetrics(&mistStats) + streamsMetrics := compileStreamMetrics(&mistState) eg := errgroup.Group{} eg.SetLimit(5) @@ -87,11 +70,12 @@ func (c *metricsCollector) collectMetrics(ctx context.Context) error { glog.Errorf("Error getting stream info for streamId=%s err=%q", streamID, err) continue } - if info.isLazy { - // avoid spamming metrics for playback-only catalyst instances. This means - // that if mapic restarts we will stop sending metrics from previous - // streams as well, but that's a minor issue (curr stream health is dying). - glog.Infof("Skipping metrics for lazily created stream info. streamId=%q metrics=%+v", streamID, metrics) + + stream := c.infoProvider.wildcardPlaybackID(info.stream) + isIngest := isIngestStream(stream, info, mistState) + + if !isIngest { + glog.V(8).Infof("Skipping non-ingest stream. streamId=%q", streamID) continue } diff --git a/metrics/metrics.go b/metrics/metrics.go index f25d7ed04..78e1d6d7a 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -27,6 +27,8 @@ type CatalystAPIMetrics struct { UploadVODRequestDurationSec *prometheus.SummaryVec TranscodeSegmentDurationSec prometheus.Histogram PlaybackRequestDurationSec *prometheus.SummaryVec + CDNRedirectCount *prometheus.CounterVec + CDNRedirectWebRTC406 *prometheus.CounterVec TranscodingStatusUpdate ClientMetrics BroadcasterClient ClientMetrics @@ -64,7 +66,14 @@ func NewMetrics() *CatalystAPIMetrics { Name: "catalyst_playback_request_duration_seconds", Help: "The latency of the requests made to /asset/hls in seconds broken up by success and status code", }, []string{"success", "status_code", "version"}), - + CDNRedirectCount: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "cdn_redirect_count", + Help: "Number of requests redirected to CDN", + }, []string{"playbackID"}), + CDNRedirectWebRTC406: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "cdn_redirect_webrtc_406", + Help: "Number of WebRTC requests rejected with HTTP 406 because of playback should be seved from external CDN", + }, []string{"playbackID"}), // Clients metrics TranscodingStatusUpdate: ClientMetrics{ diff --git a/middleware/capacity.go b/middleware/capacity.go index 0073fe0ca..940c5f003 100644 --- a/middleware/capacity.go +++ b/middleware/capacity.go @@ -2,18 +2,27 @@ package middleware import ( "net/http" + "sync/atomic" "github.com/julienschmidt/httprouter" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/pipeline" ) -func HasCapacity(vodEngine *pipeline.Coordinator, next httprouter.Handle) httprouter.Handle { +type CapacityMiddleware struct { + requestsInFlight atomic.Int64 +} + +func (c *CapacityMiddleware) HasCapacity(vodEngine *pipeline.Coordinator, next httprouter.Handle) httprouter.Handle { return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { - if len(vodEngine.Jobs.GetKeys()) >= config.MAX_JOBS_IN_FLIGHT { + requestsInFlight := c.requestsInFlight.Add(1) + defer c.requestsInFlight.Add(-1) + + if len(vodEngine.Jobs.GetKeys())+int(requestsInFlight) >= config.MAX_JOBS_IN_FLIGHT { w.WriteHeader(http.StatusTooManyRequests) return } + next(w, r, ps) } } diff --git a/middleware/capacity_test.go b/middleware/capacity_test.go index ffbfc9e21..bed2f6d0c 100644 --- a/middleware/capacity_test.go +++ b/middleware/capacity_test.go @@ -1,6 +1,7 @@ package middleware import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -9,8 +10,10 @@ import ( "github.com/julienschmidt/httprouter" "github.com/livepeer/catalyst-api/clients" + "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/pipeline" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestItCallsNextMiddlewareWhenCapacityAvailable(t *testing.T) { @@ -21,7 +24,8 @@ func TestItCallsNextMiddlewareWhenCapacityAvailable(t *testing.T) { } // Set up the HTTP handler - handler := HasCapacity(pipeline.NewStubCoordinator(), next) + cm := CapacityMiddleware{} + handler := cm.HasCapacity(pipeline.NewStubCoordinator(), next) // Call the handler responseRecorder := httptest.NewRecorder() @@ -32,7 +36,7 @@ func TestItCallsNextMiddlewareWhenCapacityAvailable(t *testing.T) { require.True(t, nextCalled) } -func TestItErrorsWhenNoCapacityAvailable(t *testing.T) { +func TestItErrorsWhenNoJobCapacityAvailable(t *testing.T) { // Create a next handler in the middleware chain, to confirm the request was passed onwards var nextCalled bool next := func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { @@ -53,7 +57,8 @@ func TestItErrorsWhenNoCapacityAvailable(t *testing.T) { time.Sleep(1 * time.Second) // Set up the HTTP handler - handler := HasCapacity(coordinator, next) + cm := CapacityMiddleware{} + handler := cm.HasCapacity(coordinator, next) // Call the handler responseRecorder := httptest.NewRecorder() @@ -65,3 +70,50 @@ func TestItErrorsWhenNoCapacityAvailable(t *testing.T) { // Confirm the handler didn't call the next middleware require.False(t, nextCalled) } + +// As well as looking at jobs in progress, we should also take into account +// in-flight HTTP requests to avoid the race condition where we get a lot of +// requests at once and let them all through +func TestItTakesIntoAccountInFlightHTTPRequests(t *testing.T) { + // Create a next handler in the middleware chain, to confirm the request was passed onwards + next := func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { + time.Sleep(2 * time.Second) // Sleep to simulate a request that doesn't immediately finish + } + + pipeFfmpeg, release := pipeline.NewBlockingStubHandler() + defer release() + coordinator := pipeline.NewStubCoordinatorOpts(pipeline.StrategyCatalystFfmpegDominance, nil, pipeFfmpeg, nil, "") + coordinator.InputCopy = &clients.StubInputCopy{} + + // Set up the HTTP handler + cm := CapacityMiddleware{} + handler := cm.HasCapacity(coordinator, next) + + // Call the handler + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + g, _ := errgroup.WithContext(timeout) + var responseCodes []int = make([]int, 100) + for i := 0; i < 100; i++ { + i := i + g.Go( + func() error { + responseRecorder := httptest.NewRecorder() + handler(responseRecorder, nil, nil) + responseCodes[i] = responseRecorder.Code + return nil + }, + ) + } + require.NoError(t, g.Wait()) + + var rejectedRequestCount = 0 + for _, responseCode := range responseCodes { + if responseCode == http.StatusTooManyRequests { + rejectedRequestCount++ + } + } + + // Confirm the handler didn't let too many requests through + require.Equal(t, rejectedRequestCount, 100-config.MAX_JOBS_IN_FLIGHT+1) +} diff --git a/pipeline/coordinator.go b/pipeline/coordinator.go index 08a54c8a5..76cf558e3 100644 --- a/pipeline/coordinator.go +++ b/pipeline/coordinator.go @@ -35,8 +35,6 @@ const ( StrategyExternalDominance Strategy = "external" // Only execute the FFMPEG / Livepeer pipeline StrategyCatalystFfmpegDominance Strategy = "catalyst_ffmpeg" - // Execute the FFMPEG / Livepeer pipeline in foreground and the external transcoder in background. - StrategyBackgroundExternal Strategy = "background_external" // Execute the FFMPEG pipeline first and fallback to the external transcoding // provider on errors. StrategyFallbackExternal Strategy = "fallback_external" @@ -49,7 +47,7 @@ const ( func (s Strategy) IsValid() bool { switch s { - case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyBackgroundExternal, StrategyFallbackExternal: + case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal: return true default: return false @@ -75,9 +73,6 @@ type UploadJobPayload struct { GenerateMP4 bool Encryption *EncryptionPayload InputFileInfo video.InputVideo - SignedSourceURL string - InFallbackMode bool - LivepeerSupported bool SourceCopy bool } @@ -96,16 +91,12 @@ type UploadJobResult struct { type JobInfo struct { mu sync.Mutex UploadJobPayload + PipelineInfo StreamName string // this is only set&used internally in the mist pipeline SegmentingTargetURL string - SourceOutputURL string - handler Handler - hasFallback bool statusClient clients.TranscodeStatusClient - startTime time.Time - result chan bool SourcePlaybackDone time.Time DownloadDone time.Time @@ -126,12 +117,24 @@ type JobInfo struct { sourceSampleRate int sourceSampleBits int - transcodedSegments int targetSegmentSizeSecs int64 - pipeline string catalystRegion string numProfiles int - state string + inFallbackMode bool + SignedSourceURL string + LivepeerSupported bool +} + +// PipelineInfo represents the state of an individual pipeline, i.e. ffmpeg or mediaconvert +// These fields have been split out from JobInfo to ensure that they are zeroed out in startOneUploadJob() when a fallback pipeline runs +type PipelineInfo struct { + startTime time.Time + result chan bool + handler Handler + hasFallback bool + transcodedSegments int + pipeline string + state string } func (j *JobInfo) ReportProgress(stage clients.TranscodeStatus, completionRatio float64) { @@ -234,18 +237,23 @@ func NewStubCoordinatorOpts(strategy Strategy, statusClient clients.TranscodeSta // This has the main logic regarding the pipeline strategy. It starts jobs and // handles processing the response and triggering a fallback if appropriate. func (c *Coordinator) StartUploadJob(p UploadJobPayload) { - // A bit hacky - this is effectively a dummy job object to allow us to reuse the runHandlerAsync and - // progress reporting logic. The real job objects still get created in startOneUploadJob(). + streamName := config.SegmentingStreamName(p.RequestID) + log.AddContext(p.RequestID, "stream_name", streamName) si := &JobInfo{ UploadJobPayload: p, statusClient: c.statusClient, - startTime: time.Now(), + StreamName: streamName, numProfiles: len(p.Profiles), - state: "segmenting", catalystRegion: os.Getenv("MY_REGION"), + PipelineInfo: PipelineInfo{ + startTime: time.Now(), + state: "segmenting", + }, } si.ReportProgress(clients.TranscodeStatusPreparing, 0) + c.Jobs.Store(streamName, si) + log.Log(si.RequestID, "Wrote to jobs cache") c.runHandlerAsync(si, func() (*HandlerOutput, error) { sourceURL, err := url.Parse(p.SourceFile) @@ -275,15 +283,16 @@ func (c *Coordinator) StartUploadJob(p UploadJobPayload) { return nil, fmt.Errorf("error copying input to storage: %w", err) } - p.SourceFile = osTransferURL.String() // OS URL used by mist - p.SignedSourceURL = signedNewSourceURL // http(s) URL used by mediaconvert - p.InputFileInfo = inputVideoProbe - p.GenerateMP4 = ShouldGenerateMP4(sourceURL, p.Mp4TargetURL, p.FragMp4TargetURL, p.Mp4OnlyShort, p.InputFileInfo.Duration) + si.SourceFile = osTransferURL.String() // OS URL used by mist + si.SignedSourceURL = signedNewSourceURL // http(s) URL used by mediaconvert + si.InputFileInfo = inputVideoProbe + si.GenerateMP4 = ShouldGenerateMP4(sourceURL, p.Mp4TargetURL, p.FragMp4TargetURL, p.Mp4OnlyShort, si.InputFileInfo.Duration) + si.DownloadDone = time.Now() - log.AddContext(p.RequestID, "new_source_url", p.SourceFile) - log.AddContext(p.RequestID, "signed_url", p.SignedSourceURL) + log.AddContext(p.RequestID, "new_source_url", si.SourceFile) + log.AddContext(p.RequestID, "signed_url", si.SignedSourceURL) - c.startUploadJob(p) + c.startUploadJob(si) return nil, nil }) } @@ -307,7 +316,7 @@ func ShouldGenerateMP4(sourceURL, mp4TargetUrl *url.URL, fragMp4TargetUrl *url.U return false } -func (c *Coordinator) startUploadJob(p UploadJobPayload) { +func (c *Coordinator) startUploadJob(p *JobInfo) { strategy := c.strategy if p.PipelineStrategy.IsValid() { strategy = p.PipelineStrategy @@ -318,20 +327,17 @@ func (c *Coordinator) startUploadJob(p UploadJobPayload) { switch strategy { case StrategyExternalDominance: - c.startOneUploadJob(p, c.pipeExternal, true, false) + c.startOneUploadJob(p, c.pipeExternal, false) case StrategyCatalystFfmpegDominance: - c.startOneUploadJob(p, c.pipeFfmpeg, true, false) - case StrategyBackgroundExternal: - c.startOneUploadJob(p, c.pipeFfmpeg, true, false) - c.startOneUploadJob(p, c.pipeExternal, false, false) + c.startOneUploadJob(p, c.pipeFfmpeg, false) case StrategyFallbackExternal: // nolint:errcheck go recovered(func() (t bool, e error) { - success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true, true) + success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true) if !success { - p.InFallbackMode = true + p.inFallbackMode = true log.Log(p.RequestID, "Entering fallback pipeline") - c.startOneUploadJob(p, c.pipeExternal, true, false) + c.startOneUploadJob(p, c.pipeExternal, false) } return }) @@ -341,6 +347,11 @@ func (c *Coordinator) startUploadJob(p UploadJobPayload) { // checkLivepeerCompatible checks if the input codecs are compatible with our Livepeer pipeline and overrides the pipeline strategy // to external if they are incompatible func checkLivepeerCompatible(requestID string, strategy Strategy, iv video.InputVideo) (bool, Strategy) { + if _, err := iv.GetTrack(video.TrackTypeVideo); err != nil { + log.Log(requestID, "audio-only inputs not supported by Livepeer pipeline") + return livepeerNotSupported(strategy) + } + for _, track := range iv.Tracks { // if the codecs are not compatible then override to external pipeline to avoid sending to Livepeer if (track.Type == video.TrackTypeVideo && strings.ToLower(track.Codec) != "h264") || @@ -406,18 +417,8 @@ func checkDisplayAspectRatio(track video.InputTrack, requestID string) bool { // The `hasFallback` argument means the caller has a special logic to handle // failures (today this means re-running the job in another pipeline). If it's // set to true, error callbacks from this job will not be sent. -func (c *Coordinator) startOneUploadJob(p UploadJobPayload, handler Handler, foreground, hasFallback bool) <-chan bool { - if !foreground { - p.RequestID = fmt.Sprintf("bg_%s", p.RequestID) - if p.HlsTargetURL != nil { - p.HlsTargetURL = p.HlsTargetURL.JoinPath("..", handler.Name(), path.Base(p.HlsTargetURL.Path)) - } - // this will prevent the callbacks for this job from actually being sent - p.CallbackURL = "" - } - streamName := config.SegmentingStreamName(p.RequestID) - log.AddContext(p.RequestID, "stream_name", streamName) - log.AddContext(p.RequestID, "handler", handler.Name()) +func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, hasFallback bool) <-chan bool { + log.AddContext(si.RequestID, "handler", handler.Name()) var pipeline = handler.Name() if pipeline == "external" { @@ -426,51 +427,45 @@ func (c *Coordinator) startOneUploadJob(p UploadJobPayload, handler Handler, for // Codecs are parsed here primarily to write codec stats for each job var videoCodec, audioCodec string - videoTrack, err := p.InputFileInfo.GetTrack(video.TrackTypeVideo) + videoTrack, err := si.InputFileInfo.GetTrack(video.TrackTypeVideo) if err != nil { videoCodec = "n/a" } else { videoCodec = videoTrack.Codec } - audioTrack, err := p.InputFileInfo.GetTrack(video.TrackTypeAudio) + audioTrack, err := si.InputFileInfo.GetTrack(video.TrackTypeAudio) if err != nil { audioCodec = "n/a" } else { audioCodec = audioTrack.Codec } - si := &JobInfo{ - UploadJobPayload: p, - StreamName: streamName, - handler: handler, - hasFallback: hasFallback, - statusClient: c.statusClient, - startTime: time.Now(), - result: make(chan bool, 1), - - pipeline: pipeline, - numProfiles: len(p.Profiles), - state: "segmenting", - transcodedSegments: 0, - targetSegmentSizeSecs: p.TargetSegmentSizeSecs, - catalystRegion: os.Getenv("MY_REGION"), - sourceCodecVideo: videoCodec, - sourceCodecAudio: audioCodec, - sourceWidth: videoTrack.Width, - sourceHeight: videoTrack.Height, - sourceFPS: videoTrack.FPS, - sourceBitrateVideo: videoTrack.Bitrate, - sourceBitrateAudio: audioTrack.Bitrate, - sourceChannels: audioTrack.Channels, - sourceSampleRate: audioTrack.SampleRate, - sourceSampleBits: audioTrack.SampleBits, - sourceBytes: p.InputFileInfo.SizeBytes, - sourceDurationMs: int64(math.Round(p.InputFileInfo.Duration) * 1000), - DownloadDone: time.Now(), - } - si.ReportProgress(clients.TranscodeStatusPreparing, 0) + si.PipelineInfo = PipelineInfo{ + startTime: time.Now(), + result: make(chan bool, 1), + handler: handler, + hasFallback: hasFallback, + transcodedSegments: 0, + pipeline: pipeline, + state: "segmenting", + } + + si.targetSegmentSizeSecs = si.TargetSegmentSizeSecs + si.sourceBytes = si.InputFileInfo.SizeBytes + si.sourceDurationMs = int64(math.Round(si.InputFileInfo.Duration) * 1000) + si.sourceCodecVideo = videoCodec + si.sourceCodecAudio = audioCodec + si.sourceWidth = videoTrack.Width + si.sourceHeight = videoTrack.Height + si.sourceFPS = videoTrack.FPS + si.sourceBitrateVideo = videoTrack.Bitrate + si.sourceBitrateAudio = audioTrack.Bitrate + si.sourceChannels = audioTrack.Channels + si.sourceSampleRate = audioTrack.SampleRate + si.sourceSampleBits = audioTrack.SampleBits - c.Jobs.Store(streamName, si) + si.ReportProgress(clients.TranscodeStatusPreparing, 0) + c.Jobs.Store(si.StreamName, si) log.Log(si.RequestID, "Wrote to jobs cache") c.runHandlerAsync(si, func() (*HandlerOutput, error) { @@ -533,7 +528,7 @@ func (c *Coordinator) finishJob(job *JobInfo, out *HandlerOutput, err error) { fmt.Sprint(job.numProfiles), job.state, config.Version, - strconv.FormatBool(job.InFallbackMode), + strconv.FormatBool(job.inFallbackMode), strconv.FormatBool(job.LivepeerSupported), } @@ -580,7 +575,7 @@ func (c *Coordinator) sendDBMetrics(job *JobInfo, out *HandlerOutput) { // If it's a fallback, we want a unique Request ID so that it doesn't clash with the row that's already been created for the first pipeline metricsRequestID := job.RequestID - if job.InFallbackMode { + if job.inFallbackMode { metricsRequestID = "fb_" + metricsRequestID } @@ -631,7 +626,7 @@ func (c *Coordinator) sendDBMetrics(job *JobInfo, out *HandlerOutput) { job.sourceDurationMs, log.RedactURL(job.SourceFile), targetURL, - job.InFallbackMode, + job.inFallbackMode, job.SourcePlaybackDone.Unix(), job.DownloadDone.Unix(), job.SegmentingDone.Unix(), diff --git a/pipeline/coordinator_test.go b/pipeline/coordinator_test.go index 6cd6afcdf..d60dd1f20 100644 --- a/pipeline/coordinator_test.go +++ b/pipeline/coordinator_test.go @@ -173,64 +173,6 @@ func TestCoordinatorSourceCopy(t *testing.T) { require.Zero(len(calls)) } -func TestCoordinatorBackgroundJobsStrategies(t *testing.T) { - require := require.New(t) - - callbackHandler, callbacks := callbacksRecorder() - fgHandler, foregroundCalls := recordingHandler(nil) - backgroundCalls := make(chan *JobInfo, 10) - bgHandler := &StubHandler{ - handleStartUploadJob: func(job *JobInfo) (*HandlerOutput, error) { - backgroundCalls <- job - // Test that background job is really hidden: status callbacks are not reported (empty URL) - job.ReportProgress(clients.TranscodeStatusPreparing, 0.2) - - time.Sleep(1 * time.Second) - require.Zero(len(callbacks)) - return testHandlerResult, nil - }, - } - - doTest := func(strategy Strategy) { - var coord *Coordinator - if strategy == StrategyBackgroundExternal { - coord = NewStubCoordinatorOpts(strategy, callbackHandler, fgHandler, bgHandler, "") - } else { - t.Fatalf("Unexpected strategy: %s", strategy) - } - - inputFile, _, cleanup := setupTransferDir(t, coord) - defer cleanup() - job := testJob - job.SourceFile = "file://" + inputFile.Name() - coord.StartUploadJob(job) - - requireReceive(t, callbacks, 1*time.Second) // discard initial TranscodeStatusPreparing message - msg := requireReceive(t, callbacks, 1*time.Second) - require.NotZero(msg.URL) - require.Equal(clients.TranscodeStatusPreparing, msg.Status) - - fgJob := requireReceive(t, foregroundCalls, 1*time.Second) - require.Equal("123", fgJob.RequestID) - bgJob := requireReceive(t, backgroundCalls, 1*time.Second) - require.Equal("bg_123", bgJob.RequestID) - require.NotEqual(fgJob.StreamName, bgJob.StreamName) - - // Test that foreground job is the real one: status callbacks ARE reported - msg = requireReceive(t, callbacks, 1*time.Second) - require.NotZero(msg.URL) - require.Equal(clients.TranscodeStatusCompleted, msg.Status) - require.Equal("123", msg.RequestID) - - time.Sleep(1 * time.Second) - require.Zero(len(foregroundCalls)) - require.Zero(len(backgroundCalls)) - require.Zero(len(callbacks)) - } - - doTest(StrategyBackgroundExternal) -} - func TestCoordinatorFallbackStrategySuccess(t *testing.T) { require := require.New(t) @@ -336,7 +278,7 @@ func TestAllowsOverridingStrategyOnRequest(t *testing.T) { defer cleanup() // Override the strategy to background external, which will call the external provider *and* the ffmpeg provider p := testJob - p.PipelineStrategy = StrategyBackgroundExternal + p.PipelineStrategy = StrategyFallbackExternal p.SourceFile = "file://" + inputFile.Name() coord.StartUploadJob(p) @@ -347,8 +289,8 @@ func TestAllowsOverridingStrategyOnRequest(t *testing.T) { // Sanity check that ffmpeg also ran externalJob := requireReceive(t, externalCalls, 1*time.Second) - require.Equal("bg_"+ffmpegJob.RequestID, externalJob.RequestID) - require.Equal("catalyst_vod_bg_"+ffmpegJob.RequestID, externalJob.StreamName) + require.Equal(ffmpegJob.RequestID, externalJob.RequestID) + require.Equal("catalyst_vod_"+ffmpegJob.RequestID, externalJob.StreamName) } func setJobInfoFields(job *JobInfo) { @@ -385,7 +327,7 @@ func TestPipelineCollectedMetrics(t *testing.T) { db, dbMock, err := sqlmock.New() require.NoError(err) - coord := NewStubCoordinatorOpts(StrategyBackgroundExternal, callbackHandler, ffmpeg, external, "") + coord := NewStubCoordinatorOpts(StrategyFallbackExternal, callbackHandler, ffmpeg, external, "") coord.MetricsDB = db inputFile, transferDir, cleanup := setupTransferDir(t, coord) @@ -396,7 +338,7 @@ func TestPipelineCollectedMetrics(t *testing.T) { dbMock. ExpectExec("insert into \"vod_completed\".*"). - WithArgs(sqlmock.AnyArg(), 0, sqlmock.AnyArg(), sqlmock.AnyArg(), "vid codec", "audio codec", "stub", "test region", "completed", 1, sqlmock.AnyArg(), 2, 3, 4, 5, sourceFile, "s3+https://user:xxxxx@storage.google.com/bucket/key", false, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). + WithArgs(sqlmock.AnyArg(), 0, "123", sqlmock.AnyArg(), "vid codec", "audio codec", "stub", "test region", "completed", 1, sqlmock.AnyArg(), 2, 3, 4, 5, sourceFile, "s3+https://user:xxxxx@storage.google.com/bucket/key", false, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(1, 1)) coord.StartUploadJob(job) @@ -463,7 +405,7 @@ func Test_ProbeErrors(t *testing.T) { { name: "audio only", assetType: "audio", - expectedErr: "error copying input to storage: no video track found in input video: no 'video' tracks found", + expectedErr: "", }, { name: "filesize greater than max", diff --git a/pipeline/external.go b/pipeline/external.go index 681916034..4f7a59513 100644 --- a/pipeline/external.go +++ b/pipeline/external.go @@ -18,6 +18,10 @@ func (m *external) Name() string { } func (e *external) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { + if e == nil || e.transcoder == nil { + return nil, fmt.Errorf("no external transcoder configured") + } + sourceFileUrl, err := url.Parse(job.SignedSourceURL) if err != nil { return nil, fmt.Errorf("invalid source file URL: %w", err) diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index c26f1710e..2e9d4b726 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -52,7 +52,6 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { sourceOutputURL := sourceOutputBaseURL.JoinPath(job.RequestID) segmentingTargetURL := sourceOutputURL.JoinPath(config.SEGMENTING_SUBDIR, config.SEGMENTING_TARGET_MANIFEST) - job.SourceOutputURL = sourceOutputURL.String() job.SegmentingTargetURL = segmentingTargetURL.String() log.AddContext(job.RequestID, "segmented_url", job.SegmentingTargetURL) job.ReportProgress(clients.TranscodeStatusPreparing, 0.3) @@ -79,7 +78,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { TranscodeAPIUrl: job.TranscodeAPIUrl, Profiles: job.Profiles, SourceManifestURL: job.SegmentingTargetURL, - SourceOutputURL: job.SourceOutputURL, + SourceOutputURL: sourceOutputURL.String(), HlsTargetURL: toStr(job.HlsTargetURL), Mp4TargetUrl: toStr(job.Mp4TargetURL), FragMp4TargetUrl: toStr(job.FragMp4TargetURL), diff --git a/test/cucumber_test.go b/test/cucumber_test.go index 91b2afa48..0e153cf04 100644 --- a/test/cucumber_test.go +++ b/test/cucumber_test.go @@ -13,8 +13,6 @@ import ( var baseURL = "http://127.0.0.1:18989" var baseInternalURL = "http://127.0.0.1:17979" -var sourceOutputDir string -var app *exec.Cmd func init() { // Build the app @@ -46,49 +44,14 @@ func init() { } } -func startApp() error { - - sourceOutputDir = fmt.Sprintf("file://%s/%s/", os.TempDir(), "livepeer/source") - app = exec.Command( - "./app", - "-http-addr=127.0.0.1:18989", - "-http-internal-addr=127.0.0.1:17979", - "-cluster-addr=127.0.0.1:19935", - "-broadcaster-url=http://127.0.0.1:18935", - `-metrics-db-connection-string=`+steps.DB_CONNECTION_STRING, - "-private-bucket", - "fixtures/playback-bucket", - "-gate-url=http://localhost:13000/api/access-control/gate", - "-source-output", - sourceOutputDir, - "-no-mist", - ) - outfile, err := os.Create("logs/app.log") - if err != nil { - return err - } - defer outfile.Close() - app.Stdout = outfile - app.Stderr = outfile - if err := app.Start(); err != nil { - return err - } - - // Wait for app to start - steps.WaitForStartup(baseURL + "/ok") - - return nil -} - func InitializeScenario(ctx *godog.ScenarioContext) { // Allows our steps to share data between themselves, e.g the response of the last HTTP call (which future steps can check is correct) var stepContext = steps.StepContext{ BaseURL: baseURL, BaseInternalURL: baseInternalURL, - SourceOutputDir: sourceOutputDir, } - ctx.Step(`^the VOD API is running$`, startApp) + ctx.Step(`^the VOD API is running$`, stepContext.StartApp) ctx.Step(`^the Client app is authenticated$`, stepContext.SetAuthHeaders) ctx.Step(`^an object store is available$`, stepContext.StartObjectStore) ctx.Step(`^Studio API server is running at "([^"]*)"$`, stepContext.StartStudioAPI) @@ -96,7 +59,6 @@ func InitializeScenario(ctx *godog.ScenarioContext) { ctx.Step(`^a Broadcaster is running at "([^"]*)"$`, stepContext.StartBroadcaster) ctx.Step(`^a Postgres database is running$`, stepContext.StartDatabase) ctx.Step(`^a callback server is running at "([^"]*)"$`, stepContext.StartCallbackHandler) - ctx.Step(`^I query the "([^"]*)" endpoint( with "([^"]*)")?$`, stepContext.CreateRequest) ctx.Step(`^I query the internal "([^"]*)" endpoint$`, stepContext.CreateGetRequestInternal) ctx.Step(`^I submit to the "([^"]*)" endpoint with "([^"]*)"$`, stepContext.CreatePostRequest) @@ -122,12 +84,16 @@ func InitializeScenario(ctx *godog.ScenarioContext) { ctx.Step(`^a source copy (has|has not) been written to disk$`, stepContext.SourceCopyWrittenToDisk) ctx.Step(`^a row is written to the database containing the following values$`, stepContext.CheckDatabase) + // Mediaconvert Steps + ctx.Step(`^Mediaconvert is running at "([^"]*)"$`, stepContext.StartMediaconvert) + ctx.Step(`^Mediaconvert receives a valid job creation request within "([^"]*)" seconds$`, stepContext.MediaconvertReceivesAValidRequestJobCreationRequest) + ctx.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) { - if app != nil && app.Process != nil { - if err := app.Process.Kill(); err != nil { + if steps.App != nil && steps.App.Process != nil { + if err := steps.App.Process.Kill(); err != nil { fmt.Println("Error while killing app process:", err.Error()) } - if err := app.Wait(); err != nil { + if err := steps.App.Wait(); err != nil { if err.Error() != "signal: killed" { fmt.Println("Error while waiting for app to exit:", err.Error()) } @@ -139,6 +105,7 @@ func InitializeScenario(ctx *godog.ScenarioContext) { _ = stepContext.MinioAdmin.ServiceStop(ctx) } _ = stepContext.Broadcaster.Shutdown(ctx) + _ = stepContext.Mediaconvert.Shutdown(ctx) _ = stepContext.CallbackHandler.Shutdown(ctx) if stepContext.Database != nil { _ = stepContext.Database.Stop() diff --git a/test/features/vod.feature b/test/features/vod.feature index 6ed5b587c..ad1eb7efd 100644 --- a/test/features/vod.feature +++ b/test/features/vod.feature @@ -9,6 +9,7 @@ Feature: VOD Streaming And an object store is available And Studio API server is running at "localhost:13000" And a Broadcaster is running at "localhost:18935" + And Mediaconvert is running at "localhost:11111" And a callback server is running at "localhost:3333" And ffmpeg is available @@ -63,3 +64,14 @@ Feature: VOD Streaming | payload | | a valid ffmpeg upload vod request with a source manifest | | a valid ffmpeg upload vod request with a source manifest and source copying | + + Scenario Outline: Submit an audio-only asset for ingestion + When I submit to the internal "/api/vod" endpoint with "" + And receive a response within "3" seconds + Then I get an HTTP response with code "200" + And I receive a Request ID in the response body + And Mediaconvert receives a valid job creation request within "5" seconds + + Examples: + | payload | + | a valid upload vod request (audio-only) | diff --git a/test/fixtures/audio.mp4 b/test/fixtures/audio.mp4 new file mode 100644 index 000000000..02533f113 Binary files /dev/null and b/test/fixtures/audio.mp4 differ diff --git a/test/steps/http.go b/test/steps/http.go index f8108aada..b72309787 100644 --- a/test/steps/http.go +++ b/test/steps/http.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "os" + "os/exec" "path" "path/filepath" "regexp" @@ -16,6 +17,8 @@ import ( "github.com/cucumber/godog" ) +var App *exec.Cmd + type VODUploadResponse struct { RequestID string `json:"request_id"` } @@ -73,7 +76,11 @@ func (s *StepContext) postRequest(baseURL, endpoint, payload string, headers map if err != nil { return fmt.Errorf("failed to create a source file: %s", err) } - sourceBytes, err := os.ReadFile("fixtures/tiny.mp4") + var sourceFixture = "fixtures/tiny.mp4" + if payload == "a valid upload vod request (audio-only)" { + sourceFixture = "fixtures/audio.mp4" + } + sourceBytes, err := os.ReadFile(sourceFixture) if err != nil { return fmt.Errorf("failed to read example source file: %s", err) } @@ -106,8 +113,9 @@ func (s *StepContext) postRequest(baseURL, endpoint, payload string, headers map } s.TranscodedOutputDir = destinationDir - if payload == "a valid upload vod request" { + if strings.HasPrefix(payload, "a valid upload vod request") { req := DefaultUploadRequest + req.PipelineStrategy = "fallback_external" req.URL = "file://" + sourceFile.Name() if payload, err = req.ToJSON(); err != nil { return fmt.Errorf("failed to build upload request JSON: %s", err) @@ -128,6 +136,9 @@ func (s *StepContext) postRequest(baseURL, endpoint, payload string, headers map }, }, } + if strings.Contains(payload, "and fmp4") { + req.OutputLocations[0].Outputs.FMP4 = "enabled" + } if payload, err = req.ToJSON(); err != nil { return fmt.Errorf("failed to build upload request JSON: %s", err) } @@ -169,6 +180,41 @@ func (s *StepContext) postRequest(baseURL, endpoint, payload string, headers map return nil } +func (s *StepContext) StartApp() error { + s.SourceOutputDir = fmt.Sprintf("file://%s/%s/", os.TempDir(), "livepeer/source") + + App = exec.Command( + "./app", + "-http-addr=127.0.0.1:18989", + "-http-internal-addr=127.0.0.1:17979", + "-cluster-addr=127.0.0.1:19935", + "-broadcaster-url=http://127.0.0.1:18935", + `-metrics-db-connection-string=`+DB_CONNECTION_STRING, + "-private-bucket", + "fixtures/playback-bucket", + "-gate-url=http://localhost:13000/api/access-control/gate", + "-external-transcoder=mediaconverthttp://examplekey:examplepass@127.0.0.1:11111?region=us-east-1&role=arn:aws:iam::exampleaccountid:examplerole&s3_aux_bucket=s3://example-bucket", + "-source-output", + s.SourceOutputDir, + "-no-mist", + ) + outfile, err := os.Create("logs/app.log") + if err != nil { + return err + } + defer outfile.Close() + App.Stdout = outfile + App.Stderr = outfile + if err := App.Start(); err != nil { + return err + } + + // Wait for app to start + WaitForStartup(s.BaseURL + "/ok") + + return nil +} + func (s *StepContext) SetAuthHeaders() { s.authHeaders = "Bearer IAmAuthorized" } diff --git a/test/steps/mediaconvert.go b/test/steps/mediaconvert.go new file mode 100644 index 000000000..df70f7b19 --- /dev/null +++ b/test/steps/mediaconvert.go @@ -0,0 +1,148 @@ +package steps + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/julienschmidt/httprouter" +) + +type JobCreationRequest struct { + AccelerationSettings struct { + Mode string `json:"mode"` + } `json:"accelerationSettings"` + ClientRequestToken string `json:"clientRequestToken"` + Role string `json:"role"` + Settings struct { + Inputs []struct { + AudioSelectors struct { + AudioSelector1 struct { + DefaultSelection string `json:"defaultSelection"` + } `json:"Audio Selector 1"` + } `json:"audioSelectors"` + FileInput string `json:"fileInput"` + TimecodeSource string `json:"timecodeSource"` + VideoSelector struct { + Rotate string `json:"rotate"` + } `json:"videoSelector"` + } `json:"inputs"` + OutputGroups []struct { + CustomName string `json:"customName"` + Name string `json:"name"` + OutputGroupSettings struct { + HlsGroupSettings struct { + Destination string `json:"destination"` + MinSegmentLength int `json:"minSegmentLength"` + SegmentLength int `json:"segmentLength"` + } `json:"hlsGroupSettings"` + Type string `json:"type"` + } `json:"outputGroupSettings"` + Outputs []struct { + AudioDescriptions []struct { + CodecSettings struct { + AacSettings struct { + Bitrate int `json:"bitrate"` + CodingMode string `json:"codingMode"` + SampleRate int `json:"sampleRate"` + } `json:"aacSettings"` + Codec string `json:"codec"` + } `json:"codecSettings"` + } `json:"audioDescriptions"` + ContainerSettings struct { + Container string `json:"container"` + } `json:"containerSettings"` + NameModifier string `json:"nameModifier"` + } `json:"outputs"` + } `json:"outputGroups"` + TimecodeConfig struct { + Source string `json:"source"` + } `json:"timecodeConfig"` + } `json:"settings"` +} + +func (s *StepContext) StartMediaconvert(listen string) error { + router := httprouter.New() + router.POST("/2017-08-29/jobs", func(w http.ResponseWriter, r *http.Request, params httprouter.Params) { + requestBody, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "couldn't read mediaconvert CreateJob request body:"+err.Error(), http.StatusInternalServerError) + return + } + s.MediaconvertJobsReceived = append(s.MediaconvertJobsReceived, requestBody) + + body, err := json.Marshal(map[string]interface{}{ + "job": map[string]string{ + "id": "job-id-123", + }, + }) + if err != nil { + http.Error(w, "couldn't marshal mediaconvert CreateJobOutput:"+err.Error(), http.StatusInternalServerError) + return + } + if _, err := w.Write(body); err != nil { + http.Error(w, "couldn't write mediaconvert response:"+err.Error(), http.StatusInternalServerError) + return + } + }) + router.GET("/2017-08-29/jobs/:id", func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { + id := params.ByName("id") + body, err := json.Marshal(map[string]interface{}{ + "job": map[string]string{ + "id": id, + "status": "COMPLETE", + }, + }) + if err != nil { + http.Error(w, "couldn't marshal mediaconvert CreateJobOutput:"+err.Error(), http.StatusInternalServerError) + return + } + if _, err := w.Write(body); err != nil { + http.Error(w, "couldn't write mediaconvert response:"+err.Error(), http.StatusInternalServerError) + return + } + }) + router.GET("/ok", func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) { + w.WriteHeader(http.StatusOK) + }) + + s.Mediaconvert = http.Server{ + Addr: listen, + Handler: router, + } + go func() { + _ = s.Mediaconvert.ListenAndServe() + }() + + WaitForStartup("http://" + listen + "/ok") + return nil +} + +func (s *StepContext) MediaconvertReceivesAValidRequestJobCreationRequest(withinSecs int) error { + for x := 0; x < withinSecs; x++ { + if len(s.MediaconvertJobsReceived) == 1 { + var job JobCreationRequest + if err := json.Unmarshal(s.MediaconvertJobsReceived[0], &job); err != nil { + return fmt.Errorf("could not parse mediaconvert job creation request: %w", err) + } + + if len(job.Settings.OutputGroups) != 1 { + return fmt.Errorf("only expected 1 output group in the mediaconvert job creation request but received %d", len(job.Settings.OutputGroups)) + } + + if len(job.Settings.OutputGroups[0].Outputs) != 1 { + return fmt.Errorf("only expected 1 output in the mediaconvert job creation request but received %d", len(job.Settings.OutputGroups[0].Outputs)) + } + + if job.Settings.OutputGroups[0].Outputs[0].NameModifier != "audioonly" { + return fmt.Errorf("expected an audioonly output but received %s", job.Settings.OutputGroups[0].Outputs[0].NameModifier) + } + + return nil + } + time.Sleep(time.Second) + } + return fmt.Errorf("did not receive a valid Mediaconvert job creation request with %d seconds (actually received %d)", withinSecs, len(s.MediaconvertJobsReceived)) +} diff --git a/test/steps/steps.go b/test/steps/steps.go index b6c83b1ae..7510262b3 100644 --- a/test/steps/steps.go +++ b/test/steps/steps.go @@ -21,6 +21,8 @@ type StepContext struct { TranscodedOutputDir string Studio http.Server Broadcaster http.Server + Mediaconvert http.Server + MediaconvertJobsReceived []([]byte) CallbackHandler http.Server Database *embeddedpostgres.EmbeddedPostgres BroadcasterSegmentsReceived map[string]int // Map of ManifestID -> Num Segments diff --git a/test/steps/upload_request.go b/test/steps/upload_request.go index 6195a238a..5bd43f375 100644 --- a/test/steps/upload_request.go +++ b/test/steps/upload_request.go @@ -7,6 +7,7 @@ import ( type Output struct { HLS string `json:"hls,omitempty"` MP4 string `json:"mp4,omitempty"` + FMP4 string `json:"fragmented_mp4,omitempty"` SourceMp4 bool `json:"source_mp4"` } diff --git a/video/probe.go b/video/probe.go index ae3e44bd9..bf33177ef 100644 --- a/video/probe.go +++ b/video/probe.go @@ -67,10 +67,33 @@ func (p Probe) runProbe(url string, ffProbeOptions ...string) (iv InputVideo, er } func parseProbeOutput(probeData *ffprobe.ProbeData) (InputVideo, error) { + // We rely on this being present to get required information about the input video, so error out if it isn't + if probeData.Format == nil { + return InputVideo{}, fmt.Errorf("error parsing input video: format information missing") + } + // parse filesize + size, err := strconv.ParseInt(probeData.Format.Size, 10, 64) + if err != nil { + return InputVideo{}, fmt.Errorf("error parsing filesize from probed data: %w", err) + } + // check for a valid video stream videoStream := probeData.FirstVideoStream() if videoStream == nil { - return InputVideo{}, errors.New("error checking for video: no video stream found") + audioStream := probeData.FirstAudioStream() + if audioStream == nil { + return InputVideo{}, errors.New("error checking for video: no video or audio stream found") + } + + // Audio-only stream + iv := InputVideo{ + Format: findFormat(probeData.Format.FormatName), + Tracks: []InputTrack{}, + Duration: float64(audioStream.DurationTs), + SizeBytes: size, + } + return addAudioTrack(probeData, iv) + } // check for unsupported video stream(s) for _, codec := range unsupportedVideoCodecList { @@ -81,10 +104,6 @@ func parseProbeOutput(probeData *ffprobe.ProbeData) (InputVideo, error) { if strings.ToLower(videoStream.CodecName) == "vp9" && strings.Contains(probeData.Format.FormatName, "mp4") { return InputVideo{}, fmt.Errorf("error checking for video: VP9 in an MP4 container is not supported") } - // We rely on this being present to get required information about the input video, so error out if it isn't - if probeData.Format == nil { - return InputVideo{}, fmt.Errorf("error parsing input video: format information missing") - } // parse bitrate bitRateValue := videoStream.BitRate if bitRateValue == "" { @@ -92,7 +111,6 @@ func parseProbeOutput(probeData *ffprobe.ProbeData) (InputVideo, error) { } var ( bitrate int64 - err error ) if bitRateValue == "" { bitrate = DefaultProfile720p.Bitrate @@ -107,11 +125,7 @@ func parseProbeOutput(probeData *ffprobe.ProbeData) (InputVideo, error) { // correct bitrates cannot be probed for hls manifests, so override with default bitrate bitrate = DefaultProfile720p.Bitrate } - // parse filesize - size, err := strconv.ParseInt(probeData.Format.Size, 10, 64) - if err != nil { - return InputVideo{}, fmt.Errorf("error parsing filesize from probed data: %w", err) - } + // parse fps fps, err := parseFps(videoStream.AvgFrameRate) if err != nil { diff --git a/video/probe_test.go b/video/probe_test.go index cb98d3208..f2c7648b9 100644 --- a/video/probe_test.go +++ b/video/probe_test.go @@ -7,19 +7,11 @@ import ( "gopkg.in/vansante/go-ffprobe.v2" ) -func TestItRejectsWhenNoVideoTrackPresent(t *testing.T) { - _, err := parseProbeOutput(&ffprobe.ProbeData{ - Streams: []*ffprobe.Stream{ - { - CodecType: "audio", - }, - }, - }) - require.ErrorContains(t, err, "no video stream found") -} - func TestItRejectsWhenMJPEGVideoTrackPresent(t *testing.T) { _, err := parseProbeOutput(&ffprobe.ProbeData{ + Format: &ffprobe.Format{ + Size: "1", + }, Streams: []*ffprobe.Stream{ { CodecType: "video", @@ -30,6 +22,9 @@ func TestItRejectsWhenMJPEGVideoTrackPresent(t *testing.T) { require.ErrorContains(t, err, "mjpeg is not supported") _, err = parseProbeOutput(&ffprobe.ProbeData{ + Format: &ffprobe.Format{ + Size: "1", + }, Streams: []*ffprobe.Stream{ { CodecType: "video",