Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eos timer #541

Merged
merged 6 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Controller struct {
limitTimer *time.Timer
playing core.Fuse
eos core.Fuse
eosTimer *time.Timer
stopped core.Fuse
}

Expand Down Expand Up @@ -231,6 +232,7 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
return c.Info
}

logger.Debugw("closing sinks")
for _, si := range c.sinks {
for _, s := range si {
if err := s.Close(); err != nil {
Expand Down Expand Up @@ -378,7 +380,7 @@ func (c *Controller) SendEOS(ctx context.Context) {
defer span.End()

c.eos.Once(func() {
logger.Debugw("Sending EOS")
logger.Debugw("sending EOS")

if c.limitTimer != nil {
c.limitTimer.Stop()
Expand All @@ -405,7 +407,12 @@ func (c *Controller) SendEOS(ctx context.Context) {

case livekit.EgressStatus_EGRESS_ENDING,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
go c.p.SendEOS()
go func() {
c.eosTimer = time.AfterFunc(time.Second*30, func() {
c.OnError(errors.ErrPipelineFrozen)
})
c.p.SendEOS()
Copy link
Contributor

@biglittlebigben biglittlebigben Nov 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be confusing if we mark the session as failed, but the media is still available in the end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the media won't be available if the eos is never received

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(unless it's HLS, in which case the playlist will not be closed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is about calling c.OnError(errors.ErrPipelineFrozen) after 30s.

}()
}

if c.SourceType == types.SourceTypeWeb {
Expand All @@ -430,6 +437,8 @@ func (c *Controller) Close() {
if c.SourceType == types.SourceTypeSDK || !c.eos.IsBroken() {
c.updateDuration(c.src.GetEndedAt())
}

logger.Debugw("closing source")
c.src.Close()

now := time.Now().UnixNano()
Expand Down Expand Up @@ -518,6 +527,7 @@ func (c *Controller) updateStartTime(startedAt int64) {

case types.EgressTypeSegments:
o[0].(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt

case types.EgressTypeImages:
for _, c := range o {
c.(*config.ImageConfig).ImagesInfo.StartedAt = startedAt
Expand Down Expand Up @@ -563,6 +573,7 @@ func (c *Controller) updateDuration(endedAt int64) {
}
segmentsInfo.EndedAt = endedAt
segmentsInfo.Duration = endedAt - segmentsInfo.StartedAt

case types.EgressTypeImages:
for _, c := range o {
imageInfo := c.(*config.ImageConfig).ImagesInfo
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (c *Controller) uploadDebugFiles() {

select {
case <-done:
logger.Infow("debug files uploaded")
return
case <-time.After(time.Second * 3):
logger.Errorw("failed to upload debug files", errors.New("timed out"))
Expand Down
8 changes: 5 additions & 3 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

const (
Expand Down Expand Up @@ -83,11 +84,12 @@ type remoteUploader struct {
func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) {
start := time.Now()
location, size, err := u.upload(localFilepath, storageFilepath, outputType)
elapsed := time.Since(start).Milliseconds()
elapsed := time.Since(start)
logger.Debugw("upload complete", "fileType", fileType, "time", elapsed.String())

// success
if err == nil {
u.monitor.IncUploadCountSuccess(fileType, float64(elapsed))
u.monitor.IncUploadCountSuccess(fileType, float64(elapsed.Milliseconds()))
if deleteAfterUpload {
_ = os.Remove(localFilepath)
}
Expand All @@ -96,7 +98,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp
}

// failure
u.monitor.IncUploadCountFailure(fileType, float64(elapsed))
u.monitor.IncUploadCountFailure(fileType, float64(elapsed.Milliseconds()))
if u.backup != "" {
stat, err := os.Stat(localFilepath)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ func (c *Controller) messageWatch(msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageEOS:
logger.Infow("EOS received")
if c.eosTimer != nil {
c.eosTimer.Stop()
}
c.p.Stop()
return false
case gst.MessageWarning:
Expand Down