From 3bb6d9e23e940d4838b5d1c9f8c75e099d2f90c2 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 30 Jul 2024 13:47:35 -0400 Subject: [PATCH] close src once recording has stopped --- pkg/pipeline/controller.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 3092c3ee..30fa9ce9 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -214,6 +214,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { logger.Debugw("waiting for start signal") select { case <-c.stopped.Watch(): + c.src.Close() c.Info.SetAborted(info.MsgStartNotReceived) return c.Info case <-start: @@ -224,6 +225,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { for _, si := range c.sinks { for _, s := range si { if err := s.Start(); err != nil { + c.src.Close() c.Info.SetFailed(err) return c.Info } @@ -231,10 +233,14 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { } if err := c.p.Run(); err != nil { + c.src.Close() c.Info.SetFailed(err) return c.Info } + logger.Debugw("closing source") + c.src.Close() + logger.Debugw("closing sinks") for _, si := range c.sinks { for _, s := range si { @@ -443,9 +449,6 @@ func (c *Controller) Close() { c.updateDuration(c.src.GetEndedAt()) } - logger.Debugw("closing source") - c.src.Close() - // update status if c.Info.Status == livekit.EgressStatus_EGRESS_FAILED { if o := c.GetStreamConfig(); o != nil {