Skip to content

Commit

Permalink
mist-api-connector: Export metrics for multistream usage minutes/bytes (
Browse files Browse the repository at this point in the history
#121)

* mapic: Store pushed bytes/minutes on pushInfo object

* mapic: Store pushed time as duration

* mapic: Accumulate prometheus metrics for multistream

* mapic: Only save pushed time/bytes if increasing

* metrics: Reorder multistream metrics

* mapic: Init multistream metrics on stats collector

* metrics: Fix view description
  • Loading branch information
victorges authored Jan 4, 2022
1 parent 59ae0b3 commit fce8857
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
6 changes: 4 additions & 2 deletions internal/app/mistapiconnector/mistapiconnector_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ type (
}

pushStatus struct {
pushStartEmitted bool
pushStopped bool
target *livepeer.MultistreamTarget
profile string
pushStartEmitted bool
pushStopped bool
pushedBytes int64
pushedMediaTime time.Duration
}

streamInfo struct {
Expand Down
14 changes: 13 additions & 1 deletion internal/app/mistapiconnector/stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/livepeer/livepeer-data/pkg/data"
"github.com/livepeer/livepeer-data/pkg/event"
"github.com/livepeer/stream-tester/apis/mist"
census "github.com/livepeer/stream-tester/internal/metrics"
)

type infoProvider interface {
Expand All @@ -24,6 +25,9 @@ type metricsCollector struct {
}

func startMetricsCollector(ctx context.Context, period time.Duration, nodeID, ownRegion string, mapi *mist.API, producer *event.AMQPProducer, amqpExchange string, infop infoProvider) {
census.IncMultistreamBytes(0)
census.IncMultistreamTime(0)

mc := &metricsCollector{nodeID, ownRegion, mapi, producer, amqpExchange, infop}
go mc.mainLoop(ctx, period)
}
Expand Down Expand Up @@ -85,15 +89,23 @@ func createMetricsEvent(nodeID, region string, info *streamInfo, metrics *stream
defer info.mu.Unlock()
multistream := make([]*data.MultistreamTargetMetrics, len(metrics.pushes))
for i, push := range metrics.pushes {
pushInfo := info.pushStatus[push.OriginalURI]
var metrics *data.MultistreamMetrics
if push.Stats != nil {
metrics = &data.MultistreamMetrics{
ActiveSec: push.Stats.ActiveSeconds,
Bytes: push.Stats.Bytes,
MediaTimeMs: push.Stats.MediaTime,
}
if metrics.Bytes > pushInfo.pushedBytes {
census.IncMultistreamBytes(metrics.Bytes - pushInfo.pushedBytes)
pushInfo.pushedBytes = metrics.Bytes
}
if mediaTime := time.Duration(metrics.MediaTimeMs) * time.Millisecond; mediaTime > pushInfo.pushedMediaTime {
census.IncMultistreamTime(mediaTime - pushInfo.pushedMediaTime)
pushInfo.pushedMediaTime = mediaTime
}
}
pushInfo := info.pushStatus[push.OriginalURI]
multistream[i] = &data.MultistreamTargetMetrics{
Target: pushToMultistreamTargetInfo(pushInfo),
Metrics: metrics,
Expand Down
28 changes: 28 additions & 0 deletions internal/metrics/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type (
mSegmentsToDownload *stats.Int64Measure
mSegmentsToDownloaded *stats.Int64Measure

mMultistreamUsageMb *stats.Float64Measure
mMultistreamUsageMin *stats.Float64Measure

mStartupLatency *stats.Float64Measure
mTranscodeLatency *stats.Float64Measure

Expand Down Expand Up @@ -92,6 +95,9 @@ func InitCensus(nodeID, version, namespace string) {
Census.mSegmentsToDownload = stats.Int64("segments_to_download", "Number of segments queued for download", "tot")
Census.mSegmentsToDownloaded = stats.Int64("segments_downloaded", "Number of segments downloaded", "tot")

Census.mMultistreamUsageMb = stats.Float64("multistream_usage_megabytes", "Total number of megabytes multistreamed, or pushed, to external services", "megabyte")
Census.mMultistreamUsageMin = stats.Float64("multistream_usage_minutes", "Total minutes multistreamed, or pushed, to external services", "min")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Streamtester version: %s", version)
glog.Infof("Node ID %s", nodeID)
Expand Down Expand Up @@ -144,6 +150,20 @@ func InitCensus(nodeID, version, namespace string) {
TagKeys: baseTags,
Aggregation: view.Count(),
},
{
Name: "multistream_usage_megabytes",
Measure: Census.mMultistreamUsageMb,
Description: "Total number of megabytes multistreamed, or pushed, to external services",
TagKeys: baseTags,
Aggregation: view.Sum(),
},
{
Name: "multistream_usage_minutes",
Measure: Census.mMultistreamUsageMin,
Description: "Total minutes multistreamed, or pushed, to external services",
TagKeys: baseTags,
Aggregation: view.Sum(),
},
{
Name: "successful_streams",
Measure: Census.mSuccessfulStreams,
Expand Down Expand Up @@ -250,6 +270,14 @@ func (cs *censusMetricsCounter) SegmentDownloaded() int64 {
return std
}

func IncMultistreamBytes(bytes int64) {
stats.Record(Census.ctx, Census.mMultistreamUsageMb.M(float64(bytes)/1024/1024))
}

func IncMultistreamTime(mediaTime time.Duration) {
stats.Record(Census.ctx, Census.mMultistreamUsageMin.M(mediaTime.Minutes()))
}

// CurrentStreams set number of active streams
// func CurrentStreams(cs int) {
// stats.Record(census.ctx, census.mCurrentStreams.M(int64(cs)))
Expand Down

0 comments on commit fce8857

Please sign in to comment.