From bdac4291cb3666d0c52ea1c5d2ed7082f7e9b21f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Monnom?= Date: Wed, 14 Feb 2024 13:17:01 +0100 Subject: [PATCH] watch panic (#305) --- livekit-ffi/protocol/ffi.proto | 7 +++ livekit-ffi/src/cabi.rs | 62 +++++++++++++++----------- livekit-ffi/src/livekit.proto.rs | 10 ++++- livekit-ffi/src/server/audio_source.rs | 3 +- livekit-ffi/src/server/audio_stream.rs | 3 +- livekit-ffi/src/server/mod.rs | 31 +++++++++++++ livekit-ffi/src/server/requests.rs | 15 +++---- livekit-ffi/src/server/room.rs | 39 ++++++++++------ livekit-ffi/src/server/video_stream.rs | 3 +- 9 files changed, 122 insertions(+), 51 deletions(-) diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index bec1f593..b3d4f6f6 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -146,6 +146,7 @@ message FfiEvent { GetStatsCallback get_stats = 14; LogBatch logs = 15; GetSessionStatsCallback get_session_stats = 16; + Panic panic = 17; } } @@ -184,5 +185,11 @@ message LogBatch { repeated LogRecord records = 1; } +message Panic { + string message = 1; +} + // TODO(theomonnom): Debug messages (Print handles). + + diff --git a/livekit-ffi/src/cabi.rs b/livekit-ffi/src/cabi.rs index a3d15924..517ce992 100644 --- a/livekit-ffi/src/cabi.rs +++ b/livekit-ffi/src/cabi.rs @@ -1,12 +1,11 @@ -use std::sync::Arc; - use prost::Message; use server::FfiDataBuffer; +use std::{panic, sync::Arc}; use crate::{ proto, server::{self, FfiConfig}, - FfiHandleId, FFI_SERVER, INVALID_HANDLE, + FfiError, FfiHandleId, FFI_SERVER, INVALID_HANDLE, }; /// # SAFTEY: The "C" callback must be threadsafe and not block @@ -38,34 +37,47 @@ pub unsafe extern "C" fn livekit_ffi_request( res_ptr: *mut *const u8, res_len: *mut usize, ) -> FfiHandleId { - let data = unsafe { std::slice::from_raw_parts(data, len) }; - let req = match proto::FfiRequest::decode(data) { - Ok(req) => req, - Err(err) => { - log::error!("failed to decode request: {:?}", err); - return INVALID_HANDLE; + let res = panic::catch_unwind(|| { + let data = unsafe { std::slice::from_raw_parts(data, len) }; + let req = match proto::FfiRequest::decode(data) { + Ok(req) => req, + Err(err) => { + log::error!("failed to decode request: {:?}", err); + return INVALID_HANDLE; + } + }; + + let res = match server::requests::handle_request(&FFI_SERVER, req.clone()) { + Ok(res) => res, + Err(err) => { + log::error!("failed to handle request {:?}: {:?}", req, err); + return INVALID_HANDLE; + } } - }; + .encode_to_vec(); - let res = match server::requests::handle_request(&FFI_SERVER, req.clone()) { - Ok(res) => res, - Err(err) => { - log::error!("failed to handle request {:?}: {:?}", req, err); - return INVALID_HANDLE; + unsafe { + *res_ptr = res.as_ptr(); + *res_len = res.len(); } - } - .encode_to_vec(); - unsafe { - *res_ptr = res.as_ptr(); - *res_len = res.len(); - } + let handle_id = FFI_SERVER.next_id(); + let ffi_data = FfiDataBuffer { handle: handle_id, data: Arc::new(res) }; - let handle_id = FFI_SERVER.next_id(); - let ffi_data = FfiDataBuffer { handle: handle_id, data: Arc::new(res) }; + FFI_SERVER.store_handle(handle_id, ffi_data); + handle_id + }); - FFI_SERVER.store_handle(handle_id, ffi_data); - handle_id + match res { + Ok(handle_id) => handle_id, + Err(err) => { + log::error!("panic while handling request: {:?}", err); + FFI_SERVER.send_panic(Box::new(FfiError::InvalidRequest( + "panic while handling request".into(), + ))); + INVALID_HANDLE + } + } } #[no_mangle] diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 289061e5..b217c92b 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -3023,7 +3023,7 @@ pub mod ffi_response { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiEvent { - #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16")] + #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiEvent`. @@ -3063,6 +3063,8 @@ pub mod ffi_event { Logs(super::LogBatch), #[prost(message, tag="16")] GetSessionStats(super::GetSessionStatsCallback), + #[prost(message, tag="17")] + Panic(super::Panic), } } /// Stop all rooms synchronously (Do we need async here?). @@ -3110,6 +3112,12 @@ pub struct LogBatch { #[prost(message, repeated, tag="1")] pub records: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Panic { + #[prost(string, tag="1")] + pub message: ::prost::alloc::string::String, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum LogLevel { diff --git a/livekit-ffi/src/server/audio_source.rs b/livekit-ffi/src/server/audio_source.rs index 8a9174c6..932ea1c6 100644 --- a/livekit-ffi/src/server/audio_source.rs +++ b/livekit-ffi/src/server/audio_source.rs @@ -79,7 +79,7 @@ impl FfiAudioSource { } .to_vec(); - server.async_runtime.spawn(async move { + let handle = server.async_runtime.spawn(async move { // The data must be available as long as the client receive the callback. match source { #[cfg(not(target_arch = "wasm32"))] @@ -104,6 +104,7 @@ impl FfiAudioSource { _ => {} } }); + server.watch_panic(handle); Ok(proto::CaptureAudioFrameResponse { async_id }) } diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 5fcd4488..b76e5c5a 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -58,12 +58,13 @@ impl FfiAudioStream { let audio_stream = Self { handle_id, stream_type, close_tx }; let native_stream = NativeAudioStream::new(rtc_track); - server.async_runtime.spawn(Self::native_audio_stream_task( + let handle = server.async_runtime.spawn(Self::native_audio_stream_task( server, handle_id, native_stream, close_rx, )); + server.watch_panic(handle); Ok::(audio_stream) } _ => return Err(FfiError::InvalidRequest("unsupported audio stream type".into())), diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index d5e14b20..306e5240 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::{ + error::Error, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -23,8 +24,10 @@ use std::{ use dashmap::{mapref::one::MappedRef, DashMap}; use downcast_rs::{impl_downcast, Downcast}; +use futures_util::Future; use livekit::webrtc::{native::audio_resampler::AudioResampler, prelude::*}; use parking_lot::{deadlock, Mutex}; +use tokio::task::JoinHandle; use crate::{proto, proto::FfiEvent, FfiError, FfiHandleId, FfiResult, INVALID_HANDLE}; @@ -202,4 +205,32 @@ impl FfiServer { pub fn drop_handle(&self, id: FfiHandleId) -> bool { self.ffi_handles.remove(&id).is_some() } + + pub fn send_panic(&self, err: Box) { + // Ok to block here, we're panicking anyway + // Mb send_event can now be sync since we're more confident about + // the callback function not blocking on Python + let _ = self.async_runtime.block_on(self.send_event(proto::ffi_event::Message::Panic( + proto::Panic { message: err.as_ref().to_string() }, + ))); + } + + pub fn watch_panic(&'static self, handle: JoinHandle) -> JoinHandle + where + O: Send + 'static, + { + let handle = self.async_runtime.spawn(async move { + match handle.await { + Ok(r) => r, + Err(e) => { + // Forward the panic to the client + // Recommended behaviour is to exit the process + log::error!("task panicked: {:?}", e); + self.send_panic(Box::new(e)); + panic!("watch_panic: task panicked"); + } + } + }); + handle + } } diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index e0f549bd..cf63a1ba 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -61,7 +61,7 @@ fn on_disconnect( disconnect: proto::DisconnectRequest, ) -> FfiResult { let async_id = server.next_id(); - server.async_runtime.spawn(async move { + let handle = server.async_runtime.spawn(async move { let ffi_room = server.retrieve_handle::(disconnect.room_handle).unwrap().clone(); @@ -73,7 +73,7 @@ fn on_disconnect( })) .await; }); - + server.watch_panic(handle); Ok(proto::DisconnectResponse { async_id }) } @@ -205,10 +205,8 @@ fn on_get_stats( get_stats: proto::GetStatsRequest, ) -> FfiResult { let ffi_track = server.retrieve_handle::(get_stats.track_handle)?.clone(); - let async_id = server.next_id(); - - server.async_runtime.spawn(async move { + let handle = server.async_runtime.spawn(async move { match ffi_track.track.get_stats().await { Ok(stats) => { let _ = server @@ -230,7 +228,7 @@ fn on_get_stats( } } }); - + server.watch_panic(handle); Ok(proto::GetStatsResponse { async_id }) } @@ -339,6 +337,7 @@ fn new_audio_resampler( } /// Remix and resample an audio frame +/// TODO: Deprecate this function fn remix_and_resample( server: &'static FfiServer, remix: proto::RemixAndResampleRequest, @@ -504,7 +503,7 @@ fn on_get_session_stats( let ffi_room = server.retrieve_handle::(get_session_stats.room_handle)?.clone(); let async_id = server.next_id(); - server.async_runtime.spawn(async move { + let handle = server.async_runtime.spawn(async move { match ffi_room.inner.room.get_stats().await { Ok(stats) => { let _ = server @@ -539,7 +538,7 @@ fn on_get_session_stats( } } }); - + server.watch_panic(handle); Ok(proto::GetSessionStatsResponse { async_id }) } diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 0705d238..8317406f 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -148,12 +148,22 @@ impl FfiRoom { .await; // Forward events - let event_handle = { + let event_handle = server.watch_panic({ let close_rx = close_rx.resubscribe(); - tokio::spawn(room_task(server, inner.clone(), events, close_rx)) - }; - let data_handle = - tokio::spawn(data_task(server, inner.clone(), data_rx, close_rx)); // Publish data + server.async_runtime.spawn(room_task( + server, + inner.clone(), + events, + close_rx, + )) + }); + + let data_handle = server.watch_panic(server.async_runtime.spawn(data_task( + server, + inner.clone(), + data_rx, + close_rx, + ))); // Publish data *handle = Some(Handle { event_handle, data_handle, close_tx }); } @@ -172,7 +182,7 @@ impl FfiRoom { }; }; - server.async_runtime.spawn(connect); + server.watch_panic(server.async_runtime.spawn(connect)); proto::ConnectResponse { async_id } } @@ -216,7 +226,7 @@ impl RoomInner { }, async_id, }) { - server.async_runtime.spawn(async move { + let handle = server.async_runtime.spawn(async move { let cb = proto::PublishDataCallback { async_id, error: Some(format!("failed to send data, room closed: {}", err)), @@ -224,6 +234,7 @@ impl RoomInner { let _ = server.send_event(proto::ffi_event::Message::PublishData(cb)).await; }); + server.watch_panic(handle); } Ok(proto::PublishDataResponse { async_id }) @@ -311,7 +322,7 @@ impl RoomInner { ) -> proto::UnpublishTrackResponse { let async_id = server.next_id(); let inner = self.clone(); - server.async_runtime.spawn(async move { + let handle = server.async_runtime.spawn(async move { let sid = unpublish.track_sid.try_into().unwrap(); let unpublish_res = inner.room.local_participant().unpublish_track(&sid).await; @@ -336,7 +347,7 @@ impl RoomInner { )) .await; }); - + server.watch_panic(handle); proto::UnpublishTrackResponse { async_id } } @@ -347,7 +358,7 @@ impl RoomInner { ) -> proto::UpdateLocalMetadataResponse { let async_id = server.next_id(); let inner = self.clone(); - server.async_runtime.spawn(async move { + let handle = server.async_runtime.spawn(async move { let _ = inner .room .local_participant() @@ -360,7 +371,7 @@ impl RoomInner { )) .await; }); - + server.watch_panic(handle); proto::UpdateLocalMetadataResponse { async_id } } @@ -371,7 +382,7 @@ impl RoomInner { ) -> proto::UpdateLocalNameResponse { let async_id = server.next_id(); let inner = self.clone(); - server.async_runtime.spawn(async move { + let handle = server.async_runtime.spawn(async move { let _ = inner.room.local_participant().update_name(update_local_name.name).await; let _ = server @@ -380,7 +391,7 @@ impl RoomInner { )) .await; }); - + server.watch_panic(handle); proto::UpdateLocalNameResponse { async_id } } } @@ -448,7 +459,7 @@ async fn room_task( } } - task.await.unwrap(); + let _ = server.watch_panic(task).await; }, _ = close_rx.recv() => { break; diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index e013fa04..cc4d3234 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -56,7 +56,7 @@ impl FfiVideoStream { #[cfg(not(target_arch = "wasm32"))] proto::VideoStreamType::VideoStreamNative => { let video_stream = Self { handle_id, close_tx, stream_type }; - server.async_runtime.spawn(Self::native_video_stream_task( + let handle = server.async_runtime.spawn(Self::native_video_stream_task( server, handle_id, new_stream.format.and_then(|_| Some(new_stream.format())), @@ -64,6 +64,7 @@ impl FfiVideoStream { NativeVideoStream::new(rtc_track), close_rx, )); + server.watch_panic(handle); Ok::(video_stream) } _ => return Err(FfiError::InvalidRequest("unsupported video stream type".into())),