Skip to content

Commit

Permalink
feat: ffi logger (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Nov 17, 2023
1 parent b66d3b1 commit c7a431b
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 9 deletions.
29 changes: 27 additions & 2 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,16 @@ message FfiEvent {
UpdateLocalMetadataCallback update_local_metadata = 12;
UpdateLocalNameCallback update_local_name = 13;
GetStatsCallback get_stats = 14;
LogBatch logs = 15;
}
}

// Setup the callback where the foreign language can receive events
// and responses to asynchronous requests
message InitializeRequest { uint64 event_callback_ptr = 1; }
message InitializeRequest {
uint64 event_callback_ptr = 1;
bool capture_logs = 2; // When true, the FfiServer will forward logs using LogRecord
}
message InitializeResponse {}

// Stop all rooms synchronously (Do we need async here?).
Expand All @@ -172,5 +176,26 @@ message DisposeCallback {
uint64 async_id = 1;
}

// TODO(theomonnom): Debug messages (Print handles, forward logs).
enum LogLevel {
LOG_ERROR = 0;
LOG_WARN = 1;
LOG_INFO = 2;
LOG_DEBUG = 3;
LOG_TRACE = 4;
}

message LogRecord {
LogLevel level = 1;
string target = 2; // e.g "livekit", "libwebrtc", "tokio-tungstenite", etc...
optional string module_path = 3;
optional string file = 4;
optional uint32 line = 5;
string message = 6;
}

message LogBatch {
repeated LogRecord records = 1;
}

// TODO(theomonnom): Debug messages (Print handles).

65 changes: 64 additions & 1 deletion livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3157,7 +3157,7 @@ pub mod ffi_response {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiEvent {
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14")]
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15")]
pub message: ::core::option::Option<ffi_event::Message>,
}
/// Nested message and enum types in `FfiEvent`.
Expand Down Expand Up @@ -3193,6 +3193,8 @@ pub mod ffi_event {
UpdateLocalName(super::UpdateLocalNameCallback),
#[prost(message, tag="14")]
GetStats(super::GetStatsCallback),
#[prost(message, tag="15")]
Logs(super::LogBatch),
}
}
/// Setup the callback where the foreign language can receive events
Expand All @@ -3202,6 +3204,9 @@ pub mod ffi_event {
pub struct InitializeRequest {
#[prost(uint64, tag="1")]
pub event_callback_ptr: u64,
/// When true, the FfiServer will forward logs using LogRecord
#[prost(bool, tag="2")]
pub capture_logs: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -3229,4 +3234,62 @@ pub struct DisposeCallback {
#[prost(uint64, tag="1")]
pub async_id: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LogRecord {
#[prost(enumeration="LogLevel", tag="1")]
pub level: i32,
/// e.g "livekit", "libwebrtc", "tokio-tungstenite", etc...
#[prost(string, tag="2")]
pub target: ::prost::alloc::string::String,
#[prost(string, optional, tag="3")]
pub module_path: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag="4")]
pub file: ::core::option::Option<::prost::alloc::string::String>,
#[prost(uint32, optional, tag="5")]
pub line: ::core::option::Option<u32>,
#[prost(string, tag="6")]
pub message: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LogBatch {
#[prost(message, repeated, tag="1")]
pub records: ::prost::alloc::vec::Vec<LogRecord>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum LogLevel {
LogError = 0,
LogWarn = 1,
LogInfo = 2,
LogDebug = 3,
LogTrace = 4,
}
impl LogLevel {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
LogLevel::LogError => "LOG_ERROR",
LogLevel::LogWarn => "LOG_WARN",
LogLevel::LogInfo => "LOG_INFO",
LogLevel::LogDebug => "LOG_DEBUG",
LogLevel::LogTrace => "LOG_TRACE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"LOG_ERROR" => Some(Self::LogError),
"LOG_WARN" => Some(Self::LogWarn),
"LOG_INFO" => Some(Self::LogInfo),
"LOG_DEBUG" => Some(Self::LogDebug),
"LOG_TRACE" => Some(Self::LogTrace),
_ => None,
}
}
}
// @@protoc_insertion_point(module)
150 changes: 150 additions & 0 deletions livekit-ffi/src/server/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use crate::proto;
use crate::FFI_SERVER;
use env_logger;
use log::{self, Log};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};

pub const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
pub const BATCH_SIZE: usize = 32;

/// Logger that forward logs to the FfiClient when capture_logs is enabled
/// Otherwise fallback to the env_logger
pub struct FfiLogger {
async_runtime: tokio::runtime::Handle,
log_tx: mpsc::UnboundedSender<LogMsg>,
capture_logs: AtomicBool,
env_logger: env_logger::Logger,
}

enum LogMsg {
Log(proto::LogRecord),
Flush(oneshot::Sender<()>),
}

