Skip to content

Commit

Permalink
Merge branch 'main' into theo/update-webrtc-1
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Dec 8, 2023
2 parents 415ef2f + d3d7800 commit 4d67021
Show file tree
Hide file tree
Showing 21 changed files with 203 additions and 74 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions examples/basic_room/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use livekit_api::access_token;
use livekit::prelude::*;
use livekit_api::access_token;
use std::env;

// Connect to a room using the specified env variables
Expand All @@ -24,18 +24,17 @@ async fn main() {
.to_jwt()
.unwrap();


let (room, mut rx) = Room::connect(&url, &token, RoomOptions::default())
.await
.unwrap();
log::info!("Connected to room: {} - {}", room.name(), room.sid());

room.local_participant()
.publish_data(
"Hello world".to_owned().into_bytes(),
DataPacketKind::Reliable,
Default::default(),
)
.publish_data(DataPacket {
payload: "Hello world".to_owned().into_bytes(),
kind: DataPacketKind::Reliable,
..Default::default()
})
.await
.unwrap();

Expand Down
2 changes: 2 additions & 0 deletions libwebrtc/src/audio_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ impl RtcAudioSource {
[Native];
fn set_audio_options(self: &Self, options: AudioSourceOptions) -> ();
fn audio_options(self: &Self) -> AudioSourceOptions;
fn sample_rate(self: &Self) -> u32;
fn num_channels(self: &Self) -> u32;
);
}

