Skip to content

Commit

Permalink
Add FFI events for data streams (#510)
Browse files Browse the repository at this point in the history
* update protocol and streams boilerplate

* wip filestream reader

* text stream reader

* finish data stream receiving

* cleanup

* receiving working

* working with mutex

* working with RWLock

* cleanup

* simplify

* better lock scope

* filestream fixes

* add example

* remove rust client api and replace with raw events

* cleanup imports

* more cleanup

* ffi imports

* implement ffi
  • Loading branch information
lukasIO authored Dec 16, 2024
1 parent a5ef23e commit 3d9168a
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 5 deletions.
54 changes: 54 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ message RoomEvent {
DataPacketReceived data_packet_received = 27;
TranscriptionReceived transcription_received = 28;
ChatMessageReceived chat_message = 29;
DataStream.Header stream_header = 30;
DataStream.Chunk stream_chunk = 31;
}
}

Expand Down Expand Up @@ -526,3 +528,55 @@ message Reconnected {}

message RoomEOS {}

message DataStream {

// enum for operation types (specific to TextHeader)
enum OperationType {
CREATE = 0;
UPDATE = 1;
DELETE = 2;
REACTION = 3;
}

// header properties specific to text streams
message TextHeader {
required OperationType operation_type = 1;
required int32 version = 2; // Optional: Version for updates/edits
required string reply_to_stream_id = 3; // Optional: Reply to specific message
repeated string attached_stream_ids = 4; // file attachments for text streams
required bool generated = 5; // true if the text has been generated by an agent from a participant's audio transcription

}

// header properties specific to file or image streams
message FileHeader {
required string file_name = 1; // name of the file
}

// main DataStream.Header that contains a oneof for specific headers
message Header {
required string stream_id = 1; // unique identifier for this data stream
required int64 timestamp = 2; // using int64 for Unix timestamp
required string topic = 3;
required string mime_type = 4;
optional uint64 total_length = 5; // only populated for finite streams, if it's a stream of unknown size this stays empty
optional uint64 total_chunks = 6; // only populated for finite streams, if it's a stream of unknown size this stays empty
map<string, string> extensions = 7; // user defined extensions map that can carry additional info

// oneof to choose between specific header types
oneof content_header {
TextHeader text_header = 8;
FileHeader file_header = 9;
}
}

message Chunk {
required string stream_id = 1; // unique identifier for this data stream to map it to the correct header
required uint64 chunk_index = 2;
required bytes content = 3; // content as binary (bytes)
required bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content
required int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
optional bytes iv = 6; // optional, initialization vector for AES-GCM encryption
}
}

48 changes: 48 additions & 0 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,51 @@ impl From<livekit::ChatMessage> for proto::ChatMessage {
}
}
}

impl From<livekit_protocol::data_stream::Header> for proto::data_stream::Header {
fn from(msg: livekit_protocol::data_stream::Header) -> Self {
let content_header = match msg.content_header {
Some(livekit_protocol::data_stream::header::ContentHeader::TextHeader(text_header)) => {
Some(proto::data_stream::header::ContentHeader::TextHeader(
proto::data_stream::TextHeader {
operation_type: text_header.operation_type,
version: text_header.version,
reply_to_stream_id: text_header.reply_to_stream_id,
attached_stream_ids: text_header.attached_stream_ids,
generated: text_header.generated,
},
))
}
Some(livekit_protocol::data_stream::header::ContentHeader::FileHeader(file_header)) => {
Some(proto::data_stream::header::ContentHeader::FileHeader(
proto::data_stream::FileHeader { file_name: file_header.file_name },
))
}
None => None,
};

proto::data_stream::Header {
stream_id: msg.stream_id,
timestamp: msg.timestamp,
topic: msg.topic,
mime_type: msg.mime_type,
total_chunks: msg.total_chunks,
total_length: msg.total_length,
extensions: msg.extensions,
content_header,
}
}
}

