Skip to content

Commit

Permalink
more refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dandoug committed Nov 6, 2023
1 parent 6718b5d commit 25fc2bd
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 38 deletions.
29 changes: 8 additions & 21 deletions pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package sink

import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/livekit/egress/pkg/stats"
"os"
"path"
"strings"
Expand Down Expand Up @@ -71,7 +71,7 @@ type SegmentUpdate struct {
uploadComplete chan struct{}
}

func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks) (*SegmentSink, error) {
func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor stats.HandlerMonitor) (*SegmentSink, error) {
playlistName := path.Join(o.LocalDir, o.PlaylistFilename)
playlist, err := m3u8.NewEventPlaylistWriter(playlistName, o.SegmentDuration)
if err != nil {
Expand Down Expand Up @@ -108,27 +108,14 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg
}

// Register gauges that track the number of segments and playlist updates pending upload
segmentsUploadsGauge := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "segments_uploads_channel_size",
Help: "number of segment uploads pending in channel",
ConstLabels: prometheus.Labels{"egress_id": s.conf.Info.EgressId},
}, func() float64 {
return float64(len(s.closedSegments))
})
playlistUploadsGauge := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "playlist_uploads_channel_size",
Help: "number of playlist updates pending in channel",
ConstLabels: prometheus.Labels{"egress_id": s.conf.Info.EgressId},
}, func() float64 {
monitor.RegisterPlaylistChannelSizeGauge(s.conf.NodeID, s.conf.ClusterID, s.conf.Info.EgressId,
func() float64 {
return float64(len(s.playlistUpdates))
})
prometheus.MustRegister(segmentsUploadsGauge, playlistUploadsGauge)
monitor.RegisterSegmentsChannelSizeGauge(s.conf.NodeID, s.conf.ClusterID, s.conf.Info.EgressId,
func() float64 {
return float64(len(s.closedSegments))
})

return s, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[
return nil, err
}

s, err = newSegmentSink(u, p, o, callbacks)
s, err = newSegmentSink(u, p, o, callbacks, monitor)
if err != nil {
return nil, err
}
Expand Down
18 changes: 3 additions & 15 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package uploader
import (
"fmt"
"github.com/livekit/egress/pkg/stats"
"github.com/prometheus/client_golang/prometheus"
"os"
"path"
"time"
Expand Down Expand Up @@ -69,25 +68,14 @@ func New(conf config.UploadConfig, backup string, monitor stats.HandlerMonitor)
monitor: monitor,
}

remoteUploader.backupCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "backup_storage_writes",
Help: "number of writes to backup storage location by output type",
// we don't have access to egress_id, so on it being added in server process
}, []string{"output_type"})
prometheus.MustRegister(remoteUploader.backupCounter)

return remoteUploader, nil
}

type remoteUploader struct {
uploader

backup string
backupCounter *prometheus.CounterVec
monitor stats.HandlerMonitor
backup string
monitor stats.HandlerMonitor
}

func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) {
Expand All @@ -114,7 +102,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp
if err = os.Rename(localFilepath, backupFilepath); err != nil {
return "", 0, err
}
u.backupCounter.With(prometheus.Labels{"output_type": string(outputType)}).Add(1)
u.monitor.IncBackupStorageWrites(string(outputType))

return backupFilepath, stat.Size(), nil
}
Expand Down
40 changes: 39 additions & 1 deletion pkg/stats/handler_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
type HandlerMonitor struct {
uploadsCounter *prometheus.CounterVec
uploadsResponseTime *prometheus.HistogramVec
backupCounter *prometheus.CounterVec
}

func NewHandlerMonitor(nodeId string, clusterId string, egressId string) *HandlerMonitor {
Expand All @@ -30,7 +31,16 @@ func NewHandlerMonitor(nodeId string, clusterId string, egressId string) *Handle
Buckets: []float64{10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 15000, 20000, 30000},
ConstLabels: constantLabels,
}, []string{"type", "status"})
prometheus.MustRegister(m.uploadsCounter, m.uploadsResponseTime)

m.backupCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "backup_storage_writes",
Help: "number of writes to backup storage location by output type",
ConstLabels: constantLabels,
}, []string{"output_type"})

prometheus.MustRegister(m.uploadsCounter, m.uploadsResponseTime, m.backupCounter)

return m
}
Expand All @@ -46,3 +56,31 @@ func (m *HandlerMonitor) IncUploadCountFailure(uploadType string, elapsed float6
m.uploadsCounter.With(labels).Add(1)
m.uploadsResponseTime.With(labels).Observe(elapsed)
}

func (m *HandlerMonitor) IncBackupStorageWrites(outputType string) {
m.backupCounter.With(prometheus.Labels{"output_type": outputType}).Add(1)
}

func (m *HandlerMonitor) RegisterSegmentsChannelSizeGauge(nodeId string, clusterId string, egressId string, channelSizeFunction func() float64) {
segmentsUploadsGauge := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "segments_uploads_channel_size",
Help: "number of segment uploads pending in channel",
ConstLabels: prometheus.Labels{"node_id": nodeId, "cluster_id": clusterId, "egress_id": egressId},
}, channelSizeFunction)
prometheus.MustRegister(segmentsUploadsGauge)
}

func (m *HandlerMonitor) RegisterPlaylistChannelSizeGauge(nodeId string, clusterId string, egressId string, channelSizeFunction func() float64) {
playlistUploadsGauge := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "playlist_uploads_channel_size",
Help: "number of playlist updates pending in channel",
ConstLabels: prometheus.Labels{"node_id": nodeId, "cluster_id": clusterId, "egress_id": egressId},
}, channelSizeFunction)
prometheus.MustRegister(playlistUploadsGauge)
}

0 comments on commit 25fc2bd

Please sign in to comment.