Skip to content

Commit

Permalink
redo stream updates
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Aug 9, 2024
1 parent 8602b26 commit de94004
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 44 deletions.
4 changes: 3 additions & 1 deletion pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str
func (s *Stream) UpdateEndTime(endedAt int64) {
s.StreamInfo.EndedAt = endedAt
if s.StreamInfo.StartedAt == 0 {
logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl)
if s.StreamInfo.Status != livekit.StreamInfo_FAILED {
logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl)
}
s.StreamInfo.StartedAt = endedAt
} else {
s.StreamInfo.Duration = endedAt - s.StreamInfo.StartedAt
Expand Down
47 changes: 27 additions & 20 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync"
"time"

"github.com/bep/debounce"
"github.com/frostbyte73/core"
"github.com/go-gst/go-gst/gst"
"go.uber.org/zap"
Expand All @@ -47,7 +46,6 @@ const (
type Controller struct {
*config.PipelineConfig
ipcServiceClient ipc.EgressServiceClient
streamUpdates func(func()) // debounce stream updates since they can come in quick succession

// gstreamer
gstLogger *zap.SugaredLogger
Expand Down Expand Up @@ -75,7 +73,6 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.
c := &Controller{
PipelineConfig: conf,
ipcServiceClient: ipcServiceClient,
streamUpdates: debounce.New(time.Millisecond * 500),
gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)),
callbacks: &gstreamer.Callbacks{
GstReady: make(chan struct{}),
Expand Down Expand Up @@ -267,7 +264,6 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
}

errs := errors.ErrArray{}
sendUpdate := false

// add stream outputs first
for _, rawUrl := range req.AddOutputUrls {
Expand All @@ -287,9 +283,10 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
c.mu.Unlock()

// add stream
sendUpdate = true
if err = c.streamBin.AddStream(stream); err != nil {
stream.StreamInfo.Status = livekit.StreamInfo_FAILED
stream.StreamInfo.Error = err.Error()
stream.UpdateEndTime(time.Now().UnixNano())
errs.AppendErr(err)
continue
}
Expand All @@ -305,19 +302,12 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
continue
}

sendUpdate = true
if err = c.streamFinished(ctx, stream); err != nil {
errs.AppendErr(err)
}
}

if sendUpdate {
c.Info.UpdatedAt = time.Now().UnixNano()
c.streamUpdates(func() {
_, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info))
})
}

c.streamUpdated(ctx)
return errs.ToError()
}

Expand Down Expand Up @@ -366,10 +356,7 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st
"duration", stream.StreamInfo.Duration,
"error", streamErr)

c.streamUpdates(func() {
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
})

c.streamUpdated(ctx)
return c.streamBin.RemoveStream(stream)
}

Expand Down Expand Up @@ -546,14 +533,34 @@ func (c *Controller) updateStreamStartTime(streamID string) {
logger.Debugw("stream started", "url", stream.RedactedUrl)
stream.StreamInfo.StartedAt = time.Now().UnixNano()
c.Info.UpdatedAt = time.Now().UnixNano()
c.streamUpdates(func() {
_, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info))
})
return false
}
return true
})
c.streamUpdated(context.Background())
}
}

func (c *Controller) streamUpdated(ctx context.Context) {
c.Info.UpdatedAt = time.Now().UnixNano()

if o := c.GetStreamConfig(); o != nil {
skipUpdate := false
// when adding streams, wait until they've all either started or failed before sending the update
o.Streams.Range(func(_, stream any) bool {
streamInfo := stream.(*config.Stream).StreamInfo
if streamInfo.Status == livekit.StreamInfo_ACTIVE && streamInfo.StartedAt == 0 {
skipUpdate = true
return false
}
return true
})
if skipUpdate {
return
}
}

_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
}

func (c *Controller) updateDuration(endedAt int64) {
Expand Down
34 changes: 11 additions & 23 deletions test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,32 +146,20 @@ func (r *Runner) checkUpdate(t *testing.T, egressID string, status livekit.Egres
}

func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[string]livekit.StreamInfo_Status) {
for {
info := r.getUpdate(t, egressID)
require.Equal(t, len(expected), len(info.StreamResults))

failureStillActive := false
for _, s := range info.StreamResults {
require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "")

var e livekit.StreamInfo_Status
if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") {
e = expected[badRtmpUrl1Redacted]
} else {
e = expected[s.Url]
}
if e == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE {
// expecting another update
failureStillActive = true
continue
}
info := r.getUpdate(t, egressID)
require.Equal(t, len(expected), len(info.StreamResults))

require.Equal(t, e, s.Status)
}
for _, s := range info.StreamResults {
require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "")

if !failureStillActive {
return
var e livekit.StreamInfo_Status
if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") {
e = expected[badRtmpUrl1Redacted]
} else {
e = expected[s.Url]
}

require.Equal(t, e, s.Status)
}
}

Expand Down

0 comments on commit de94004

Please sign in to comment.