Skip to content

Commit

Permalink
feat: rtc stats (#218)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Nov 2, 2023
1 parent fdb83a7 commit a5ba08e
Show file tree
Hide file tree
Showing 48 changed files with 3,344 additions and 461 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions examples/wgpu_room/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ impl LkApp {
let _ = self.service.send(AsyncCmd::ToggleSine);
}
});

ui.menu_button("Debug", |ui| {
if ui.button("Refresh stats").clicked() {
// TODO
}
});
});
}

Expand Down
2 changes: 2 additions & 0 deletions libwebrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ repository = "https://github.com/livekit/client-sdk-rust"
[dependencies]
livekit-protocol = { path = "../livekit-protocol", version = "0.2.0" }
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"

[target.'cfg(target_os = "android")'.dependencies]
Expand Down
10 changes: 6 additions & 4 deletions libwebrtc/src/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use crate::{imp::data_channel as dc_imp, rtp_parameters::Priority};
use serde::Deserialize;
use std::{fmt::Debug, str::Utf8Error};
use thiserror::Error;

Expand Down Expand Up @@ -49,8 +50,9 @@ pub enum DataChannelError {
Utf8(#[from] Utf8Error),
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DataState {
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DataChannelState {
Connecting,
Open,
Closing,
Expand All @@ -63,7 +65,7 @@ pub struct DataBuffer<'a> {
pub binary: bool,
}

pub type OnStateChange = Box<dyn FnMut(DataState) + Send + Sync>;
pub type OnStateChange = Box<dyn FnMut(DataChannelState) + Send + Sync>;
pub type OnMessage = Box<dyn FnMut(DataBuffer) + Send + Sync>;
pub type OnBufferedAmountChange = Box<dyn FnMut(u64) + Send + Sync>;

Expand All @@ -85,7 +87,7 @@ impl DataChannel {
self.handle.label()
}

pub fn state(&self) -> DataState {
pub fn state(&self) -> DataChannelState {
self.handle.state()
}

Expand Down
1 change: 1 addition & 0 deletions libwebrtc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod rtp_receiver;
pub mod rtp_sender;
pub mod rtp_transceiver;
pub mod session_description;
pub mod stats;
pub mod video_frame;
pub mod video_source;
pub mod video_stream;
Expand Down
8 changes: 4 additions & 4 deletions libwebrtc/src/native/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
// limitations under the License.

use crate::data_channel::{
DataBuffer, DataChannelError, DataChannelInit, DataState, OnBufferedAmountChange, OnMessage,
OnStateChange,
DataBuffer, DataChannelError, DataChannelInit, DataChannelState, OnBufferedAmountChange,
OnMessage, OnStateChange,
};
use cxx::SharedPtr;
use parking_lot::Mutex;
use std::str;
use std::sync::Arc;
use webrtc_sys::data_channel as sys_dc;

impl From<sys_dc::ffi::DataState> for DataState {
impl From<sys_dc::ffi::DataState> for DataChannelState {
fn from(value: sys_dc::ffi::DataState) -> Self {
match value {
sys_dc::ffi::DataState::Connecting => Self::Connecting,
Expand Down Expand Up @@ -95,7 +95,7 @@ impl DataChannel {
self.sys_handle.label()
}

pub fn state(&self) -> DataState {
pub fn state(&self) -> DataChannelState {
self.sys_handle.state().into()
}

Expand Down
37 changes: 30 additions & 7 deletions libwebrtc/src/native/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::rtp_receiver::RtpReceiver;
use crate::rtp_sender::RtpSender;
use crate::rtp_transceiver::RtpTransceiver;
use crate::rtp_transceiver::RtpTransceiverInit;
use crate::stats::RtcStats;
use crate::MediaType;
use crate::RtcErrorType;
use crate::{session_description::SessionDescription, RtcError};
Expand All @@ -49,6 +50,7 @@ use tokio::sync::oneshot;
use webrtc_sys::data_channel as sys_dc;
use webrtc_sys::jsep as sys_jsep;
use webrtc_sys::peer_connection as sys_pc;
use webrtc_sys::peer_connection_factory as sys_pcf;
use webrtc_sys::rtc_error as sys_err;

impl From<OfferOptions> for sys_pc::ffi::RtcOfferAnswerOptions {
Expand Down Expand Up @@ -203,7 +205,7 @@ impl PeerConnection {
options: OfferOptions,
) -> Result<SessionDescription, RtcError> {
let (tx, mut rx) = mpsc::channel::<Result<SessionDescription, RtcError>>(1);
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
type CtxType = mpsc::Sender<Result<SessionDescription, RtcError>>;

self.sys_handle.create_offer(
Expand All @@ -229,7 +231,7 @@ impl PeerConnection {
options: AnswerOptions,
) -> Result<SessionDescription, RtcError> {
let (tx, mut rx) = mpsc::channel::<Result<SessionDescription, RtcError>>(1);
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
type CtxType = mpsc::Sender<Result<SessionDescription, RtcError>>;

self.sys_handle.create_answer(
Expand All @@ -252,7 +254,7 @@ impl PeerConnection {

pub async fn set_local_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
let (tx, rx) = oneshot::channel::<Result<(), RtcError>>();
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));

self.sys_handle
.set_local_description(desc.handle.sys_handle, ctx, |ctx, err| {
Expand All @@ -273,7 +275,7 @@ impl PeerConnection {

pub async fn set_remote_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
let (tx, rx) = oneshot::channel::<Result<(), RtcError>>();
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));

self.sys_handle
.set_remote_description(desc.handle.sys_handle, ctx, |ctx, err| {
Expand All @@ -297,7 +299,7 @@ impl PeerConnection {

pub async fn add_ice_candidate(&self, candidate: IceCandidate) -> Result<(), RtcError> {
let (tx, rx) = oneshot::channel::<Result<(), RtcError>>();
let ctx = Box::new(sys_pc::AsyncContext(Box::new(tx)));
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));

self.sys_handle
.add_ice_candidate(candidate.handle.sys_handle, ctx, |ctx, err| {
Expand Down Expand Up @@ -440,6 +442,27 @@ impl PeerConnection {
.map_err(|e| unsafe { sys_err::ffi::RtcError::from(e.what()).into() })
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RtcStats>, RtcError>>();
let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));

self.sys_handle.get_stats(ctx, |ctx, stats| {
let tx = ctx
.0
.downcast::<oneshot::Sender<Result<Vec<RtcStats>, RtcError>>>()
.unwrap();

// Unwrap because it should not happens
let vec = serde_json::from_str(&stats).unwrap();
let _ = tx.send(Ok(vec));
});

rx.await.map_err(|_| RtcError {
error_type: RtcErrorType::Internal,
message: "get_stats cancelled".to_owned(),
})?
}

pub fn senders(&self) -> Vec<RtpSender> {
self.sys_handle
.get_senders()
Expand Down Expand Up @@ -526,7 +549,7 @@ pub struct PeerObserver {
pub track_handler: Mutex<Option<OnTrack>>,
}

impl sys_pc::PeerConnectionObserver for PeerObserver {
impl sys_pcf::PeerConnectionObserver for PeerObserver {
fn on_signaling_change(&self, new_state: sys_pc::ffi::SignalingState) {
if let Some(f) = self.signaling_change_handler.lock().as_mut() {
f(new_state.into());
Expand Down Expand Up @@ -612,7 +635,7 @@ impl sys_pc::PeerConnectionObserver for PeerObserver {

fn on_ice_selected_candidate_pair_changed(
&self,
_event: sys_pc::ffi::CandidatePairChangeEvent,
_event: sys_pcf::ffi::CandidatePairChangeEvent,
) {
}

Expand Down
14 changes: 6 additions & 8 deletions libwebrtc/src/native/peer_connection_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use cxx::UniquePtr;
use lazy_static::lazy_static;
use parking_lot::Mutex;
use std::sync::Arc;
use webrtc_sys::peer_connection as sys_pc;
use webrtc_sys::peer_connection_factory as sys_pcf;
use webrtc_sys::rtc_error as sys_err;
use webrtc_sys::webrtc as sys_rtc;
Expand Down Expand Up @@ -78,13 +77,12 @@ impl PeerConnectionFactory {
config: RtcConfiguration,
) -> Result<PeerConnection, RtcError> {
let observer = Arc::new(imp_pc::PeerObserver::default());
let native_observer = sys_pc::ffi::create_native_peer_connection_observer(Box::new(
sys_pc::PeerConnectionObserverWrapper::new(observer.clone()),
));

let res = self
.sys_handle
.create_peer_connection(config.into(), native_observer);
let res = self.sys_handle.create_peer_connection(
config.into(),
Box::new(sys_pcf::PeerConnectionObserverWrapper::new(
observer.clone(),
)),
);

match res {
Ok(sys_handle) => Ok(PeerConnection {
Expand Down
24 changes: 24 additions & 0 deletions libwebrtc/src/native/rtp_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use crate::imp::media_stream_track::new_media_stream_track;
use crate::media_stream_track::MediaStreamTrack;
use crate::rtp_parameters::RtpParameters;
use crate::stats::RtcStats;
use crate::{RtcError, RtcErrorType};
use cxx::SharedPtr;
use tokio::sync::oneshot;
use webrtc_sys::rtp_receiver as sys_rr;

#[derive(Clone)]
Expand All @@ -33,6 +36,27 @@ impl RtpReceiver {
Some(new_media_stream_track(track_handle))
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RtcStats>, RtcError>>();
let ctx = Box::new(sys_rr::ReceiverContext(Box::new(tx)));

self.sys_handle.get_stats(ctx, |ctx, stats| {
let tx = ctx
.0
.downcast::<oneshot::Sender<Result<Vec<RtcStats>, RtcError>>>()
.unwrap();

// Unwrap because it should not happens
let vec = serde_json::from_str(&stats).unwrap();
let _ = tx.send(Ok(vec));
});

rx.await.map_err(|_| RtcError {
error_type: RtcErrorType::Internal,
message: "get_stats cancelled".to_owned(),
})?
}

pub fn parameters(&self) -> RtpParameters {
self.sys_handle.get_parameters().into()
}
Expand Down
23 changes: 23 additions & 0 deletions libwebrtc/src/native/rtp_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

use super::media_stream_track::new_media_stream_track;
use crate::media_stream_track::MediaStreamTrack;
use crate::stats::RtcStats;
use crate::{rtp_parameters::RtpParameters, RtcError, RtcErrorType};
use cxx::SharedPtr;
use tokio::sync::oneshot;
use webrtc_sys::rtc_error as sys_err;
use webrtc_sys::rtp_sender as sys_rs;

Expand All @@ -34,6 +36,27 @@ impl RtpSender {
Some(new_media_stream_track(track_handle))
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RtcStats>, RtcError>>();
let ctx = Box::new(sys_rs::SenderContext(Box::new(tx)));

self.sys_handle.get_stats(ctx, |ctx, stats| {
let tx = ctx
.0
.downcast::<oneshot::Sender<Result<Vec<RtcStats>, RtcError>>>()
.unwrap();

// Unwrap because it should not happens
let vec = serde_json::from_str(&stats).unwrap();
let _ = tx.send(Ok(vec));
});

rx.await.map_err(|_| RtcError {
error_type: RtcErrorType::Internal,
message: "get_stats cancelled".to_owned(),
})?
}

pub fn set_track(&self, track: Option<MediaStreamTrack>) -> Result<(), RtcError> {
if !self
.sys_handle
Expand Down
6 changes: 6 additions & 0 deletions libwebrtc/src/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::rtp_receiver::RtpReceiver;
use crate::rtp_sender::RtpSender;
use crate::rtp_transceiver::{RtpTransceiver, RtpTransceiverInit};
use crate::session_description::SessionDescription;
use crate::stats::RtcStats;
use crate::{MediaType, RtcError};
use std::fmt::Debug;

Expand Down Expand Up @@ -157,6 +158,10 @@ impl PeerConnection {
self.handle.remove_track(sender)
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
self.handle.get_stats().await
}

pub fn add_transceiver(
&self,
track: MediaStreamTrack,
Expand All @@ -172,6 +177,7 @@ impl PeerConnection {
) -> Result<RtpTransceiver, RtcError> {
self.handle.add_transceiver_for_media(media_type, init)
}

pub fn close(&self) {
self.handle.close()
}
Expand Down
2 changes: 1 addition & 1 deletion libwebrtc/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use crate::audio_frame::AudioFrame;
pub use crate::audio_source::{AudioSourceOptions, RtcAudioSource};
pub use crate::audio_track::RtcAudioTrack;
pub use crate::data_channel::{
DataBuffer, DataChannel, DataChannelError, DataChannelInit, DataState,
DataBuffer, DataChannel, DataChannelError, DataChannelInit, DataChannelState,
};
pub use crate::ice_candidate::IceCandidate;
pub use crate::media_stream::MediaStream;
Expand Down
6 changes: 5 additions & 1 deletion libwebrtc/src/rtp_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fmt::Debug;

use crate::{
imp::rtp_receiver as imp_rr, media_stream_track::MediaStreamTrack,
rtp_parameters::RtpParameters,
rtp_parameters::RtpParameters, stats::RtcStats, RtcError,
};

#[derive(Clone)]
Expand All @@ -29,6 +29,10 @@ impl RtpReceiver {
self.handle.track()
}

pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
self.handle.get_stats().await
}

pub fn parameters(&self) -> RtpParameters {
self.handle.parameters()
}
Expand Down
Loading

0 comments on commit a5ba08e

Please sign in to comment.