diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index cb7cb980..b0cd7ffa 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -160,11 +160,13 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { } // start with defaults + now := time.Now().UnixNano() p.Info = &info.EgressInfo{ EgressId: request.EgressId, RoomId: request.RoomId, Status: livekit.EgressStatus_EGRESS_STARTING, - UpdatedAt: time.Now().UnixNano(), + StartedAt: now, + UpdatedAt: now, } p.AudioConfig = AudioConfig{ AudioBitrate: 128, diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index 7cb95d3d..623d30e2 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -134,6 +134,9 @@ func (sb *StreamBin) AddStream(stream *config.Stream) error { if err = sink.Set("location", stream.ParsedUrl); err != nil { return errors.ErrGstPipelineError(err) } + if err = sink.SetProperty("async-connect", false); err != nil { + return errors.ErrGstPipelineError(err) + } case types.OutputTypeSRT: sink, err = gst.NewElementWithName("srtsink", fmt.Sprintf("srtsink_%s", stream.Name)) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 13d9d1ac..7bb26f29 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -197,7 +197,6 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { ctx, span := tracer.Start(ctx, "Pipeline.Run") defer span.End() - c.Info.StartedAt = time.Now().UnixNano() defer c.Close() // session limit timer @@ -380,6 +379,8 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) { } c.Info.Details = fmt.Sprintf("end reason: %s", reason) + logger.Debugw("stopping pipeline", "reason", reason) + switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: c.Info.SetAborted(info.MsgStoppedBeforeStarted) @@ -403,7 +404,8 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) { } if c.SourceType == types.SourceTypeWeb { - c.updateDuration(c.src.GetEndedAt()) + // web source uses the current time + c.updateEndTime() } }) } @@ -413,8 +415,8 @@ func (c *Controller) sendEOS() { c.OnError(errors.ErrPipelineFrozen) }) go func() { - logger.Debugw("sending EOS") c.p.SendEOS() + logger.Debugw("eos sent") }() } @@ -432,7 +434,8 @@ func (c *Controller) OnError(err error) { func (c *Controller) Close() { if c.SourceType == types.SourceTypeSDK || !c.eos.IsBroken() { - c.updateDuration(c.src.GetEndedAt()) + // sdk source will use the timestamp of the last packet pushed to the pipeline + c.updateEndTime() } // update status @@ -543,11 +546,11 @@ func (c *Controller) updateStreamStartTime(streamID string) { logger.Debugw("stream started", "url", stream.RedactedUrl) stream.StreamInfo.StartedAt = time.Now().UnixNano() c.Info.UpdatedAt = time.Now().UnixNano() + c.streamUpdated(context.Background()) return false } return true }) - c.streamUpdated(context.Background()) } } @@ -573,7 +576,9 @@ func (c *Controller) streamUpdated(ctx context.Context) { _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) } -func (c *Controller) updateDuration(endedAt int64) { +func (c *Controller) updateEndTime() { + endedAt := c.src.GetEndedAt() + for egressType, o := range c.Outputs { if len(o) == 0 { continue diff --git a/test/edge.go b/test/edge.go index 38f171b4..8360d4c4 100644 --- a/test/edge.go +++ b/test/edge.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/rpc" @@ -40,6 +41,7 @@ func (r *Runner) testEdgeCases(t *testing.T) { r.testRtmpFailure(t) r.testSrtFailure(t) r.testTrackDisconnection(t) + r.testEmptyStreamBin(t) }) } @@ -247,3 +249,50 @@ func (r *Runner) testTrackDisconnection(t *testing.T) { r.runFileTest(t, req, test) }) } + +func (r *Runner) testEmptyStreamBin(t *testing.T) { + r.runRoomTest(t, "Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_RoomComposite{ + RoomComposite: &livekit.RoomCompositeEgressRequest{ + RoomName: r.room.Name(), + Layout: "grid-light", + StreamOutputs: []*livekit.StreamOutput{{ + Urls: []string{rtmpUrl1, badRtmpUrl1}, + }}, + SegmentOutputs: []*livekit.SegmentedFileOutput{{ + FilenamePrefix: path.Join(r.FilePrefix, "empty_stream_{time}"), + PlaylistName: "empty_stream_{time}", + }}, + }, + }, + } + + info := r.sendRequest(t, req) + egressID := info.EgressId + time.Sleep(time.Second * 15) + + // get params + p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) + require.NoError(t, err) + + r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ + rtmpUrl1Redacted: livekit.StreamInfo_ACTIVE, + badRtmpUrl1Redacted: livekit.StreamInfo_FAILED, + }) + _, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{ + EgressId: egressID, + RemoveOutputUrls: []string{rtmpUrl1}, + }) + require.NoError(t, err) + r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ + rtmpUrl1Redacted: livekit.StreamInfo_FINISHED, + badRtmpUrl1Redacted: livekit.StreamInfo_FAILED, + }) + + time.Sleep(time.Second * 10) + res := r.stopEgress(t, egressID) + r.verifySegments(t, p, livekit.SegmentedFileSuffix_INDEX, res, false) + }) +}