Skip to content

Commit

Permalink
Eos timer (#541)
Browse files Browse the repository at this point in the history
* Add EOS timeout

* add log for finding debug files

* more logging

* add nil check
  • Loading branch information
frostbyte73 authored Nov 20, 2023
1 parent fe33d99 commit b8ab069
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
15 changes: 13 additions & 2 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Controller struct {
limitTimer *time.Timer
playing core.Fuse
eos core.Fuse
eosTimer *time.Timer
stopped core.Fuse
}

Expand Down Expand Up @@ -231,6 +232,7 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
return c.Info
}

logger.Debugw("closing sinks")
for _, si := range c.sinks {
for _, s := range si {
if err := s.Close(); err != nil {
Expand Down Expand Up @@ -378,7 +380,7 @@ func (c *Controller) SendEOS(ctx context.Context) {
defer span.End()

c.eos.Once(func() {
logger.Debugw("Sending EOS")
logger.Debugw("sending EOS")

if c.limitTimer != nil {
c.limitTimer.Stop()
Expand All @@ -405,7 +407,12 @@ func (c *Controller) SendEOS(ctx context.Context) {

case livekit.EgressStatus_EGRESS_ENDING,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
go c.p.SendEOS()
go func() {
c.eosTimer = time.AfterFunc(time.Second*30, func() {
c.OnError(errors.ErrPipelineFrozen)
})
c.p.SendEOS()
}()
}

if c.SourceType == types.SourceTypeWeb {
Expand All @@ -430,6 +437,8 @@ func (c *Controller) Close() {
if c.SourceType == types.SourceTypeSDK || !c.eos.IsBroken() {
c.updateDuration(c.src.GetEndedAt())
}

logger.Debugw("closing source")
c.src.Close()

now := time.Now().UnixNano()
Expand Down Expand Up @@ -518,6 +527,7 @@ func (c *Controller) updateStartTime(startedAt int64) {

case types.EgressTypeSegments:
o[0].(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt

case types.EgressTypeImages:
for _, c := range o {
c.(*config.ImageConfig).ImagesInfo.StartedAt = startedAt
Expand Down Expand Up @@ -563,6 +573,7 @@ func (c *Controller) updateDuration(endedAt int64) {
}
segmentsInfo.EndedAt = endedAt
segmentsInfo.Duration = endedAt - segmentsInfo.StartedAt

case types.EgressTypeImages:
for _, c := range o {
imageInfo := c.(*config.ImageConfig).ImagesInfo
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (c *Controller) uploadDebugFiles() {

select {
case <-done:
logger.Infow("debug files uploaded")
return
case <-time.After(time.Second * 3):
logger.Errorw("failed to upload debug files", errors.New("timed out"))
Expand Down
8 changes: 5 additions & 3 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

const (
Expand Down Expand Up @@ -83,11 +84,12 @@ type remoteUploader struct {
func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) {
start := time.Now()
location, size, err := u.upload(localFilepath, storageFilepath, outputType)
elapsed := time.Since(start).Milliseconds()
elapsed := time.Since(start)
logger.Debugw("upload complete", "fileType", fileType, "time", elapsed.String())

// success
if err == nil {
u.monitor.IncUploadCountSuccess(fileType, float64(elapsed))
u.monitor.IncUploadCountSuccess(fileType, float64(elapsed.Milliseconds()))
if deleteAfterUpload {
_ = os.Remove(localFilepath)
}
Expand All @@ -96,7 +98,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp
}

// failure
u.monitor.IncUploadCountFailure(fileType, float64(elapsed))
u.monitor.IncUploadCountFailure(fileType, float64(elapsed.Milliseconds()))
if u.backup != "" {
stat, err := os.Stat(localFilepath)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ func (c *Controller) messageWatch(msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageEOS:
logger.Infow("EOS received")
if c.eosTimer != nil {
c.eosTimer.Stop()
}
c.p.Stop()
return false
case gst.MessageWarning:
Expand Down

0 comments on commit b8ab069

Please sign in to comment.