impl FfiLogger {
pub fn new(async_runtime: tokio::runtime::Handle) -> Self {
let (log_tx, log_rx) = mpsc::unbounded_channel();
async_runtime.spawn(log_forward_task(log_rx));

let env_logger = env_logger::Builder::from_default_env().build();
FfiLogger {
async_runtime,
log_tx,
capture_logs: AtomicBool::new(false), // Always false by default to ensure the server
// is always initialized when using capture_logs
env_logger,
}
}
}

impl FfiLogger {
pub fn capture_logs(&self) -> bool {
self.capture_logs.load(Ordering::Acquire)
}

pub fn set_capture_logs(&self, capture: bool) {
self.capture_logs.store(capture, Ordering::Release);
}
}

impl Log for FfiLogger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
if !self.capture_logs() {
return self.env_logger.enabled(metadata);
}

true // The ffi client decides what to log (FfiLogger is just forwarding)
}

fn log(&self, record: &log::Record) {
if !self.capture_logs() {
return self.env_logger.log(record);
}

self.log_tx.send(LogMsg::Log(record.into())).unwrap();
}

fn flush(&self) {
if !self.capture_logs() {
return self.env_logger.flush();
}

let (tx, rx) = oneshot::channel();
self.log_tx.send(LogMsg::Flush(tx)).unwrap();
let _ = self.async_runtime.block_on(rx); // should we block?
}
}

async fn log_forward_task(mut rx: mpsc::UnboundedReceiver<LogMsg>) {
async fn flush(batch: &mut Vec<proto::LogRecord>) {
if batch.is_empty() {
return;
}
// It is safe to use FFI_SERVER here, if we receive logs when capture_logs is enabled,
// it means the server has already been initialized

let _ = FFI_SERVER
.send_event(proto::ffi_event::Message::Logs(proto::LogBatch {
records: batch.clone(), // Avoid clone here?
}))
.await;
batch.clear();
}

let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut interval = tokio::time::interval(FLUSH_INTERVAL);

loop {
tokio::select! {
msg = rx.recv() => {
if msg.is_none() {
break;
}

match msg.unwrap() {
LogMsg::Log(record) => {
batch.push(record);
}
LogMsg::Flush(tx) => {
flush(&mut batch).await;
let _ = tx.send(());
}
}
},
_ = interval.tick() => {
flush(&mut batch).await;
}
}

flush(&mut batch).await;
}

println!("log forwarding task stopped"); // Shouldn't happen (logger is leaked)
}

impl From<&log::Record<'_>> for proto::LogRecord {
fn from(record: &log::Record) -> Self {
proto::LogRecord {
level: proto::LogLevel::from(record.level()).into(),
target: record.target().to_string(),
module_path: record.module_path().map(|s| s.to_string()),
file: record.file().map(|s| s.to_string()),
line: record.line(),
message: record.args().to_string(), // Display trait
}
}
}

impl From<log::Level> for proto::LogLevel {
fn from(level: log::Level) -> Self {
match level {
log::Level::Error => Self::LogError,
log::Level::Warn => Self::LogWarn,
log::Level::Info => Self::LogInfo,
log::Level::Debug => Self::LogDebug,
log::Level::Trace => Self::LogTrace,
}
}
}
20 changes: 15 additions & 5 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::time::Duration;

pub mod audio_source;
pub mod audio_stream;
pub mod logger;
pub mod requests;
pub mod room;
pub mod video_source;
Expand Down Expand Up @@ -65,11 +66,21 @@ pub struct FfiServer {

next_id: AtomicU64,
config: Mutex<Option<FfiConfig>>,
logger: &'static logger::FfiLogger,
}

impl Default for FfiServer {
fn default() -> Self {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let async_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let logger = Box::leak(Box::new(logger::FfiLogger::new(
async_runtime.handle().clone(),
)));
log::set_logger(logger).unwrap();
log::set_max_level(log::LevelFilter::Trace);

#[cfg(feature = "tracing")]
console_subscriber::init();
Expand All @@ -94,11 +105,9 @@ impl Default for FfiServer {
Self {
ffi_handles: Default::default(),
next_id: AtomicU64::new(1), // 0 is invalid
async_runtime: tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
async_runtime,
config: Default::default(),
logger,
}
}
}
Expand Down Expand Up @@ -133,6 +142,7 @@ impl FfiServer {
.as_ref()
.map_or_else(|| Err(FfiError::NotConfigured), |c| Ok(c.callback_fn))?;

// TODO(theomonnom): Don't reallocate
let message = proto::FfiEvent {
message: Some(message),
}
Expand Down
4 changes: 3 additions & 1 deletion livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ fn on_initialize(
return Err(FfiError::AlreadyInitialized);
}

log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION"));
server.logger.set_capture_logs(init.capture_logs);

// # SAFETY: The foreign language is responsible for ensuring that the callback function is valid
*server.config.lock() = Some(FfiConfig {
callback_fn: unsafe { std::mem::transmute(init.event_callback_ptr as usize) },
});

log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION"));

Ok(proto::InitializeResponse::default())
}

Expand Down

0 comments on commit c7a431b

Please sign in to comment.