diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 33f6914a..8a1b9dcf 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -383,15 +383,14 @@ func (c *Controller) SendEOS(ctx context.Context) { defer span.End() c.eos.Once(func() { - logger.Debugw("sending EOS") - if c.limitTimer != nil { c.limitTimer.Stop() } + switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: c.Info.SetAborted(info.MsgStoppedBeforeStarted) - fallthrough + c.p.Stop() case livekit.EgressStatus_EGRESS_ABORTED, livekit.EgressStatus_EGRESS_FAILED: @@ -399,18 +398,15 @@ func (c *Controller) SendEOS(ctx context.Context) { case livekit.EgressStatus_EGRESS_ACTIVE: c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ENDING) - fallthrough + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) + c.sendEOS() - case livekit.EgressStatus_EGRESS_ENDING, - livekit.EgressStatus_EGRESS_LIMIT_REACHED: + case livekit.EgressStatus_EGRESS_ENDING: _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) + c.sendEOS() - go func() { - c.eosTimer = time.AfterFunc(time.Second*30, func() { - c.OnError(errors.ErrPipelineFrozen) - }) - c.p.SendEOS() - }() + case livekit.EgressStatus_EGRESS_LIMIT_REACHED: + c.sendEOS() } if c.SourceType == types.SourceTypeWeb { @@ -419,6 +415,16 @@ func (c *Controller) SendEOS(ctx context.Context) { }) } +func (c *Controller) sendEOS() { + go func() { + logger.Debugw("sending EOS") + c.eosTimer = time.AfterFunc(time.Second*30, func() { + c.OnError(errors.ErrPipelineFrozen) + }) + c.p.SendEOS() + }() +} + func (c *Controller) OnError(err error) { if errors.Is(err, errors.ErrPipelineFrozen) && c.Debug.EnableProfiling { c.uploadDebugFiles()