From b8ab069ab58e334bd54782e793a191642a1c22f9 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 20 Nov 2023 11:20:36 -0800 Subject: [PATCH] Eos timer (#541) * Add EOS timeout * add log for finding debug files * more logging * add nil check --- pkg/pipeline/controller.go | 15 +++++++++++++-- pkg/pipeline/debug.go | 1 + pkg/pipeline/sink/uploader/uploader.go | 8 +++++--- pkg/pipeline/watch.go | 3 +++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index ea9fffc6..6714cc67 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -58,6 +58,7 @@ type Controller struct { limitTimer *time.Timer playing core.Fuse eos core.Fuse + eosTimer *time.Timer stopped core.Fuse } @@ -231,6 +232,7 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { return c.Info } + logger.Debugw("closing sinks") for _, si := range c.sinks { for _, s := range si { if err := s.Close(); err != nil { @@ -378,7 +380,7 @@ func (c *Controller) SendEOS(ctx context.Context) { defer span.End() c.eos.Once(func() { - logger.Debugw("Sending EOS") + logger.Debugw("sending EOS") if c.limitTimer != nil { c.limitTimer.Stop() @@ -405,7 +407,12 @@ func (c *Controller) SendEOS(ctx context.Context) { case livekit.EgressStatus_EGRESS_ENDING, livekit.EgressStatus_EGRESS_LIMIT_REACHED: - go c.p.SendEOS() + go func() { + c.eosTimer = time.AfterFunc(time.Second*30, func() { + c.OnError(errors.ErrPipelineFrozen) + }) + c.p.SendEOS() + }() } if c.SourceType == types.SourceTypeWeb { @@ -430,6 +437,8 @@ func (c *Controller) Close() { if c.SourceType == types.SourceTypeSDK || !c.eos.IsBroken() { c.updateDuration(c.src.GetEndedAt()) } + + logger.Debugw("closing source") c.src.Close() now := time.Now().UnixNano() @@ -518,6 +527,7 @@ func (c *Controller) updateStartTime(startedAt int64) { case types.EgressTypeSegments: o[0].(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt + case types.EgressTypeImages: for _, c := range o { c.(*config.ImageConfig).ImagesInfo.StartedAt = startedAt @@ -563,6 +573,7 @@ func (c *Controller) updateDuration(endedAt int64) { } segmentsInfo.EndedAt = endedAt segmentsInfo.Duration = endedAt - segmentsInfo.StartedAt + case types.EgressTypeImages: for _, c := range o { imageInfo := c.(*config.ImageConfig).ImagesInfo diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index ec24b420..9ef45eeb 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -68,6 +68,7 @@ func (c *Controller) uploadDebugFiles() { select { case <-done: + logger.Infow("debug files uploaded") return case <-time.After(time.Second * 3): logger.Errorw("failed to upload debug files", errors.New("timed out")) diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index cb5cde09..a540e863 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -26,6 +26,7 @@ import ( "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" ) const ( @@ -83,11 +84,12 @@ type remoteUploader struct { func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) { start := time.Now() location, size, err := u.upload(localFilepath, storageFilepath, outputType) - elapsed := time.Since(start).Milliseconds() + elapsed := time.Since(start) + logger.Debugw("upload complete", "fileType", fileType, "time", elapsed.String()) // success if err == nil { - u.monitor.IncUploadCountSuccess(fileType, float64(elapsed)) + u.monitor.IncUploadCountSuccess(fileType, float64(elapsed.Milliseconds())) if deleteAfterUpload { _ = os.Remove(localFilepath) } @@ -96,7 +98,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp } // failure - u.monitor.IncUploadCountFailure(fileType, float64(elapsed)) + u.monitor.IncUploadCountFailure(fileType, float64(elapsed.Milliseconds())) if u.backup != "" { stat, err := os.Stat(localFilepath) if err != nil { diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index ca316125..e368b232 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -121,6 +121,9 @@ func (c *Controller) messageWatch(msg *gst.Message) bool { switch msg.Type() { case gst.MessageEOS: logger.Infow("EOS received") + if c.eosTimer != nil { + c.eosTimer.Stop() + } c.p.Stop() return false case gst.MessageWarning: