Skip to content

Commit

Permalink
refactored handler metrics collection per review comments from @frost…
Browse files Browse the repository at this point in the history
  • Loading branch information
dandoug committed Nov 6, 2023
1 parent 6eab146 commit 6718b5d
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 100 deletions.
23 changes: 0 additions & 23 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package config
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -61,9 +60,6 @@ type PipelineConfig struct {
OnUpdate func(context.Context, *livekit.EgressInfo) `yaml:"-"`

Info *livekit.EgressInfo `yaml:"-"`

UploadsCounter *prometheus.CounterVec `yaml:"-"`
UploadsResponseTime *prometheus.HistogramVec `yaml:"-"`
}

type SourceConfig struct {
Expand Down Expand Up @@ -148,25 +144,6 @@ func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*Pipelin
return nil, err
}

constantLabels := prometheus.Labels{"node_id": p.NodeID, "cluster_id": p.ClusterID, "egress_id": req.EgressId}

p.UploadsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "pipeline_uploads",
Help: "Number of uploads per pipeline with type and status labels",
ConstLabels: constantLabels,
}, []string{"type", "status"}) // type: file, manifest, segment, liveplaylist, playlist; status: success,failure

p.UploadsResponseTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "pipline_upload_response_time_ms",
Help: "A histogram of latencies for upload requests in milliseconds.",
Buckets: []float64{10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 15000, 20000, 30000},
ConstLabels: constantLabels,
}, []string{"type", "status"})
prometheus.MustRegister(p.UploadsCounter, p.UploadsResponseTime)
return p, p.Update(req)
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pipeline
import (
"context"
"fmt"
"github.com/livekit/egress/pkg/stats"
"os"
"path"
"strings"
Expand All @@ -37,7 +38,8 @@ func (c *Controller) GetGstPipelineDebugDot() string {
}

func (c *Controller) uploadDebugFiles() {
u, err := uploader.New(c.Debug.ToUploadConfig(), "")
monitor := *stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId)
u, err := uploader.New(c.Debug.ToUploadConfig(), "", monitor)
if err != nil {
logger.Errorw("failed to create uploader", err)
return
Expand Down Expand Up @@ -89,7 +91,7 @@ func (c *Controller) uploadTrackFiles(u uploader.Uploader) {
if strings.HasSuffix(f.Name(), ".csv") {
local := path.Join(dir, f.Name())
storage := path.Join(c.Debug.PathPrefix, f.Name())
_, _, err = u.Upload(local, storage, types.OutputTypeBlob, false)
_, _, err = u.Upload(local, storage, types.OutputTypeBlob, false, "track")
if err != nil {
logger.Errorw("failed to upload debug file", err)
return
Expand Down Expand Up @@ -137,7 +139,7 @@ func (c *Controller) uploadDebugFile(u uploader.Uploader, data []byte, fileExten
return
}

_, _, err = u.Upload(local, storage, types.OutputTypeBlob, false)
_, _, err = u.Upload(local, storage, types.OutputTypeBlob, false, "debug")
if err != nil {
logger.Errorw("failed to upload debug file", err)
return
Expand Down
21 changes: 3 additions & 18 deletions pkg/pipeline/sink/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@ package sink

import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"os"
"path"
"time"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/pipeline/sink/uploader"
"github.com/livekit/protocol/logger"
"os"
"path"
)

type FileSink struct {
Expand All @@ -46,19 +43,7 @@ func (s *FileSink) Start() error {
}

func (s *FileSink) Close() error {
var labels prometheus.Labels
start := time.Now()
location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false)
elapsed := time.Since(start).Milliseconds()
if err != nil {
labels = prometheus.Labels{"type": "file", "status": "failure"}
s.conf.UploadsCounter.With(labels).Add(1)
s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed))
return err
}
labels = prometheus.Labels{"type": "file", "status": "success"}
s.conf.UploadsCounter.With(labels).Add(1)
s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed))
location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false, "file")

s.FileInfo.Location = location
s.FileInfo.Size = size
Expand Down
17 changes: 2 additions & 15 deletions pkg/pipeline/sink/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@ package sink

import (
"encoding/json"
"github.com/prometheus/client_golang/prometheus"
"os"
"time"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/pipeline/sink/uploader"
"github.com/livekit/egress/pkg/types"
"os"
)

type Manifest struct {
Expand Down Expand Up @@ -57,17 +54,7 @@ func uploadManifest(p *config.PipelineConfig, u uploader.Uploader, localFilepath
return err
}

var labels prometheus.Labels
start := time.Now()
_, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false)
elapsed := time.Since(start).Milliseconds()
if err != nil {
labels = prometheus.Labels{"type": "manifest", "status": "failure"}
} else {
labels = prometheus.Labels{"type": "manifest", "status": "success"}
}
p.UploadsCounter.With(labels).Add(1)
p.UploadsResponseTime.With(labels).Observe(float64(elapsed))
_, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false, "manifest")

return err
}
Expand Down
38 changes: 3 additions & 35 deletions pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) {
go func() {
defer close(update.uploadComplete)

start := time.Now()
_, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true)
elapsed := time.Since(start).Milliseconds()
if err != nil {
labels := prometheus.Labels{"type": "segment", "status": "failure"}
s.conf.UploadsCounter.With(labels).Add(1)
s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed))
s.callbacks.OnError(err)
return
}
labels := prometheus.Labels{"type": "segment", "status": "success"}
s.conf.UploadsCounter.With(labels).Add(1)
s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed))
_, size, _ := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true, "segment")

