Skip to content

Commit

Permalink
WIP, replace buffer queue in wasapi capture with ringbuf
Browse files Browse the repository at this point in the history
  • Loading branch information
HEnquist committed Nov 10, 2024
1 parent 1f510c5 commit b5ad42a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ parking_lot = { version = "0.12.1", features = ["hardware-lock-elision"] }
crossbeam-channel = "0.5"
rayon = "1.10.0"
audio_thread_priority = { version = "0.32.0", default-features = false }
ringbuf = "0.4.7"

[build-dependencies]
version_check = "0.9"
Expand Down
79 changes: 35 additions & 44 deletions src/wasapidevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::thread;
use std::time::Duration;
use wasapi;
use wasapi::DeviceCollection;
use ringbuf::{traits::*, HeapRb};
use ringbuf::wrap::caching::Caching;

use audio_thread_priority::{
demote_current_thread_from_real_time, promote_current_thread_to_real_time,
Expand Down Expand Up @@ -445,16 +447,16 @@ fn playback_loop(
}

struct CaptureChannels {
pub tx_filled: Sender<(u64, usize, Vec<u8>)>,
pub rx_empty: Receiver<Vec<u8>>,
pub tx_filled: Sender<(u64, usize)>,
pub ringbuf: Caching<Arc<HeapRb<u8>>, true, false>,
}

// Capture loop, capture samples and send in chunks of "chunksize" frames to channel
fn capture_loop(
audio_client: wasapi::AudioClient,
capture_client: wasapi::AudioCaptureClient,
handle: wasapi::Handle,
channels: CaptureChannels,
mut channels: CaptureChannels,
tx_disconnectreason: Sender<DisconnectReason>,
blockalign: usize,
stop_signal: Arc<AtomicBool>,
Expand All @@ -479,7 +481,7 @@ fn capture_loop(

let mut inactive = false;

let mut saved_buffer: Option<Vec<u8>> = None;
let mut data = vec![0u8; 4*blockalign*1024];

// Raise priority
let _thread_handle = match promote_current_thread_to_real_time(0, 1) {
Expand Down Expand Up @@ -511,8 +513,7 @@ fn capture_loop(
warn!("No data received, pausing stream");
inactive = true;
}
let data = vec![0u8; 0];
match channels.tx_filled.try_send((chunk_nbr, 0, data)) {
match channels.tx_filled.try_send((chunk_nbr, 0)) {
Ok(()) | Err(TrySendError::Full(_)) => {}
Err(TrySendError::Disconnected(_)) => {
error!("Error sending, channel disconnected");
Expand All @@ -537,19 +538,9 @@ fn capture_loop(

// If no available frames, just skip the rest of this loop iteration
if available_frames > 0 {
//let mut data = vec![0u8; available_frames as usize * blockalign as usize];
let mut data = match saved_buffer {
Some(buf) => {
saved_buffer = None;
buf
}
None => channels.rx_empty.recv().unwrap(),
};

let mut nbr_bytes = available_frames as usize * blockalign;
if data.len() < nbr_bytes {
data.resize(nbr_bytes, 0);
}
let nbr_bytes = available_frames as usize * blockalign;

let (nbr_frames_read, flags) =
capture_client.read_from_device(&mut data[0..nbr_bytes])?;
if nbr_frames_read != available_frames {
Expand All @@ -572,13 +563,11 @@ fn capture_loop(
// in shared mode. This device seems to misbehave and not provide
// the buffers right after the event occurs.
// Check if more samples are available and read again.
/*
if let Some(extra_frames) = capture_client.get_next_nbr_frames()? {
if extra_frames > 0 {
trace!("Workaround, reading {} frames more", extra_frames);
debug!("Workaround, reading {} frames more", extra_frames);
let nbr_bytes_extra = extra_frames as usize * blockalign;
if data.len() < (nbr_bytes + nbr_bytes_extra) {
data.resize(nbr_bytes + nbr_bytes_extra, 0);
}
let (nbr_frames_read, flags) = capture_client
.read_from_device(&mut data[nbr_bytes..(nbr_bytes + nbr_bytes_extra)])?;
if nbr_frames_read != extra_frames {
Expand All @@ -596,18 +585,23 @@ fn capture_loop(
}
nbr_bytes += nbr_bytes_extra;
}
} */
let pushed_bytes = channels.ringbuf.push_slice(&data[0..nbr_bytes]);
if pushed_bytes < nbr_bytes {
debug!("Ring buffer is full, dropped {} out of {} bytes", nbr_bytes - pushed_bytes, nbr_bytes);
}

match channels.tx_filled.try_send((chunk_nbr, nbr_bytes, data)) {
Ok(()) => {}
Err(TrySendError::Full((nbr, length, data))) => {
debug!("Dropping captured chunk {} with len {}", nbr, length);
saved_buffer = Some(data);
}
Err(TrySendError::Disconnected(_)) => {
error!("Error sending, channel disconnected");
audio_client.stop_stream()?;
return Err(DeviceError::new("Channel disconnected").into());
if nbr_bytes > 0 {
match channels.tx_filled.try_send((chunk_nbr, pushed_bytes)) {
Ok(()) => {}
Err(TrySendError::Full((nbr, length))) => {
// TODO this should not happen, make sure ringbuffer gets full first..
debug!("Dropping captured chunk {} with len {}", nbr, length);
}
Err(TrySendError::Disconnected(_)) => {
error!("Error sending, channel disconnected");
audio_client.stop_stream()?;
return Err(DeviceError::new("Channel disconnected").into());
}
}
}
chunk_nbr += 1;
Expand Down Expand Up @@ -967,14 +961,13 @@ impl CaptureDevice for WasapiCaptureDevice {
let channel_capacity = 8*chunksize/1024 + 1;
debug!("Using a capture channel capacity of {} buffers.", channel_capacity);
let (tx_dev, rx_dev) = bounded(channel_capacity);
let (tx_dev_free, rx_dev_free) = bounded(channel_capacity+2);
for _ in 0..(channel_capacity+2) {
let data = vec![0u8; 2*1024*bytes_per_sample*channels];
tx_dev_free.send(data).unwrap();
}

let (tx_state_dev, rx_state_dev) = bounded(0);
let (tx_disconnectreason, rx_disconnectreason) = unbounded();

let ringbuffer = HeapRb::<u8>::new(channels * bytes_per_sample * chunksize * 4);
let (device_producer, mut device_consumer) = ringbuffer.split();

trace!("Build input stream");
// wasapi device loop
let stop_signal = Arc::new(AtomicBool::new(false));
Expand All @@ -998,7 +991,7 @@ impl CaptureDevice for WasapiCaptureDevice {
let blockalign = wave_format.get_blockalign();
let channels = CaptureChannels {
tx_filled: tx_dev,
rx_empty: rx_dev_free,
ringbuf: device_producer,
};
let result = capture_loop(audio_client, capture_client, handle, channels, tx_disconnectreason, blockalign as usize, stop_signal_inner);
if let Err(err) = result {
Expand Down Expand Up @@ -1111,7 +1104,7 @@ impl CaptureDevice for WasapiCaptureDevice {
while data_queue.len() < (blockalign * capture_frames) {
trace!("capture device needs more samples to make chunk, reading from channel");
match rx_dev.recv() {
Ok((chunk_nbr, data_bytes, data)) => {
Ok((chunk_nbr, data_bytes)) => {
trace!("got chunk, length {} bytes", data_bytes);
expected_chunk_nbr += 1;
if data_bytes == 0 {
Expand All @@ -1130,11 +1123,9 @@ impl CaptureDevice for WasapiCaptureDevice {
warn!("Samples were dropped, missing {} buffers", chunk_nbr - expected_chunk_nbr);
expected_chunk_nbr = chunk_nbr;
}
for element in data.iter().take(data_bytes) {
data_queue.push_back(*element);
for element in device_consumer.pop_iter().take(data_bytes) {
data_queue.push_back(element);
}
// Return the buffer to the queue
tx_dev_free.send(data).unwrap();
}
Err(err) => {
error!("Channel is closed");
Expand Down

0 comments on commit b5ad42a

Please sign in to comment.