diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 3092c3ee..30fa9ce9 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -214,6 +214,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { logger.Debugw("waiting for start signal") select { case <-c.stopped.Watch(): + c.src.Close() c.Info.SetAborted(info.MsgStartNotReceived) return c.Info case <-start: @@ -224,6 +225,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { for _, si := range c.sinks { for _, s := range si { if err := s.Start(); err != nil { + c.src.Close() c.Info.SetFailed(err) return c.Info } @@ -231,10 +233,14 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { } if err := c.p.Run(); err != nil { + c.src.Close() c.Info.SetFailed(err) return c.Info } + logger.Debugw("closing source") + c.src.Close() + logger.Debugw("closing sinks") for _, si := range c.sinks { for _, s := range si { @@ -443,9 +449,6 @@ func (c *Controller) Close() { c.updateDuration(c.src.GetEndedAt()) } - logger.Debugw("closing source") - c.src.Close() - // update status if c.Info.Status == livekit.EgressStatus_EGRESS_FAILED { if o := c.GetStreamConfig(); o != nil {