Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove delay from stream updates #756

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str
func (s *Stream) UpdateEndTime(endedAt int64) {
s.StreamInfo.EndedAt = endedAt
if s.StreamInfo.StartedAt == 0 {
logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl)
if s.StreamInfo.Status != livekit.StreamInfo_FAILED {
logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl)
}
s.StreamInfo.StartedAt = endedAt
} else {
s.StreamInfo.Duration = endedAt - s.StreamInfo.StartedAt
Expand Down
47 changes: 27 additions & 20 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync"
"time"

"github.com/bep/debounce"
"github.com/frostbyte73/core"
"github.com/go-gst/go-gst/gst"
"go.uber.org/zap"
Expand All @@ -47,7 +46,6 @@ const (
type Controller struct {
*config.PipelineConfig
ipcServiceClient ipc.EgressServiceClient
streamUpdates func(func()) // debounce stream updates since they can come in quick succession

// gstreamer
gstLogger *zap.SugaredLogger
Expand Down Expand Up @@ -75,7 +73,6 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.
c := &Controller{
PipelineConfig: conf,
ipcServiceClient: ipcServiceClient,
streamUpdates: debounce.New(time.Millisecond * 500),
gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)),
callbacks: &gstreamer.Callbacks{
GstReady: make(chan struct{}),
Expand Down Expand Up @@ -267,7 +264,6 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
}

errs := errors.ErrArray{}
sendUpdate := false

// add stream outputs first
for _, rawUrl := range req.AddOutputUrls {
Expand All @@ -287,9 +283,10 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
c.mu.Unlock()

// add stream
sendUpdate = true
if err = c.streamBin.AddStream(stream); err != nil {
stream.StreamInfo.Status = livekit.StreamInfo_FAILED
stream.StreamInfo.Error = err.Error()
stream.UpdateEndTime(time.Now().UnixNano())
errs.AppendErr(err)
continue
}
Expand All @@ -305,19 +302,12 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
continue
}

sendUpdate = true
if err = c.streamFinished(ctx, stream); err != nil {
errs.AppendErr(err)
}
}

if sendUpdate {
c.Info.UpdatedAt = time.Now().UnixNano()
c.streamUpdates(func() {
_, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info))
})
}

c.streamUpdated(ctx)
return errs.ToError()
}

Expand Down Expand Up @@ -366,10 +356,7 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st
"duration", stream.StreamInfo.Duration,
"error", streamErr)

c.streamUpdates(func() {
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
})

c.streamUpdated(ctx)
return c.streamBin.RemoveStream(stream)
}

Expand Down Expand Up @@ -546,14 +533,34 @@ func (c *Controller) updateStreamStartTime(streamID string) {
logger.Debugw("stream started", "url", stream.RedactedUrl)
stream.StreamInfo.StartedAt = time.Now().UnixNano()
c.Info.UpdatedAt = time.Now().UnixNano()
c.streamUpdates(func() {
_, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info))
})
return false
}
return true
})
c.streamUpdated(context.Background())
}
}

func (c *Controller) streamUpdated(ctx context.Context) {
c.Info.UpdatedAt = time.Now().UnixNano()

if o := c.GetStreamConfig(); o != nil {
skipUpdate := false
// when adding streams, wait until they've all either started or failed before sending the update
o.Streams.Range(func(_, stream any) bool {
streamInfo := stream.(*config.Stream).StreamInfo
if streamInfo.Status == livekit.StreamInfo_ACTIVE && streamInfo.StartedAt == 0 {
skipUpdate = true
return false
}
return true
})
if skipUpdate {
return
}
}

_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
}

func (c *Controller) updateDuration(endedAt int64) {
Expand Down
1 change: 0 additions & 1 deletion test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[s
e = expected[s.Url]
}
if e == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE {
// expecting another update
failureStillActive = true
continue
}
Expand Down
Loading