Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FFI events for data streams #510

Merged
merged 19 commits into from
Dec 16, 2024
54 changes: 54 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ message RoomEvent {
DataPacketReceived data_packet_received = 27;
TranscriptionReceived transcription_received = 28;
ChatMessageReceived chat_message = 29;
DataStream.Header stream_header = 30;
DataStream.Chunk stream_chunk = 31;
}
}

Expand Down Expand Up @@ -526,3 +528,55 @@ message Reconnected {}

message RoomEOS {}

message DataStream {

// enum for operation types (specific to TextHeader)
enum OperationType {
CREATE = 0;
UPDATE = 1;
DELETE = 2;
REACTION = 3;
}

// header properties specific to text streams
message TextHeader {
required OperationType operation_type = 1;
required int32 version = 2; // Optional: Version for updates/edits
required string reply_to_stream_id = 3; // Optional: Reply to specific message
repeated string attached_stream_ids = 4; // file attachments for text streams
required bool generated = 5; // true if the text has been generated by an agent from a participant's audio transcription

}

// header properties specific to file or image streams
message FileHeader {
required string file_name = 1; // name of the file
}

// main DataStream.Header that contains a oneof for specific headers
message Header {
required string stream_id = 1; // unique identifier for this data stream
required int64 timestamp = 2; // using int64 for Unix timestamp
required string topic = 3;
required string mime_type = 4;
optional uint64 total_length = 5; // only populated for finite streams, if it's a stream of unknown size this stays empty
optional uint64 total_chunks = 6; // only populated for finite streams, if it's a stream of unknown size this stays empty
map<string, string> extensions = 7; // user defined extensions map that can carry additional info

// oneof to choose between specific header types
oneof content_header {
TextHeader text_header = 8;
FileHeader file_header = 9;
}
}

message Chunk {
required string stream_id = 1; // unique identifier for this data stream to map it to the correct header
required uint64 chunk_index = 2;
required bytes content = 3; // content as binary (bytes)
required bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content
required int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
optional bytes iv = 6; // optional, initialization vector for AES-GCM encryption
}
}

48 changes: 48 additions & 0 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,51 @@ impl From<livekit::ChatMessage> for proto::ChatMessage {
}
}
}

impl From<livekit_protocol::data_stream::Header> for proto::data_stream::Header {
fn from(msg: livekit_protocol::data_stream::Header) -> Self {
let content_header = match msg.content_header {
Some(livekit_protocol::data_stream::header::ContentHeader::TextHeader(text_header)) => {
Some(proto::data_stream::header::ContentHeader::TextHeader(
proto::data_stream::TextHeader {
operation_type: text_header.operation_type,
version: text_header.version,
reply_to_stream_id: text_header.reply_to_stream_id,
attached_stream_ids: text_header.attached_stream_ids,
generated: text_header.generated,
},
))
}
Some(livekit_protocol::data_stream::header::ContentHeader::FileHeader(file_header)) => {
Some(proto::data_stream::header::ContentHeader::FileHeader(
proto::data_stream::FileHeader { file_name: file_header.file_name },
))
}
None => None,
};

proto::data_stream::Header {
stream_id: msg.stream_id,
timestamp: msg.timestamp,
topic: msg.topic,
mime_type: msg.mime_type,
total_chunks: msg.total_chunks,
total_length: msg.total_length,
extensions: msg.extensions,
content_header,
}
}
}