impl From<livekit_protocol::data_stream::Chunk> for proto::data_stream::Chunk {
fn from(msg: livekit_protocol::data_stream::Chunk) -> Self {
proto::data_stream::Chunk {
stream_id: msg.stream_id,
content: msg.content,
complete: msg.complete,
chunk_index: msg.chunk_index,
version: msg.version,
iv: msg.iv,
}
}
}
134 changes: 133 additions & 1 deletion livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FrameCryptor {
Expand Down Expand Up @@ -2634,7 +2635,7 @@ pub struct OwnedBuffer {
pub struct RoomEvent {
#[prost(uint64, required, tag="1")]
pub room_handle: u64,
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")]
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31")]
pub message: ::core::option::Option<room_event::Message>,
}
/// Nested message and enum types in `RoomEvent`.
Expand Down Expand Up @@ -2700,6 +2701,10 @@ pub mod room_event {
TranscriptionReceived(super::TranscriptionReceived),
#[prost(message, tag="29")]
ChatMessage(super::ChatMessageReceived),
#[prost(message, tag="30")]
StreamHeader(super::data_stream::Header),
#[prost(message, tag="31")]
StreamChunk(super::data_stream::Chunk),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -2974,6 +2979,133 @@ pub struct Reconnected {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RoomEos {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataStream {
}
/// Nested message and enum types in `DataStream`.
pub mod data_stream {
/// header properties specific to text streams
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TextHeader {
#[prost(enumeration="OperationType", required, tag="1")]
pub operation_type: i32,
/// Optional: Version for updates/edits
#[prost(int32, required, tag="2")]
pub version: i32,
/// Optional: Reply to specific message
#[prost(string, required, tag="3")]
pub reply_to_stream_id: ::prost::alloc::string::String,
/// file attachments for text streams
#[prost(string, repeated, tag="4")]
pub attached_stream_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// true if the text has been generated by an agent from a participant's audio transcription
#[prost(bool, required, tag="5")]
pub generated: bool,
}
/// header properties specific to file or image streams
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileHeader {
/// name of the file
#[prost(string, required, tag="1")]
pub file_name: ::prost::alloc::string::String,
}
/// main DataStream.Header that contains a oneof for specific headers
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Header {
/// unique identifier for this data stream
#[prost(string, required, tag="1")]
pub stream_id: ::prost::alloc::string::String,
/// using int64 for Unix timestamp
#[prost(int64, required, tag="2")]
pub timestamp: i64,
#[prost(string, required, tag="3")]
pub topic: ::prost::alloc::string::String,
#[prost(string, required, tag="4")]
pub mime_type: ::prost::alloc::string::String,
/// only populated for finite streams, if it's a stream of unknown size this stays empty
#[prost(uint64, optional, tag="5")]
pub total_length: ::core::option::Option<u64>,
/// only populated for finite streams, if it's a stream of unknown size this stays empty
#[prost(uint64, optional, tag="6")]
pub total_chunks: ::core::option::Option<u64>,
/// user defined extensions map that can carry additional info
#[prost(map="string, string", tag="7")]
pub extensions: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
/// oneof to choose between specific header types
#[prost(oneof="header::ContentHeader", tags="8, 9")]
pub content_header: ::core::option::Option<header::ContentHeader>,
}
/// Nested message and enum types in `Header`.
pub mod header {
/// oneof to choose between specific header types
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum ContentHeader {
#[prost(message, tag="8")]
TextHeader(super::TextHeader),
#[prost(message, tag="9")]
FileHeader(super::FileHeader),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Chunk {
/// unique identifier for this data stream to map it to the correct header
#[prost(string, required, tag="1")]
pub stream_id: ::prost::alloc::string::String,
#[prost(uint64, required, tag="2")]
pub chunk_index: u64,
/// content as binary (bytes)
#[prost(bytes="vec", required, tag="3")]
pub content: ::prost::alloc::vec::Vec<u8>,
/// true only if this is the last chunk of this stream - can also be sent with empty content
#[prost(bool, required, tag="4")]
pub complete: bool,
/// a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
#[prost(int32, required, tag="5")]
pub version: i32,
/// optional, initialization vector for AES-GCM encryption
#[prost(bytes="vec", optional, tag="6")]
pub iv: ::core::option::Option<::prost::alloc::vec::Vec<u8>>,
}
/// enum for operation types (specific to TextHeader)
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum OperationType {
Create = 0,
Update = 1,
Delete = 2,
Reaction = 3,
}
impl OperationType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
OperationType::Create => "CREATE",
OperationType::Update => "UPDATE",
OperationType::Delete => "DELETE",
OperationType::Reaction => "REACTION",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"CREATE" => Some(Self::Create),
"UPDATE" => Some(Self::Update),
"DELETE" => Some(Self::Delete),
"REACTION" => Some(Self::Reaction),
_ => None,
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum IceTransportType {
Expand Down
6 changes: 6 additions & 0 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,12 @@ async fn forward_event(
state: proto::EncryptionState::from(state).into(),
}));
}
RoomEvent::StreamHeaderReceived { header } => {
let _ = send_event(proto::room_event::Message::StreamHeader(header.into()));
}
RoomEvent::StreamChunkReceived { chunk } => {
let _ = send_event(proto::room_event::Message::StreamChunk(chunk.into()));
}
_ => {
log::warn!("unhandled room event: {:?}", event);
}
Expand Down
27 changes: 23 additions & 4 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ use parking_lot::RwLock;
pub use proto::DisconnectReason;
use proto::{promise::Promise, SignalTarget};
use thiserror::Error;
use tokio::{
signal,
sync::{mpsc, oneshot, Mutex as AsyncMutex},
};
use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex};

pub use self::{
e2ee::{manager::E2eeManager, E2eeOptions},
Expand Down Expand Up @@ -171,6 +168,12 @@ pub enum RoomEvent {
message: ChatMessage,
participant: Option<RemoteParticipant>,
},
StreamHeaderReceived {
header: proto::data_stream::Header,
},
StreamChunkReceived {
chunk: proto::data_stream::Chunk,
},
E2eeStateChanged {
participant: Participant,
state: EncryptionState,
Expand Down Expand Up @@ -720,6 +723,12 @@ impl RoomSession {
EngineEvent::LocalTrackSubscribed { track_sid } => {
self.handle_track_subscribed(track_sid)
}
EngineEvent::DataStreamHeader { header } => {
self.handle_data_stream_header(header);
}
EngineEvent::DataStreamChunk { chunk } => {
self.handle_data_stream_chunk(chunk);
}
_ => {}
}

Expand Down Expand Up @@ -1230,6 +1239,16 @@ impl RoomSession {
});
}

fn handle_data_stream_header(&self, header: proto::data_stream::Header) {
let event = RoomEvent::StreamHeaderReceived { header };
self.dispatcher.dispatch(&event);
}

fn handle_data_stream_chunk(&self, chunk: proto::data_stream::Chunk) {
let event = RoomEvent::StreamChunkReceived { chunk };
self.dispatcher.dispatch(&event);
}

/// Create a new participant
/// Also add it to the participants list
fn create_participant(
Expand Down
12 changes: 12 additions & 0 deletions livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ pub enum EngineEvent {
LocalTrackSubscribed {
track_sid: String,
},
DataStreamHeader {
header: proto::data_stream::Header,
},
DataStreamChunk {
chunk: proto::data_stream::Chunk,
},
}

/// Represents a running RtcSession with the ability to close the session
Expand Down Expand Up @@ -524,6 +530,12 @@ impl EngineInner {
SessionEvent::LocalTrackSubscribed { track_sid } => {
let _ = self.engine_tx.send(EngineEvent::LocalTrackSubscribed { track_sid });
}
SessionEvent::DataStreamHeader { header } => {
let _ = self.engine_tx.send(EngineEvent::DataStreamHeader { header });
}
SessionEvent::DataStreamChunk { chunk } => {
let _ = self.engine_tx.send(EngineEvent::DataStreamChunk { chunk });
}
}
Ok(())
}
Expand Down
Loading

0 comments on commit 3d9168a

Please sign in to comment.