diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index a8cab099..bd390d04 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -172,44 +172,7 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { defer span.End() c.Info.StartedAt = time.Now().UnixNano() - defer func() { - now := time.Now().UnixNano() - - if c.SourceType == types.SourceTypeSDK { - c.updateDuration(c.src.GetEndedAt()) - } - - c.Info.UpdatedAt = now - c.Info.EndedAt = now - if c.SourceType == types.SourceTypeSDK { - c.updateDuration(c.src.GetEndedAt()) - } - - // update status - if c.Info.Error != "" { - c.Info.Status = livekit.EgressStatus_EGRESS_FAILED - - if o := c.GetStreamConfig(); o != nil { - for _, streamInfo := range o.StreamInfo { - streamInfo.Status = livekit.StreamInfo_FAILED - } - } - } - - // ensure egress ends with a final state - switch c.Info.Status { - case livekit.EgressStatus_EGRESS_STARTING: - c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED - - case livekit.EgressStatus_EGRESS_ACTIVE, - livekit.EgressStatus_EGRESS_ENDING: - c.Info.Status = livekit.EgressStatus_EGRESS_COMPLETE - } - - for _, s := range c.sinks { - s.Cleanup() - } - }() + defer c.Close() // session limit timer c.startSessionLimitTimer(ctx) @@ -226,7 +189,6 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { logger.Debugw("waiting for start signal") select { case <-c.stopped.Watch(): - c.src.Close() c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED return c.Info case <-start: @@ -236,14 +198,12 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { for _, s := range c.sinks { if err := s.Start(); err != nil { - c.src.Close() c.Info.Error = err.Error() return c.Info } } if err := c.p.Run(); err != nil { - c.src.Close() c.Info.Error = err.Error() return c.Info } @@ -251,65 +211,6 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { return c.Info } -func (c *Controller) startSessionLimitTimer(ctx context.Context) { - var timeout time.Duration - for egressType := range c.Outputs { - var t time.Duration - switch egressType { - case types.EgressTypeFile: - t = c.FileOutputMaxDuration - case types.EgressTypeStream, types.EgressTypeWebsocket: - t = c.StreamOutputMaxDuration - case types.EgressTypeSegments: - t = c.SegmentOutputMaxDuration - } - if t > 0 && (timeout == 0 || t < timeout) { - timeout = t - } - } - - if timeout > 0 { - c.limitTimer = time.AfterFunc(timeout, func() { - switch c.Info.Status { - case livekit.EgressStatus_EGRESS_STARTING, - livekit.EgressStatus_EGRESS_ACTIVE: - c.Info.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED - } - if c.playing.IsBroken() { - c.SendEOS(ctx) - } else { - c.p.Stop() - } - }) - } -} - -func (c *Controller) updateStartTime(startedAt int64) { - for egressType, o := range c.Outputs { - switch egressType { - case types.EgressTypeStream, types.EgressTypeWebsocket: - c.mu.Lock() - for _, streamInfo := range o.(*config.StreamConfig).StreamInfo { - streamInfo.Status = livekit.StreamInfo_ACTIVE - streamInfo.StartedAt = startedAt - } - c.mu.Unlock() - - case types.EgressTypeFile: - o.(*config.FileConfig).FileInfo.StartedAt = startedAt - - case types.EgressTypeSegments: - o.(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt - } - } - - if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING { - c.Info.Status = livekit.EgressStatus_EGRESS_ACTIVE - c.Info.UpdatedAt = time.Now().UnixNano() - c.OnUpdate(context.Background(), c.Info) - } -} - func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStreamRequest) error { ctx, span := tracer.Start(ctx, "Pipeline.UpdateStream") defer span.End() @@ -496,6 +397,100 @@ func (c *Controller) OnError(err error) { go c.p.Stop() } +func (c *Controller) Close() { + c.src.Close() + + now := time.Now().UnixNano() + c.Info.UpdatedAt = now + c.Info.EndedAt = now + if c.SourceType == types.SourceTypeSDK { + c.updateDuration(c.src.GetEndedAt()) + } + + // update status + if c.Info.Error != "" { + c.Info.Status = livekit.EgressStatus_EGRESS_FAILED + if o := c.GetStreamConfig(); o != nil { + for _, streamInfo := range o.StreamInfo { + streamInfo.Status = livekit.StreamInfo_FAILED + } + } + } + + // ensure egress ends with a final state + switch c.Info.Status { + case livekit.EgressStatus_EGRESS_STARTING: + c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED + + case livekit.EgressStatus_EGRESS_ACTIVE, + livekit.EgressStatus_EGRESS_ENDING: + c.Info.Status = livekit.EgressStatus_EGRESS_COMPLETE + } + + for _, s := range c.sinks { + s.Cleanup() + } +} + +func (c *Controller) startSessionLimitTimer(ctx context.Context) { + var timeout time.Duration + for egressType := range c.Outputs { + var t time.Duration + switch egressType { + case types.EgressTypeFile: + t = c.FileOutputMaxDuration + case types.EgressTypeStream, types.EgressTypeWebsocket: + t = c.StreamOutputMaxDuration + case types.EgressTypeSegments: + t = c.SegmentOutputMaxDuration + } + if t > 0 && (timeout == 0 || t < timeout) { + timeout = t + } + } + + if timeout > 0 { + c.limitTimer = time.AfterFunc(timeout, func() { + switch c.Info.Status { + case livekit.EgressStatus_EGRESS_STARTING, + livekit.EgressStatus_EGRESS_ACTIVE: + c.Info.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED + } + if c.playing.IsBroken() { + c.SendEOS(ctx) + } else { + c.p.Stop() + } + }) + } +} + +func (c *Controller) updateStartTime(startedAt int64) { + for egressType, o := range c.Outputs { + switch egressType { + case types.EgressTypeStream, types.EgressTypeWebsocket: + c.mu.Lock() + for _, streamInfo := range o.(*config.StreamConfig).StreamInfo { + streamInfo.Status = livekit.StreamInfo_ACTIVE + streamInfo.StartedAt = startedAt + } + c.mu.Unlock() + + case types.EgressTypeFile: + o.(*config.FileConfig).FileInfo.StartedAt = startedAt + + case types.EgressTypeSegments: + o.(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt + } + } + + if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING { + c.Info.Status = livekit.EgressStatus_EGRESS_ACTIVE + c.Info.UpdatedAt = time.Now().UnixNano() + c.OnUpdate(context.Background(), c.Info) + } +} + func (c *Controller) updateDuration(endedAt int64) { for egressType, o := range c.Outputs { switch egressType {