Skip to content

Commit

Permalink
sync send event (#306)
Browse files Browse the repository at this point in the history
useful when panicking
  • Loading branch information
theomonnom authored Feb 14, 2024
1 parent bdac429 commit cb14200
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 182 deletions.
14 changes: 6 additions & 8 deletions livekit-ffi/src/server/audio_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,12 @@ impl FfiAudioSource {
};

let res = source.capture_frame(&audio_frame).await;
let _ = server
.send_event(proto::ffi_event::Message::CaptureAudioFrame(
proto::CaptureAudioFrameCallback {
async_id,
error: res.err().map(|e| e.to_string()),
},
))
.await;
let _ = server.send_event(proto::ffi_event::Message::CaptureAudioFrame(
proto::CaptureAudioFrameCallback {
async_id,
error: res.err().map(|e| e.to_string()),
},
));
}
_ => {}
}
Expand Down
12 changes: 6 additions & 6 deletions livekit-ffi/src/server/audio_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ impl FfiAudioStream {
},
)),
},
)).await {
)) {
server.drop_handle(handle_id);
log::warn!("failed to send audio frame: {}", err);
}
}
}
}

if let Err(err) = server
.send_event(proto::ffi_event::Message::AudioStreamEvent(proto::AudioStreamEvent {
if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent(
proto::AudioStreamEvent {
stream_handle: stream_handle_id,
message: Some(proto::audio_stream_event::Message::Eos(proto::AudioStreamEos {})),
}))
.await
{
},
)) {
log::warn!("failed to send audio EOS: {}", err);
}
}
Expand Down
8 changes: 3 additions & 5 deletions livekit-ffi/src/server/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,9 @@ async fn log_forward_task(mut rx: mpsc::UnboundedReceiver<LogMsg>) {
// It is safe to use FFI_SERVER here, if we receive logs when capture_logs is enabled,
// it means the server has already been initialized

let _ = FFI_SERVER
.send_event(proto::ffi_event::Message::Logs(proto::LogBatch {
records: batch.clone(), // Avoid clone here?
}))
.await;
let _ = FFI_SERVER.send_event(proto::ffi_event::Message::Logs(proto::LogBatch {
records: batch.clone(), // Avoid clone here?
}));
batch.clear();
}

Expand Down
24 changes: 5 additions & 19 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::{

use dashmap::{mapref::one::MappedRef, DashMap};
use downcast_rs::{impl_downcast, Downcast};
use futures_util::Future;
use livekit::webrtc::{native::audio_resampler::AudioResampler, prelude::*};
use parking_lot::{deadlock, Mutex};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -146,24 +145,14 @@ impl FfiServer {
*self.config.lock() = None; // Invalidate the config
}

pub async fn send_event(&self, message: proto::ffi_event::Message) -> FfiResult<()> {
pub fn send_event(&self, message: proto::ffi_event::Message) -> FfiResult<()> {
let cb = self
.config
.lock()
.as_ref()
.map_or_else(|| Err(FfiError::NotConfigured), |c| Ok(c.callback_fn.clone()))?;

let cb_task = self.async_runtime.spawn_blocking(move || {
cb(proto::FfiEvent { message: Some(message) });
});

tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(2)) => {
log::error!("sending an event to the foreign language took too much time, is your callback function blocking?");
}
_ = cb_task => {}
}

cb(proto::FfiEvent { message: Some(message) });
Ok(())
}

Expand Down Expand Up @@ -207,12 +196,9 @@ impl FfiServer {
}

pub fn send_panic(&self, err: Box<dyn Error>) {
// Ok to block here, we're panicking anyway
// Mb send_event can now be sync since we're more confident about
// the callback function not blocking on Python
let _ = self.async_runtime.block_on(self.send_event(proto::ffi_event::Message::Panic(
proto::Panic { message: err.as_ref().to_string() },
)));
let _ = self.send_event(proto::ffi_event::Message::Panic(proto::Panic {
message: err.as_ref().to_string(),
}));
}

pub fn watch_panic<O>(&'static self, handle: JoinHandle<O>) -> JoinHandle<O>
Expand Down
73 changes: 34 additions & 39 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ fn on_disconnect(

ffi_room.close().await;

let _ = server
.send_event(proto::ffi_event::Message::Disconnect(proto::DisconnectCallback {
let _ =
server.send_event(proto::ffi_event::Message::Disconnect(proto::DisconnectCallback {
async_id,
}))
.await;
}));
});
server.watch_panic(handle);
Ok(proto::DisconnectResponse { async_id })
Expand Down Expand Up @@ -209,22 +208,22 @@ fn on_get_stats(
let handle = server.async_runtime.spawn(async move {
match ffi_track.track.get_stats().await {
Ok(stats) => {
let _ = server
.send_event(proto::ffi_event::Message::GetStats(proto::GetStatsCallback {
let _ = server.send_event(proto::ffi_event::Message::GetStats(
proto::GetStatsCallback {
async_id,
error: None,
stats: stats.into_iter().map(Into::into).collect(),
}))
.await;
},
));
}
Err(err) => {
let _ = server
.send_event(proto::ffi_event::Message::GetStats(proto::GetStatsCallback {
let _ = server.send_event(proto::ffi_event::Message::GetStats(
proto::GetStatsCallback {
async_id,
error: Some(err.to_string()),
stats: Vec::default(),
}))
.await;
},
));
}
}
});
Expand Down Expand Up @@ -506,35 +505,31 @@ fn on_get_session_stats(
let handle = server.async_runtime.spawn(async move {
match ffi_room.inner.room.get_stats().await {
Ok(stats) => {
let _ = server
.send_event(proto::ffi_event::Message::GetSessionStats(
proto::GetSessionStatsCallback {
async_id,
error: None,
publisher_stats: stats
.publisher_stats
.into_iter()
.map(Into::into)
.collect(),
subscriber_stats: stats
.subscriber_stats
.into_iter()
.map(Into::into)
.collect(),
},
))
.await;
let _ = server.send_event(proto::ffi_event::Message::GetSessionStats(
proto::GetSessionStatsCallback {
async_id,
error: None,
publisher_stats: stats
.publisher_stats
.into_iter()
.map(Into::into)
.collect(),
subscriber_stats: stats
.subscriber_stats
.into_iter()
.map(Into::into)
.collect(),
},
));
}
Err(err) => {
let _ = server
.send_event(proto::ffi_event::Message::GetSessionStats(
proto::GetSessionStatsCallback {
async_id,
error: Some(err.to_string()),
..Default::default()
},
))
.await;
let _ = server.send_event(proto::ffi_event::Message::GetSessionStats(
proto::GetSessionStatsCallback {
async_id,
error: Some(err.to_string()),
..Default::default()
},
));
}
}
});
Expand Down
Loading

0 comments on commit cb14200

Please sign in to comment.