diff --git a/pkg/config/output_stream.go b/pkg/config/output_stream.go index 03793f00..afad1a2e 100644 --- a/pkg/config/output_stream.go +++ b/pkg/config/output_stream.go @@ -86,7 +86,9 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str func (s *Stream) UpdateEndTime(endedAt int64) { s.StreamInfo.EndedAt = endedAt if s.StreamInfo.StartedAt == 0 { - logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl) + if s.StreamInfo.Status != livekit.StreamInfo_FAILED { + logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl) + } s.StreamInfo.StartedAt = endedAt } else { s.StreamInfo.Duration = endedAt - s.StreamInfo.StartedAt diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 68db8de2..78cb6954 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -20,7 +20,6 @@ import ( "sync" "time" - "github.com/bep/debounce" "github.com/frostbyte73/core" "github.com/go-gst/go-gst/gst" "go.uber.org/zap" @@ -47,7 +46,6 @@ const ( type Controller struct { *config.PipelineConfig ipcServiceClient ipc.EgressServiceClient - streamUpdates func(func()) // debounce stream updates since they can come in quick succession // gstreamer gstLogger *zap.SugaredLogger @@ -75,7 +73,6 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc. c := &Controller{ PipelineConfig: conf, ipcServiceClient: ipcServiceClient, - streamUpdates: debounce.New(time.Millisecond * 500), gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), callbacks: &gstreamer.Callbacks{ GstReady: make(chan struct{}), @@ -267,7 +264,6 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream } errs := errors.ErrArray{} - sendUpdate := false // add stream outputs first for _, rawUrl := range req.AddOutputUrls { @@ -287,9 +283,10 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream c.mu.Unlock() // add stream - sendUpdate = true if err = c.streamBin.AddStream(stream); err != nil { stream.StreamInfo.Status = livekit.StreamInfo_FAILED + stream.StreamInfo.Error = err.Error() + stream.UpdateEndTime(time.Now().UnixNano()) errs.AppendErr(err) continue } @@ -305,19 +302,12 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream continue } - sendUpdate = true if err = c.streamFinished(ctx, stream); err != nil { errs.AppendErr(err) } } - if sendUpdate { - c.Info.UpdatedAt = time.Now().UnixNano() - c.streamUpdates(func() { - _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info)) - }) - } - + c.streamUpdated(ctx) return errs.ToError() } @@ -366,10 +356,7 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st "duration", stream.StreamInfo.Duration, "error", streamErr) - c.streamUpdates(func() { - _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) - }) - + c.streamUpdated(ctx) return c.streamBin.RemoveStream(stream) } @@ -546,14 +533,34 @@ func (c *Controller) updateStreamStartTime(streamID string) { logger.Debugw("stream started", "url", stream.RedactedUrl) stream.StreamInfo.StartedAt = time.Now().UnixNano() c.Info.UpdatedAt = time.Now().UnixNano() - c.streamUpdates(func() { - _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info)) - }) return false } return true }) + c.streamUpdated(context.Background()) + } +} + +func (c *Controller) streamUpdated(ctx context.Context) { + c.Info.UpdatedAt = time.Now().UnixNano() + + if o := c.GetStreamConfig(); o != nil { + skipUpdate := false + // when adding streams, wait until they've all either started or failed before sending the update + o.Streams.Range(func(_, stream any) bool { + streamInfo := stream.(*config.Stream).StreamInfo + if streamInfo.Status == livekit.StreamInfo_ACTIVE && streamInfo.StartedAt == 0 { + skipUpdate = true + return false + } + return true + }) + if skipUpdate { + return + } } + + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) } func (c *Controller) updateDuration(endedAt int64) { diff --git a/test/integration.go b/test/integration.go index ae99fc01..ab3f5e45 100644 --- a/test/integration.go +++ b/test/integration.go @@ -161,7 +161,6 @@ func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[s e = expected[s.Url] } if e == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE { - // expecting another update failureStillActive = true continue }