diff --git a/livekit-ffi/src/server/audio_source.rs b/livekit-ffi/src/server/audio_source.rs index 932ea1c6..845e15bc 100644 --- a/livekit-ffi/src/server/audio_source.rs +++ b/livekit-ffi/src/server/audio_source.rs @@ -92,14 +92,12 @@ impl FfiAudioSource { }; let res = source.capture_frame(&audio_frame).await; - let _ = server - .send_event(proto::ffi_event::Message::CaptureAudioFrame( - proto::CaptureAudioFrameCallback { - async_id, - error: res.err().map(|e| e.to_string()), - }, - )) - .await; + let _ = server.send_event(proto::ffi_event::Message::CaptureAudioFrame( + proto::CaptureAudioFrameCallback { + async_id, + error: res.err().map(|e| e.to_string()), + }, + )); } _ => {} } diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index b76e5c5a..b5edcad0 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -112,20 +112,20 @@ impl FfiAudioStream { }, )), }, - )).await { + )) { + server.drop_handle(handle_id); log::warn!("failed to send audio frame: {}", err); } } } } - if let Err(err) = server - .send_event(proto::ffi_event::Message::AudioStreamEvent(proto::AudioStreamEvent { + if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent( + proto::AudioStreamEvent { stream_handle: stream_handle_id, message: Some(proto::audio_stream_event::Message::Eos(proto::AudioStreamEos {})), - })) - .await - { + }, + )) { log::warn!("failed to send audio EOS: {}", err); } } diff --git a/livekit-ffi/src/server/logger.rs b/livekit-ffi/src/server/logger.rs index 6262bd0f..bed451ec 100644 --- a/livekit-ffi/src/server/logger.rs +++ b/livekit-ffi/src/server/logger.rs @@ -88,11 +88,9 @@ async fn log_forward_task(mut rx: mpsc::UnboundedReceiver) { // It is safe to use FFI_SERVER here, if we receive logs when capture_logs is enabled, // it means the server has already been initialized - let _ = FFI_SERVER - .send_event(proto::ffi_event::Message::Logs(proto::LogBatch { - records: batch.clone(), // Avoid clone here? - })) - .await; + let _ = FFI_SERVER.send_event(proto::ffi_event::Message::Logs(proto::LogBatch { + records: batch.clone(), // Avoid clone here? + })); batch.clear(); } diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index 306e5240..2d3cf608 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -24,7 +24,6 @@ use std::{ use dashmap::{mapref::one::MappedRef, DashMap}; use downcast_rs::{impl_downcast, Downcast}; -use futures_util::Future; use livekit::webrtc::{native::audio_resampler::AudioResampler, prelude::*}; use parking_lot::{deadlock, Mutex}; use tokio::task::JoinHandle; @@ -146,24 +145,14 @@ impl FfiServer { *self.config.lock() = None; // Invalidate the config } - pub async fn send_event(&self, message: proto::ffi_event::Message) -> FfiResult<()> { + pub fn send_event(&self, message: proto::ffi_event::Message) -> FfiResult<()> { let cb = self .config .lock() .as_ref() .map_or_else(|| Err(FfiError::NotConfigured), |c| Ok(c.callback_fn.clone()))?; - let cb_task = self.async_runtime.spawn_blocking(move || { - cb(proto::FfiEvent { message: Some(message) }); - }); - - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(2)) => { - log::error!("sending an event to the foreign language took too much time, is your callback function blocking?"); - } - _ = cb_task => {} - } - + cb(proto::FfiEvent { message: Some(message) }); Ok(()) } @@ -207,12 +196,9 @@ impl FfiServer { } pub fn send_panic(&self, err: Box) { - // Ok to block here, we're panicking anyway - // Mb send_event can now be sync since we're more confident about - // the callback function not blocking on Python - let _ = self.async_runtime.block_on(self.send_event(proto::ffi_event::Message::Panic( - proto::Panic { message: err.as_ref().to_string() }, - ))); + let _ = self.send_event(proto::ffi_event::Message::Panic(proto::Panic { + message: err.as_ref().to_string(), + })); } pub fn watch_panic(&'static self, handle: JoinHandle) -> JoinHandle diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index cf63a1ba..0cd8f25c 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -67,11 +67,10 @@ fn on_disconnect( ffi_room.close().await; - let _ = server - .send_event(proto::ffi_event::Message::Disconnect(proto::DisconnectCallback { + let _ = + server.send_event(proto::ffi_event::Message::Disconnect(proto::DisconnectCallback { async_id, - })) - .await; + })); }); server.watch_panic(handle); Ok(proto::DisconnectResponse { async_id }) @@ -209,22 +208,22 @@ fn on_get_stats( let handle = server.async_runtime.spawn(async move { match ffi_track.track.get_stats().await { Ok(stats) => { - let _ = server - .send_event(proto::ffi_event::Message::GetStats(proto::GetStatsCallback { + let _ = server.send_event(proto::ffi_event::Message::GetStats( + proto::GetStatsCallback { async_id, error: None, stats: stats.into_iter().map(Into::into).collect(), - })) - .await; + }, + )); } Err(err) => { - let _ = server - .send_event(proto::ffi_event::Message::GetStats(proto::GetStatsCallback { + let _ = server.send_event(proto::ffi_event::Message::GetStats( + proto::GetStatsCallback { async_id, error: Some(err.to_string()), stats: Vec::default(), - })) - .await; + }, + )); } } }); @@ -506,35 +505,31 @@ fn on_get_session_stats( let handle = server.async_runtime.spawn(async move { match ffi_room.inner.room.get_stats().await { Ok(stats) => { - let _ = server - .send_event(proto::ffi_event::Message::GetSessionStats( - proto::GetSessionStatsCallback { - async_id, - error: None, - publisher_stats: stats - .publisher_stats - .into_iter() - .map(Into::into) - .collect(), - subscriber_stats: stats - .subscriber_stats - .into_iter() - .map(Into::into) - .collect(), - }, - )) - .await; + let _ = server.send_event(proto::ffi_event::Message::GetSessionStats( + proto::GetSessionStatsCallback { + async_id, + error: None, + publisher_stats: stats + .publisher_stats + .into_iter() + .map(Into::into) + .collect(), + subscriber_stats: stats + .subscriber_stats + .into_iter() + .map(Into::into) + .collect(), + }, + )); } Err(err) => { - let _ = server - .send_event(proto::ffi_event::Message::GetSessionStats( - proto::GetSessionStatsCallback { - async_id, - error: Some(err.to_string()), - ..Default::default() - }, - )) - .await; + let _ = server.send_event(proto::ffi_event::Message::GetSessionStats( + proto::GetSessionStatsCallback { + async_id, + error: Some(err.to_string()), + ..Default::default() + }, + )); } } }); diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 8317406f..e3dcc7fc 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -132,10 +132,12 @@ impl FfiRoom { let mut handle = ffi_room.handle.lock().await; let room_info = proto::RoomInfo::from(&ffi_room); + panic!("room_info: {:?}", room_info); + // Send the async response to the FfiClient *before* starting the tasks. // Ensure no events are sent before the callback - let _ = server - .send_event(proto::ffi_event::Message::Connect(proto::ConnectCallback { + let _ = server.send_event(proto::ffi_event::Message::Connect( + proto::ConnectCallback { async_id, error: None, room: Some(proto::OwnedRoom { @@ -144,8 +146,8 @@ impl FfiRoom { }), local_participant: Some(local_info), participants: remote_infos, - })) - .await; + }, + )); // Forward events let event_handle = server.watch_panic({ @@ -171,13 +173,13 @@ impl FfiRoom { // Failed to connect to the room, send an error message to the FfiClient // TODO(theomonnom): Typed errors? log::error!("error while connecting to a room: {}", e); - let _ = server - .send_event(proto::ffi_event::Message::Connect(proto::ConnectCallback { + let _ = server.send_event(proto::ffi_event::Message::Connect( + proto::ConnectCallback { async_id, error: Some(e.to_string()), ..Default::default() - })) - .await; + }, + )); } }; }; @@ -232,7 +234,7 @@ impl RoomInner { error: Some(format!("failed to send data, room closed: {}", err)), }; - let _ = server.send_event(proto::ffi_event::Message::PublishData(cb)).await; + let _ = server.send_event(proto::ffi_event::Message::PublishData(cb)); }); server.watch_panic(handle); } @@ -278,32 +280,28 @@ impl RoomInner { let publication_info = proto::TrackPublicationInfo::from(&ffi_publication); server.store_handle(ffi_publication.handle, ffi_publication); - let _ = server - .send_event(proto::ffi_event::Message::PublishTrack( - proto::PublishTrackCallback { - async_id, - publication: Some(proto::OwnedTrackPublication { - handle: Some(proto::FfiOwnedHandle { id: handle_id }), - info: Some(publication_info), - }), - ..Default::default() - }, - )) - .await; + let _ = server.send_event(proto::ffi_event::Message::PublishTrack( + proto::PublishTrackCallback { + async_id, + publication: Some(proto::OwnedTrackPublication { + handle: Some(proto::FfiOwnedHandle { id: handle_id }), + info: Some(publication_info), + }), + ..Default::default() + }, + )); inner.pending_published_tracks.lock().insert(publication.sid()); } Err(err) => { // Failed to publish the track - let _ = server - .send_event(proto::ffi_event::Message::PublishTrack( - proto::PublishTrackCallback { - async_id, - error: Some(err.to_string()), - ..Default::default() - }, - )) - .await; + let _ = server.send_event(proto::ffi_event::Message::PublishTrack( + proto::PublishTrackCallback { + async_id, + error: Some(err.to_string()), + ..Default::default() + }, + )); } } }); @@ -338,14 +336,12 @@ impl RoomInner { } } - let _ = server - .send_event(proto::ffi_event::Message::UnpublishTrack( - proto::UnpublishTrackCallback { - async_id, - error: unpublish_res.err().map(|e| e.to_string()), - }, - )) - .await; + let _ = server.send_event(proto::ffi_event::Message::UnpublishTrack( + proto::UnpublishTrackCallback { + async_id, + error: unpublish_res.err().map(|e| e.to_string()), + }, + )); }); server.watch_panic(handle); proto::UnpublishTrackResponse { async_id } @@ -365,11 +361,9 @@ impl RoomInner { .update_metadata(update_local_metadata.metadata) .await; - let _ = server - .send_event(proto::ffi_event::Message::UpdateLocalMetadata( - proto::UpdateLocalMetadataCallback { async_id }, - )) - .await; + let _ = server.send_event(proto::ffi_event::Message::UpdateLocalMetadata( + proto::UpdateLocalMetadataCallback { async_id }, + )); }); server.watch_panic(handle); proto::UpdateLocalMetadataResponse { async_id } @@ -385,11 +379,9 @@ impl RoomInner { let handle = server.async_runtime.spawn(async move { let _ = inner.room.local_participant().update_name(update_local_name.name).await; - let _ = server - .send_event(proto::ffi_event::Message::UpdateLocalName( - proto::UpdateLocalNameCallback { async_id }, - )) - .await; + let _ = server.send_event(proto::ffi_event::Message::UpdateLocalName( + proto::UpdateLocalNameCallback { async_id }, + )); }); server.watch_panic(handle); proto::UpdateLocalNameResponse { async_id } @@ -413,7 +405,7 @@ async fn data_task( error: res.err().map(|e| e.to_string()), }; - let _ = server.send_event(proto::ffi_event::Message::PublishData(cb)).await; + let _ = server.send_event(proto::ffi_event::Message::PublishData(cb)); }, _ = close_rx.recv() => { break; @@ -467,12 +459,10 @@ async fn room_task( }; } - let _ = server - .send_event(proto::ffi_event::Message::RoomEvent(proto::RoomEvent { - room_handle: inner.handle_id, - message: Some(proto::room_event::Message::Eos(proto::RoomEos {})), - })) - .await; + let _ = server.send_event(proto::ffi_event::Message::RoomEvent(proto::RoomEvent { + room_handle: inner.handle_id, + message: Some(proto::room_event::Message::Eos(proto::RoomEos {})), + })); } async fn forward_event( @@ -504,14 +494,12 @@ async fn forward_event( info: Some(proto::ParticipantInfo::from(&ffi_participant)), }), }, - )) - .await; + )); } RoomEvent::ParticipantDisconnected(participant) => { let _ = send_event(proto::room_event::Message::ParticipantDisconnected( proto::ParticipantDisconnected { participant_sid: participant.sid().into() }, - )) - .await; + )); } RoomEvent::LocalTrackPublished { publication, track: _, participant: _ } => { let sid = publication.sid(); @@ -539,14 +527,12 @@ async fn forward_event( let _ = send_event(proto::room_event::Message::LocalTrackPublished( proto::LocalTrackPublished { track_sid: sid.to_string() }, - )) - .await; + )); } RoomEvent::LocalTrackUnpublished { publication, participant: _ } => { let _ = send_event(proto::room_event::Message::LocalTrackUnpublished( proto::LocalTrackUnpublished { publication_sid: publication.sid().into() }, - )) - .await; + )); inner.pending_unpublished_tracks.lock().insert(publication.sid()); } @@ -566,16 +552,14 @@ async fn forward_event( handle: Some(proto::FfiOwnedHandle { id: handle_id }), info: Some(publication_info), }), - })) - .await; + })); } RoomEvent::TrackUnpublished { publication, participant } => { let _ = send_event(proto::room_event::Message::TrackUnpublished(proto::TrackUnpublished { participant_sid: participant.sid().to_string(), publication_sid: publication.sid().into(), - })) - .await; + })); } RoomEvent::TrackSubscribed { track, publication: _, participant } => { let handle_id = server.next_id(); @@ -591,8 +575,7 @@ async fn forward_event( handle: Some(proto::FfiOwnedHandle { id: handle_id }), info: Some(track_info), }), - })) - .await; + })); } RoomEvent::TrackUnsubscribed { track, publication: _, participant } => { let _ = send_event(proto::room_event::Message::TrackUnsubscribed( @@ -600,8 +583,7 @@ async fn forward_event( participant_sid: participant.sid().to_string(), track_sid: track.sid().to_string(), }, - )) - .await; + )); } RoomEvent::TrackSubscriptionFailed { participant, error, track_sid } => { let _ = send_event(proto::room_event::Message::TrackSubscriptionFailed( @@ -610,28 +592,24 @@ async fn forward_event( error: error.to_string(), track_sid: track_sid.into(), }, - )) - .await; + )); } RoomEvent::TrackMuted { participant, publication } => { let _ = send_event(proto::room_event::Message::TrackMuted(proto::TrackMuted { participant_sid: participant.sid().to_string(), track_sid: publication.sid().into(), - })) - .await; + })); } RoomEvent::TrackUnmuted { participant, publication } => { let _ = send_event(proto::room_event::Message::TrackUnmuted(proto::TrackUnmuted { participant_sid: participant.sid().to_string(), track_sid: publication.sid().into(), - })) - .await; + })); } RoomEvent::RoomMetadataChanged { old_metadata: _, metadata } => { let _ = send_event(proto::room_event::Message::RoomMetadataChanged( proto::RoomMetadataChanged { metadata }, - )) - .await; + )); } RoomEvent::ParticipantMetadataChanged { participant, old_metadata: _, metadata } => { let _ = send_event(proto::room_event::Message::ParticipantMetadataChanged( @@ -639,8 +617,7 @@ async fn forward_event( participant_sid: participant.sid().to_string(), metadata, }, - )) - .await; + )); } RoomEvent::ParticipantNameChanged { participant, old_name: _, name } => { let _ = send_event(proto::room_event::Message::ParticipantNameChanged( @@ -648,16 +625,14 @@ async fn forward_event( participant_sid: participant.sid().to_string(), name, }, - )) - .await; + )); } RoomEvent::ActiveSpeakersChanged { speakers } => { let participant_sids = speakers.iter().map(|p| p.sid().to_string()).collect::>(); let _ = send_event(proto::room_event::Message::ActiveSpeakersChanged( proto::ActiveSpeakersChanged { participant_sids }, - )) - .await; + )); } RoomEvent::ConnectionQualityChanged { quality, participant } => { let _ = send_event(proto::room_event::Message::ConnectionQualityChanged( @@ -665,8 +640,7 @@ async fn forward_event( participant_sid: participant.sid().to_string(), quality: proto::ConnectionQuality::from(quality).into(), }, - )) - .await; + )); } RoomEvent::DataReceived { payload, kind, participant, topic } => { let handle_id = server.next_id(); @@ -684,39 +658,33 @@ async fn forward_event( participant_sid: participant.map(|p| p.sid().to_string()), kind: proto::DataPacketKind::from(kind).into(), topic, - })) - .await; + })); } RoomEvent::ConnectionStateChanged(state) => { let _ = send_event(proto::room_event::Message::ConnectionStateChanged( proto::ConnectionStateChanged { state: proto::ConnectionState::from(state).into() }, - )) - .await; + )); } RoomEvent::Connected { .. } => { // Ignore here, we're already sent the event on connect (see above) } RoomEvent::Disconnected { reason: _ } => { - let _ = - send_event(proto::room_event::Message::Disconnected(proto::Disconnected {})).await; + let _ = send_event(proto::room_event::Message::Disconnected(proto::Disconnected {})); } RoomEvent::Reconnecting => { present_state.lock().reconnecting = true; - let _ = - send_event(proto::room_event::Message::Reconnecting(proto::Reconnecting {})).await; + let _ = send_event(proto::room_event::Message::Reconnecting(proto::Reconnecting {})); } RoomEvent::Reconnected => { present_state.lock().reconnecting = false; - let _ = - send_event(proto::room_event::Message::Reconnected(proto::Reconnected {})).await; + let _ = send_event(proto::room_event::Message::Reconnected(proto::Reconnected {})); } RoomEvent::E2eeStateChanged { participant, state } => { let _ = send_event(proto::room_event::Message::E2eeStateChanged(proto::E2eeStateChanged { participant_sid: participant.sid().to_string(), state: proto::EncryptionState::from(state).into(), - })) - .await; + })); } _ => { log::warn!("unhandled room event: {:?}", event); diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index cc4d3234..83fc79b5 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -124,20 +124,20 @@ impl FfiVideoStream { } )), } - )).await{ + )) { + server.drop_handle(handle_id); log::warn!("failed to send video frame: {}", err); } } } } - if let Err(err) = server - .send_event(proto::ffi_event::Message::VideoStreamEvent(proto::VideoStreamEvent { + if let Err(err) = server.send_event(proto::ffi_event::Message::VideoStreamEvent( + proto::VideoStreamEvent { stream_handle, message: Some(proto::video_stream_event::Message::Eos(proto::VideoStreamEos {})), - })) - .await - { + }, + )) { log::warn!("failed to send video EOS: {}", err); } }