From de9400453d8c078fcef9fed20454375cb157cace Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 9 Aug 2024 19:23:04 -0400 Subject: [PATCH 1/2] redo stream updates --- pkg/config/output_stream.go | 4 +++- pkg/pipeline/controller.go | 47 +++++++++++++++++++++---------------- test/integration.go | 34 +++++++++------------------ 3 files changed, 41 insertions(+), 44 deletions(-) diff --git a/pkg/config/output_stream.go b/pkg/config/output_stream.go index 03793f00..afad1a2e 100644 --- a/pkg/config/output_stream.go +++ b/pkg/config/output_stream.go @@ -86,7 +86,9 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str func (s *Stream) UpdateEndTime(endedAt int64) { s.StreamInfo.EndedAt = endedAt if s.StreamInfo.StartedAt == 0 { - logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl) + if s.StreamInfo.Status != livekit.StreamInfo_FAILED { + logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl) + } s.StreamInfo.StartedAt = endedAt } else { s.StreamInfo.Duration = endedAt - s.StreamInfo.StartedAt diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 68db8de2..78cb6954 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -20,7 +20,6 @@ import ( "sync" "time" - "github.com/bep/debounce" "github.com/frostbyte73/core" "github.com/go-gst/go-gst/gst" "go.uber.org/zap" @@ -47,7 +46,6 @@ const ( type Controller struct { *config.PipelineConfig ipcServiceClient ipc.EgressServiceClient - streamUpdates func(func()) // debounce stream updates since they can come in quick succession // gstreamer gstLogger *zap.SugaredLogger @@ -75,7 +73,6 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc. c := &Controller{ PipelineConfig: conf, ipcServiceClient: ipcServiceClient, - streamUpdates: debounce.New(time.Millisecond * 500), gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), callbacks: &gstreamer.Callbacks{ GstReady: make(chan struct{}), @@ -267,7 +264,6 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream } errs := errors.ErrArray{} - sendUpdate := false // add stream outputs first for _, rawUrl := range req.AddOutputUrls { @@ -287,9 +283,10 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream c.mu.Unlock() // add stream - sendUpdate = true if err = c.streamBin.AddStream(stream); err != nil { stream.StreamInfo.Status = livekit.StreamInfo_FAILED + stream.StreamInfo.Error = err.Error() + stream.UpdateEndTime(time.Now().UnixNano()) errs.AppendErr(err) continue } @@ -305,19 +302,12 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream continue } - sendUpdate = true if err = c.streamFinished(ctx, stream); err != nil { errs.AppendErr(err) } } - if sendUpdate { - c.Info.UpdatedAt = time.Now().UnixNano() - c.streamUpdates(func() { - _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info)) - }) - } - + c.streamUpdated(ctx) return errs.ToError() } @@ -366,10 +356,7 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st "duration", stream.StreamInfo.Duration, "error", streamErr) - c.streamUpdates(func() { - _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) - }) - + c.streamUpdated(ctx) return c.streamBin.RemoveStream(stream) } @@ -546,14 +533,34 @@ func (c *Controller) updateStreamStartTime(streamID string) { logger.Debugw("stream started", "url", stream.RedactedUrl) stream.StreamInfo.StartedAt = time.Now().UnixNano() c.Info.UpdatedAt = time.Now().UnixNano() - c.streamUpdates(func() { - _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info)) - }) return false } return true }) + c.streamUpdated(context.Background()) + } +} + +func (c *Controller) streamUpdated(ctx context.Context) { + c.Info.UpdatedAt = time.Now().UnixNano() + + if o := c.GetStreamConfig(); o != nil { + skipUpdate := false + // when adding streams, wait until they've all either started or failed before sending the update + o.Streams.Range(func(_, stream any) bool { + streamInfo := stream.(*config.Stream).StreamInfo + if streamInfo.Status == livekit.StreamInfo_ACTIVE && streamInfo.StartedAt == 0 { + skipUpdate = true + return false + } + return true + }) + if skipUpdate { + return + } } + + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) } func (c *Controller) updateDuration(endedAt int64) { diff --git a/test/integration.go b/test/integration.go index ae99fc01..6ed2f906 100644 --- a/test/integration.go +++ b/test/integration.go @@ -146,32 +146,20 @@ func (r *Runner) checkUpdate(t *testing.T, egressID string, status livekit.Egres } func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[string]livekit.StreamInfo_Status) { - for { - info := r.getUpdate(t, egressID) - require.Equal(t, len(expected), len(info.StreamResults)) - - failureStillActive := false - for _, s := range info.StreamResults { - require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "") - - var e livekit.StreamInfo_Status - if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") { - e = expected[badRtmpUrl1Redacted] - } else { - e = expected[s.Url] - } - if e == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE { - // expecting another update - failureStillActive = true - continue - } + info := r.getUpdate(t, egressID) + require.Equal(t, len(expected), len(info.StreamResults)) - require.Equal(t, e, s.Status) - } + for _, s := range info.StreamResults { + require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "") - if !failureStillActive { - return + var e livekit.StreamInfo_Status + if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") { + e = expected[badRtmpUrl1Redacted] + } else { + e = expected[s.Url] } + + require.Equal(t, e, s.Status) } } From 850913aba1c797e09b2e0fd7ae4ade151545369f Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 9 Aug 2024 19:52:41 -0400 Subject: [PATCH 2/2] revert test change --- test/integration.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/test/integration.go b/test/integration.go index 6ed2f906..ab3f5e45 100644 --- a/test/integration.go +++ b/test/integration.go @@ -146,20 +146,31 @@ func (r *Runner) checkUpdate(t *testing.T, egressID string, status livekit.Egres } func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[string]livekit.StreamInfo_Status) { - info := r.getUpdate(t, egressID) - require.Equal(t, len(expected), len(info.StreamResults)) - - for _, s := range info.StreamResults { - require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "") + for { + info := r.getUpdate(t, egressID) + require.Equal(t, len(expected), len(info.StreamResults)) + + failureStillActive := false + for _, s := range info.StreamResults { + require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "") + + var e livekit.StreamInfo_Status + if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") { + e = expected[badRtmpUrl1Redacted] + } else { + e = expected[s.Url] + } + if e == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE { + failureStillActive = true + continue + } - var e livekit.StreamInfo_Status - if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") { - e = expected[badRtmpUrl1Redacted] - } else { - e = expected[s.Url] + require.Equal(t, e, s.Status) } - require.Equal(t, e, s.Status) + if !failureStillActive { + return + } } }