From c3880b1e48dc0ad71d407ad2eb7b63f4f939f7c9 Mon Sep 17 00:00:00 2001 From: Sergey Stepanov Date: Sun, 11 Aug 2024 11:46:19 +0300 Subject: [PATCH] Use mutex when switching media encoders --- pkg/worker/media/media.go | 48 ++++++++++++++++++++++++++++------ pkg/worker/media/media_test.go | 2 +- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/pkg/worker/media/media.go b/pkg/worker/media/media.go index 39c484428..e4764fa8b 100644 --- a/pkg/worker/media/media.go +++ b/pkg/worker/media/media.go @@ -107,6 +107,9 @@ type WebrtcMediaPipe struct { audioBuf buffer log *logger.Logger + mua sync.RWMutex + muv sync.RWMutex + aConf config.Audio vConf config.Video @@ -132,8 +135,9 @@ func (wmp *WebrtcMediaPipe) SetAudioCb(cb func([]byte, int32)) { wmp.onAudio = func(bytes []byte) { cb(bytes, fr) } } func (wmp *WebrtcMediaPipe) Destroy() { - if wmp.v != nil { - wmp.v.Stop() + v := wmp.Video() + if v != nil { + v.Stop() } } func (wmp *WebrtcMediaPipe) PushAudio(audio []int16) { wmp.audioBuf.write(audio, wmp.encodeAudio) } @@ -145,11 +149,15 @@ func (wmp *WebrtcMediaPipe) Init() error { if err := wmp.initVideo(wmp.VideoW, wmp.VideoH, wmp.VideoScale, wmp.vConf); err != nil { return err } - if wmp.v == nil || wmp.a == nil { - return fmt.Errorf("could intit the encoders, v=%v a=%v", wmp.v != nil, wmp.a != nil) + + a := wmp.Audio() + v := wmp.Video() + + if v == nil || a == nil { + return fmt.Errorf("could intit the encoders, v=%v a=%v", v != nil, a != nil) } - wmp.log.Debug().Msgf("%v", wmp.v.Info()) + wmp.log.Debug().Msgf("%v", v.Info()) wmp.initialized = true return nil } @@ -172,7 +180,7 @@ func (wmp *WebrtcMediaPipe) initAudio(srcHz int, frameSize int) error { } func (wmp *WebrtcMediaPipe) encodeAudio(pcm samples) { - data, err := wmp.a.Encode(pcm) + data, err := wmp.Audio().Encode(pcm) audioPool.Put((*[]int16)(&pcm)) if err != nil { wmp.log.Error().Err(err).Msgf("opus encode fail") @@ -198,7 +206,7 @@ func (wmp *WebrtcMediaPipe) initVideo(w, h int, scale float64, conf config.Video func round(x int, scale float64) int { return (int(float64(x)*scale) + 1) & ^1 } func (wmp *WebrtcMediaPipe) ProcessVideo(v app.Video) []byte { - return wmp.v.Encode(encoder.InFrame(v.Frame)) + return wmp.Video().Encode(encoder.InFrame(v.Frame)) } func (wmp *WebrtcMediaPipe) Reinit() error { @@ -206,7 +214,7 @@ func (wmp *WebrtcMediaPipe) Reinit() error { return nil } - wmp.v.Stop() + wmp.Video().Stop() if err := wmp.initVideo(wmp.VideoW, wmp.VideoH, wmp.VideoScale, wmp.vConf); err != nil { return err } @@ -221,3 +229,27 @@ func (wmp *WebrtcMediaPipe) IsInitialized() bool { return wmp.initialized } func (wmp *WebrtcMediaPipe) SetPixFmt(f uint32) { wmp.oldPf = f; wmp.v.SetPixFormat(f) } func (wmp *WebrtcMediaPipe) SetVideoFlip(b bool) { wmp.oldFlip = b; wmp.v.SetFlip(b) } func (wmp *WebrtcMediaPipe) SetRot(r uint) { wmp.oldRot = r; wmp.v.SetRot(r) } + +func (wmp *WebrtcMediaPipe) Video() *encoder.Video { + wmp.muv.RLock() + defer wmp.muv.RUnlock() + return wmp.v +} + +func (wmp *WebrtcMediaPipe) SetVideo(e *encoder.Video) { + wmp.muv.Lock() + wmp.v = e + wmp.muv.Unlock() +} + +func (wmp *WebrtcMediaPipe) Audio() *opus.Encoder { + wmp.mua.RLock() + defer wmp.mua.RUnlock() + return wmp.a +} + +func (wmp *WebrtcMediaPipe) SetAudio(e *opus.Encoder) { + wmp.mua.Lock() + wmp.a = e + wmp.mua.Unlock() +} diff --git a/pkg/worker/media/media_test.go b/pkg/worker/media/media_test.go index a14fd02ab..2fb49e077 100644 --- a/pkg/worker/media/media_test.go +++ b/pkg/worker/media/media_test.go @@ -33,7 +33,7 @@ func TestEncoders(t *testing.T) { } } -func BenchmarkH264(b *testing.B) { run(1920, 1080, encoder.H264, b.N, nil, nil, b) } +func BenchmarkH264(b *testing.B) { run(640, 480, encoder.H264, b.N, nil, nil, b) } func BenchmarkVP8(b *testing.B) { run(1920, 1080, encoder.VP8, b.N, nil, nil, b) } func run(w, h int, cod encoder.VideoCodec, count int, a *image.RGBA, b *image.RGBA, backend testing.TB) {