Skip to content

Commit

Permalink
ffi protocol improvements (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Sep 3, 2023
1 parent c746592 commit a934d50
Show file tree
Hide file tree
Showing 43 changed files with 2,426 additions and 804 deletions.
344 changes: 179 additions & 165 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 17 additions & 6 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = [
"basic_room",
"mobile",
"save_to_disk",
"play_from_disk",
"wgpu_room",
"webhooks",
]
1 change: 1 addition & 0 deletions examples/play_from_disk/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.wav filter=lfs diff=lfs merge=lfs -text
13 changes: 13 additions & 0 deletions examples/play_from_disk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "play_from_disk"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"] }
livekit = { path = "../../livekit", version = "0.2.0" }
thiserror = "1.0.47"
log = "0.4.20"
env_logger = "0.10.0"
3 changes: 3 additions & 0 deletions examples/play_from_disk/change-sophie.wav
Git LFS file not shown
191 changes: 191 additions & 0 deletions examples/play_from_disk/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use livekit::{
options::TrackPublishOptions,
track::{LocalAudioTrack, LocalTrack, TrackSource},
webrtc::{
audio_source::native::NativeAudioSource,
prelude::{AudioFrame, AudioSourceOptions, RtcAudioSource},
},
Room, RoomOptions,
};
use std::{env, mem::size_of, sync::Arc, time::Duration};
use std::{error::Error, io};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};

