diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 5aab014d6..3f97ff5d6 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -150,12 +150,16 @@ message FfiEvent { UpdateLocalMetadataCallback update_local_metadata = 12; UpdateLocalNameCallback update_local_name = 13; GetStatsCallback get_stats = 14; + LogBatch logs = 15; } } // Setup the callback where the foreign language can receive events // and responses to asynchronous requests -message InitializeRequest { uint64 event_callback_ptr = 1; } +message InitializeRequest { + uint64 event_callback_ptr = 1; + bool capture_logs = 2; // When true, the FfiServer will forward logs using LogRecord +} message InitializeResponse {} // Stop all rooms synchronously (Do we need async here?). @@ -172,5 +176,26 @@ message DisposeCallback { uint64 async_id = 1; } -// TODO(theomonnom): Debug messages (Print handles, forward logs). +enum LogLevel { + LOG_ERROR = 0; + LOG_WARN = 1; + LOG_INFO = 2; + LOG_DEBUG = 3; + LOG_TRACE = 4; +} + +message LogRecord { + LogLevel level = 1; + string target = 2; // e.g "livekit", "libwebrtc", "tokio-tungstenite", etc... + optional string module_path = 3; + optional string file = 4; + optional uint32 line = 5; + string message = 6; +} + +message LogBatch { + repeated LogRecord records = 1; +} + +// TODO(theomonnom): Debug messages (Print handles). diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index a352fb9de..4d318a76e 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -3157,7 +3157,7 @@ pub mod ffi_response { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiEvent { - #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14")] + #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiEvent`. @@ -3193,6 +3193,8 @@ pub mod ffi_event { UpdateLocalName(super::UpdateLocalNameCallback), #[prost(message, tag="14")] GetStats(super::GetStatsCallback), + #[prost(message, tag="15")] + Logs(super::LogBatch), } } /// Setup the callback where the foreign language can receive events @@ -3202,6 +3204,9 @@ pub mod ffi_event { pub struct InitializeRequest { #[prost(uint64, tag="1")] pub event_callback_ptr: u64, + /// When true, the FfiServer will forward logs using LogRecord + #[prost(bool, tag="2")] + pub capture_logs: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -3229,4 +3234,62 @@ pub struct DisposeCallback { #[prost(uint64, tag="1")] pub async_id: u64, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LogRecord { + #[prost(enumeration="LogLevel", tag="1")] + pub level: i32, + /// e.g "livekit", "libwebrtc", "tokio-tungstenite", etc... + #[prost(string, tag="2")] + pub target: ::prost::alloc::string::String, + #[prost(string, optional, tag="3")] + pub module_path: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag="4")] + pub file: ::core::option::Option<::prost::alloc::string::String>, + #[prost(uint32, optional, tag="5")] + pub line: ::core::option::Option, + #[prost(string, tag="6")] + pub message: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LogBatch { + #[prost(message, repeated, tag="1")] + pub records: ::prost::alloc::vec::Vec, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum LogLevel { + LogError = 0, + LogWarn = 1, + LogInfo = 2, + LogDebug = 3, + LogTrace = 4, +} +impl LogLevel { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + LogLevel::LogError => "LOG_ERROR", + LogLevel::LogWarn => "LOG_WARN", + LogLevel::LogInfo => "LOG_INFO", + LogLevel::LogDebug => "LOG_DEBUG", + LogLevel::LogTrace => "LOG_TRACE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "LOG_ERROR" => Some(Self::LogError), + "LOG_WARN" => Some(Self::LogWarn), + "LOG_INFO" => Some(Self::LogInfo), + "LOG_DEBUG" => Some(Self::LogDebug), + "LOG_TRACE" => Some(Self::LogTrace), + _ => None, + } + } +} // @@protoc_insertion_point(module) diff --git a/livekit-ffi/src/server/logger.rs b/livekit-ffi/src/server/logger.rs new file mode 100644 index 000000000..b4c777a34 --- /dev/null +++ b/livekit-ffi/src/server/logger.rs @@ -0,0 +1,150 @@ +use crate::proto; +use crate::FFI_SERVER; +use env_logger; +use log::{self, Log}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; + +pub const FLUSH_INTERVAL: Duration = Duration::from_secs(1); +pub const BATCH_SIZE: usize = 32; + +/// Logger that forward logs to the FfiClient when capture_logs is enabled +/// Otherwise fallback to the env_logger +pub struct FfiLogger { + async_runtime: tokio::runtime::Handle, + log_tx: mpsc::UnboundedSender, + capture_logs: AtomicBool, + env_logger: env_logger::Logger, +} + +enum LogMsg { + Log(proto::LogRecord), + Flush(oneshot::Sender<()>), +} + +impl FfiLogger { + pub fn new(async_runtime: tokio::runtime::Handle) -> Self { + let (log_tx, log_rx) = mpsc::unbounded_channel(); + async_runtime.spawn(log_forward_task(log_rx)); + + let env_logger = env_logger::Builder::from_default_env().build(); + FfiLogger { + async_runtime, + log_tx, + capture_logs: AtomicBool::new(false), // Always false by default to ensure the server + // is always initialized when using capture_logs + env_logger, + } + } +} + +impl FfiLogger { + pub fn capture_logs(&self) -> bool { + self.capture_logs.load(Ordering::Acquire) + } + + pub fn set_capture_logs(&self, capture: bool) { + self.capture_logs.store(capture, Ordering::Release); + } +} + +impl Log for FfiLogger { + fn enabled(&self, metadata: &log::Metadata) -> bool { + if !self.capture_logs() { + return self.env_logger.enabled(metadata); + } + + true // The ffi client decides what to log (FfiLogger is just forwarding) + } + + fn log(&self, record: &log::Record) { + if !self.capture_logs() { + return self.env_logger.log(record); + } + + self.log_tx.send(LogMsg::Log(record.into())).unwrap(); + } + + fn flush(&self) { + if !self.capture_logs() { + return self.env_logger.flush(); + } + + let (tx, rx) = oneshot::channel(); + self.log_tx.send(LogMsg::Flush(tx)).unwrap(); + let _ = self.async_runtime.block_on(rx); // should we block? + } +} + +async fn log_forward_task(mut rx: mpsc::UnboundedReceiver) { + async fn flush(batch: &mut Vec) { + if batch.is_empty() { + return; + } + // It is safe to use FFI_SERVER here, if we receive logs when capture_logs is enabled, + // it means the server has already been initialized + + let _ = FFI_SERVER + .send_event(proto::ffi_event::Message::Logs(proto::LogBatch { + records: batch.clone(), // Avoid clone here? + })) + .await; + batch.clear(); + } + + let mut batch = Vec::with_capacity(BATCH_SIZE); + let mut interval = tokio::time::interval(FLUSH_INTERVAL); + + loop { + tokio::select! { + msg = rx.recv() => { + if msg.is_none() { + break; + } + + match msg.unwrap() { + LogMsg::Log(record) => { + batch.push(record); + } + LogMsg::Flush(tx) => { + flush(&mut batch).await; + let _ = tx.send(()); + } + } + }, + _ = interval.tick() => { + flush(&mut batch).await; + } + } + + flush(&mut batch).await; + } + + println!("log forwarding task stopped"); // Shouldn't happen (logger is leaked) +} + +impl From<&log::Record<'_>> for proto::LogRecord { + fn from(record: &log::Record) -> Self { + proto::LogRecord { + level: proto::LogLevel::from(record.level()).into(), + target: record.target().to_string(), + module_path: record.module_path().map(|s| s.to_string()), + file: record.file().map(|s| s.to_string()), + line: record.line(), + message: record.args().to_string(), // Display trait + } + } +} + +impl From for proto::LogLevel { + fn from(level: log::Level) -> Self { + match level { + log::Level::Error => Self::LogError, + log::Level::Warn => Self::LogWarn, + log::Level::Info => Self::LogInfo, + log::Level::Debug => Self::LogDebug, + log::Level::Trace => Self::LogTrace, + } + } +} diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index f133c880c..bf337bb48 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -29,6 +29,7 @@ use std::time::Duration; pub mod audio_source; pub mod audio_stream; +pub mod logger; pub mod requests; pub mod room; pub mod video_source; @@ -65,11 +66,21 @@ pub struct FfiServer { next_id: AtomicU64, config: Mutex>, + logger: &'static logger::FfiLogger, } impl Default for FfiServer { fn default() -> Self { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + let async_runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let logger = Box::leak(Box::new(logger::FfiLogger::new( + async_runtime.handle().clone(), + ))); + log::set_logger(logger).unwrap(); + log::set_max_level(log::LevelFilter::Trace); #[cfg(feature = "tracing")] console_subscriber::init(); @@ -94,11 +105,9 @@ impl Default for FfiServer { Self { ffi_handles: Default::default(), next_id: AtomicU64::new(1), // 0 is invalid - async_runtime: tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(), + async_runtime, config: Default::default(), + logger, } } } @@ -133,6 +142,7 @@ impl FfiServer { .as_ref() .map_or_else(|| Err(FfiError::NotConfigured), |c| Ok(c.callback_fn))?; + // TODO(theomonnom): Don't reallocate let message = proto::FfiEvent { message: Some(message), } diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 538639f30..6534aee4f 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -36,13 +36,15 @@ fn on_initialize( return Err(FfiError::AlreadyInitialized); } - log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); + server.logger.set_capture_logs(init.capture_logs); // # SAFETY: The foreign language is responsible for ensuring that the callback function is valid *server.config.lock() = Some(FfiConfig { callback_fn: unsafe { std::mem::transmute(init.event_callback_ptr as usize) }, }); + log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); + Ok(proto::InitializeResponse::default()) }