Skip to content

Commit

Permalink
Status messaging, keyframe intervals (#611)
Browse files Browse the repository at this point in the history
* Status messaging, keyframe intervals

* use ABORTED if limit reached but not started
  • Loading branch information
frostbyte73 authored Feb 15, 2024
1 parent bfc460d commit 1c57bde
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 23 deletions.
8 changes: 4 additions & 4 deletions pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error {
}
}

if segmentConf := p.Outputs[types.EgressTypeSegments]; segmentConf != nil && len(segmentConf) > 0 {
// double the segment length
p.KeyFrameInterval = float64(2 * segmentConf[0].(*SegmentConfig).SegmentDuration)
} else if p.KeyFrameInterval == 0 && p.Outputs[types.EgressTypeStream] != nil {
if p.KeyFrameInterval == 0 && p.Outputs[types.EgressTypeStream] != nil {
// default 4s for streams
p.KeyFrameInterval = 4
} else if segmentConf := p.Outputs[types.EgressTypeSegments]; segmentConf != nil && len(segmentConf) > 0 {
// double the segment length
p.KeyFrameInterval = float64(2 * segmentConf[0].(*SegmentConfig).SegmentDuration)
}

err := p.updateImageOutputs(req)
Expand Down
5 changes: 1 addition & 4 deletions pkg/gstreamer/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package gstreamer
import (
"time"

"github.com/frostbyte73/core"
"github.com/go-gst/go-glib/glib"
"github.com/go-gst/go-gst/gst"

Expand All @@ -35,7 +34,6 @@ type Pipeline struct {
loop *glib.MainLoop
binsAdded bool
elementsAdded bool
stopped core.Fuse
}

// A pipeline can have either elements or src and sink bins. If you add both you will get a wrong hierarchy error
Expand All @@ -55,8 +53,7 @@ func NewPipeline(name string, latency uint64, callbacks *Callbacks) (*Pipeline,
latency: latency,
queues: make(map[string]*gst.Element),
},
loop: glib.NewMainLoop(glib.MainContextDefault(), false),
stopped: core.NewFuse(),
loop: glib.NewMainLoop(glib.MainContextDefault(), false),
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.
if !errors.IsFatal(err) {
// user error, send update
now := time.Now().UnixNano()
conf.Info.UpdatedAt = now
conf.Info.EndedAt = now
conf.Info.Status = livekit.EgressStatus_EGRESS_FAILED
conf.Info.Error = err.Error()
conf.Info.UpdatedAt = now
conf.Info.EndedAt = now
_, _ = h.ioClient.UpdateEgress(context.Background(), conf.Info)
}
return nil, err
Expand Down
28 changes: 15 additions & 13 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,15 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
for _, si := range c.sinks {
for _, s := range si {
if err := s.Start(); err != nil {
c.Info.Status = livekit.EgressStatus_EGRESS_FAILED
c.Info.Error = err.Error()
return c.Info
}
}
}

if err := c.p.Run(); err != nil {
c.Info.Status = livekit.EgressStatus_EGRESS_FAILED
c.Info.Error = err.Error()
return c.Info
}
Expand All @@ -240,6 +242,7 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
for _, si := range c.sinks {
for _, s := range si {
if err := s.Close(); err != nil {
c.Info.Status = livekit.EgressStatus_EGRESS_FAILED
c.Info.Error = err.Error()
return c.Info
}
Expand Down Expand Up @@ -400,18 +403,14 @@ func (c *Controller) SendEOS(ctx context.Context) {
c.p.Stop()

case livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.UpdatedAt = time.Now().UnixNano()
if c.Info.Error != "" {
c.Info.Status = livekit.EgressStatus_EGRESS_FAILED
c.p.Stop()
} else {
c.Info.Status = livekit.EgressStatus_EGRESS_ENDING
_, _ = c.ioClient.UpdateEgress(ctx, c.Info)
}
c.Info.Status = livekit.EgressStatus_EGRESS_ENDING
fallthrough

case livekit.EgressStatus_EGRESS_ENDING,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
c.Info.UpdatedAt = time.Now().UnixNano()
_, _ = c.ioClient.UpdateEgress(ctx, c.Info)

go func() {
c.eosTimer = time.AfterFunc(time.Second*30, func() {
c.OnError(errors.ErrPipelineFrozen)
Expand All @@ -431,7 +430,8 @@ func (c *Controller) OnError(err error) {
c.uploadDebugFiles()
}

if c.Info.Error == "" && (!c.eos.IsBroken() || c.FinalizationRequired) {
if c.Info.Status != livekit.EgressStatus_EGRESS_FAILED && (!c.eos.IsBroken() || c.FinalizationRequired) {
c.Info.Status = livekit.EgressStatus_EGRESS_FAILED
c.Info.Error = err.Error()
}

Expand All @@ -451,8 +451,7 @@ func (c *Controller) Close() {
c.Info.EndedAt = now

// update status
if c.Info.Error != "" && c.Info.Status != livekit.EgressStatus_EGRESS_ABORTED {
c.Info.Status = livekit.EgressStatus_EGRESS_FAILED
if c.Info.Status == livekit.EgressStatus_EGRESS_FAILED {
if o := c.GetStreamConfig(); o != nil {
for _, streamInfo := range o.StreamInfo {
streamInfo.Status = livekit.StreamInfo_FAILED
Expand Down Expand Up @@ -501,9 +500,12 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) {
if timeout > 0 {
c.limitTimer = time.AfterFunc(timeout, func() {
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING,
livekit.EgressStatus_EGRESS_ACTIVE:
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED
c.Info.Error = "Session limit reached before start signal"
case livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED
c.Info.Error = "Session limit reached"
}
if c.playing.IsBroken() {
c.SendEOS(ctx)
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/version"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
)
Expand Down Expand Up @@ -194,7 +195,11 @@ func (s *Service) killProcess(egressID string, maxUsage float64) {

if h, ok := s.activeHandlers[egressID]; ok {
logger.Errorw("killing egress", errors.ErrCPUExhausted, "egressID", egressID, "usage", maxUsage)
now := time.Now().UnixNano()
h.info.Status = livekit.EgressStatus_EGRESS_FAILED
h.info.Error = errors.ErrCPUExhausted.Error()
h.info.UpdatedAt = now
h.info.EndedAt = now
h.kill()
}
}
Expand Down

0 comments on commit 1c57bde

Please sign in to comment.