diff --git a/cmd/recordtester/recordtester.go b/cmd/recordtester/recordtester.go index 31b65875..0be2825a 100644 --- a/cmd/recordtester/recordtester.go +++ b/cmd/recordtester/recordtester.go @@ -177,6 +177,7 @@ func main() { catalystPipelineStrategy := fs.String("catalyst-pipeline-strategy", "", "Which catalyst pipeline strategy to use regarding. The appropriate values are defined by catalyst-api itself.") recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API") recordingSpecStr := fs.String("recording-spec", "", "JSON object with the `recordingSpec` field to use in the test streams. Forwarded to the streams created in the API") + skipSourcePlayback := fs.Bool("skip-source-playback", false, "Whether to skip the source playback check on recordings processing validation") // Discord related flags discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel") @@ -347,6 +348,7 @@ func main() { Ingest: ingest, RecordObjectStoreId: *recordObjectStoreId, RecordingSpec: recordingSpec, + SkipSourcePlayback: *skipSourcePlayback, UseForceURL: *forceRecordingUrl, RecordingWaitTime: *recordingWaitTime, UseHTTP: *useHttp, diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index 19fee3dd..9f870d49 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -53,6 +53,7 @@ type ( Ingest *api.Ingest RecordObjectStoreId string RecordingSpec *api.RecordingSpec + SkipSourcePlayback bool UseForceURL bool RecordingWaitTime time.Duration UseHTTP bool @@ -61,19 +62,11 @@ type ( } recordTester struct { - ctx context.Context - cancel context.CancelFunc - lapi *api.Client - lanalyzers testers.AnalyzerByRegion - ingest *api.Ingest - recordObjectStoreId string - recordingSpec *api.RecordingSpec - useForceURL bool - recordingWaitTime time.Duration - useHTTP bool - mp4 bool - streamHealth bool - serfOpts SerfOptions + RecordTesterOptions + serfOpts SerfOptions + + ctx context.Context + cancel context.CancelFunc // mutable fields streamID string @@ -86,18 +79,9 @@ type ( func NewRecordTester(gctx context.Context, opts RecordTesterOptions, serfOpts SerfOptions) IRecordTester { ctx, cancel := context.WithCancel(gctx) rt := &recordTester{ - lapi: opts.API, - lanalyzers: opts.Analyzers, - ingest: opts.Ingest, + RecordTesterOptions: opts, ctx: ctx, cancel: cancel, - recordObjectStoreId: opts.RecordObjectStoreId, - recordingSpec: opts.RecordingSpec, - useForceURL: opts.UseForceURL, - recordingWaitTime: opts.RecordingWaitTime, - useHTTP: opts.UseHTTP, - mp4: opts.TestMP4, - streamHealth: opts.TestStreamHealth, serfOpts: serfOpts, } return rt @@ -112,7 +96,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. } apiTry := 0 for { - broadcasters, err = rt.lapi.Broadcasters() + broadcasters, err = rt.API.Broadcasters() if err != nil { if testers.Timedout(err) && apiTry < 3 { apiTry++ @@ -126,9 +110,9 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. glog.V(model.DEBUG).Infof("Got broadcasters: %+v", broadcasters) glog.V(model.DEBUG).Infof("Streaming video file '%s'\n", fileName) - if rt.useHTTP && len(broadcasters) == 0 { + if rt.UseHTTP && len(broadcasters) == 0 { return 254, errors.New("empty list of broadcasters") - } else if (!rt.useHTTP && ingest.Ingest == "") || ingest.Playback == "" { + } else if (!rt.UseHTTP && ingest.Ingest == "") || ingest.Playback == "" { return 254, errors.New("empty ingest URLs") } @@ -136,11 +120,11 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00")) var stream *api.Stream for { - stream, err = rt.lapi.CreateStream(api.CreateStreamReq{ + stream, err = rt.API.CreateStream(api.CreateStreamReq{ Name: streamName, Record: true, - RecordingSpec: rt.recordingSpec, - RecordObjectStoreId: rt.recordObjectStoreId, + RecordingSpec: rt.RecordingSpec, + RecordObjectStoreId: rt.RecordObjectStoreId, }) if err != nil { if testers.Timedout(err) && apiTry < 3 { @@ -166,9 +150,9 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. rtmpURL := fmt.Sprintf("%s/%s", ingest.Ingest, stream.StreamKey) testerFuncs := []testers.StartTestFunc{} - if rt.streamHealth { + if rt.TestStreamHealth { testerFuncs = append(testerFuncs, func(ctx context.Context, mediaURL string, waitForTarget time.Duration, opts testers.Streamer2Options) testers.Finite { - return testers.NewStreamHealth(ctx, stream.ID, rt.lanalyzers, 2*time.Minute) + return testers.NewStreamHealth(ctx, stream.ID, rt.Analyzers, 2*time.Minute) }) } @@ -187,7 +171,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. } glog.V(model.SHORT).Infof("RTMP: %s streamId=%s playbackId=%s", rtmpURL, stream.ID, stream.PlaybackID) glog.V(model.SHORT).Infof("MEDIA: %s streamId=%s playbackId=%s", mediaURL, stream.ID, stream.PlaybackID) - if rt.useHTTP { + if rt.UseHTTP { sterr := rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream) if sterr != nil { glog.Warningf("Streaming returned error err=%v streamId=%s playbackId=%s", sterr, stream.ID, stream.PlaybackID) @@ -258,18 +242,18 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. } lapiNoAPIKey := api.NewAPIClient(api.ClientOptions{ - Server: rt.lapi.GetServer(), + Server: rt.API.GetServer(), AccessToken: "", // test playback info call without API key Timeout: 8 * time.Second, }) - if code, err := checkPlaybackInfo(stream.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil { + if code, err := checkPlaybackInfo(stream.PlaybackID, rt.API, lapiNoAPIKey); err != nil { return code, err } glog.Infof("Waiting 10 seconds. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID) time.Sleep(10 * time.Second) // now get sessions - sessions, err := rt.lapi.GetSessionsNew(stream.ID, false) + sessions, err := rt.API.GetSessionsNew(stream.ID, false) if err != nil { glog.Errorf("Error getting sessions err=%v streamId=%s playbackId=%s", err, stream.ID, stream.PlaybackID) return 252, err @@ -304,8 +288,8 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. glog.Infof("Streaming done, waiting for recording URL to appear. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID) - deadline := time.Now().Add(rt.recordingWaitTime) - if rt.useForceURL { + deadline := time.Now().Add(rt.RecordingWaitTime) + if rt.UseForceURL { deadline = time.Now().Add(5 * time.Second) } @@ -329,7 +313,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. for _, sess := range sessions { // currently the assetID is the same as the sessionID so we could just query on that but just in case that // ever changes, we can use the ListAssets call to find the asset - assets, _, err := rt.lapi.ListAssets(api.ListOptions{ + assets, _, err := rt.API.ListAssets(api.ListOptions{ Limit: 1, Filters: map[string]interface{}{ "sourceSessionId": sess.ID, @@ -347,7 +331,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. } asset := assets[0] - if code, err := checkPlaybackInfo(asset.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil { + if code, err := checkPlaybackInfo(asset.PlaybackID, rt.API, lapiNoAPIKey); err != nil { errCode, errs = code, append(errs, err) } else { // if we get playback before the processing is done it means source playback was provided @@ -362,12 +346,12 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. } } } - if !sourcePlayback { + if !sourcePlayback && !rt.SkipSourcePlayback { return 246, errors.New("source playback was not provided") } // check actual recordings playback - sessions, err = rt.lapi.GetSessionsNew(stream.ID, false) + sessions, err = rt.API.GetSessionsNew(stream.ID, false) if err != nil { glog.Errorf("Error getting sessions err=%v streamId=%s playbackId=%s", err, stream.ID, stream.PlaybackID) return 252, err @@ -383,7 +367,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. for _, sess := range sessions { statusShould := api.RecordingStatusReady - if rt.useForceURL { + if rt.UseForceURL { statusShould = api.RecordingStatusWaiting } if sess.RecordingStatus != statusShould { @@ -399,7 +383,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. if err = rt.isCancelled(); err != nil { return 0, err } - if rt.mp4 { + if rt.TestMP4 { es, err := rt.checkRecordingMp4(stream, sess.Mp4Url, testDuration) if err != nil { return es, err @@ -417,7 +401,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. glog.Infof("Done Record Test. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID) - rt.lapi.DeleteStream(stream.ID) + rt.API.DeleteStream(stream.ID) return 0, nil } @@ -440,14 +424,14 @@ func checkPlaybackInfo(playbackID string, withKey, withoutKey *api.Client) (int, } func (rt *recordTester) getIngestInfo() (*api.Ingest, error) { - if rt.ingest != nil { - return rt.ingest, nil + if rt.Ingest != nil { + return rt.Ingest, nil } var ingests []api.Ingest apiTry := 0 for { var err error - ingests, err = rt.lapi.Ingest(false) + ingests, err = rt.API.Ingest(false) if err != nil { if testers.Timedout(err) && apiTry < 3 { apiTry++ @@ -469,11 +453,11 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str var err error apiTry := 0 for { - session, err = rt.lapi.CreateStream(api.CreateStreamReq{ + session, err = rt.API.CreateStream(api.CreateStreamReq{ Name: streamName, Record: true, - RecordingSpec: rt.recordingSpec, - RecordObjectStoreId: rt.recordObjectStoreId, + RecordingSpec: rt.RecordingSpec, + RecordObjectStoreId: rt.RecordObjectStoreId, ParentID: stream.ID, }) if err != nil { @@ -558,7 +542,6 @@ func (rt *recordTester) checkRecordingMp4(stream *api.Stream, url string, stream } func (rt *recordTester) checkRecordingHls(stream *api.Stream, url string, streamDuration time.Duration) (int, error) { - es := 0 started := time.Now() downloader := testers.NewM3utester2(rt.ctx, url, false, false, false, false, 5*time.Second, nil, false) <-downloader.Done() @@ -568,24 +551,32 @@ func (rt *recordTester) checkRecordingHls(stream *api.Stream, url string, stream } vs := downloader.VODStats() rt.vodStats = vs - expectedProfiles := len(api.StandardProfiles) + 1 - if rt.recordingSpec != nil && rt.recordingSpec.Profiles != nil { - expectedProfiles = len(*rt.recordingSpec.Profiles) + 1 - } - if len(vs.SegmentsNum) != expectedProfiles { - glog.Warningf("Number of renditions doesn't match! Has %d should %d. streamId=%s playbackId=%s", len(vs.SegmentsNum), len(api.StandardProfiles)+1, stream.ID, stream.PlaybackID) - es = 35 + + numProfiles := len(vs.SegmentsNum) + if rt.RecordingSpec != nil && rt.RecordingSpec.Profiles != nil { + expectedProfiles := len(*rt.RecordingSpec.Profiles) + 1 + if numProfiles != expectedProfiles { + glog.Warningf("Number of renditions doesn't match! Has %d should %d. streamId=%s playbackId=%s", numProfiles, expectedProfiles, stream.ID, stream.PlaybackID) + return 35, fmt.Errorf("number of renditions doesn't match (expected: %d actual: %d)", expectedProfiles, numProfiles) + } + } else { + // if there's no explicit recording spec we can only expect there's at least 2 profiles (source and transcoded) + expectedProfiles := 2 + if numProfiles < expectedProfiles { + glog.Warningf("Number of renditions too low! Has %d should have at least %d. streamId=%s playbackId=%s", numProfiles, expectedProfiles, stream.ID, stream.PlaybackID) + return 35, fmt.Errorf("number of renditions too low (expected at least: %d actual: %d)", expectedProfiles, numProfiles) + } } + glog.V(model.DEBUG).Infof("Stats: %s streamId=%s playbackId=%s", vs.String(), stream.ID, stream.PlaybackID) glog.V(model.DEBUG).Infof("Stats raw: %+v streamId=%s playbackId=%s", vs, stream.ID, stream.PlaybackID) if ok, ers := vs.IsOk(streamDuration, false); !ok { glog.Warningf("NOT OK! (%s) streamId=%s playbackId=%s", ers, stream.ID, stream.PlaybackID) - es = 36 - return es, errors.New(ers) + return 36, errors.New(ers) } else { glog.Infof("All ok! streamId=%s playbackId=%s", stream.ID, stream.PlaybackID) } - return es, nil + return 0, nil } func (rt *recordTester) Cancel() { @@ -602,7 +593,7 @@ func (rt *recordTester) VODStats() model.VODStats { func (rt *recordTester) Clean() { if rt.streamID != "" { - rt.lapi.DeleteStream(rt.streamID) + rt.API.DeleteStream(rt.streamID) } }