diff --git a/pkg/gcc/interceptor.go b/pkg/gcc/interceptor.go index d029d16a..eeee5ca3 100644 --- a/pkg/gcc/interceptor.go +++ b/pkg/gcc/interceptor.go @@ -51,7 +51,8 @@ func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, i := &Interceptor{ NoOp: interceptor.NoOp{}, lock: sync.Mutex{}, - bitrate: 0, + bitrate: 100_000, + latestStats: &Stats{}, pacer: nil, FeedbackAdapter: nil, loss: nil, @@ -67,8 +68,8 @@ func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, } } - i.pacer = NewLeakyBucketPacer() i.FeedbackAdapter = NewFeedbackAdapter() + i.pacer = NewLeakyBucketPacer(i.bitrate) i.loss = newLossBasedBWE(i.bitrate) i.delay = newDelayBasedBWE(i.bitrate) @@ -154,11 +155,15 @@ func (c *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor. if err != nil { return 0, nil, err } + buf := make([]byte, i) + + copy(buf, b[:i]) + if attr == nil { attr = make(interceptor.Attributes) } - pkts, err := attr.GetRTCPPackets(b[:i]) + pkts, err := attr.GetRTCPPackets(buf[:i]) if err != nil { return 0, nil, err } @@ -195,9 +200,11 @@ func (c *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer inter } attributes.Set(twccExtensionAttributesKey, hdrExtID) } + co := make([]byte, len(payload)) + copy(co, payload) c.packet <- &packetAndAttributes{ - header: *header, - payload: payload, + header: header.Clone(), + payload: co, attributes: attributes, } @@ -212,7 +219,7 @@ func (c *Interceptor) Close() error { } func (c *Interceptor) loop() { - ticker := time.NewTicker(500 * time.Millisecond) + ticker := time.NewTicker(200 * time.Millisecond) for { select { case <-c.close: diff --git a/pkg/gcc/leaky_bucket_pacer.go b/pkg/gcc/leaky_bucket_pacer.go index 32205796..b78dd7df 100644 --- a/pkg/gcc/leaky_bucket_pacer.go +++ b/pkg/gcc/leaky_bucket_pacer.go @@ -31,11 +31,11 @@ type LeakyBucketPacer struct { } // NewLeakyBucketPacer initializes a new LeakyBucketPacer -func NewLeakyBucketPacer() *LeakyBucketPacer { +func NewLeakyBucketPacer(initialBitrate int) *LeakyBucketPacer { p := &LeakyBucketPacer{ log: logging.NewDefaultLoggerFactory().NewLogger("pacer"), f: 1.5, - targetBitrate: 150_000, + targetBitrate: initialBitrate, pacingInterval: 5 * time.Millisecond, itemCh: make(chan item), bitrateCh: make(chan int), diff --git a/pkg/gcc/noop_pacer.go b/pkg/gcc/noop_pacer.go index 33a65155..2a8a5790 100644 --- a/pkg/gcc/noop_pacer.go +++ b/pkg/gcc/noop_pacer.go @@ -27,6 +27,9 @@ func NewNoOpPacer() *NoOpPacer { } } +func (p *NoOpPacer) SetTargetBitrate(int) { +} + // AddStream adds a stream and corresponding writer to the p func (p *NoOpPacer) AddStream(ssrc uint32, writer interceptor.RTPWriter) { p.lock.Lock()