diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 6714cc67..7a56dabd 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -29,6 +29,7 @@ import ( "github.com/livekit/egress/pkg/pipeline/builder" "github.com/livekit/egress/pkg/pipeline/sink" "github.com/livekit/egress/pkg/pipeline/source" + "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -55,6 +56,7 @@ type Controller struct { // internal mu sync.Mutex gstLogger *zap.SugaredLogger + monitor *stats.HandlerMonitor limitTimer *time.Timer playing core.Fuse eos core.Fuse @@ -74,6 +76,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, ioClient rpc.IOInfoCl }, ioClient: ioClient, gstLogger: logger.GetLogger().(*logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), + monitor: stats.NewHandlerMonitor(conf.NodeID, conf.ClusterID, conf.Info.EgressId), playing: core.NewFuse(), eos: core.NewFuse(), stopped: core.NewFuse(), @@ -96,7 +99,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, ioClient rpc.IOInfoCl } // create sinks - c.sinks, err = sink.CreateSinks(conf, c.callbacks) + c.sinks, err = sink.CreateSinks(conf, c.callbacks, c.monitor) if err != nil { c.src.Close() return nil, err diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index 9ef45eeb..64b291d9 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -27,7 +27,6 @@ import ( "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/pipeline/sink/uploader" - "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/pprof" @@ -38,8 +37,7 @@ func (c *Controller) GetGstPipelineDebugDot() string { } func (c *Controller) uploadDebugFiles() { - monitor := stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId) - u, err := uploader.New(c.Debug.ToUploadConfig(), "", monitor) + u, err := uploader.New(c.Debug.ToUploadConfig(), "", c.monitor) if err != nil { logger.Errorw("failed to create uploader", err) return diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index 7ac6b5ca..aaf5f9eb 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -28,9 +28,8 @@ type Sink interface { Cleanup() } -func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[types.EgressType][]Sink, error) { +func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (map[types.EgressType][]Sink, error) { sinks := make(map[types.EgressType][]Sink) - monitor := stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId) for egressType, c := range p.Outputs { if len(c) == 0 { continue