Skip to content

Commit

Permalink
Fix web durations (#473)
Browse files Browse the repository at this point in the history
* fix web durations

* remove onEOS

* relax video bitrate check

* move pipeline onStop before uploads

* remove eos timer checks from segment sink
  • Loading branch information
frostbyte73 authored Aug 25, 2023
1 parent 09976c9 commit 8016d39
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
29 changes: 20 additions & 9 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (
)

const (
pipelineSource = "pipeline"
eosTimeout = time.Second * 30
pipelineName = "pipeline"
eosTimeout = time.Second * 30
)

type Controller struct {
Expand Down Expand Up @@ -111,11 +111,14 @@ func New(ctx context.Context, conf *config.PipelineConfig) (*Controller, error)
func (c *Controller) BuildPipeline() error {
logger.Debugw("building pipeline")

p, err := gstreamer.NewPipeline("pipeline", c.Latency, c.callbacks)
p, err := gstreamer.NewPipeline(pipelineName, c.Latency, c.callbacks)
if err != nil {
return errors.ErrGstPipelineError(err)
}

p.SetWatch(c.messageWatch)
p.AddOnStop(c.OnStop)

if c.AudioEnabled {
audioBin, err := builder.BuildAudioBin(p, c.PipelineConfig)
if err != nil {
Expand Down Expand Up @@ -160,9 +163,6 @@ func (c *Controller) BuildPipeline() error {
}
}

p.SetWatch(c.messageWatch)
p.AddOnStop(c.OnStop)

if err = p.Link(); err != nil {
return err
}
Expand Down Expand Up @@ -486,6 +486,11 @@ func (c *Controller) SendEOS(ctx context.Context) {
c.p.SendEOS()
}()
}

switch c.src.(type) {
case *source.WebSource:
c.updateDuration(c.src.GetEndedAt())
}
})
}

Expand All @@ -507,8 +512,16 @@ func (c *Controller) OnStop() error {
if c.eosTimer != nil {
c.eosTimer.Stop()
}
endedAt := c.src.GetEndedAt()

switch c.src.(type) {
case *source.SDKSource:
c.updateDuration(c.src.GetEndedAt())
}

return nil
}

func (c *Controller) updateDuration(endedAt int64) {
for egressType, o := range c.Outputs {
switch egressType {
case types.EgressTypeStream, types.EgressTypeWebsocket:
Expand Down Expand Up @@ -538,6 +551,4 @@ func (c *Controller) OnStop() error {
segmentsInfo.Duration = endedAt - segmentsInfo.StartedAt
}
}

return nil
}
10 changes: 1 addition & 9 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (c *Controller) handleMessageStateChanged(msg *gst.Message) {
}

s := msg.Source()
if s == pipelineSource {
if s == pipelineName {
logger.Infow("pipeline playing")

c.playing.Break()
Expand All @@ -254,10 +254,6 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error {
if s != nil {
switch s.Name() {
case msgFragmentOpened:
if timer := c.eosTimer; timer != nil {
timer.Reset(eosTimeout)
}

filepath, t, err := getSegmentParamsFromGstStructure(s)
if err != nil {
logger.Errorw("failed to retrieve segment parameters from event", err)
Expand All @@ -270,10 +266,6 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error {
}

case msgFragmentClosed:
if timer := c.eosTimer; timer != nil {
timer.Reset(eosTimeout)
}

filepath, t, err := getSegmentParamsFromGstStructure(s)
if err != nil {
logger.Errorw("failed to retrieve segment parameters from event", err, "location", filepath, "runningTime", t)
Expand Down
2 changes: 1 addition & 1 deletion test/ffprobe.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre
bitrate, err := strconv.Atoi(stream.BitRate)
require.NoError(t, err)
require.NotZero(t, bitrate)
require.Less(t, int32(bitrate), p.VideoBitrate*1010)
require.Less(t, int32(bitrate), p.VideoBitrate*1050)

// framerate
frac := strings.Split(stream.AvgFrameRate, "/")
Expand Down

0 comments on commit 8016d39

Please sign in to comment.