From 86a1dcde49a896eafb5b9ce6c6a26a6241298018 Mon Sep 17 00:00:00 2001 From: Mark Kremer Date: Thu, 4 Jan 2024 19:04:09 +0100 Subject: [PATCH] Add a Watcher to detect stream events --- watcher.go | 199 ++++++++++++++++++++++++++++++++++++++++++++++++ watcher_test.go | 192 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 391 insertions(+) create mode 100644 watcher.go create mode 100644 watcher_test.go diff --git a/watcher.go b/watcher.go new file mode 100644 index 0000000..ebade8f --- /dev/null +++ b/watcher.go @@ -0,0 +1,199 @@ +package beep + +import ( + "container/heap" +) + +// Watcher is a mechanism for executing callbacks at specific positions in the audio stream. +// +// Watcher supports two types: +// - Asynchronous watchers: These are called from a separate goroutine after the Stream() function completes. +// They do not block the Stream() operation. +// - Synchronous watchers: These are executed precisely at the specified position in the stream. The source streamer +// is consumed up to the watch position before triggering the callback. Only after this point, the rest of +// the source stream will be consumed. +// +// While synchronous watchers offer better accuracy and performance for the overall application, they come with limitations: +// - Synchronous watchers run within Stream(), so they must not take excessive time to process. Lengthy operations +// could lead to playback glitches, especially when used in conjunction with the speaker. Slow synchronous watchers may +// adversely affect the performance of the Streamer pipeline. +// - When played through the speaker, the speaker remains in a locked state. Attempting to lock the speaker again +// (e.g., when performed in speaker.Add()) can result in a deadlock. +type Watcher struct { + s Streamer + pos int + streamSyncEvents eventQueue + streamAsyncEvents eventQueue + drainedSyncEvents []Trigger + drainedAsyncEvents []Trigger +} + +// Trigger is a callback function invoked by a Watcher when a specific position in the audio stream is reached. +// The 'pos' parameter represents the watched position. For asynchronous watchers, be aware that the stream may +// have progressed beyond 'pos' in the meantime. +type Trigger func(pos int) + +// Watch wraps Streamer s in a Watcher. +func Watch(s Streamer) *Watcher { + return &Watcher{s: s} +} + +// Stream populates the 'samples' slice with audio samples from the source streamer, +// checking registered watchers to trigger associated triggers on relevant events. +func (w *Watcher) Stream(samples [][2]float64) (n int, ok bool) { + defer func() { + var evts []event + for len(w.streamAsyncEvents) > 0 { + evt := w.streamAsyncEvents[0] + if evt.time > w.pos { + break + } + heap.Pop(&w.streamAsyncEvents) + evts = append(evts, evt) + } + if !ok { + for _, cb := range w.drainedAsyncEvents { + evts = append(evts, event{ + time: w.pos, + callback: cb, + }) + } + } + if len(evts) > 0 { + go func() { + for _, evt := range evts { + evt.callback(evt.time) + } + }() + } + }() + + for len(samples) > 0 { + want := len(samples) + + if len(w.streamSyncEvents) > 0 { + evt := w.streamSyncEvents[0] + if evt.time <= w.pos+want { + want = evt.time - w.pos + } + } + + var sn int + sn, ok = w.s.Stream(samples[:want]) + n += sn + w.pos += sn + + for len(w.streamSyncEvents) > 0 { + evt := w.streamSyncEvents[0] + if evt.time > w.pos { + break + } + heap.Pop(&w.streamSyncEvents) + evt.callback(w.pos) + } + + if !ok { + for _, callback := range w.drainedSyncEvents { + callback(w.pos) + } + w.drainedSyncEvents = nil + } + if !ok || sn < want { + return + } + + samples = samples[sn:] + } + return +} + +// Position returns the current playback position. +func (w *Watcher) Position() int { + return w.pos +} + +// Err propagates the original Streamer's errors. +func (w *Watcher) Err() error { + return w.s.Err() +} + +// AtAsync registers a callback to be invoked when the Streamer reaches the specified position 'pos'. +// +// See Watcher for distinctions between synchronous and asynchronous triggers. +func (w *Watcher) AtAsync(pos int, callback Trigger) { + heap.Push(&w.streamAsyncEvents, event{ + time: pos, + callback: callback, + }) +} + +// AtSync registers a callback to be invoked when the Streamer reaches the specified position 'pos'. +// +// See Watcher for distinctions between synchronous and asynchronous triggers. +func (w *Watcher) AtSync(pos int, callback Trigger) { + heap.Push(&w.streamSyncEvents, event{ + time: pos, + callback: callback, + }) +} + +// StartedAsync registers a callback to be invoked when the Streamer begins streaming. +// +// See Watcher for distinctions between synchronous and asynchronous triggers. +func (w *Watcher) StartedAsync(callback Trigger) { + w.AtAsync(0, callback) +} + +// StartedSync registers a callback to be invoked when the Streamer begins streaming. +// +// See Watcher for distinctions between synchronous and asynchronous triggers. +func (w *Watcher) StartedSync(callback Trigger) { + w.AtSync(0, callback) +} + +// EndedAsync registers callback to be invoked when the source Streamer is fully drained (ok == false). +// +// See Watcher for the difference between synchronous and asynchronous triggers. +func (w *Watcher) EndedAsync(callback Trigger) { + w.drainedAsyncEvents = append(w.drainedAsyncEvents, callback) +} + +// EndedSync registers callback to be invoked when the source Streamer is fully drained (ok == false). +// +// See Watcher for distinctions between synchronous and asynchronous triggers. +func (w *Watcher) EndedSync(callback Trigger) { + w.drainedSyncEvents = append(w.drainedSyncEvents, callback) +} + +type event struct { + time int + callback Trigger +} + +// eventQueue can be used as a priority queue. It implements heap.Interface. +type eventQueue []event + +func (eq eventQueue) Len() int { + return len(eq) +} + +func (eq eventQueue) Less(i, j int) bool { + return eq[i].time < eq[j].time +} + +func (eq eventQueue) Swap(i, j int) { + eq[i], eq[j] = eq[j], eq[i] +} + +func (eq *eventQueue) Push(x any) { + item := x.(event) + *eq = append(*eq, item) +} + +func (eq *eventQueue) Pop() any { + last := len(*eq) - 1 + item := (*eq)[last] + (*eq)[last] = event{} + *eq = (*eq)[:last] + return item +} diff --git a/watcher_test.go b/watcher_test.go new file mode 100644 index 0000000..6f42d26 --- /dev/null +++ b/watcher_test.go @@ -0,0 +1,192 @@ +package beep_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/gopxl/beep" + "github.com/gopxl/beep/internal/testtools" + "github.com/gopxl/beep/speaker" +) + +func TestWatcher_Stream_StreamsWithoutEvents(t *testing.T) { + s, _ := testtools.RandomDataStreamer(100) + + l := beep.Watch(s) + + data := testtools.Collect(l) + + assert.Equal(t, 100, len(data)) +} + +func TestWatcher_Stream_ReturnsCorrectSampleCount(t *testing.T) { + s, _ := testtools.RandomDataStreamer(75) + + l := beep.Watch(s) + l.AtSync(25, func(pos int) {}) + + var buf [50][2]float64 + n, ok := l.Stream(buf[:]) + assert.True(t, ok) + assert.Equal(t, 50, n) + + n, ok = l.Stream(buf[:]) + assert.True(t, ok) + assert.Equal(t, 25, n) +} + +func TestWatcher_AtSync(t *testing.T) { + s, _ := testtools.RandomDataStreamer(100) + + numCalled := 0 + + l := beep.Watch(s) + l.AtSync(0, func(pos int) { + numCalled++ + assert.Equal(t, 0, pos) + assert.Equal(t, 0, s.Position()) + assert.Equal(t, 0, l.Position()) + }) + l.AtSync(50, func(pos int) { + numCalled++ + assert.Equal(t, 50, pos) + assert.Equal(t, 50, s.Position()) + assert.Equal(t, 50, l.Position()) + }) + l.AtSync(100, func(pos int) { + numCalled++ + assert.Equal(t, 100, pos) + assert.Equal(t, 100, s.Position()) + assert.Equal(t, 100, l.Position()) + }) + l.AtSync(101, func(pos int) { + assert.FailNow(t, "event after end of streamer was triggered unexpectedly") + }) + + testtools.Collect(l) + + assert.Equal(t, 3, numCalled) +} + +func TestWatcher_AtAsync(t *testing.T) { + s, _ := testtools.RandomDataStreamer(100) + + numCalled := 0 + + l := beep.Watch(s) + l.AtAsync(0, func(pos int) { + numCalled++ + assert.Equal(t, 0, pos) + assert.Equal(t, 100, s.Position()) + assert.Equal(t, 100, l.Position()) + }) + l.AtAsync(50, func(pos int) { + numCalled++ + assert.Equal(t, 50, pos) + assert.Equal(t, 100, s.Position()) + assert.Equal(t, 100, l.Position()) + }) + l.AtAsync(100, func(pos int) { + numCalled++ + assert.Equal(t, 100, pos) + assert.Equal(t, 100, s.Position()) + assert.Equal(t, 100, l.Position()) + }) + l.AtAsync(101, func(pos int) { + assert.FailNow(t, "event after end of streamer was triggered unexpectedly") + }) + + testtools.Collect(l) + + // Wait for goroutines to finish. Increase the time if test is flaky. + time.Sleep(time.Millisecond) + + assert.Equal(t, 3, numCalled) +} + +func TestWatcher_AtAsync_DoesntDeadlockWithSpeaker(t *testing.T) { + s, _ := testtools.RandomDataStreamer(100) + + numCalled := 0 + + l := beep.Watch(s) + l.AtAsync(50, func(pos int) { + speaker.Lock() + numCalled++ + speaker.Unlock() + }) + + // Emulate speaker behaviour by locking the speaker while consuming samples. + speaker.Lock() + testtools.Collect(l) + speaker.Unlock() + + // Wait for goroutines to finish. Increase the time if test is flaky. + time.Sleep(time.Millisecond) + + assert.Equal(t, 1, numCalled) +} + +func TestWatcher_EndedSync(t *testing.T) { + s, _ := testtools.RandomDataStreamer(100) + + numCalled := 0 + + l := beep.Watch(s) + l.EndedSync(func(pos int) { + numCalled++ + assert.Equal(t, 100, pos) + assert.Equal(t, 100, s.Position()) + assert.Equal(t, 100, l.Position()) + }) + + testtools.Collect(l) + + assert.Equal(t, 1, numCalled) +} + +func TestWatcher_EndedAsync(t *testing.T) { + s, _ := testtools.RandomDataStreamer(100) + + numCalled := 0 + + l := beep.Watch(s) + l.EndedAsync(func(pos int) { + numCalled++ + assert.Equal(t, 100, pos) + assert.Equal(t, 100, s.Position()) + assert.Equal(t, 100, l.Position()) + }) + + testtools.Collect(l) + + // Wait for goroutines to finish. Increase the time if test is flaky. + time.Sleep(time.Millisecond) + + assert.Equal(t, 1, numCalled) +} + +func TestWatcher_EndedAsync_DoesntDeadlockWithSpeaker(t *testing.T) { + s, _ := testtools.RandomDataStreamer(100) + + numCalled := 0 + + l := beep.Watch(s) + l.EndedAsync(func(pos int) { + speaker.Lock() + numCalled++ + speaker.Unlock() + }) + + // Emulate speaker behaviour by locking the speaker while consuming samples. + speaker.Lock() + testtools.Collect(l) + speaker.Unlock() + + // Wait for goroutines to finish. Increase the time if test is flaky. + time.Sleep(time.Millisecond) + + assert.Equal(t, 1, numCalled) +}