Skip to content

Commit

Permalink
Increase max queue size for web (#602)
Browse files Browse the repository at this point in the history
* increase max queue size for web

* use 3s for all egress

* missed some

* one more
  • Loading branch information
frostbyte73 authored Jan 30, 2024
1 parent 4b459c5 commit fda24b8
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 18 deletions.
11 changes: 1 addition & 10 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ import (
lksdk "github.com/livekit/server-sdk-go"
)

const (
webLatency = uint64(2e9)
sdkLatency = uint64(3e9)
)
const Latency = uint64(3e9)

type PipelineConfig struct {
BaseConfig `yaml:",inline"`
Expand All @@ -62,7 +59,6 @@ type PipelineConfig struct {

type SourceConfig struct {
SourceType types.SourceType
Latency uint64
WebSourceParams
SDKSourceParams
}
Expand Down Expand Up @@ -195,7 +191,6 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {

p.SourceType = types.SourceTypeWeb
p.AwaitStartSignal = true
p.Latency = webLatency

p.Info.RoomName = req.RoomComposite.RoomName
p.Layout = req.RoomComposite.Layout
Expand Down Expand Up @@ -250,7 +245,6 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {
connectionInfoRequired = false
p.SourceType = types.SourceTypeWeb
p.AwaitStartSignal = req.Web.AwaitStartSignal
p.Latency = webLatency

p.WebUrl = req.Web.Url
webUrl, err := url.Parse(p.WebUrl)
Expand Down Expand Up @@ -297,7 +291,6 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {
redactEncodedOutputs(clone)

p.SourceType = types.SourceTypeSDK
p.Latency = sdkLatency

p.Info.RoomName = req.Participant.RoomName
p.AudioEnabled = true
Expand Down Expand Up @@ -334,7 +327,6 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {
redactEncodedOutputs(clone)

p.SourceType = types.SourceTypeSDK
p.Latency = sdkLatency

p.Info.RoomName = req.TrackComposite.RoomName
if audioTrackID := req.TrackComposite.AudioTrackId; audioTrackID != "" {
Expand Down Expand Up @@ -378,7 +370,6 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {
}

p.SourceType = types.SourceTypeSDK
p.Latency = sdkLatency

p.Info.RoomName = req.Track.RoomName
p.TrackID = req.Track.TrackId
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
return err
}
} else {
queue, err := gstreamer.BuildQueue("audio_queue", p.Latency, true)
queue, err := gstreamer.BuildQueue("audio_queue", config.Latency, true)
if err != nil {
return errors.ErrGstPipelineError(err)
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func (b *AudioBin) addEncoder() error {
}

func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
audioQueue, err := gstreamer.BuildQueue("audio_input_queue", p.Latency, true)
audioQueue, err := gstreamer.BuildQueue("audio_input_queue", config.Latency, true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/builder/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St
return nil, nil, errors.ErrGstPipelineError(err)
}
// add latency to give time for flvmux to receive and order packets from both streams
if err = mux.SetProperty("latency", p.Latency); err != nil {
if err = mux.SetProperty("latency", config.Latency); err != nil {
return nil, nil, errors.ErrGstPipelineError(err)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error
return tee.GetRequestPad("src_%u")
}
} else if len(p.GetEncodedOutputs()) > 0 {
queue, err := gstreamer.BuildQueue("video_queue", p.Latency, true)
queue, err := gstreamer.BuildQueue("video_queue", config.Latency, true)
if err != nil {
return errors.ErrGstPipelineError(err)
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (b *VideoBin) buildWebInput() error {
return errors.ErrGstPipelineError(err)
}

videoQueue, err := gstreamer.BuildQueue("video_input_queue", b.conf.Latency, true)
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -499,7 +499,7 @@ func (b *VideoBin) addSelector() error {
}

func (b *VideoBin) addEncoder() error {
videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", b.conf.Latency, false)
videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", config.Latency, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func (b *VideoBin) addDecodedVideoSink() error {
}

func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
videoQueue, err := gstreamer.BuildQueue("video_input_queue", p.Latency, true)
videoQueue, err := gstreamer.BuildQueue("video_input_queue", config.Latency, true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, ioClient rpc.IOInfoCl
}

func (c *Controller) BuildPipeline() error {
p, err := gstreamer.NewPipeline(pipelineName, c.Latency, c.callbacks)
p, err := gstreamer.NewPipeline(pipelineName, config.Latency, c.callbacks)
if err != nil {
return errors.ErrGstPipelineError(err)
}
Expand Down

0 comments on commit fda24b8

Please sign in to comment.