impl From<livekit_protocol::data_stream::Chunk> for proto::data_stream::Chunk {
fn from(msg: livekit_protocol::data_stream::Chunk) -> Self {
proto::data_stream::Chunk {
stream_id: msg.stream_id,
content: msg.content,
complete: msg.complete,
chunk_index: msg.chunk_index,
version: msg.version,
iv: msg.iv,
}
}
}
134 changes: 133 additions & 1 deletion livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FrameCryptor {
Expand Down Expand Up @@ -2634,7 +2635,7 @@ pub struct OwnedBuffer {
pub struct RoomEvent {
#[prost(uint64, required, tag="1")]
pub room_handle: u64,
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")]
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31")]
pub message: ::core::option::Option<room_event::Message>,
}
/// Nested message and enum types in `RoomEvent`.
Expand Down Expand Up @@ -2700,6 +2701,10 @@ pub mod room_event {
TranscriptionReceived(super::TranscriptionReceived),
#[prost(message, tag="29")]
ChatMessage(super::ChatMessageReceived),
#[prost(message, tag="30")]
StreamHeader(super::data_stream::Header),
#[prost(message, tag="31")]
StreamChunk(super::data_stream::Chunk),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -2974,6 +2979,133 @@ pub struct Reconnected {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RoomEos {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataStream {
}
/// Nested message and enum types in `DataStream`.
pub mod data_stream {
/// header properties specific to text streams
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TextHeader {
#[prost(enumeration="OperationType", required, tag="1")]
pub operation_type: i32,
/// Optional: Version for updates/edits
#[prost(int32, required, tag="2")]
pub version: i32,
/// Optional: Reply to specific message
#[prost(string, required, tag="3")]
pub reply_to_stream_id: ::prost::alloc::string::String,
/// file attachments for text streams
#[prost(string, repeated, tag="4")]
pub attached_stream_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// true if the text has been generated by an agent from a participant's audio transcription
#[prost(bool, required, tag="5")]
pub generated: bool,
}
/// header properties specific to file or image streams
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileHeader {
/// name of the file
#[prost(string, required, tag="1")]
pub file_name: ::prost::alloc::string::String,
}
/// main DataStream.Header that contains a oneof for specific headers
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Header {
/// unique identifier for this data stream
#[prost(string, required, tag="1")]
pub stream_id: ::prost::alloc::string::String,
/// using int64 for Unix timestamp
#[prost(int64, required, tag="2")]
pub timestamp: i64,
#[prost(string, required, tag="3")]
pub topic: ::prost::alloc::string::String,
#[prost(string, required, tag="4")]
pub mime_type: ::prost::alloc::string::String,
/// only populated for finite streams, if it's a stream of unknown size this stays empty
#[prost(uint64, optional, tag="5")]
pub total_length: ::core::option::Option<u64>,
/// only populated for finite streams, if it's a stream of unknown size this stays empty
#[prost(uint64, optional, tag="6")]
pub total_chunks: ::core::option::Option<u64>,
/// user defined extensions map that can carry additional info
#[prost(map="string, string", tag="7")]
pub extensions: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
/// oneof to choose between specific header types
#[prost(oneof="header::ContentHeader", tags="8, 9")]
pub content_header: ::core::option::Option<header::ContentHeader>,
}
/// Nested message and enum types in `Header`.
pub mod header {
/// oneof to choose between specific header types
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum ContentHeader {
#[prost(message, tag="8")]
TextHeader(super::TextHeader),
#[prost(message, tag="9")]
FileHeader(super::FileHeader),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Chunk {
/// unique identifier for this data stream to map it to the correct header
#[prost(string, required, tag="1")]
pub stream_id: ::prost::alloc::string::String,
#[prost(uint64, required, tag="2")]
pub chunk_index: u64,
/// content as binary (bytes)
#[prost(bytes="vec", required, tag="3")]
pub content: ::prost::alloc::vec::Vec<u8>,
/// true only if this is the last chunk of this stream - can also be sent with empty content
#[prost(bool, required, tag="4")]
pub complete: bool,
/// a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
#[prost(int32, required, tag="5")]
pub version: i32,
/// optional, initialization vector for AES-GCM encryption
#[prost(bytes="vec", optional, tag="6")]
pub iv: ::core::option::Option<::prost::alloc::vec::Vec<u8>>,
}
/// enum for operation types (specific to TextHeader)
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum OperationType {
Create = 0,
Update = 1,
Delete = 2,
Reaction = 3,
}
impl OperationType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
OperationType::Create => "CREATE",
OperationType::Update => "UPDATE",
OperationType::Delete => "DELETE",
OperationType::Reaction => "REACTION",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"CREATE" => Some(Self::Create),
"UPDATE" => Some(Self::Update),
"DELETE" => Some(Self::Delete),
"REACTION" => Some(Self::Reaction),
_ => None,
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum IceTransportType {
Expand Down
6 changes: 6 additions & 0 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,12 @@ async fn forward_event(
state: proto::EncryptionState::from(state).into(),
}));
}
RoomEvent::StreamHeaderReceived { header } => {
let _ = send_event(proto::room_event::Message::StreamHeader(header.into()));
}
RoomEvent::StreamChunkReceived { chunk } => {
let _ = send_event(proto::room_event::Message::StreamChunk(chunk.into()));
}
_ => {
log::warn!("unhandled room event: {:?}", event);
}
Expand Down
27 changes: 23 additions & 4 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ use parking_lot::RwLock;
pub use proto::DisconnectReason;
use proto::{promise::Promise, SignalTarget};
use thiserror::Error;
use tokio::{
signal,
sync::{mpsc, oneshot, Mutex as AsyncMutex},
};
use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex};

pub use self::{
e2ee::{manager::E2eeManager, E2eeOptions},
Expand Down Expand Up @@ -171,6 +168,12 @@ pub enum RoomEvent {
message: ChatMessage,
participant: Option<RemoteParticipant>,
},
StreamHeaderReceived {
header: proto::data_stream::Header,
},
StreamChunkReceived {
chunk: proto::data_stream::Chunk,
},
E2eeStateChanged {
participant: Participant,
state: EncryptionState,
Expand Down Expand Up @@ -720,6 +723,12 @@ impl RoomSession {
EngineEvent::LocalTrackSubscribed { track_sid } => {
self.handle_track_subscribed(track_sid)
}
EngineEvent::DataStreamHeader { header } => {
self.handle_data_stream_header(header);
}
EngineEvent::DataStreamChunk { chunk } => {
self.handle_data_stream_chunk(chunk);
}
_ => {}
}

Expand Down Expand Up @@ -1230,6 +1239,16 @@ impl RoomSession {
});
}

fn handle_data_stream_header(&self, header: proto::data_stream::Header) {
let event = RoomEvent::StreamHeaderReceived { header };
self.dispatcher.dispatch(&event);
}

fn handle_data_stream_chunk(&self, chunk: proto::data_stream::Chunk) {
let event = RoomEvent::StreamChunkReceived { chunk };
self.dispatcher.dispatch(&event);
}

/// Create a new participant
/// Also add it to the participants list
fn create_participant(
Expand Down
12 changes: 12 additions & 0 deletions livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ pub enum EngineEvent {
LocalTrackSubscribed {
track_sid: String,
},
DataStreamHeader {
header: proto::data_stream::Header,
},
DataStreamChunk {
chunk: proto::data_stream::Chunk,
},
}

/// Represents a running RtcSession with the ability to close the session
Expand Down Expand Up @@ -524,6 +530,12 @@ impl EngineInner {
SessionEvent::LocalTrackSubscribed { track_sid } => {
let _ = self.engine_tx.send(EngineEvent::LocalTrackSubscribed { track_sid });
}
SessionEvent::DataStreamHeader { header } => {
let _ = self.engine_tx.send(EngineEvent::DataStreamHeader { header });
}
SessionEvent::DataStreamChunk { chunk } => {
let _ = self.engine_tx.send(EngineEvent::DataStreamChunk { chunk });
}
}
Ok(())
}
Expand Down
Loading
Loading