#[derive(Debug, Error)]
pub enum WavError {
#[error("Invalid header: {0}")]
InvalidHeader(&'static str),
#[error("IO error: {0}")]
Io(#[from] io::Error),
}

pub struct WavReader<R: AsyncRead + Unpin> {
reader: R,
}

#[allow(dead_code)]
#[derive(Debug)]
pub struct WavHeader {
file_size: u32,
data_size: u32,
format: String,
format_length: u32,
format_type: u16,
num_channels: u16,
sample_rate: u32,
byte_rate: u32,
block_align: u16,
bits_per_sample: u16,
}

impl<R: AsyncRead + Unpin> WavReader<R> {
pub fn new(reader: R) -> Self {
Self { reader }
}

pub async fn read_header(&mut self) -> Result<WavHeader, WavError> {
let mut header = [0u8; 4];
let mut format = [0u8; 4];
let mut chunk_marker = [0u8; 4];
let mut data_chunk = [0u8; 4];

self.reader.read_exact(&mut header).await?;

if &header != b"RIFF" {
return Err(WavError::InvalidHeader("Invalid RIFF header"));
}

let file_size = self.reader.read_u32_le().await?;
self.reader.read_exact(&mut format).await?;

if &format != b"WAVE" {
return Err(WavError::InvalidHeader("Invalid WAVE header"));
}

self.reader.read_exact(&mut chunk_marker).await?;

if &chunk_marker != b"fmt " {
return Err(WavError::InvalidHeader("Invalid fmt chunk"));
}

let format_length = self.reader.read_u32_le().await?;
let format_type = self.reader.read_u16_le().await?;
let num_channels = self.reader.read_u16_le().await?;
let sample_rate = self.reader.read_u32_le().await?;
let byte_rate = self.reader.read_u32_le().await?;
let block_align = self.reader.read_u16_le().await?;
let bits_per_sample = self.reader.read_u16_le().await?;
self.reader.read_exact(&mut data_chunk).await?;
let data_size = self.reader.read_u32_le().await?;

if &data_chunk != b"data" {
return Err(WavError::InvalidHeader("Invalid data chunk"));
}

Ok(WavHeader {
file_size,
data_size,
format: String::from_utf8_lossy(&format).to_string(),
format_length,
format_type,
num_channels,
sample_rate,
byte_rate,
block_align,
bits_per_sample,
})
}

pub async fn read_i16(&mut self) -> Result<i16, WavError> {
let i = self.reader.read_i16_le().await?;
Ok(i)
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();

let url = env::var("LIVEKIT_URL").expect("LIVEKIT_URL is not set");
let token = env::var("LIVEKIT_TOKEN").expect("LIVEKIT_TOKEN is not set");

let file = tokio::fs::File::open("change-sophie.wav").await?;
let mut reader = WavReader::new(BufReader::new(file));
let header = reader.read_header().await?;
log::debug!("{:?}", header);

if header.bits_per_sample != 16 {
return Err("only 16-bit samples supported for this demo".into());
}

let (room, mut rx) = Room::connect(&url, &token, RoomOptions::default())
.await
.unwrap();
let room = Arc::new(room);
log::info!("Connected to room: {} - {}", room.name(), room.sid());

let source = NativeAudioSource::new(
AudioSourceOptions::default(),
header.sample_rate,
header.num_channels as u32,
);

let track = LocalAudioTrack::create_audio_track("file", RtcAudioSource::Native(source.clone()));

room.local_participant()
.publish_track(
LocalTrack::Audio(track),
TrackPublishOptions {
source: TrackSource::Microphone,
..Default::default()
},
)
.await?;

// Play the wav file and disconnect
tokio::spawn({
let room = room.clone();
async move {
const FRAME_DURATION: Duration = Duration::from_millis(1000); // Write 1s of audio at a time

let max_samples = header.data_size as usize / size_of::<i16>();
let ms = FRAME_DURATION.as_millis() as u32;
let num_samples = (header.sample_rate / 1000 * ms) as usize;

log::info!("sample_rate: {}", header.sample_rate);
log::info!("num_channels: {}", header.num_channels);
log::info!("max samples: {}", max_samples);
log::info!("chunk size: {}ms - {} samples", ms, num_samples);

let mut written_samples = 0;
while written_samples < max_samples {
let available_samples = max_samples - written_samples;
let frame_size = num_samples.min(available_samples);

let mut audio_frame = AudioFrame {
data: vec![0i16; frame_size].into(),
num_channels: header.num_channels as u32,
sample_rate: header.sample_rate,
samples_per_channel: (frame_size / header.num_channels as usize) as u32,
};

for i in 0..frame_size {
let sample = reader.read_i16().await.unwrap();
audio_frame.data.to_mut()[i] = sample;
}

source.capture_frame(&audio_frame).await.unwrap();
written_samples += frame_size;
}

room.close().await.unwrap();
}
});

while let Some(msg) = rx.recv().await {
log::info!("Event: {:?}", msg);
}

Ok(())
}
15 changes: 7 additions & 8 deletions examples/wgpu_room/src/logo_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct FrameData {
image: Arc<RgbaImage>,
framebuffer: Arc<Mutex<Vec<u8>>>,
video_frame: Arc<Mutex<VideoFrame<I420Buffer>>>,
pos: (u32, u32),
pos: (i32, i32),
direction: (i32, i32),
}

Expand Down Expand Up @@ -119,7 +119,7 @@ impl LogoTrack {

let mut data = FrameData {
image: Arc::new(image),
framebuffer: Arc::new(Mutex::new(vec![0u8; (FB_WIDTH * FB_HEIGHT * 4) as usize])),
framebuffer: Arc::new(Mutex::new(vec![0u8; FB_WIDTH * FB_HEIGHT * 4])),
video_frame: Arc::new(Mutex::new(VideoFrame {
rotation: VideoRotation::VideoRotation0,
buffer: I420Buffer::new(FB_WIDTH as u32, FB_HEIGHT as u32),
Expand All @@ -137,16 +137,16 @@ impl LogoTrack {
_ = interval.tick() => {}
}

data.pos.0 = (data.pos.0 as i32 + data.direction.0 * MOVE_SPEED) as u32;
data.pos.1 = (data.pos.1 as i32 + data.direction.1 * MOVE_SPEED) as u32;
data.pos.0 += data.direction.0 * MOVE_SPEED;
data.pos.1 += data.direction.1 * MOVE_SPEED;

if data.pos.0 >= (FB_WIDTH - data.image.width() as usize) as u32 {
if data.pos.0 >= (FB_WIDTH - data.image.width() as usize) as i32 {
data.direction.0 = -1;
} else if data.pos.0 <= 0 {
data.direction.0 = 1;
}

if data.pos.1 >= (FB_HEIGHT - data.image.height() as usize) as u32 {
if data.pos.1 >= (FB_HEIGHT - data.image.height() as usize) as i32 {
data.direction.1 = -1;
} else if data.pos.1 <= 0 {
data.direction.1 = 1;
Expand Down Expand Up @@ -189,8 +189,7 @@ impl LogoTrack {
stride_v,
FB_WIDTH as i32,
FB_HEIGHT as i32,
)
.unwrap();
);

source.capture_frame(&*video_frame);
}
Expand Down
7 changes: 5 additions & 2 deletions examples/wgpu_room/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{logo_track::LogoTrack, sine_track::SineTrack};
use crate::{
logo_track::LogoTrack,
sine_track::{SineParameters, SineTrack},
};
use livekit::{prelude::*, SimulateScenario};
use parking_lot::Mutex;
use std::sync::Arc;
Expand Down Expand Up @@ -120,7 +123,7 @@ async fn service_task(inner: Arc<ServiceInner>, mut cmd_rx: mpsc::UnboundedRecei
running_state = Some(RunningState {
room: new_room.clone(),
logo_track: LogoTrack::new(new_room.clone()),
sine_track: SineTrack::new(new_room.clone()),
sine_track: SineTrack::new(new_room.clone(), SineParameters::default()),
});

// Allow direct access to the room from the UI (Used for sync access)
Expand Down
Loading

0 comments on commit a934d50

Please sign in to comment.