// lock segment info updates
s.infoLock.Lock()
Expand Down Expand Up @@ -340,34 +328,14 @@ func (s *SegmentSink) uploadPlaylist() error {
var err error
playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename)
playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename)
var labels prometheus.Labels
start := time.Now()
s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
elapsed := time.Since(start).Milliseconds()
if err != nil {
labels = prometheus.Labels{"type": "playlist", "status": "failure"}
} else {
labels = prometheus.Labels{"type": "playlist", "status": "success"}
}
s.conf.UploadsCounter.With(labels).Add(1)
s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed))
s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false, "playlist")
return err
}

func (s *SegmentSink) uploadLivePlaylist() error {
var err error
liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename)
liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename)
var labels prometheus.Labels
start := time.Now()
s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
elapsed := time.Since(start).Milliseconds()
if err != nil {
labels = prometheus.Labels{"type": "live_playlist", "status": "failure"}
} else {
labels = prometheus.Labels{"type": "live_playlist", "status": "success"}
}
s.conf.UploadsCounter.With(labels).Add(1)
s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed))
s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false, "live_playlist")
return err
}
6 changes: 4 additions & 2 deletions pkg/pipeline/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/gstreamer"
"github.com/livekit/egress/pkg/pipeline/sink/uploader"
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/pkg/types"
)

Expand All @@ -32,11 +33,12 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[
for egressType, c := range p.Outputs {
var s Sink
var err error
monitor := *stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId)
switch egressType {
case types.EgressTypeFile:
o := c.(*config.FileConfig)

u, err := uploader.New(o.UploadConfig, p.BackupStorage)
u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor)
if err != nil {
return nil, err
}
Expand All @@ -46,7 +48,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[
case types.EgressTypeSegments:
o := c.(*config.SegmentConfig)

u, err := uploader.New(o.UploadConfig, p.BackupStorage)
u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor)
if err != nil {
return nil, err
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package uploader

import (
"fmt"
"github.com/livekit/egress/pkg/stats"
"github.com/prometheus/client_golang/prometheus"
"os"
"path"
Expand All @@ -35,14 +36,14 @@ const (
)

type Uploader interface {
Upload(string, string, types.OutputType, bool) (string, int64, error)
Upload(string, string, types.OutputType, bool, string) (string, int64, error)
}

type uploader interface {
upload(string, string, types.OutputType) (string, int64, error)
}

func New(conf config.UploadConfig, backup string) (Uploader, error) {
func New(conf config.UploadConfig, backup string, monitor stats.HandlerMonitor) (Uploader, error) {
var u uploader
var err error

Expand All @@ -65,6 +66,7 @@ func New(conf config.UploadConfig, backup string) (Uploader, error) {
remoteUploader := &remoteUploader{
uploader: u,
backup: backup,
monitor: monitor,
}

remoteUploader.backupCounter = prometheus.NewCounterVec(
Expand All @@ -85,17 +87,22 @@ type remoteUploader struct {

backup string
backupCounter *prometheus.CounterVec
monitor stats.HandlerMonitor
}

func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) {
func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) {
start := time.Now()
location, size, err := u.upload(localFilepath, storageFilepath, outputType)
elapsed := time.Since(start).Milliseconds()
if err == nil {
u.monitor.IncUploadCountFailure(fileType, float64(elapsed))
if deleteAfterUpload {
_ = os.Remove(localFilepath)
}

return location, size, nil
}
u.monitor.IncUploadCountSuccess(fileType, float64(elapsed))

if u.backup != "" {
stat, err := os.Stat(localFilepath)
Expand All @@ -117,7 +124,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp

type localUploader struct{}

func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bool) (string, int64, error) {
func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bool, _ string) (string, int64, error) {
stat, err := os.Stat(localFilepath)
if err != nil {
return "", 0, err
Expand Down
48 changes: 48 additions & 0 deletions pkg/stats/handler_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package stats

import (
"github.com/prometheus/client_golang/prometheus"
)

type HandlerMonitor struct {
uploadsCounter *prometheus.CounterVec
uploadsResponseTime *prometheus.HistogramVec
}

func NewHandlerMonitor(nodeId string, clusterId string, egressId string) *HandlerMonitor {
m := &HandlerMonitor{}

constantLabels := prometheus.Labels{"node_id": nodeId, "cluster_id": clusterId, "egress_id": egressId}

m.uploadsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "pipeline_uploads",
Help: "Number of uploads per pipeline with type and status labels",
ConstLabels: constantLabels,
}, []string{"type", "status"}) // type: file, manifest, segment, liveplaylist, playlist; status: success,failure

m.uploadsResponseTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "pipline_upload_response_time_ms",
Help: "A histogram of latencies for upload requests in milliseconds.",
Buckets: []float64{10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 15000, 20000, 30000},
ConstLabels: constantLabels,
}, []string{"type", "status"})
prometheus.MustRegister(m.uploadsCounter, m.uploadsResponseTime)

return m
}

func (m *HandlerMonitor) IncUploadCountSuccess(uploadType string, elapsed float64) {
labels := prometheus.Labels{"type": uploadType, "status": "success"}
m.uploadsCounter.With(labels).Add(1)
m.uploadsResponseTime.With(labels).Observe(elapsed)
}

func (m *HandlerMonitor) IncUploadCountFailure(uploadType string, elapsed float64) {
labels := prometheus.Labels{"type": uploadType, "status": "failure"}
m.uploadsCounter.With(labels).Add(1)
m.uploadsResponseTime.With(labels).Observe(elapsed)
}

0 comments on commit 6718b5d

Please sign in to comment.