Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic audio buf #483

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,12 @@ encoder:
# audio frame duration needed for WebRTC (Opus)
# most of the emulators have ~1400 samples per a video frame,
# so we keep the frame buffer roughly half of that size or 2 RTC packets per frame
# (deprecated) due to frames
frame: 10
# dynamic frames for Opus encoder
frames:
- 10
- 5
video:
# h264, vpx (vp8) or vp9
codec: h264
Expand Down
13 changes: 9 additions & 4 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
func TestConfigEnv(t *testing.T) {
var out WorkerConfig

_ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAME", "33")
defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAME") }()
_ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[0]", "10")
_ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[1]", "5")
defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[0]") }()
defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[1]") }()

_ = os.Setenv("CLOUD_GAME_EMULATOR_LIBRETRO_CORES_LIST_PCSX_OPTIONS__PCSX_REARMED_DRC", "x")
defer func() {
Expand All @@ -22,8 +24,11 @@ func TestConfigEnv(t *testing.T) {
t.Fatal(err)
}

if out.Encoder.Audio.Frame != 33 {
t.Errorf("%v is not 33", out.Encoder.Audio.Frame)
for i, x := range []float32{10, 5} {
if out.Encoder.Audio.Frames[i] != x {
t.Errorf("%v is not [10, 5]", out.Encoder.Audio.Frames)
t.Failed()
}
}

v := out.Emulator.Libretro.Cores.List["pcsx"].Options["pcsx_rearmed_drc"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Encoder struct {
}

type Audio struct {
Frame float32
Frames []float32
}

type Video struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/worker/coordinatorhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke
}

m.AudioSrcHz = app.AudioSampleRate()
m.AudioFrame = w.conf.Encoder.Audio.Frame
m.AudioFrames = w.conf.Encoder.Audio.Frames
m.VideoW, m.VideoH = app.ViewportSize()
m.VideoScale = app.Scale()

Expand Down
119 changes: 119 additions & 0 deletions pkg/worker/media/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package media

import (
"errors"
"math"
"unsafe"
)

// buffer is a simple non-concurrent safe buffer for audio samples.
type buffer struct {
stretch bool
frameHz []int

raw samples
buckets []Bucket
cur *Bucket
}

type Bucket struct {
mem samples
ms float32
lv int
dst int
}

func newBuffer(frames []float32, hz int) (*buffer, error) {
if hz < 2000 {
return nil, errors.New("hz should be > than 2000")
}

buf := buffer{}

// preallocate continuous array
s := 0
for _, f := range frames {
s += frame(hz, f)
}
buf.raw = make(samples, s)

next := 0
for _, f := range frames {
s := frame(hz, f)
buf.buckets = append(buf.buckets, Bucket{
mem: buf.raw[next : next+s],
ms: f,
})
next += s
}
buf.cur = &buf.buckets[len(buf.buckets)-1]
return &buf, nil
}

func (b *buffer) choose(l int) {
for _, bb := range b.buckets {
if l >= len(bb.mem) {
b.cur = &bb
break
}
}
}

func (b *buffer) resample(hz int) {
b.stretch = true
for i := range b.buckets {
b.buckets[i].dst = frame(hz, float32(b.buckets[i].ms))
}
}

// write fills the buffer until it's full and then passes the gathered data into a callback.
//
// There are two cases to consider:
// 1. Underflow, when the length of the written data is less than the buffer's available space.
// 2. Overflow, when the length exceeds the current available buffer space.
//
// We overwrite any previous values in the buffer and move the internal write pointer
// by the length of the written data.
// In the first case, we won't call the callback, but it will be called every time
// when the internal buffer overflows until all samples are read.
func (b *buffer) write(s samples, onFull func(samples, float32)) (r int) {
for r < len(s) {
buf := b.cur
w := copy(buf.mem[buf.lv:], s[r:])
r += w
buf.lv += w
if buf.lv == len(buf.mem) {
if b.stretch {
onFull(buf.mem.stretch(buf.dst), buf.ms)
} else {
onFull(buf.mem, buf.ms)
}
b.choose(len(s) - r)
b.cur.lv = 0
}
}
return
}

// frame calculates an audio stereo frame size, i.e. 48k*frame/1000*2
// with round(x / 2) * 2 for the closest even number
func frame(hz int, frame float32) int {
return int(math.Round(float64(hz)*float64(frame)/1000/2) * 2 * 2)
}

// stretch does a simple stretching of audio samples.
// something like: [1,2,3,4,5,6] -> [1,2,x,x,3,4,x,x,5,6,x,x] -> [1,2,1,2,3,4,3,4,5,6,5,6]
func (s samples) stretch(size int) []int16 {
out := buf[:size]
n := len(s)
ratio := float32(size) / float32(n)
sPtr := unsafe.Pointer(&s[0])
for i, l, r := 0, 0, 0; i < n; i += 2 {
l, r = r, int(float32((i+2)>>1)*ratio)<<1 // index in src * ratio -> approximated index in dst *2 due to int16
for j := l; j < r; j += 2 {
*(*int32)(unsafe.Pointer(&out[j])) = *(*int32)(sPtr) // out[j] = s[i]; out[j+1] = s[i+1]
}
sPtr = unsafe.Add(sPtr, uintptr(4))
}
return out
}
77 changes: 77 additions & 0 deletions pkg/worker/media/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package media

import (
"reflect"
"testing"
)

type bufWrite struct {
sample int16
len int
}

func TestBufferWrite(t *testing.T) {
tests := []struct {
bufLen int
writes []bufWrite
expect samples
}{
{
bufLen: 2000,
writes: []bufWrite{
{sample: 1, len: 10},
{sample: 2, len: 20},
{sample: 3, len: 30},
},
expect: samples{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3},
},
{
bufLen: 2000,
writes: []bufWrite{
{sample: 1, len: 3},
{sample: 2, len: 18},
{sample: 3, len: 2},
},
expect: samples{2, 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2},
},
}

for _, test := range tests {
var lastResult samples
buf, err := newBuffer([]float32{10, 5}, test.bufLen)
if err != nil {
t.Fatalf("oof, %v", err)
}
for _, w := range test.writes {
buf.write(samplesOf(w.sample, w.len),
func(s samples, ms float32) { lastResult = s },
)
}
if !reflect.DeepEqual(test.expect, lastResult) {
t.Errorf("not expted buffer, %v != %v, %v", lastResult, test.expect, len(buf.cur.mem))
}
}
}

func BenchmarkBufferWrite(b *testing.B) {
fn := func(_ samples, _ float32) {}
l := 2000
buf, err := newBuffer([]float32{10}, l)
if err != nil {
b.Fatalf("oof: %v", err)
}
samples1 := samplesOf(1, l/2)
samples2 := samplesOf(2, l*2)
for i := 0; i < b.N; i++ {
buf.write(samples1, fn)
buf.write(samples2, fn)
}
}

func samplesOf(v int16, len int) (s samples) {
s = make(samples, len)
for i := range s {
s[i] = v
}
return
}
Loading
Loading