From c8e7374a8868a823ff74e775b2c10826fc4dace6 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 17 Nov 2023 20:21:52 -0800 Subject: [PATCH 1/4] Add EOS timeout --- pkg/pipeline/controller.go | 10 +++++++++- pkg/pipeline/watch.go | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index ea9fffc6..1c5f5288 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -58,6 +58,7 @@ type Controller struct { limitTimer *time.Timer playing core.Fuse eos core.Fuse + eosTimer *time.Timer stopped core.Fuse } @@ -405,7 +406,12 @@ func (c *Controller) SendEOS(ctx context.Context) { case livekit.EgressStatus_EGRESS_ENDING, livekit.EgressStatus_EGRESS_LIMIT_REACHED: - go c.p.SendEOS() + go func() { + c.eosTimer = time.AfterFunc(time.Second*30, func() { + c.OnError(errors.ErrPipelineFrozen) + }) + c.p.SendEOS() + }() } if c.SourceType == types.SourceTypeWeb { @@ -518,6 +524,7 @@ func (c *Controller) updateStartTime(startedAt int64) { case types.EgressTypeSegments: o[0].(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt + case types.EgressTypeImages: for _, c := range o { c.(*config.ImageConfig).ImagesInfo.StartedAt = startedAt @@ -563,6 +570,7 @@ func (c *Controller) updateDuration(endedAt int64) { } segmentsInfo.EndedAt = endedAt segmentsInfo.Duration = endedAt - segmentsInfo.StartedAt + case types.EgressTypeImages: for _, c := range o { imageInfo := c.(*config.ImageConfig).ImagesInfo diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index ca316125..61301773 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -121,6 +121,7 @@ func (c *Controller) messageWatch(msg *gst.Message) bool { switch msg.Type() { case gst.MessageEOS: logger.Infow("EOS received") + c.eosTimer.Stop() c.p.Stop() return false case gst.MessageWarning: From 3832d5b7e652bd228914ac791f6bd6ceea91a863 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 17 Nov 2023 20:23:57 -0800 Subject: [PATCH 2/4] add log for finding debug files --- pkg/pipeline/debug.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index ec24b420..9ef45eeb 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -68,6 +68,7 @@ func (c *Controller) uploadDebugFiles() { select { case <-done: + logger.Infow("debug files uploaded") return case <-time.After(time.Second * 3): logger.Errorw("failed to upload debug files", errors.New("timed out")) From 5fdfe3f425d18a7c0590c6b1b712300dfec7fde1 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Sat, 18 Nov 2023 14:12:08 -0800 Subject: [PATCH 3/4] more logging --- pkg/pipeline/controller.go | 5 ++++- pkg/pipeline/sink/uploader/uploader.go | 8 +++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 1c5f5288..6714cc67 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -232,6 +232,7 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { return c.Info } + logger.Debugw("closing sinks") for _, si := range c.sinks { for _, s := range si { if err := s.Close(); err != nil { @@ -379,7 +380,7 @@ func (c *Controller) SendEOS(ctx context.Context) { defer span.End() c.eos.Once(func() { - logger.Debugw("Sending EOS") + logger.Debugw("sending EOS") if c.limitTimer != nil { c.limitTimer.Stop() @@ -436,6 +437,8 @@ func (c *Controller) Close() { if c.SourceType == types.SourceTypeSDK || !c.eos.IsBroken() { c.updateDuration(c.src.GetEndedAt()) } + + logger.Debugw("closing source") c.src.Close() now := time.Now().UnixNano() diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index cb5cde09..a540e863 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -26,6 +26,7 @@ import ( "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" ) const ( @@ -83,11 +84,12 @@ type remoteUploader struct { func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) { start := time.Now() location, size, err := u.upload(localFilepath, storageFilepath, outputType) - elapsed := time.Since(start).Milliseconds() + elapsed := time.Since(start) + logger.Debugw("upload complete", "fileType", fileType, "time", elapsed.String()) // success if err == nil { - u.monitor.IncUploadCountSuccess(fileType, float64(elapsed)) + u.monitor.IncUploadCountSuccess(fileType, float64(elapsed.Milliseconds())) if deleteAfterUpload { _ = os.Remove(localFilepath) } @@ -96,7 +98,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp } // failure - u.monitor.IncUploadCountFailure(fileType, float64(elapsed)) + u.monitor.IncUploadCountFailure(fileType, float64(elapsed.Milliseconds())) if u.backup != "" { stat, err := os.Stat(localFilepath) if err != nil { From 36c6551a44e615d2a1cf5214bf271a009d8e74d7 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Sat, 18 Nov 2023 15:31:34 -0800 Subject: [PATCH 4/4] add nil check --- pkg/pipeline/watch.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 61301773..e368b232 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -121,7 +121,9 @@ func (c *Controller) messageWatch(msg *gst.Message) bool { switch msg.Type() { case gst.MessageEOS: logger.Infow("EOS received") - c.eosTimer.Stop() + if c.eosTimer != nil { + c.eosTimer.Stop() + } c.p.Stop() return false case gst.MessageWarning: