Skip to content

Commit

Permalink
watch panic (#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Feb 14, 2024
1 parent 4450c6c commit bdac429
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 51 deletions.
7 changes: 7 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ message FfiEvent {
GetStatsCallback get_stats = 14;
LogBatch logs = 15;
GetSessionStatsCallback get_session_stats = 16;
Panic panic = 17;
}
}

Expand Down Expand Up @@ -184,5 +185,11 @@ message LogBatch {
repeated LogRecord records = 1;
}

message Panic {
string message = 1;
}

// TODO(theomonnom): Debug messages (Print handles).



62 changes: 37 additions & 25 deletions livekit-ffi/src/cabi.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::sync::Arc;

use prost::Message;
use server::FfiDataBuffer;
use std::{panic, sync::Arc};

use crate::{
proto,
server::{self, FfiConfig},
FfiHandleId, FFI_SERVER, INVALID_HANDLE,
FfiError, FfiHandleId, FFI_SERVER, INVALID_HANDLE,
};

/// # SAFTEY: The "C" callback must be threadsafe and not block
Expand Down Expand Up @@ -38,34 +37,47 @@ pub unsafe extern "C" fn livekit_ffi_request(
res_ptr: *mut *const u8,
res_len: *mut usize,
) -> FfiHandleId {
let data = unsafe { std::slice::from_raw_parts(data, len) };
let req = match proto::FfiRequest::decode(data) {
Ok(req) => req,
Err(err) => {
log::error!("failed to decode request: {:?}", err);
return INVALID_HANDLE;
let res = panic::catch_unwind(|| {
let data = unsafe { std::slice::from_raw_parts(data, len) };
let req = match proto::FfiRequest::decode(data) {
Ok(req) => req,
Err(err) => {
log::error!("failed to decode request: {:?}", err);
return INVALID_HANDLE;
}
};

let res = match server::requests::handle_request(&FFI_SERVER, req.clone()) {
Ok(res) => res,
Err(err) => {
log::error!("failed to handle request {:?}: {:?}", req, err);
return INVALID_HANDLE;
}
}
};
.encode_to_vec();

let res = match server::requests::handle_request(&FFI_SERVER, req.clone()) {
Ok(res) => res,
Err(err) => {
log::error!("failed to handle request {:?}: {:?}", req, err);
return INVALID_HANDLE;
unsafe {
*res_ptr = res.as_ptr();
*res_len = res.len();
}
}
.encode_to_vec();

unsafe {
*res_ptr = res.as_ptr();
*res_len = res.len();
}
let handle_id = FFI_SERVER.next_id();
let ffi_data = FfiDataBuffer { handle: handle_id, data: Arc::new(res) };

let handle_id = FFI_SERVER.next_id();
let ffi_data = FfiDataBuffer { handle: handle_id, data: Arc::new(res) };
FFI_SERVER.store_handle(handle_id, ffi_data);
handle_id
});

FFI_SERVER.store_handle(handle_id, ffi_data);
handle_id
match res {
Ok(handle_id) => handle_id,
Err(err) => {
log::error!("panic while handling request: {:?}", err);
FFI_SERVER.send_panic(Box::new(FfiError::InvalidRequest(
"panic while handling request".into(),
)));
INVALID_HANDLE
}
}
}

#[no_mangle]
Expand Down
10 changes: 9 additions & 1 deletion livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3023,7 +3023,7 @@ pub mod ffi_response {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiEvent {
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16")]
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17")]
pub message: ::core::option::Option<ffi_event::Message>,
}
/// Nested message and enum types in `FfiEvent`.
Expand Down Expand Up @@ -3063,6 +3063,8 @@ pub mod ffi_event {
Logs(super::LogBatch),
#[prost(message, tag="16")]
GetSessionStats(super::GetSessionStatsCallback),
#[prost(message, tag="17")]
Panic(super::Panic),
}
}
/// Stop all rooms synchronously (Do we need async here?).
Expand Down Expand Up @@ -3110,6 +3112,12 @@ pub struct LogBatch {
#[prost(message, repeated, tag="1")]
pub records: ::prost::alloc::vec::Vec<LogRecord>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Panic {
#[prost(string, tag="1")]
pub message: ::prost::alloc::string::String,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum LogLevel {
Expand Down
3 changes: 2 additions & 1 deletion livekit-ffi/src/server/audio_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl FfiAudioSource {
}
.to_vec();

server.async_runtime.spawn(async move {
let handle = server.async_runtime.spawn(async move {
// The data must be available as long as the client receive the callback.
match source {
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -104,6 +104,7 @@ impl FfiAudioSource {
_ => {}
}
});
server.watch_panic(handle);

Ok(proto::CaptureAudioFrameResponse { async_id })
}
Expand Down
3 changes: 2 additions & 1 deletion livekit-ffi/src/server/audio_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ impl FfiAudioStream {
let audio_stream = Self { handle_id, stream_type, close_tx };

let native_stream = NativeAudioStream::new(rtc_track);
server.async_runtime.spawn(Self::native_audio_stream_task(
let handle = server.async_runtime.spawn(Self::native_audio_stream_task(
server,
handle_id,
native_stream,
close_rx,
));
server.watch_panic(handle);
Ok::<FfiAudioStream, FfiError>(audio_stream)
}
_ => return Err(FfiError::InvalidRequest("unsupported audio stream type".into())),
Expand Down
31 changes: 31 additions & 0 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::{
error::Error,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand All @@ -23,8 +24,10 @@ use std::{

use dashmap::{mapref::one::MappedRef, DashMap};
use downcast_rs::{impl_downcast, Downcast};
use futures_util::Future;
use livekit::webrtc::{native::audio_resampler::AudioResampler, prelude::*};
use parking_lot::{deadlock, Mutex};
use tokio::task::JoinHandle;

use crate::{proto, proto::FfiEvent, FfiError, FfiHandleId, FfiResult, INVALID_HANDLE};

Expand Down Expand Up @@ -202,4 +205,32 @@ impl FfiServer {
pub fn drop_handle(&self, id: FfiHandleId) -> bool {
self.ffi_handles.remove(&id).is_some()
}

pub fn send_panic(&self, err: Box<dyn Error>) {
// Ok to block here, we're panicking anyway
// Mb send_event can now be sync since we're more confident about
// the callback function not blocking on Python
let _ = self.async_runtime.block_on(self.send_event(proto::ffi_event::Message::Panic(
proto::Panic { message: err.as_ref().to_string() },
)));
}

pub fn watch_panic<O>(&'static self, handle: JoinHandle<O>) -> JoinHandle<O>
where
O: Send + 'static,
{
let handle = self.async_runtime.spawn(async move {
match handle.await {
Ok(r) => r,
Err(e) => {
// Forward the panic to the client
// Recommended behaviour is to exit the process
log::error!("task panicked: {:?}", e);
self.send_panic(Box::new(e));
panic!("watch_panic: task panicked");
}
}
});
handle
}
}
15 changes: 7 additions & 8 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn on_disconnect(
disconnect: proto::DisconnectRequest,
) -> FfiResult<proto::DisconnectResponse> {
let async_id = server.next_id();
server.async_runtime.spawn(async move {
let handle = server.async_runtime.spawn(async move {
let ffi_room =
server.retrieve_handle::<room::FfiRoom>(disconnect.room_handle).unwrap().clone();

Expand All @@ -73,7 +73,7 @@ fn on_disconnect(
}))
.await;
});

server.watch_panic(handle);
Ok(proto::DisconnectResponse { async_id })
}

Expand Down Expand Up @@ -205,10 +205,8 @@ fn on_get_stats(
get_stats: proto::GetStatsRequest,
) -> FfiResult<proto::GetStatsResponse> {
let ffi_track = server.retrieve_handle::<FfiTrack>(get_stats.track_handle)?.clone();

let async_id = server.next_id();

server.async_runtime.spawn(async move {
let handle = server.async_runtime.spawn(async move {
match ffi_track.track.get_stats().await {
Ok(stats) => {
let _ = server
Expand All @@ -230,7 +228,7 @@ fn on_get_stats(
}
}
});

server.watch_panic(handle);
Ok(proto::GetStatsResponse { async_id })
}

Expand Down Expand Up @@ -339,6 +337,7 @@ fn new_audio_resampler(
}

/// Remix and resample an audio frame
/// TODO: Deprecate this function
fn remix_and_resample(
server: &'static FfiServer,
remix: proto::RemixAndResampleRequest,
Expand Down Expand Up @@ -504,7 +503,7 @@ fn on_get_session_stats(
let ffi_room = server.retrieve_handle::<room::FfiRoom>(get_session_stats.room_handle)?.clone();
let async_id = server.next_id();

server.async_runtime.spawn(async move {
let handle = server.async_runtime.spawn(async move {
match ffi_room.inner.room.get_stats().await {
Ok(stats) => {
let _ = server
Expand Down Expand Up @@ -539,7 +538,7 @@ fn on_get_session_stats(
}
}
});

server.watch_panic(handle);
Ok(proto::GetSessionStatsResponse { async_id })
}

Expand Down
Loading

0 comments on commit bdac429

Please sign in to comment.