Skip to content

Commit

Permalink
async-connect causes state issues
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Aug 29, 2024
1 parent 3ce5588 commit f129ffe
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
4 changes: 3 additions & 1 deletion pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,13 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {
}

// start with defaults
now := time.Now().UnixNano()
p.Info = &info.EgressInfo{
EgressId: request.EgressId,
RoomId: request.RoomId,
Status: livekit.EgressStatus_EGRESS_STARTING,
UpdatedAt: time.Now().UnixNano(),
StartedAt: now,
UpdatedAt: now,
}
p.AudioConfig = AudioConfig{
AudioBitrate: 128,
Expand Down
3 changes: 3 additions & 0 deletions pkg/pipeline/builder/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func (sb *StreamBin) AddStream(stream *config.Stream) error {
if err = sink.Set("location", stream.ParsedUrl); err != nil {
return errors.ErrGstPipelineError(err)
}
if err = sink.SetProperty("async-connect", false); err != nil {
return errors.ErrGstPipelineError(err)
}

case types.OutputTypeSRT:
sink, err = gst.NewElementWithName("srtsink", fmt.Sprintf("srtsink_%s", stream.Name))
Expand Down
25 changes: 15 additions & 10 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo {
ctx, span := tracer.Start(ctx, "Pipeline.Run")
defer span.End()

c.Info.StartedAt = time.Now().UnixNano()
defer c.Close()

// session limit timer
Expand Down Expand Up @@ -380,6 +379,8 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) {
}

c.Info.Details = fmt.Sprintf("end reason: %s", reason)
logger.Debugw("stopping pipeline", "reason", reason)

switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.SetAborted(info.MsgStoppedBeforeStarted)
Expand All @@ -392,29 +393,30 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) {
case livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ENDING)
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
c.sendEOS()
c.sendEOS(reason)

case livekit.EgressStatus_EGRESS_ENDING:
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
c.sendEOS()
c.sendEOS(reason)

case livekit.EgressStatus_EGRESS_LIMIT_REACHED:
c.sendEOS()
c.sendEOS(reason)
}

if c.SourceType == types.SourceTypeWeb {
c.updateDuration(c.src.GetEndedAt())
// web source uses the current time
c.updateEndTime()
}
})
}

func (c *Controller) sendEOS() {
func (c *Controller) sendEOS(reason string) {
c.eosTimer = time.AfterFunc(time.Second*30, func() {
c.OnError(errors.ErrPipelineFrozen)
})
go func() {
logger.Debugw("sending EOS")
c.p.SendEOS()
logger.Debugw("eos sent")
}()
}

Expand All @@ -432,7 +434,8 @@ func (c *Controller) OnError(err error) {

func (c *Controller) Close() {
if c.SourceType == types.SourceTypeSDK || !c.eos.IsBroken() {
c.updateDuration(c.src.GetEndedAt())
// sdk source will use the timestamp of the last packet pushed to the pipeline
c.updateEndTime()
}

// update status
Expand Down Expand Up @@ -543,11 +546,11 @@ func (c *Controller) updateStreamStartTime(streamID string) {
logger.Debugw("stream started", "url", stream.RedactedUrl)
stream.StreamInfo.StartedAt = time.Now().UnixNano()
c.Info.UpdatedAt = time.Now().UnixNano()
c.streamUpdated(context.Background())
return false
}
return true
})
c.streamUpdated(context.Background())
}
}

Expand All @@ -573,7 +576,9 @@ func (c *Controller) streamUpdated(ctx context.Context) {
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
}

func (c *Controller) updateDuration(endedAt int64) {
func (c *Controller) updateEndTime() {
endedAt := c.src.GetEndedAt()

for egressType, o := range c.Outputs {
if len(o) == 0 {
continue
Expand Down
49 changes: 49 additions & 0 deletions test/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
Expand All @@ -40,6 +41,7 @@ func (r *Runner) testEdgeCases(t *testing.T) {
r.testRtmpFailure(t)
r.testSrtFailure(t)
r.testTrackDisconnection(t)
r.testEmptyStreamBin(t)
})
}

Expand Down Expand Up @@ -247,3 +249,50 @@ func (r *Runner) testTrackDisconnection(t *testing.T) {
r.runFileTest(t, req, test)
})
}

func (r *Runner) testEmptyStreamBin(t *testing.T) {
r.runRoomTest(t, "Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: &livekit.RoomCompositeEgressRequest{
RoomName: r.room.Name(),
Layout: "grid-light",
StreamOutputs: []*livekit.StreamOutput{{
Urls: []string{rtmpUrl1, badRtmpUrl1},
}},
SegmentOutputs: []*livekit.SegmentedFileOutput{{
FilenamePrefix: path.Join(r.FilePrefix, "empty_stream_{time}"),
PlaylistName: "empty_stream_{time}",
}},
},
},
}

info := r.sendRequest(t, req)
egressID := info.EgressId
time.Sleep(time.Second * 15)

// get params
p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req)
require.NoError(t, err)

r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{
rtmpUrl1Redacted: livekit.StreamInfo_ACTIVE,
badRtmpUrl1Redacted: livekit.StreamInfo_FAILED,
})
_, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{
EgressId: egressID,
RemoveOutputUrls: []string{rtmpUrl1},
})
require.NoError(t, err)
r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{
rtmpUrl1Redacted: livekit.StreamInfo_FINISHED,
badRtmpUrl1Redacted: livekit.StreamInfo_FAILED,
})

time.Sleep(time.Second * 10)
res := r.stopEgress(t, egressID)
r.verifySegments(t, p, livekit.SegmentedFileSuffix_INDEX, res, false)
})
}

0 comments on commit f129ffe

Please sign in to comment.