From 3d9168a840d4f4b4e8b15ec7c70a077860574513 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 16 Dec 2024 13:59:54 +0100 Subject: [PATCH] Add FFI events for data streams (#510) * update protocol and streams boilerplate * wip filestream reader * text stream reader * finish data stream receiving * cleanup * receiving working * working with mutex * working with RWLock * cleanup * simplify * better lock scope * filestream fixes * add example * remove rust client api and replace with raw events * cleanup imports * more cleanup * ffi imports * implement ffi --- livekit-ffi/protocol/room.proto | 54 +++++++++++ livekit-ffi/src/conversion/room.rs | 48 +++++++++ livekit-ffi/src/livekit.proto.rs | 134 +++++++++++++++++++++++++- livekit-ffi/src/server/room.rs | 6 ++ livekit/src/room/mod.rs | 27 +++++- livekit/src/rtc_engine/mod.rs | 12 +++ livekit/src/rtc_engine/rtc_session.rs | 16 +++ 7 files changed, 292 insertions(+), 5 deletions(-) diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index 4d0f3d92..c6f59569 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -367,6 +367,8 @@ message RoomEvent { DataPacketReceived data_packet_received = 27; TranscriptionReceived transcription_received = 28; ChatMessageReceived chat_message = 29; + DataStream.Header stream_header = 30; + DataStream.Chunk stream_chunk = 31; } } @@ -526,3 +528,55 @@ message Reconnected {} message RoomEOS {} +message DataStream { + + // enum for operation types (specific to TextHeader) + enum OperationType { + CREATE = 0; + UPDATE = 1; + DELETE = 2; + REACTION = 3; + } + + // header properties specific to text streams + message TextHeader { + required OperationType operation_type = 1; + required int32 version = 2; // Optional: Version for updates/edits + required string reply_to_stream_id = 3; // Optional: Reply to specific message + repeated string attached_stream_ids = 4; // file attachments for text streams + required bool generated = 5; // true if the text has been generated by an agent from a participant's audio transcription + + } + + // header properties specific to file or image streams + message FileHeader { + required string file_name = 1; // name of the file + } + + // main DataStream.Header that contains a oneof for specific headers + message Header { + required string stream_id = 1; // unique identifier for this data stream + required int64 timestamp = 2; // using int64 for Unix timestamp + required string topic = 3; + required string mime_type = 4; + optional uint64 total_length = 5; // only populated for finite streams, if it's a stream of unknown size this stays empty + optional uint64 total_chunks = 6; // only populated for finite streams, if it's a stream of unknown size this stays empty + map extensions = 7; // user defined extensions map that can carry additional info + + // oneof to choose between specific header types + oneof content_header { + TextHeader text_header = 8; + FileHeader file_header = 9; + } + } + + message Chunk { + required string stream_id = 1; // unique identifier for this data stream to map it to the correct header + required uint64 chunk_index = 2; + required bytes content = 3; // content as binary (bytes) + required bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content + required int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced + optional bytes iv = 6; // optional, initialization vector for AES-GCM encryption + } +} + diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index dac00067..521c5c8c 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -281,3 +281,51 @@ impl From for proto::ChatMessage { } } } + +impl From for proto::data_stream::Header { + fn from(msg: livekit_protocol::data_stream::Header) -> Self { + let content_header = match msg.content_header { + Some(livekit_protocol::data_stream::header::ContentHeader::TextHeader(text_header)) => { + Some(proto::data_stream::header::ContentHeader::TextHeader( + proto::data_stream::TextHeader { + operation_type: text_header.operation_type, + version: text_header.version, + reply_to_stream_id: text_header.reply_to_stream_id, + attached_stream_ids: text_header.attached_stream_ids, + generated: text_header.generated, + }, + )) + } + Some(livekit_protocol::data_stream::header::ContentHeader::FileHeader(file_header)) => { + Some(proto::data_stream::header::ContentHeader::FileHeader( + proto::data_stream::FileHeader { file_name: file_header.file_name }, + )) + } + None => None, + }; + + proto::data_stream::Header { + stream_id: msg.stream_id, + timestamp: msg.timestamp, + topic: msg.topic, + mime_type: msg.mime_type, + total_chunks: msg.total_chunks, + total_length: msg.total_length, + extensions: msg.extensions, + content_header, + } + } +} + +impl From for proto::data_stream::Chunk { + fn from(msg: livekit_protocol::data_stream::Chunk) -> Self { + proto::data_stream::Chunk { + stream_id: msg.stream_id, + content: msg.content, + complete: msg.complete, + chunk_index: msg.chunk_index, + version: msg.version, + iv: msg.iv, + } + } +} diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 70066690..c45f678c 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -1,4 +1,5 @@ // @generated +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FrameCryptor { @@ -2634,7 +2635,7 @@ pub struct OwnedBuffer { pub struct RoomEvent { #[prost(uint64, required, tag="1")] pub room_handle: u64, - #[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")] + #[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31")] pub message: ::core::option::Option, } /// Nested message and enum types in `RoomEvent`. @@ -2700,6 +2701,10 @@ pub mod room_event { TranscriptionReceived(super::TranscriptionReceived), #[prost(message, tag="29")] ChatMessage(super::ChatMessageReceived), + #[prost(message, tag="30")] + StreamHeader(super::data_stream::Header), + #[prost(message, tag="31")] + StreamChunk(super::data_stream::Chunk), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -2974,6 +2979,133 @@ pub struct Reconnected { #[derive(Clone, PartialEq, ::prost::Message)] pub struct RoomEos { } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DataStream { +} +/// Nested message and enum types in `DataStream`. +pub mod data_stream { + /// header properties specific to text streams + #[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] + pub struct TextHeader { + #[prost(enumeration="OperationType", required, tag="1")] + pub operation_type: i32, + /// Optional: Version for updates/edits + #[prost(int32, required, tag="2")] + pub version: i32, + /// Optional: Reply to specific message + #[prost(string, required, tag="3")] + pub reply_to_stream_id: ::prost::alloc::string::String, + /// file attachments for text streams + #[prost(string, repeated, tag="4")] + pub attached_stream_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// true if the text has been generated by an agent from a participant's audio transcription + #[prost(bool, required, tag="5")] + pub generated: bool, + } + /// header properties specific to file or image streams + #[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] + pub struct FileHeader { + /// name of the file + #[prost(string, required, tag="1")] + pub file_name: ::prost::alloc::string::String, + } + /// main DataStream.Header that contains a oneof for specific headers + #[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] + pub struct Header { + /// unique identifier for this data stream + #[prost(string, required, tag="1")] + pub stream_id: ::prost::alloc::string::String, + /// using int64 for Unix timestamp + #[prost(int64, required, tag="2")] + pub timestamp: i64, + #[prost(string, required, tag="3")] + pub topic: ::prost::alloc::string::String, + #[prost(string, required, tag="4")] + pub mime_type: ::prost::alloc::string::String, + /// only populated for finite streams, if it's a stream of unknown size this stays empty + #[prost(uint64, optional, tag="5")] + pub total_length: ::core::option::Option, + /// only populated for finite streams, if it's a stream of unknown size this stays empty + #[prost(uint64, optional, tag="6")] + pub total_chunks: ::core::option::Option, + /// user defined extensions map that can carry additional info + #[prost(map="string, string", tag="7")] + pub extensions: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, + /// oneof to choose between specific header types + #[prost(oneof="header::ContentHeader", tags="8, 9")] + pub content_header: ::core::option::Option, + } + /// Nested message and enum types in `Header`. + pub mod header { + /// oneof to choose between specific header types + #[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum ContentHeader { + #[prost(message, tag="8")] + TextHeader(super::TextHeader), + #[prost(message, tag="9")] + FileHeader(super::FileHeader), + } + } + #[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] + pub struct Chunk { + /// unique identifier for this data stream to map it to the correct header + #[prost(string, required, tag="1")] + pub stream_id: ::prost::alloc::string::String, + #[prost(uint64, required, tag="2")] + pub chunk_index: u64, + /// content as binary (bytes) + #[prost(bytes="vec", required, tag="3")] + pub content: ::prost::alloc::vec::Vec, + /// true only if this is the last chunk of this stream - can also be sent with empty content + #[prost(bool, required, tag="4")] + pub complete: bool, + /// a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced + #[prost(int32, required, tag="5")] + pub version: i32, + /// optional, initialization vector for AES-GCM encryption + #[prost(bytes="vec", optional, tag="6")] + pub iv: ::core::option::Option<::prost::alloc::vec::Vec>, + } + /// enum for operation types (specific to TextHeader) + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] + #[repr(i32)] + pub enum OperationType { + Create = 0, + Update = 1, + Delete = 2, + Reaction = 3, + } + impl OperationType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + OperationType::Create => "CREATE", + OperationType::Update => "UPDATE", + OperationType::Delete => "DELETE", + OperationType::Reaction => "REACTION", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CREATE" => Some(Self::Create), + "UPDATE" => Some(Self::Update), + "DELETE" => Some(Self::Delete), + "REACTION" => Some(Self::Reaction), + _ => None, + } + } + } +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum IceTransportType { diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 1367897e..ca0d26ac 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -1137,6 +1137,12 @@ async fn forward_event( state: proto::EncryptionState::from(state).into(), })); } + RoomEvent::StreamHeaderReceived { header } => { + let _ = send_event(proto::room_event::Message::StreamHeader(header.into())); + } + RoomEvent::StreamChunkReceived { chunk } => { + let _ = send_event(proto::room_event::Message::StreamChunk(chunk.into())); + } _ => { log::warn!("unhandled room event: {:?}", event); } diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 21e28877..502efb2c 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -31,10 +31,7 @@ use parking_lot::RwLock; pub use proto::DisconnectReason; use proto::{promise::Promise, SignalTarget}; use thiserror::Error; -use tokio::{ - signal, - sync::{mpsc, oneshot, Mutex as AsyncMutex}, -}; +use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex}; pub use self::{ e2ee::{manager::E2eeManager, E2eeOptions}, @@ -171,6 +168,12 @@ pub enum RoomEvent { message: ChatMessage, participant: Option, }, + StreamHeaderReceived { + header: proto::data_stream::Header, + }, + StreamChunkReceived { + chunk: proto::data_stream::Chunk, + }, E2eeStateChanged { participant: Participant, state: EncryptionState, @@ -720,6 +723,12 @@ impl RoomSession { EngineEvent::LocalTrackSubscribed { track_sid } => { self.handle_track_subscribed(track_sid) } + EngineEvent::DataStreamHeader { header } => { + self.handle_data_stream_header(header); + } + EngineEvent::DataStreamChunk { chunk } => { + self.handle_data_stream_chunk(chunk); + } _ => {} } @@ -1230,6 +1239,16 @@ impl RoomSession { }); } + fn handle_data_stream_header(&self, header: proto::data_stream::Header) { + let event = RoomEvent::StreamHeaderReceived { header }; + self.dispatcher.dispatch(&event); + } + + fn handle_data_stream_chunk(&self, chunk: proto::data_stream::Chunk) { + let event = RoomEvent::StreamChunkReceived { chunk }; + self.dispatcher.dispatch(&event); + } + /// Create a new participant /// Also add it to the participants list fn create_participant( diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 4b3c4493..d4f5dec9 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -159,6 +159,12 @@ pub enum EngineEvent { LocalTrackSubscribed { track_sid: String, }, + DataStreamHeader { + header: proto::data_stream::Header, + }, + DataStreamChunk { + chunk: proto::data_stream::Chunk, + }, } /// Represents a running RtcSession with the ability to close the session @@ -524,6 +530,12 @@ impl EngineInner { SessionEvent::LocalTrackSubscribed { track_sid } => { let _ = self.engine_tx.send(EngineEvent::LocalTrackSubscribed { track_sid }); } + SessionEvent::DataStreamHeader { header } => { + let _ = self.engine_tx.send(EngineEvent::DataStreamHeader { header }); + } + SessionEvent::DataStreamChunk { chunk } => { + let _ = self.engine_tx.send(EngineEvent::DataStreamChunk { chunk }); + } } Ok(()) } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 3b6febb0..b3cb4f09 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -135,6 +135,12 @@ pub enum SessionEvent { action: proto::leave_request::Action, retry_now: bool, }, + DataStreamHeader { + header: proto::data_stream::Header, + }, + DataStreamChunk { + chunk: proto::data_stream::Chunk, + }, } #[derive(Serialize, Deserialize)] @@ -723,6 +729,16 @@ impl SessionInner { message: ChatMessage::from(message.clone()), }); } + proto::data_packet::Value::StreamHeader(message) => { + let _ = self + .emitter + .send(SessionEvent::DataStreamHeader { header: message.clone() }); + } + proto::data_packet::Value::StreamChunk(message) => { + let _ = self + .emitter + .send(SessionEvent::DataStreamChunk { chunk: message.clone() }); + } _ => {} } }