Expand Down
42 changes: 38 additions & 4 deletions libwebrtc/src/native/audio_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct NativeAudioSource {
struct AudioSourceInner {
buf: Box<[i16]>,

captured_frames: usize,

// Amount of data from the previous frame that hasn't been sent to the libwebrtc source
// (because it requires 10ms of data)
len: usize,
Expand All @@ -51,18 +53,47 @@ impl NativeAudioSource {
) -> NativeAudioSource {
let samples_10ms = (sample_rate / 100 * num_channels) as usize;

Self {
let source = Self {
sys_handle: sys_at::ffi::new_audio_track_source(options.into()),
inner: Arc::new(AsyncMutex::new(AudioSourceInner {
buf: vec![0; samples_10ms].into_boxed_slice(),
captured_frames: 0,
len: 0,
read_offset: 0,
interval: None, // interval must be created from a tokio runtime context
})),
sample_rate,
num_channels,
samples_10ms,
}
};

tokio::spawn({
let source = source.clone();
async move {
let mut interval = interval(Duration::from_millis(10));

loop {
// We directly use the sys_handle instead of the capture_frame function
// (We don't want to increase the captured_frames count and no need to buffer)
interval.tick().await;

let inner = source.inner.lock().await;
if inner.captured_frames > 0 {
break; // User captured something, stop injecting silence
}

let data = vec![0; samples_10ms];
source.sys_handle.on_captured_frame(
&data,
sample_rate,
num_channels,
sample_rate as usize / 100,
);
}
}
});

source
}

pub fn sys_handle(&self) -> SharedPtr<sys_at::ffi::AudioTrackSource> {
Expand Down Expand Up @@ -131,6 +162,8 @@ impl NativeAudioSource {
}

let mut inner = self.inner.lock().await;
inner.captured_frames += 1;

let mut interval = inner.interval.take().unwrap_or_else(|| {
let mut interval = interval(Duration::from_millis(10));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
Expand All @@ -145,11 +178,12 @@ impl NativeAudioSource {

interval.tick().await;

// samples per channel = number of frames
let samples_per_channel = data.len() / self.num_channels as usize;
self.sys_handle.on_captured_frame(
data,
self.sample_rate as i32,
self.num_channels as usize,
self.sample_rate,
self.num_channels,
samples_per_channel,
);
}
Expand Down
56 changes: 51 additions & 5 deletions libwebrtc/src/native/video_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::video_frame::{VideoBuffer, VideoFrame};
use crate::video_frame::{I420Buffer, VideoBuffer, VideoFrame};
use crate::video_source::VideoResolution;
use cxx::SharedPtr;
use std::time::{SystemTime, UNIX_EPOCH};
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use webrtc_sys::video_frame as vf_sys;
use webrtc_sys::video_frame::ffi::VideoRotation;
use webrtc_sys::video_track as vt_sys;

impl From<vt_sys::ffi::VideoResolution> for VideoResolution {
Expand All @@ -40,22 +43,65 @@ impl From<VideoResolution> for vt_sys::ffi::VideoResolution {
#[derive(Clone)]
pub struct NativeVideoSource {
sys_handle: SharedPtr<vt_sys::ffi::VideoTrackSource>,
inner: Arc<Mutex<VideoSourceInner>>,
}

struct VideoSourceInner {
captured_frames: usize,
}

impl NativeVideoSource {
pub fn new(resolution: VideoResolution) -> NativeVideoSource {
Self {
let source = Self {
sys_handle: vt_sys::ffi::new_video_track_source(&vt_sys::ffi::VideoResolution::from(
resolution,
resolution.clone(),
)),
}
inner: Arc::new(Mutex::new(VideoSourceInner { captured_frames: 0 })),
};

tokio::spawn({
let source = source.clone();
let i420 = I420Buffer::new(resolution.width, resolution.height);
async move {
let mut interval = tokio::time::interval(Duration::from_millis(100)); // 10 fps

loop {
interval.tick().await;

let inner = source.inner.lock();
if inner.captured_frames > 0 {
break;
}

let mut builder = vf_sys::ffi::new_video_frame_builder();
builder
.pin_mut()
.set_rotation(VideoRotation::VideoRotation0);
builder
.pin_mut()
.set_video_frame_buffer(i420.as_ref().sys_handle());

let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
builder.pin_mut().set_timestamp_us(now.as_micros() as i64);

source
.sys_handle
.on_captured_frame(&builder.pin_mut().build());
}
}
});

source
}

pub fn sys_handle(&self) -> SharedPtr<vt_sys::ffi::VideoTrackSource> {
self.sys_handle.clone()
}

pub fn capture_frame<T: AsRef<dyn VideoBuffer>>(&self, frame: &VideoFrame<T>) {
let mut inner = self.inner.lock();
inner.captured_frames += 1;

let mut builder = vf_sys::ffi::new_video_frame_builder();
builder.pin_mut().set_rotation(frame.rotation.into());
builder
Expand Down
2 changes: 1 addition & 1 deletion libwebrtc/src/video_source.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 LiveKit, Inc.
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion livekit-ffi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "livekit-ffi"
version = "0.3.13"
version = "0.3.15"
edition = "2021"
license = "Apache-2.0"
description = "FFI interface for bindings in other languages"
Expand Down
2 changes: 2 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ message PublishDataRequest {
uint64 data_len = 3;
DataPacketKind kind = 4;
repeated string destination_sids = 5; // destination
optional string topic = 6;
}
message PublishDataResponse {
uint64 async_id = 1;
Expand Down Expand Up @@ -343,6 +344,7 @@ message DataReceived {
OwnedBuffer data = 1;
optional string participant_sid = 2; // Can be empty if the data is sent a server SDK
DataPacketKind kind = 3;
optional string topic = 4;
}

message ConnectionStateChanged { ConnectionState state = 1; }
Expand Down
4 changes: 4 additions & 0 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2157,6 +2157,8 @@ pub struct PublishDataRequest {
/// destination
#[prost(string, repeated, tag="5")]
pub destination_sids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, optional, tag="6")]
pub topic: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -2541,6 +2543,8 @@ pub struct DataReceived {
pub participant_sid: ::core::option::Option<::prost::alloc::string::String>,
#[prost(enumeration="DataPacketKind", tag="3")]
pub kind: i32,
#[prost(string, optional, tag="4")]
pub topic: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
4 changes: 3 additions & 1 deletion livekit-ffi/src/server/audio_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl FfiAudioStream {
server: &'static server::FfiServer,
new_stream: proto::NewAudioStreamRequest,
) -> FfiResult<proto::OwnedAudioStream> {
let ffi_track = server.retrieve_handle::<FfiTrack>(new_stream.track_handle)?;
let ffi_track = server
.retrieve_handle::<FfiTrack>(new_stream.track_handle)?
.clone();
let rtc_track = ffi_track.track.rtc_track();

let MediaStreamTrack::Audio(rtc_track) = rtc_track else {
Expand Down
16 changes: 9 additions & 7 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ fn on_set_subscribed(
server.retrieve_handle::<FfiPublication>(set_subscribed.publication_handle)?;

let TrackPublication::Remote(publication) = &ffi_publication.publication else {
return Err(FfiError::InvalidRequest("publication is not a RemotePublication".into()));
};
return Err(FfiError::InvalidRequest(
"publication is not a RemotePublication".into(),
));
};

let _guard = server.async_runtime.enter();
publication.set_subscribed(set_subscribed.subscribe);
Expand Down Expand Up @@ -457,7 +459,7 @@ unsafe fn on_to_argb(
let Some(proto::video_frame_buffer_info::Buffer::Yuv(yuv)) = &buffer.buffer else {
return Err(FfiError::InvalidRequest(
"invalid i420 buffer description".into(),
))
));
};

#[rustfmt::skip]
Expand All @@ -470,19 +472,19 @@ unsafe fn on_to_argb(
match rgba_format {
proto::VideoFormatType::FormatArgb => {
#[rustfmt::skip]
yuv_helper::i420_to_argb(src_y, yuv.stride_y, src_u, yuv.stride_u, src_v, yuv.stride_v, argb, stride, w, h);
yuv_helper::i420_to_bgra(src_y, yuv.stride_y, src_u, yuv.stride_u, src_v, yuv.stride_v, argb, stride, w, h);
}
proto::VideoFormatType::FormatBgra => {
#[rustfmt::skip]
yuv_helper::i420_to_bgra(src_y, yuv.stride_y, src_u, yuv.stride_u, src_v, yuv.stride_v, argb, stride, w, h);
yuv_helper::i420_to_argb(src_y, yuv.stride_y, src_u, yuv.stride_u, src_v, yuv.stride_v, argb, stride, w, h);
}
proto::VideoFormatType::FormatRgba => {
#[rustfmt::skip]
yuv_helper::i420_to_rgba(src_y, yuv.stride_y, src_u, yuv.stride_u, src_v, yuv.stride_v, argb, stride, w, h);
yuv_helper::i420_to_abgr(src_y, yuv.stride_y, src_u, yuv.stride_u, src_v, yuv.stride_v, argb, stride, w, h);
}
proto::VideoFormatType::FormatAbgr => {
#[rustfmt::skip]
yuv_helper::i420_to_abgr(src_y, yuv.stride_y, src_u, yuv.stride_u, src_v, yuv.stride_v, argb, stride, w, h);
yuv_helper::i420_to_rgba(src_y, yuv.stride_y, src_u, yuv.stride_u, src_v, yuv.stride_v, argb, stride, w, h);
}
}
}
Expand Down
Loading

0 comments on commit 4d67021

Please sign in to comment.