diff --git a/Dockerfile.orch-tester b/Dockerfile.orch-tester index 673890f5..cace7686 100644 --- a/Dockerfile.orch-tester +++ b/Dockerfile.orch-tester @@ -15,7 +15,7 @@ RUN echo $version COPY . . -RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" -tags mainnet cmd/orch-tester/orch_tester.go +RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" -tags mainnet cmd/orch-tester/orch_tester.go cmd/orch-tester/broadcaster_metrics.go FROM debian:stretch-slim diff --git a/cmd/orch-tester/broadcaster_metrics.go b/cmd/orch-tester/broadcaster_metrics.go new file mode 100644 index 00000000..620d940b --- /dev/null +++ b/cmd/orch-tester/broadcaster_metrics.go @@ -0,0 +1,138 @@ +package main + +import ( + "go.opencensus.io/stats/view" + "sync" +) + +type broadcasterMetrics struct { + // Distribution metrics grouped by metric name + lastSum map[string]float64 + lastCount map[string]int + rateSum map[string]float64 + rateCount map[string]int + + // Error count metrics by grouped by error_code tag + lastErrs map[string]int + incErrs map[string]int + + mu sync.Mutex +} + +func newBroadcasterMetrics() *broadcasterMetrics { + return &broadcasterMetrics{ + lastSum: map[string]float64{}, + lastCount: map[string]int{}, + rateSum: map[string]float64{}, + rateCount: map[string]int{}, + lastErrs: map[string]int{}, + incErrs: map[string]int{}, + mu: sync.Mutex{}, + } +} + +func (bm *broadcasterMetrics) reset() { + bm.mu.Lock() + defer bm.mu.Unlock() + + bm.rateSum = map[string]float64{} + bm.rateCount = map[string]int{} + bm.incErrs = map[string]int{} +} + +func (bm broadcasterMetrics) ExportView(viewData *view.Data) { + bm.mu.Lock() + defer bm.mu.Unlock() + + name := viewData.View.Name + rows := viewData.Rows + if len(rows) <= 0 { + return + } + + bm.handleDistributionMetrics(rows, name) + bm.handleErrorCountMetrics(rows, name) +} + +func (bm broadcasterMetrics) handleDistributionMetrics(rows []*view.Row, name string) { + supportedMetrics := map[string]bool{ + "source_segment_duration_seconds": true, + "transcode_overall_latency_seconds": true, + "upload_time_seconds": true, + "download_time_seconds": true, + } + if !supportedMetrics[name] { + return + } + + // Sum metrics with different tags + var sum float64 + var count int + for _, r := range rows { + d, ok := r.Data.(*view.DistributionData) + if ok { + sum += d.Sum() + count += int(d.Count) + } + } + + // Get the rate between the last measurement + rateSum := sum - bm.lastSum[name] + rateCount := count - bm.lastCount[name] + + bm.rateSum[name] += rateSum + bm.rateCount[name] += rateCount + bm.lastSum[name] = sum + bm.lastCount[name] = count +} + +func (bm broadcasterMetrics) handleErrorCountMetrics(rows []*view.Row, name string) { + supportedMetrics := map[string]bool{ + "segment_source_upload_failed_total": true, + "discovery_errors_total": true, + "segment_transcode_failed_total": true, + } + if !supportedMetrics[name] { + return + } + + for _, r := range rows { + var errCode string + for _, t := range r.Tags { + if t.Key.Name() == "error_code" { + errCode = t.Value + } + } + if errCode == "" { + return + } + + d, ok := r.Data.(*view.CountData) + if ok { + inc := int(d.Value) - bm.lastErrs[errCode] + bm.incErrs[errCode] += inc + bm.lastErrs[errCode] = int(d.Value) + } + } +} + +func (bm *broadcasterMetrics) avg(m string) float64 { + bm.mu.Lock() + defer bm.mu.Unlock() + + if bm.rateCount[m] <= 0 { + return 0 + } + return bm.rateSum[m] / float64(bm.rateCount[m]) +} + +func (bm *broadcasterMetrics) errorCount() map[string]int { + bm.mu.Lock() + bm.mu.Unlock() + + res := map[string]int{} + for key, value := range bm.incErrs { + res[key] = value + } + return res +} diff --git a/cmd/orch-tester/orch_tester.go b/cmd/orch-tester/orch_tester.go index 56fdf45c..18039bc8 100644 --- a/cmd/orch-tester/orch_tester.go +++ b/cmd/orch-tester/orch_tester.go @@ -16,6 +16,7 @@ import ( streamtesterMetrics "github.com/livepeer/stream-tester/internal/metrics" "github.com/livepeer/stream-tester/internal/server" "github.com/peterbourgon/ff" + "go.opencensus.io/stats/view" "io/ioutil" "log" "math" @@ -78,7 +79,7 @@ func main() { region := flag.String("region", "", "Region this service is operating in") streamTester := flag.String("streamtester", "", "Address for stream-tester server instance") broadcaster := flag.String("broadcaster", "", "Broadcaster address") - metrics := flag.String("metrics", "127.0.0.1"+":"+prometheusPort, "Broadcaster metrics port") + metrics := flag.String("metrics", "", "Broadcaster metrics port") media := flag.String("media", bcastMediaPort, "Broadcaster HTTP port") rtmp := flag.String("rtmp", bcastRTMPPort, "broadcaster RTMP port") leaderboard := flag.String("leaderboard", "127.0.0.1:3001", "HTTP Address of the serverless leadearboard API") @@ -135,6 +136,13 @@ func main() { bcastHost = *broadcaster } + var embeddedBcastMetrics *broadcasterMetrics + if *streamTester == "" && *broadcaster == "" && *metrics == "" { + glog.Infof("Using embedded broadcaster metrics") + embeddedBcastMetrics = newBroadcasterMetrics() + view.RegisterExporter(embeddedBcastMetrics) + } + metricsURL := defaultAddr(*metrics, defaultHost, prometheusPort) streamTesterURL := defaultAddr(*streamTester, defaultHost, streamTesterPort) leaderboardURL := defaultAddr(*leaderboard, defaultHost, "3001") @@ -174,6 +182,9 @@ func main() { glog.Infof("Starting to test orchestrators") for _, o := range orchestrators { time.Sleep(refreshWait) + if embeddedBcastMetrics != nil { + embeddedBcastMetrics.reset() + } req := &streamerModel.StartStreamsReq{ Host: bcastHost, @@ -233,40 +244,17 @@ func main() { // This calculation requires HTTP ingest to be correct apiStats.SuccessRate = (float64(apiStats.SegmentsReceived) / float64(*numProfiles) / float64(apiStats.SegmentsSent)) - avgSegDuration, err := streamer.avgSegDuration() - if err != nil { - glog.Error(err) - } - apiStats.SegDuration = avgSegDuration + apiStats.SegDuration = avgMetric(streamer, embeddedBcastMetrics, "source_segment_duration_seconds") + apiStats.RoundTripTime = avgMetric(streamer, embeddedBcastMetrics, "transcode_overall_latency_seconds") + apiStats.UploadTime = avgMetric(streamer, embeddedBcastMetrics, "upload_time_seconds") + apiStats.DownloadTime = avgMetric(streamer, embeddedBcastMetrics, "download_time_seconds") - avgRoundTripTime, err := streamer.avgRoundTripTime() - if err != nil { - glog.Error(err) - } - apiStats.RoundTripTime = avgRoundTripTime - - avgUploadTime, err := streamer.avgUploadTime() - if err != nil { - glog.Error(err) - } - apiStats.UploadTime = avgUploadTime - - avgDownloadTime, err := streamer.avgDownloadTime() - if err != nil { - glog.Error(err) - } - apiStats.DownloadTime = avgDownloadTime - - transcodeTime := avgRoundTripTime - avgUploadTime - avgDownloadTime + transcodeTime := apiStats.RoundTripTime - apiStats.UploadTime - apiStats.DownloadTime if transcodeTime > 0 { apiStats.TranscodeTime = transcodeTime } - errors, err := streamer.queryErrorCounts() - if err != nil { - glog.Error(err) - } - apiStats.Errors = errors + apiStats.Errors = errorCount(streamer, embeddedBcastMetrics) if err := streamer.postStats(apiStats); err != nil { glog.Error(err) @@ -470,75 +458,52 @@ func (s *streamerClient) getFinishedStats(mid string) (*streamerModel.Stats, err return &stats, nil } -func (s *streamerClient) avgSegDuration() (float64, error) { - val, err := s.queryVectorMetric( - "rate(livepeer_source_segment_duration_seconds_sum[1m])/rate(livepeer_source_segment_duration_seconds_count[1m])", - ) - if err != nil { - return 0, err - } - valFloat := float64(0) - if val.Len() > 0 { - valFloat = float64((*val)[0].Value) - if math.IsNaN(valFloat) { - return 0, nil - } +func avgMetric(streamer *streamerClient, embedded *broadcasterMetrics, metric string) float64 { + if embedded != nil { + return embedded.avg(metric) } - return valFloat, nil -} -func (s *streamerClient) avgUploadTime() (float64, error) { - val, err := s.queryVectorMetric( - "rate(livepeer_upload_time_seconds_sum[1m])/rate(livepeer_upload_time_seconds_count[1m])", - ) + query := fmt.Sprintf("sum(rate(livepeer_%s_sum[1m]))/sum(rate(livepeer_%s_count[1m]))", metric, metric) + val, err := streamer.queryVectorMetric(query) if err != nil { - return 0, err + glog.Error(err) + return 0 } valFloat := float64(0) if val.Len() > 0 { valFloat = float64((*val)[0].Value) if math.IsNaN(valFloat) { - return 0, nil + glog.Error(err) + return 0 } } - return valFloat, nil + return valFloat } -func (s *streamerClient) avgDownloadTime() (float64, error) { - val, err := s.queryVectorMetric( - "rate(livepeer_download_time_seconds_sum[1m])/rate(livepeer_download_time_seconds_count[1m])", - ) - if err != nil { - return 0, err - } - valFloat := float64(0) - if val.Len() > 0 { - valFloat = float64((*val)[0].Value) - if math.IsNaN(valFloat) { - return 0, nil +func errorCount(s *streamerClient, metrics *broadcasterMetrics) []apiModels.Error { + var errors map[string]int + if metrics != nil { + errors = metrics.errorCount() + } else { + res, err := s.queryErrorCounts() + if err != nil { + glog.Error(errors) + return []apiModels.Error{} } + errors = res } - return valFloat, nil -} -func (s *streamerClient) avgRoundTripTime() (float64, error) { - val, err := s.queryVectorMetric( - "sum(rate(livepeer_transcode_overall_latency_seconds_sum[1m]))/sum(rate(livepeer_transcode_overall_latency_seconds_count[1m]))", - ) - if err != nil { - return 0, err - } - valFloat := float64(0) - if val.Len() > 0 { - valFloat = float64((*val)[0].Value) - if math.IsNaN(valFloat) { - return 0, nil - } + errArray := []apiModels.Error{} + for errCode, count := range errors { + errArray = append(errArray, apiModels.Error{ + ErrorCode: errCode, + Count: count, + }) } - return valFloat, nil + return errArray } -func (s *streamerClient) queryErrorCounts() ([]apiModels.Error, error) { +func (s *streamerClient) queryErrorCounts() (map[string]int, error) { errors := make(map[string]int) ctx, cancel := context.WithTimeout(context.Background(), httpTimeout) defer cancel() @@ -592,17 +557,8 @@ func (s *streamerClient) queryErrorCounts() ([]apiModels.Error, error) { continue } errors[string(err.Metric["error_code"])] += int(math.Round(count)) - - } - - errArray := []apiModels.Error{} - for errCode, count := range errors { - errArray = append(errArray, apiModels.Error{ - ErrorCode: errCode, - Count: count, - }) } - return errArray, nil + return errors, nil } func (s *streamerClient) queryVectorMetric(qry string) (*promModels.Vector, error) {