Skip to content

Commit

Permalink
recordtester: Allow overriding catalyst strategy on VOD tests (#231)
Browse files Browse the repository at this point in the history
* vodtester: Allow overiding catalyst pipeline

* recordtester: Receive catalyst pipeline config in cli arg

* recordtester: Allow running only vod tests

* vodtester: Remove redundant ticker

We already use test duration for the ctx and continuousTest
for the pause, no need for that.

* vodtester: Improve log on waiting task

* vodtester: Fix progress log
victorges authored Dec 7, 2022
1 parent 7d72d3b commit bdee5b2
Showing 5 changed files with 31 additions and 25 deletions.
27 changes: 16 additions & 11 deletions cmd/recordtester/recordtester.go
Original file line number Diff line number Diff line change
@@ -59,7 +59,9 @@ func main() {
useHttp := fs.Bool("http", false, "Do HTTP tests instead of RTMP")
testMP4 := fs.Bool("mp4", false, "Download MP4 of recording")
testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test")
testLive := fs.Bool("live", false, "Check Live workflow")
testVod := fs.Bool("vod", false, "Check VOD workflow")
catalystPipelineStrategy := fs.String("catalyst-pipeline-strategy", "", "Which catalyst pipeline strategy to use regarding. The appropriate values are defined by catalyst-api itself.")
recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API")
discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel")
discordUserName := fs.String("discord-user-name", "", "User name to use when sending messages to Discord")
@@ -218,7 +220,8 @@ func main() {
TestStreamHealth: *testStreamHealth,
}
vtOpts := vodtester.VodTesterOptions{
API: lapi,
API: lapi,
CatalystPipelineStrategy: *catalystPipelineStrategy,
}
if *sim > 1 {
var testers []recordtester.IRecordTester
@@ -264,16 +267,18 @@ func main() {
metricServer := server.NewMetricsServer()
go metricServer.Start(gctx, *bind)
eg, egCtx := errgroup.WithContext(gctx)
eg.Go(func() error {
crtOpts := recordtester.ContinuousRecordTesterOptions{
PagerDutyIntegrationKey: *pagerDutyIntegrationKey,
PagerDutyComponent: *pagerDutyComponent,
PagerDutyLowUrgency: *pagerDutyLowUrgency,
RecordTesterOptions: rtOpts,
}
crt := recordtester.NewContinuousRecordTester(egCtx, crtOpts)
return crt.Start(fileName, *testDuration, *pauseDuration, *continuousTest)
})
if *testLive {
eg.Go(func() error {
crtOpts := recordtester.ContinuousRecordTesterOptions{
PagerDutyIntegrationKey: *pagerDutyIntegrationKey,
PagerDutyComponent: *pagerDutyComponent,
PagerDutyLowUrgency: *pagerDutyLowUrgency,
RecordTesterOptions: rtOpts,
}
crt := recordtester.NewContinuousRecordTester(egCtx, crtOpts)
return crt.Start(fileName, *testDuration, *pauseDuration, *continuousTest)
})
}
if *testVod {
eg.Go(func() error {
cvtOpts := vodtester.ContinuousVodTesterOptions{
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ require (
github.com/golang/glog v1.0.0
github.com/gosuri/uilive v0.0.3 // indirect
github.com/gosuri/uiprogress v0.0.1
github.com/livepeer/go-api-client v0.4.0
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5
github.com/livepeer/go-livepeer v0.5.31
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07
github.com/livepeer/leaderboard-serverless v1.0.0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -710,8 +710,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/livepeer/go-api-client v0.4.0 h1:uOd8ztWrbJwmf1HrmeX3fLXHWUuTmLOqYKxmpUdmxNI=
github.com/livepeer/go-api-client v0.4.0/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5 h1:sxyLVN5lD4JB5THu2+48BbhNTifJK67YvW8DyNuPBJI=
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw=
github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
4 changes: 1 addition & 3 deletions internal/app/vodtester/continuous_vod_tester.go
Original file line number Diff line number Diff line change
@@ -58,9 +58,7 @@ func NewContinuousVodTester(gctx context.Context, opts ContinuousVodTesterOption

func (cvt *continuousVodTester) Start(fileName string, vodImportUrl string, testDuration, taskPollDuration, pauseBetweenTests time.Duration) error {
messenger.SendMessage(fmt.Sprintf("Starting continuous vod test of %s", cvt.host))
ticker := time.NewTicker(testDuration)
defer ticker.Stop()
for range ticker.C {
for {
msg := fmt.Sprintf(":arrow_right: Starting %s vod test to %s", testDuration, cvt.host)
messenger.SendMessage(msg)

19 changes: 11 additions & 8 deletions internal/app/vodtester/vodtester_app.go
Original file line number Diff line number Diff line change
@@ -23,13 +23,15 @@ type (
}

VodTesterOptions struct {
API *api.Client
API *api.Client
CatalystPipelineStrategy string
}

vodTester struct {
ctx context.Context
cancel context.CancelFunc
lapi *api.Client
ctx context.Context
cancel context.CancelFunc
lapi *api.Client
catalystPipelineStrategy string
}
)

@@ -127,7 +129,7 @@ func (vt *vodTester) Start(fileName string, vodImportUrl string, taskPollDuratio

func (vt *vodTester) uploadViaUrlTester(vodImportUrl string, taskPollDuration time.Duration, assetName string) (*api.Asset, error) {

importAsset, importTask, err := vt.lapi.ImportAsset(vodImportUrl, assetName)
importAsset, importTask, err := vt.lapi.UploadViaURL(vodImportUrl, assetName, vt.catalystPipelineStrategy)
if err != nil {
glog.Errorf("Error importing asset err=%v", err)
return nil, fmt.Errorf("error importing asset: %w", err)
@@ -145,7 +147,7 @@ func (vt *vodTester) uploadViaUrlTester(vodImportUrl string, taskPollDuration ti
func (vt *vodTester) directUploadTester(fileName string, taskPollDuration time.Duration) error {
hostName, _ := os.Hostname()
assetName := fmt.Sprintf("vod_test_upload_direct_%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
requestUpload, err := vt.lapi.RequestUpload(assetName)
requestUpload, err := vt.lapi.RequestUpload(assetName, vt.catalystPipelineStrategy)

if err != nil {
glog.Errorf("Error requesting upload for assetName=%s err=%v", assetName, err)
@@ -184,7 +186,7 @@ func (vt *vodTester) resumableUploadTester(fileName string, taskPollDuration tim

hostName, _ := os.Hostname()
assetName := fmt.Sprintf("vod_test_upload_resumable_%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
requestUpload, err := vt.lapi.RequestUpload(assetName)
requestUpload, err := vt.lapi.RequestUpload(assetName, vt.catalystPipelineStrategy)

if err != nil {
glog.Errorf("Error requesting upload for assetName=%s err=%v", assetName, err)
@@ -223,7 +225,6 @@ func (vt *vodTester) resumableUploadTester(fileName string, taskPollDuration tim
func (vt *vodTester) checkTaskProcessing(taskPollDuration time.Duration, processingTask api.Task) error {
startTime := time.Now()
for {
glog.Infof("Waiting %s for task id=%s to be processed, elapsed=%s", taskPollDuration, processingTask.ID, time.Since(startTime))
time.Sleep(taskPollDuration)

if err := vt.isCancelled(); err != nil {
@@ -244,6 +245,8 @@ func (vt *vodTester) checkTaskProcessing(taskPollDuration time.Duration, process
glog.Errorf("Error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage)
return fmt.Errorf("error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage)
}

glog.Infof("Waiting for task to be processed id=%s pollWait=%s elapsed=%s progressPct=%.1f%%", task.ID, taskPollDuration, time.Since(startTime), 100*task.Status.Progress)
}
}

0 comments on commit bdee5b2

Please sign in to comment.