diff --git a/libwebrtc/src/native/audio_source.rs b/libwebrtc/src/native/audio_source.rs index 0695d52db..9c3c23855 100644 --- a/libwebrtc/src/native/audio_source.rs +++ b/libwebrtc/src/native/audio_source.rs @@ -16,8 +16,8 @@ use crate::{audio_frame::AudioFrame, audio_source::AudioSourceOptions, RtcError, use cxx::SharedPtr; use std::{sync::Arc, time::Duration}; use tokio::{ - sync::{Mutex as AsyncMutex, MutexGuard}, - time::{interval, MissedTickBehavior}, + sync::{oneshot, Mutex as AsyncMutex, MutexGuard}, + time::{interval, Instant, MissedTickBehavior}, }; use webrtc_sys::audio_track as sys_at; @@ -28,12 +28,13 @@ pub struct NativeAudioSource { sample_rate: u32, num_channels: u32, samples_10ms: usize, + _close_tx: Arc>, } struct AudioSourceInner { buf: Box<[i16]>, - captured_frames: usize, + last_capture: Option, // Amount of data from the previous frame that hasn't been sent to the libwebrtc source // (because it requires 10ms of data) @@ -52,12 +53,13 @@ impl NativeAudioSource { num_channels: u32, ) -> NativeAudioSource { let samples_10ms = (sample_rate / 100 * num_channels) as usize; + let (close_tx, mut close_rx) = oneshot::channel(); let source = Self { sys_handle: sys_at::ffi::new_audio_track_source(options.into()), inner: Arc::new(AsyncMutex::new(AudioSourceInner { buf: vec![0; samples_10ms].into_boxed_slice(), - captured_frames: 0, + last_capture: None, len: 0, read_offset: 0, interval: None, // interval must be created from a tokio runtime context @@ -65,30 +67,34 @@ impl NativeAudioSource { sample_rate, num_channels, samples_10ms, + _close_tx: Arc::new(close_tx), }; tokio::spawn({ let source = source.clone(); async move { let mut interval = interval(Duration::from_millis(10)); + let data = vec![0; samples_10ms]; loop { - // We directly use the sys_handle instead of the capture_frame function - // (We don't want to increase the captured_frames count and no need to buffer) - interval.tick().await; - - let inner = source.inner.lock().await; - if inner.captured_frames > 0 { - break; // User captured something, stop injecting silence + tokio::select! { + _ = &mut close_rx => break, + _ = interval.tick() => { + let inner = source.inner.lock().await; + if let Some(last_capture) = inner.last_capture { + if last_capture.elapsed() < Duration::from_millis(20) { + continue; + } + } + + source.sys_handle.on_captured_frame( + &data, + sample_rate, + num_channels, + sample_rate as usize / 100, + ); + } } - - let data = vec![0; samples_10ms]; - source.sys_handle.on_captured_frame( - &data, - sample_rate, - num_channels, - sample_rate as usize / 100, - ); } } }); @@ -162,8 +168,6 @@ impl NativeAudioSource { } let mut inner = self.inner.lock().await; - inner.captured_frames += 1; - let mut interval = inner.interval.take().unwrap_or_else(|| { let mut interval = interval(Duration::from_millis(10)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -176,7 +180,7 @@ impl NativeAudioSource { break; }; - interval.tick().await; + let last_capture = interval.tick().await; // samples per channel = number of frames let samples_per_channel = data.len() / self.num_channels as usize; @@ -186,6 +190,8 @@ impl NativeAudioSource { self.num_channels, samples_per_channel, ); + + inner.last_capture = Some(last_capture); } Ok(())