diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index c7ad1875..618c4899 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -37,8 +37,8 @@ import ( ) const ( - pipelineSource = "pipeline" - eosTimeout = time.Second * 30 + pipelineName = "pipeline" + eosTimeout = time.Second * 30 ) type Controller struct { @@ -111,11 +111,14 @@ func New(ctx context.Context, conf *config.PipelineConfig) (*Controller, error) func (c *Controller) BuildPipeline() error { logger.Debugw("building pipeline") - p, err := gstreamer.NewPipeline("pipeline", c.Latency, c.callbacks) + p, err := gstreamer.NewPipeline(pipelineName, c.Latency, c.callbacks) if err != nil { return errors.ErrGstPipelineError(err) } + p.SetWatch(c.messageWatch) + p.AddOnStop(c.OnStop) + if c.AudioEnabled { audioBin, err := builder.BuildAudioBin(p, c.PipelineConfig) if err != nil { @@ -160,9 +163,6 @@ func (c *Controller) BuildPipeline() error { } } - p.SetWatch(c.messageWatch) - p.AddOnStop(c.OnStop) - if err = p.Link(); err != nil { return err } @@ -486,6 +486,11 @@ func (c *Controller) SendEOS(ctx context.Context) { c.p.SendEOS() }() } + + switch c.src.(type) { + case *source.WebSource: + c.updateDuration(c.src.GetEndedAt()) + } }) } @@ -507,8 +512,16 @@ func (c *Controller) OnStop() error { if c.eosTimer != nil { c.eosTimer.Stop() } - endedAt := c.src.GetEndedAt() + switch c.src.(type) { + case *source.SDKSource: + c.updateDuration(c.src.GetEndedAt()) + } + + return nil +} + +func (c *Controller) updateDuration(endedAt int64) { for egressType, o := range c.Outputs { switch egressType { case types.EgressTypeStream, types.EgressTypeWebsocket: @@ -538,6 +551,4 @@ func (c *Controller) OnStop() error { segmentsInfo.Duration = endedAt - segmentsInfo.StartedAt } } - - return nil } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index bc766e8c..3307cfc3 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -231,7 +231,7 @@ func (c *Controller) handleMessageStateChanged(msg *gst.Message) { } s := msg.Source() - if s == pipelineSource { + if s == pipelineName { logger.Infow("pipeline playing") c.playing.Break() @@ -254,10 +254,6 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { if s != nil { switch s.Name() { case msgFragmentOpened: - if timer := c.eosTimer; timer != nil { - timer.Reset(eosTimeout) - } - filepath, t, err := getSegmentParamsFromGstStructure(s) if err != nil { logger.Errorw("failed to retrieve segment parameters from event", err) @@ -270,10 +266,6 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { } case msgFragmentClosed: - if timer := c.eosTimer; timer != nil { - timer.Reset(eosTimeout) - } - filepath, t, err := getSegmentParamsFromGstStructure(s) if err != nil { logger.Errorw("failed to retrieve segment parameters from event", err, "location", filepath, "runningTime", t) diff --git a/test/ffprobe.go b/test/ffprobe.go index d10a4eab..a79ea1c9 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -237,7 +237,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre bitrate, err := strconv.Atoi(stream.BitRate) require.NoError(t, err) require.NotZero(t, bitrate) - require.Less(t, int32(bitrate), p.VideoBitrate*1010) + require.Less(t, int32(bitrate), p.VideoBitrate*1050) // framerate frac := strings.Split(stream.AvgFrameRate, "/")