Skip to content

Commit

Permalink
Use embedded broadcaster metrics instead of a separate monitoring ser…
Browse files Browse the repository at this point in the history
…vice (#163)

Use embedded Broadcaster metrics instead of external monitoring service
  • Loading branch information
leszko authored Jun 10, 2022
1 parent 251def1 commit 00faa62
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 92 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.orch-tester
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ RUN echo $version

COPY . .

RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" -tags mainnet cmd/orch-tester/orch_tester.go
RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" -tags mainnet cmd/orch-tester/orch_tester.go cmd/orch-tester/broadcaster_metrics.go

FROM debian:stretch-slim

Expand Down
138 changes: 138 additions & 0 deletions cmd/orch-tester/broadcaster_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package main

import (
"go.opencensus.io/stats/view"
"sync"
)

type broadcasterMetrics struct {
// Distribution metrics grouped by metric name
lastSum map[string]float64
lastCount map[string]int
rateSum map[string]float64
rateCount map[string]int

// Error count metrics by grouped by error_code tag
lastErrs map[string]int
incErrs map[string]int

mu sync.Mutex
}

func newBroadcasterMetrics() *broadcasterMetrics {
return &broadcasterMetrics{
lastSum: map[string]float64{},
lastCount: map[string]int{},
rateSum: map[string]float64{},
rateCount: map[string]int{},
lastErrs: map[string]int{},
incErrs: map[string]int{},
mu: sync.Mutex{},
}
}

func (bm *broadcasterMetrics) reset() {
bm.mu.Lock()
defer bm.mu.Unlock()

bm.rateSum = map[string]float64{}
bm.rateCount = map[string]int{}
bm.incErrs = map[string]int{}
}

func (bm broadcasterMetrics) ExportView(viewData *view.Data) {
bm.mu.Lock()
defer bm.mu.Unlock()

name := viewData.View.Name
rows := viewData.Rows
if len(rows) <= 0 {
return
}

bm.handleDistributionMetrics(rows, name)
bm.handleErrorCountMetrics(rows, name)
}

func (bm broadcasterMetrics) handleDistributionMetrics(rows []*view.Row, name string) {
supportedMetrics := map[string]bool{
"source_segment_duration_seconds": true,
"transcode_overall_latency_seconds": true,
"upload_time_seconds": true,
"download_time_seconds": true,
}
if !supportedMetrics[name] {
return
}

// Sum metrics with different tags
var sum float64
var count int
for _, r := range rows {
d, ok := r.Data.(*view.DistributionData)
if ok {
sum += d.Sum()
count += int(d.Count)
}
}

// Get the rate between the last measurement
rateSum := sum - bm.lastSum[name]
rateCount := count - bm.lastCount[name]

bm.rateSum[name] += rateSum
bm.rateCount[name] += rateCount
bm.lastSum[name] = sum
bm.lastCount[name] = count
}

func (bm broadcasterMetrics) handleErrorCountMetrics(rows []*view.Row, name string) {
supportedMetrics := map[string]bool{
"segment_source_upload_failed_total": true,
"discovery_errors_total": true,
"segment_transcode_failed_total": true,
}
if !supportedMetrics[name] {
return
}

for _, r := range rows {
var errCode string
for _, t := range r.Tags {
if t.Key.Name() == "error_code" {
errCode = t.Value
}
}
if errCode == "" {
return
}

d, ok := r.Data.(*view.CountData)
if ok {
inc := int(d.Value) - bm.lastErrs[errCode]
bm.incErrs[errCode] += inc
bm.lastErrs[errCode] = int(d.Value)
}
}
}

func (bm *broadcasterMetrics) avg(m string) float64 {
bm.mu.Lock()
defer bm.mu.Unlock()

if bm.rateCount[m] <= 0 {
return 0
}
return bm.rateSum[m] / float64(bm.rateCount[m])
}

func (bm *broadcasterMetrics) errorCount() map[string]int {
bm.mu.Lock()
bm.mu.Unlock()

res := map[string]int{}
for key, value := range bm.incErrs {
res[key] = value
}
return res
}
138 changes: 47 additions & 91 deletions cmd/orch-tester/orch_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
streamtesterMetrics "github.com/livepeer/stream-tester/internal/metrics"
"github.com/livepeer/stream-tester/internal/server"
"github.com/peterbourgon/ff"
"go.opencensus.io/stats/view"
"io/ioutil"
"log"
"math"
Expand Down Expand Up @@ -78,7 +79,7 @@ func main() {
region := flag.String("region", "", "Region this service is operating in")
streamTester := flag.String("streamtester", "", "Address for stream-tester server instance")
broadcaster := flag.String("broadcaster", "", "Broadcaster address")
metrics := flag.String("metrics", "127.0.0.1"+":"+prometheusPort, "Broadcaster metrics port")
metrics := flag.String("metrics", "", "Broadcaster metrics port")
media := flag.String("media", bcastMediaPort, "Broadcaster HTTP port")
rtmp := flag.String("rtmp", bcastRTMPPort, "broadcaster RTMP port")
leaderboard := flag.String("leaderboard", "127.0.0.1:3001", "HTTP Address of the serverless leadearboard API")
Expand Down Expand Up @@ -135,6 +136,13 @@ func main() {
bcastHost = *broadcaster
}

var embeddedBcastMetrics *broadcasterMetrics
if *streamTester == "" && *broadcaster == "" && *metrics == "" {
glog.Infof("Using embedded broadcaster metrics")
embeddedBcastMetrics = newBroadcasterMetrics()
view.RegisterExporter(embeddedBcastMetrics)
}

metricsURL := defaultAddr(*metrics, defaultHost, prometheusPort)
streamTesterURL := defaultAddr(*streamTester, defaultHost, streamTesterPort)
leaderboardURL := defaultAddr(*leaderboard, defaultHost, "3001")
Expand Down Expand Up @@ -174,6 +182,9 @@ func main() {
glog.Infof("Starting to test orchestrators")
for _, o := range orchestrators {
time.Sleep(refreshWait)
if embeddedBcastMetrics != nil {
embeddedBcastMetrics.reset()
}

req := &streamerModel.StartStreamsReq{
Host: bcastHost,
Expand Down Expand Up @@ -233,40 +244,17 @@ func main() {
// This calculation requires HTTP ingest to be correct
apiStats.SuccessRate = (float64(apiStats.SegmentsReceived) / float64(*numProfiles) / float64(apiStats.SegmentsSent))

avgSegDuration, err := streamer.avgSegDuration()
if err != nil {
glog.Error(err)
}
apiStats.SegDuration = avgSegDuration
apiStats.SegDuration = avgMetric(streamer, embeddedBcastMetrics, "source_segment_duration_seconds")
apiStats.RoundTripTime = avgMetric(streamer, embeddedBcastMetrics, "transcode_overall_latency_seconds")
apiStats.UploadTime = avgMetric(streamer, embeddedBcastMetrics, "upload_time_seconds")
apiStats.DownloadTime = avgMetric(streamer, embeddedBcastMetrics, "download_time_seconds")

avgRoundTripTime, err := streamer.avgRoundTripTime()
if err != nil {
glog.Error(err)
}
apiStats.RoundTripTime = avgRoundTripTime

avgUploadTime, err := streamer.avgUploadTime()
if err != nil {
glog.Error(err)
}
apiStats.UploadTime = avgUploadTime

avgDownloadTime, err := streamer.avgDownloadTime()
if err != nil {
glog.Error(err)
}
apiStats.DownloadTime = avgDownloadTime

transcodeTime := avgRoundTripTime - avgUploadTime - avgDownloadTime
transcodeTime := apiStats.RoundTripTime - apiStats.UploadTime - apiStats.DownloadTime
if transcodeTime > 0 {
apiStats.TranscodeTime = transcodeTime
}

errors, err := streamer.queryErrorCounts()
if err != nil {
glog.Error(err)
}
apiStats.Errors = errors
apiStats.Errors = errorCount(streamer, embeddedBcastMetrics)

if err := streamer.postStats(apiStats); err != nil {
glog.Error(err)
Expand Down Expand Up @@ -470,75 +458,52 @@ func (s *streamerClient) getFinishedStats(mid string) (*streamerModel.Stats, err
return &stats, nil
}

func (s *streamerClient) avgSegDuration() (float64, error) {
val, err := s.queryVectorMetric(
"rate(livepeer_source_segment_duration_seconds_sum[1m])/rate(livepeer_source_segment_duration_seconds_count[1m])",
)
if err != nil {
return 0, err
}
valFloat := float64(0)
if val.Len() > 0 {
valFloat = float64((*val)[0].Value)
if math.IsNaN(valFloat) {
return 0, nil
}
func avgMetric(streamer *streamerClient, embedded *broadcasterMetrics, metric string) float64 {
if embedded != nil {
return embedded.avg(metric)
}
return valFloat, nil
}

func (s *streamerClient) avgUploadTime() (float64, error) {
val, err := s.queryVectorMetric(
"rate(livepeer_upload_time_seconds_sum[1m])/rate(livepeer_upload_time_seconds_count[1m])",
)
query := fmt.Sprintf("sum(rate(livepeer_%s_sum[1m]))/sum(rate(livepeer_%s_count[1m]))", metric, metric)
val, err := streamer.queryVectorMetric(query)
if err != nil {
return 0, err
glog.Error(err)
return 0
}
valFloat := float64(0)
if val.Len() > 0 {
valFloat = float64((*val)[0].Value)
if math.IsNaN(valFloat) {
return 0, nil
glog.Error(err)
return 0
}
}
return valFloat, nil
return valFloat
}

func (s *streamerClient) avgDownloadTime() (float64, error) {
val, err := s.queryVectorMetric(
"rate(livepeer_download_time_seconds_sum[1m])/rate(livepeer_download_time_seconds_count[1m])",
)
if err != nil {
return 0, err
}
valFloat := float64(0)
if val.Len() > 0 {
valFloat = float64((*val)[0].Value)
if math.IsNaN(valFloat) {
return 0, nil
func errorCount(s *streamerClient, metrics *broadcasterMetrics) []apiModels.Error {
var errors map[string]int
if metrics != nil {
errors = metrics.errorCount()
} else {
res, err := s.queryErrorCounts()
if err != nil {
glog.Error(errors)
return []apiModels.Error{}
}
errors = res
}
return valFloat, nil
}

func (s *streamerClient) avgRoundTripTime() (float64, error) {
val, err := s.queryVectorMetric(
"sum(rate(livepeer_transcode_overall_latency_seconds_sum[1m]))/sum(rate(livepeer_transcode_overall_latency_seconds_count[1m]))",
)
if err != nil {
return 0, err
}
valFloat := float64(0)
if val.Len() > 0 {
valFloat = float64((*val)[0].Value)
if math.IsNaN(valFloat) {
return 0, nil
}
errArray := []apiModels.Error{}
for errCode, count := range errors {
errArray = append(errArray, apiModels.Error{
ErrorCode: errCode,
Count: count,
})
}
return valFloat, nil
return errArray
}

func (s *streamerClient) queryErrorCounts() ([]apiModels.Error, error) {
func (s *streamerClient) queryErrorCounts() (map[string]int, error) {
errors := make(map[string]int)
ctx, cancel := context.WithTimeout(context.Background(), httpTimeout)
defer cancel()
Expand Down Expand Up @@ -592,17 +557,8 @@ func (s *streamerClient) queryErrorCounts() ([]apiModels.Error, error) {
continue
}
errors[string(err.Metric["error_code"])] += int(math.Round(count))

}

errArray := []apiModels.Error{}
for errCode, count := range errors {
errArray = append(errArray, apiModels.Error{
ErrorCode: errCode,
Count: count,
})
}
return errArray, nil
return errors, nil
}

func (s *streamerClient) queryVectorMetric(qry string) (*promModels.Vector, error) {
Expand Down

0 comments on commit 00faa62

Please sign in to comment.