From 6718b5d8cd59de7ae23ed265b29f0f99a6d3b0cc Mon Sep 17 00:00:00 2001 From: Dan Douglas Date: Mon, 6 Nov 2023 08:55:18 -0800 Subject: [PATCH] refactored handler metrics collection per review comments from @frostbyte73 --- pkg/config/pipeline.go | 23 ------------ pkg/pipeline/debug.go | 8 +++-- pkg/pipeline/sink/file.go | 21 ++--------- pkg/pipeline/sink/manifest.go | 17 ++------- pkg/pipeline/sink/segments.go | 38 ++------------------ pkg/pipeline/sink/sink.go | 6 ++-- pkg/pipeline/sink/uploader/uploader.go | 15 +++++--- pkg/stats/handler_monitor.go | 48 ++++++++++++++++++++++++++ 8 files changed, 76 insertions(+), 100 deletions(-) create mode 100644 pkg/stats/handler_monitor.go diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 22c667c6..99abd757 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -17,7 +17,6 @@ package config import ( "context" "fmt" - "github.com/prometheus/client_golang/prometheus" "net/url" "strings" "time" @@ -61,9 +60,6 @@ type PipelineConfig struct { OnUpdate func(context.Context, *livekit.EgressInfo) `yaml:"-"` Info *livekit.EgressInfo `yaml:"-"` - - UploadsCounter *prometheus.CounterVec `yaml:"-"` - UploadsResponseTime *prometheus.HistogramVec `yaml:"-"` } type SourceConfig struct { @@ -148,25 +144,6 @@ func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*Pipelin return nil, err } - constantLabels := prometheus.Labels{"node_id": p.NodeID, "cluster_id": p.ClusterID, "egress_id": req.EgressId} - - p.UploadsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "livekit", - Subsystem: "egress", - Name: "pipeline_uploads", - Help: "Number of uploads per pipeline with type and status labels", - ConstLabels: constantLabels, - }, []string{"type", "status"}) // type: file, manifest, segment, liveplaylist, playlist; status: success,failure - - p.UploadsResponseTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "livekit", - Subsystem: "egress", - Name: "pipline_upload_response_time_ms", - Help: "A histogram of latencies for upload requests in milliseconds.", - Buckets: []float64{10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 15000, 20000, 30000}, - ConstLabels: constantLabels, - }, []string{"type", "status"}) - prometheus.MustRegister(p.UploadsCounter, p.UploadsResponseTime) return p, p.Update(req) } diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index 2ff1f1b1..89f73e12 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -17,6 +17,7 @@ package pipeline import ( "context" "fmt" + "github.com/livekit/egress/pkg/stats" "os" "path" "strings" @@ -37,7 +38,8 @@ func (c *Controller) GetGstPipelineDebugDot() string { } func (c *Controller) uploadDebugFiles() { - u, err := uploader.New(c.Debug.ToUploadConfig(), "") + monitor := *stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId) + u, err := uploader.New(c.Debug.ToUploadConfig(), "", monitor) if err != nil { logger.Errorw("failed to create uploader", err) return @@ -89,7 +91,7 @@ func (c *Controller) uploadTrackFiles(u uploader.Uploader) { if strings.HasSuffix(f.Name(), ".csv") { local := path.Join(dir, f.Name()) storage := path.Join(c.Debug.PathPrefix, f.Name()) - _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false) + _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false, "track") if err != nil { logger.Errorw("failed to upload debug file", err) return @@ -137,7 +139,7 @@ func (c *Controller) uploadDebugFile(u uploader.Uploader, data []byte, fileExten return } - _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false) + _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false, "debug") if err != nil { logger.Errorw("failed to upload debug file", err) return diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index 5de8a4d4..0eb9c2ac 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -16,14 +16,11 @@ package sink import ( "fmt" - "github.com/prometheus/client_golang/prometheus" - "os" - "path" - "time" - "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/pipeline/sink/uploader" "github.com/livekit/protocol/logger" + "os" + "path" ) type FileSink struct { @@ -46,19 +43,7 @@ func (s *FileSink) Start() error { } func (s *FileSink) Close() error { - var labels prometheus.Labels - start := time.Now() - location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) - elapsed := time.Since(start).Milliseconds() - if err != nil { - labels = prometheus.Labels{"type": "file", "status": "failure"} - s.conf.UploadsCounter.With(labels).Add(1) - s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed)) - return err - } - labels = prometheus.Labels{"type": "file", "status": "success"} - s.conf.UploadsCounter.With(labels).Add(1) - s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed)) + location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false, "file") s.FileInfo.Location = location s.FileInfo.Size = size diff --git a/pkg/pipeline/sink/manifest.go b/pkg/pipeline/sink/manifest.go index 1e072309..8235c8b8 100644 --- a/pkg/pipeline/sink/manifest.go +++ b/pkg/pipeline/sink/manifest.go @@ -16,13 +16,10 @@ package sink import ( "encoding/json" - "github.com/prometheus/client_golang/prometheus" - "os" - "time" - "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/pipeline/sink/uploader" "github.com/livekit/egress/pkg/types" + "os" ) type Manifest struct { @@ -57,17 +54,7 @@ func uploadManifest(p *config.PipelineConfig, u uploader.Uploader, localFilepath return err } - var labels prometheus.Labels - start := time.Now() - _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) - elapsed := time.Since(start).Milliseconds() - if err != nil { - labels = prometheus.Labels{"type": "manifest", "status": "failure"} - } else { - labels = prometheus.Labels{"type": "manifest", "status": "success"} - } - p.UploadsCounter.With(labels).Add(1) - p.UploadsResponseTime.With(labels).Observe(float64(elapsed)) + _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false, "manifest") return err } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 8e68cd9d..49149210 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -165,19 +165,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) { go func() { defer close(update.uploadComplete) - start := time.Now() - _, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true) - elapsed := time.Since(start).Milliseconds() - if err != nil { - labels := prometheus.Labels{"type": "segment", "status": "failure"} - s.conf.UploadsCounter.With(labels).Add(1) - s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed)) - s.callbacks.OnError(err) - return - } - labels := prometheus.Labels{"type": "segment", "status": "success"} - s.conf.UploadsCounter.With(labels).Add(1) - s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed)) + _, size, _ := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true, "segment") // lock segment info updates s.infoLock.Lock() @@ -340,17 +328,7 @@ func (s *SegmentSink) uploadPlaylist() error { var err error playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - var labels prometheus.Labels - start := time.Now() - s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) - elapsed := time.Since(start).Milliseconds() - if err != nil { - labels = prometheus.Labels{"type": "playlist", "status": "failure"} - } else { - labels = prometheus.Labels{"type": "playlist", "status": "success"} - } - s.conf.UploadsCounter.With(labels).Add(1) - s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed)) + s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false, "playlist") return err } @@ -358,16 +336,6 @@ func (s *SegmentSink) uploadLivePlaylist() error { var err error liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) - var labels prometheus.Labels - start := time.Now() - s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) - elapsed := time.Since(start).Milliseconds() - if err != nil { - labels = prometheus.Labels{"type": "live_playlist", "status": "failure"} - } else { - labels = prometheus.Labels{"type": "live_playlist", "status": "success"} - } - s.conf.UploadsCounter.With(labels).Add(1) - s.conf.UploadsResponseTime.With(labels).Observe(float64(elapsed)) + s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false, "live_playlist") return err } diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index 05e4a41b..cb9092bf 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -18,6 +18,7 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/pipeline/sink/uploader" + "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/pkg/types" ) @@ -32,11 +33,12 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ for egressType, c := range p.Outputs { var s Sink var err error + monitor := *stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId) switch egressType { case types.EgressTypeFile: o := c.(*config.FileConfig) - u, err := uploader.New(o.UploadConfig, p.BackupStorage) + u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor) if err != nil { return nil, err } @@ -46,7 +48,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ case types.EgressTypeSegments: o := c.(*config.SegmentConfig) - u, err := uploader.New(o.UploadConfig, p.BackupStorage) + u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor) if err != nil { return nil, err } diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index 8c117cc2..991ad1dd 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -16,6 +16,7 @@ package uploader import ( "fmt" + "github.com/livekit/egress/pkg/stats" "github.com/prometheus/client_golang/prometheus" "os" "path" @@ -35,14 +36,14 @@ const ( ) type Uploader interface { - Upload(string, string, types.OutputType, bool) (string, int64, error) + Upload(string, string, types.OutputType, bool, string) (string, int64, error) } type uploader interface { upload(string, string, types.OutputType) (string, int64, error) } -func New(conf config.UploadConfig, backup string) (Uploader, error) { +func New(conf config.UploadConfig, backup string, monitor stats.HandlerMonitor) (Uploader, error) { var u uploader var err error @@ -65,6 +66,7 @@ func New(conf config.UploadConfig, backup string) (Uploader, error) { remoteUploader := &remoteUploader{ uploader: u, backup: backup, + monitor: monitor, } remoteUploader.backupCounter = prometheus.NewCounterVec( @@ -85,17 +87,22 @@ type remoteUploader struct { backup string backupCounter *prometheus.CounterVec + monitor stats.HandlerMonitor } -func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) { +func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) { + start := time.Now() location, size, err := u.upload(localFilepath, storageFilepath, outputType) + elapsed := time.Since(start).Milliseconds() if err == nil { + u.monitor.IncUploadCountFailure(fileType, float64(elapsed)) if deleteAfterUpload { _ = os.Remove(localFilepath) } return location, size, nil } + u.monitor.IncUploadCountSuccess(fileType, float64(elapsed)) if u.backup != "" { stat, err := os.Stat(localFilepath) @@ -117,7 +124,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp type localUploader struct{} -func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bool) (string, int64, error) { +func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bool, _ string) (string, int64, error) { stat, err := os.Stat(localFilepath) if err != nil { return "", 0, err diff --git a/pkg/stats/handler_monitor.go b/pkg/stats/handler_monitor.go new file mode 100644 index 00000000..3a2b3289 --- /dev/null +++ b/pkg/stats/handler_monitor.go @@ -0,0 +1,48 @@ +package stats + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +type HandlerMonitor struct { + uploadsCounter *prometheus.CounterVec + uploadsResponseTime *prometheus.HistogramVec +} + +func NewHandlerMonitor(nodeId string, clusterId string, egressId string) *HandlerMonitor { + m := &HandlerMonitor{} + + constantLabels := prometheus.Labels{"node_id": nodeId, "cluster_id": clusterId, "egress_id": egressId} + + m.uploadsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "pipeline_uploads", + Help: "Number of uploads per pipeline with type and status labels", + ConstLabels: constantLabels, + }, []string{"type", "status"}) // type: file, manifest, segment, liveplaylist, playlist; status: success,failure + + m.uploadsResponseTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "pipline_upload_response_time_ms", + Help: "A histogram of latencies for upload requests in milliseconds.", + Buckets: []float64{10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 15000, 20000, 30000}, + ConstLabels: constantLabels, + }, []string{"type", "status"}) + prometheus.MustRegister(m.uploadsCounter, m.uploadsResponseTime) + + return m +} + +func (m *HandlerMonitor) IncUploadCountSuccess(uploadType string, elapsed float64) { + labels := prometheus.Labels{"type": uploadType, "status": "success"} + m.uploadsCounter.With(labels).Add(1) + m.uploadsResponseTime.With(labels).Observe(elapsed) +} + +func (m *HandlerMonitor) IncUploadCountFailure(uploadType string, elapsed float64) { + labels := prometheus.Labels{"type": uploadType, "status": "failure"} + m.uploadsCounter.With(labels).Add(1) + m.uploadsResponseTime.With(labels).Observe(elapsed) +}