Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use mutex when switching media encoders #468

Merged
merged 1 commit into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions pkg/worker/media/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type WebrtcMediaPipe struct {
audioBuf buffer
log *logger.Logger

mua sync.RWMutex
muv sync.RWMutex

aConf config.Audio
vConf config.Video

Expand All @@ -132,8 +135,9 @@ func (wmp *WebrtcMediaPipe) SetAudioCb(cb func([]byte, int32)) {
wmp.onAudio = func(bytes []byte) { cb(bytes, fr) }
}
func (wmp *WebrtcMediaPipe) Destroy() {
if wmp.v != nil {
wmp.v.Stop()
v := wmp.Video()
if v != nil {
v.Stop()
}
}
func (wmp *WebrtcMediaPipe) PushAudio(audio []int16) { wmp.audioBuf.write(audio, wmp.encodeAudio) }
Expand All @@ -145,11 +149,15 @@ func (wmp *WebrtcMediaPipe) Init() error {
if err := wmp.initVideo(wmp.VideoW, wmp.VideoH, wmp.VideoScale, wmp.vConf); err != nil {
return err
}
if wmp.v == nil || wmp.a == nil {
return fmt.Errorf("could intit the encoders, v=%v a=%v", wmp.v != nil, wmp.a != nil)

a := wmp.Audio()
v := wmp.Video()

if v == nil || a == nil {
return fmt.Errorf("could intit the encoders, v=%v a=%v", v != nil, a != nil)
}

wmp.log.Debug().Msgf("%v", wmp.v.Info())
wmp.log.Debug().Msgf("%v", v.Info())
wmp.initialized = true
return nil
}
Expand All @@ -172,7 +180,7 @@ func (wmp *WebrtcMediaPipe) initAudio(srcHz int, frameSize int) error {
}

func (wmp *WebrtcMediaPipe) encodeAudio(pcm samples) {
data, err := wmp.a.Encode(pcm)
data, err := wmp.Audio().Encode(pcm)
audioPool.Put((*[]int16)(&pcm))
if err != nil {
wmp.log.Error().Err(err).Msgf("opus encode fail")
Expand All @@ -198,15 +206,15 @@ func (wmp *WebrtcMediaPipe) initVideo(w, h int, scale float64, conf config.Video
func round(x int, scale float64) int { return (int(float64(x)*scale) + 1) & ^1 }

func (wmp *WebrtcMediaPipe) ProcessVideo(v app.Video) []byte {
return wmp.v.Encode(encoder.InFrame(v.Frame))
return wmp.Video().Encode(encoder.InFrame(v.Frame))
}

func (wmp *WebrtcMediaPipe) Reinit() error {
if !wmp.initialized {
return nil
}

wmp.v.Stop()
wmp.Video().Stop()
if err := wmp.initVideo(wmp.VideoW, wmp.VideoH, wmp.VideoScale, wmp.vConf); err != nil {
return err
}
Expand All @@ -221,3 +229,27 @@ func (wmp *WebrtcMediaPipe) IsInitialized() bool { return wmp.initialized }
func (wmp *WebrtcMediaPipe) SetPixFmt(f uint32) { wmp.oldPf = f; wmp.v.SetPixFormat(f) }
func (wmp *WebrtcMediaPipe) SetVideoFlip(b bool) { wmp.oldFlip = b; wmp.v.SetFlip(b) }
func (wmp *WebrtcMediaPipe) SetRot(r uint) { wmp.oldRot = r; wmp.v.SetRot(r) }

func (wmp *WebrtcMediaPipe) Video() *encoder.Video {
wmp.muv.RLock()
defer wmp.muv.RUnlock()
return wmp.v
}

func (wmp *WebrtcMediaPipe) SetVideo(e *encoder.Video) {
wmp.muv.Lock()
wmp.v = e
wmp.muv.Unlock()
}

func (wmp *WebrtcMediaPipe) Audio() *opus.Encoder {
wmp.mua.RLock()
defer wmp.mua.RUnlock()
return wmp.a
}

func (wmp *WebrtcMediaPipe) SetAudio(e *opus.Encoder) {
wmp.mua.Lock()
wmp.a = e
wmp.mua.Unlock()
}
2 changes: 1 addition & 1 deletion pkg/worker/media/media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestEncoders(t *testing.T) {
}
}

func BenchmarkH264(b *testing.B) { run(1920, 1080, encoder.H264, b.N, nil, nil, b) }
func BenchmarkH264(b *testing.B) { run(640, 480, encoder.H264, b.N, nil, nil, b) }
func BenchmarkVP8(b *testing.B) { run(1920, 1080, encoder.VP8, b.N, nil, nil, b) }

func run(w, h int, cod encoder.VideoCodec, count int, a *image.RGBA, b *image.RGBA, backend testing.TB) {
Expand Down