From c5b3b3a002503bed11571c8698faf117c73c60dd Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 8 Oct 2024 21:40:35 +0200 Subject: [PATCH 01/11] Initial ffmpeg H.264 support in frames example --- Cargo.lock | 10 + Cargo.toml | 1 + crates/store/re_video/Cargo.toml | 11 +- crates/store/re_video/examples/frames.rs | 61 +++++- crates/store/re_video/src/decode/ffmpeg.rs | 213 +++++++++++++++++++++ crates/store/re_video/src/decode/mod.rs | 4 + crates/store/re_video/src/demux/mod.rs | 4 + 7 files changed, 293 insertions(+), 11 deletions(-) create mode 100644 crates/store/re_video/src/decode/ffmpeg.rs diff --git a/Cargo.lock b/Cargo.lock index 6aff8cbde065..a054870e8b59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2327,6 +2327,15 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "ffmpeg-sidecar" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd1e249e0ceeb0f5c9f84a3c6941c3bde3ebc2815f4b94531a7e806af61c4c0" +dependencies = [ + "anyhow", +] + [[package]] name = "filetime" version = "0.2.23" @@ -5910,6 +5919,7 @@ version = "0.19.0-alpha.1+dev" dependencies = [ "crossbeam", "econtext", + "ffmpeg-sidecar", "indicatif", "itertools 0.13.0", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 59262497c974..b3791c8b6375 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -167,6 +167,7 @@ econtext = "0.2" # Prints erro ehttp = "0.5.0" enumset = "1.0.12" env_logger = { version = "0.10", default-features = false } +ffmpeg-sidecar = "1.1.2" fixed = { version = "<1.28", default-features = false } # 1.28+ is MSRV 1.79+ flatbuffers = "23.0" futures-channel = "0.3" diff --git a/crates/store/re_video/Cargo.toml b/crates/store/re_video/Cargo.toml index 2dcad53f341a..2251a5d59cfd 100644 --- a/crates/store/re_video/Cargo.toml +++ b/crates/store/re_video/Cargo.toml @@ -23,12 +23,16 @@ all-features = false no-default-features = true features = ["all"] + [features] -default = ["av1"] +default = ["av1", "ffmpeg"] ## Native AV1 decoding. av1 = ["dep:dav1d"] +## Decode H.264 using ffmpeg over CLI. +ffmpeg = ["dep:ffmpeg-sidecar"] + ## Enable faster native video decoding with assembly. ## You need to install [nasm](https://nasm.us/) to compile with this feature. nasm = ["dav1d?/default"] # The default feature set of dav1d has asm enabled @@ -57,9 +61,12 @@ dav1d = { git = "https://github.com/rerun-io/rav1d", branch = "emilk/dav1d-inter # dav1d = { version = "0.10.3", optional = true } # Requires separate install of `dav1d` library. Fast in debug builds. Useful for development. +ffmpeg-sidecar = { workspace = true, optional = true } + [dev-dependencies] +# For the `frames` example: indicatif.workspace = true -re_video = { workspace = true, features = ["av1"] } # For the `frames` example +re_video = { workspace = true, features = ["av1", "ffmpeg"] } [[example]] diff --git a/crates/store/re_video/examples/frames.rs b/crates/store/re_video/examples/frames.rs index 877c20bdff1d..61affc0c48b7 100644 --- a/crates/store/re_video/examples/frames.rs +++ b/crates/store/re_video/examples/frames.rs @@ -16,6 +16,8 @@ use parking_lot::Mutex; use re_video::{decode::SyncDecoder, VideoData}; fn main() { + re_log::setup_logging(); + // frames let args: Vec<_> = std::env::args().collect(); let Some(video_path) = args.get(1) else { @@ -42,12 +44,29 @@ fn main() { } fn create_decoder(video: &VideoData) -> Box { - if video.config.is_av1() { - Box::new( - re_video::decode::av1::SyncDav1dDecoder::new().expect("Failed to start AV1 decoder"), - ) - } else { - panic!("Unsupported codec: {}", video.human_readable_codec_string()); + match &video.config.stsd.contents { + re_mp4::StsdBoxContent::Av01(av01_box) => { + println!("Decoding AV1…"); + Box::new( + re_video::decode::av1::SyncDav1dDecoder::new() + .expect("Failed to start AV1 decoder"), + ) + } + + re_mp4::StsdBoxContent::Avc1(avc1_box) => { + println!("Decoding H.264…"); + Box::new( + re_video::decode::ffmpeg::FfmpegCliH264Decoder::new( + avc1_box.clone(), + video.timescale, + ) + .expect("Failed to start H.264 decoder"), + ) + } + + _ => { + panic!("Unsupported codec: {}", video.human_readable_codec_string()); + } } } @@ -98,8 +117,15 @@ fn write_video_frames( .create(true) .truncate(true) .open(output_dir.join(format!("{i:0width$}.ppm"))) - .expect("failed to open file"); - write_binary_ppm(&mut file, frame.width, frame.height, &frame.data); + .expect("failed to oformatpen file"); + match frame.format { + re_video::PixelFormat::Rgb8Unorm => { + write_ppm_rgb24(&mut file, frame.width, frame.height, &frame.data); + } + re_video::PixelFormat::Rgba8Unorm => { + write_ppm_rgba32(&mut file, frame.width, frame.height, &frame.data); + } + } } } } @@ -108,7 +134,24 @@ fn num_digits(n: usize) -> usize { (n as f64).log10().floor() as usize + 1 } -fn write_binary_ppm(file: &mut File, width: u32, height: u32, rgba: &[u8]) { +fn write_ppm_rgb24(file: &mut File, width: u32, height: u32, rgb: &[u8]) { + assert_eq!(width as usize * height as usize * 3, rgb.len()); + + let header = format!("P6\n{width} {height}\n255\n"); + + let mut data = Vec::with_capacity(header.len() + width as usize * height as usize * 3); + data.extend_from_slice(header.as_bytes()); + + for rgb in rgb.chunks(3) { + data.extend_from_slice(&[rgb[0], rgb[1], rgb[2]]); + } + + file.write_all(&data).expect("failed to write frame data"); +} + +fn write_ppm_rgba32(file: &mut File, width: u32, height: u32, rgba: &[u8]) { + assert_eq!(width as usize * height as usize * 4, rgba.len()); + let header = format!("P6\n{width} {height}\n255\n"); let mut data = Vec::with_capacity(header.len() + width as usize * height as usize * 3); diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs new file mode 100644 index 000000000000..398682874776 --- /dev/null +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -0,0 +1,213 @@ +//! Send video data to `ffmpeg` over CLI to decode it. + +use crossbeam::channel::Receiver; +use ffmpeg_sidecar::{ + child::FfmpegChild, + command::FfmpegCommand, + event::{FfmpegEvent, LogLevel}, +}; + +use crate::{Time, Timescale}; + +use super::{Frame, Result, SyncDecoder}; + +/// Decode H.264 video via ffmpeg over CLI + +pub struct FfmpegCliH264Decoder { + /// How we send more data to the ffmpeg process + ffmpeg_stdin: std::process::ChildStdin, + + /// How we receive new frames back from ffmpeg + frame_rx: Receiver>, + + avcc: re_mp4::Avc1Box, + timescale: Timescale, +} + +impl FfmpegCliH264Decoder { + pub fn new(avcc: re_mp4::Avc1Box, timescale: Timescale) -> Result { + re_tracing::profile_function!(); + + let mut ffmpeg = { + re_tracing::profile_scope!("spawn-ffmpeg"); + + FfmpegCommand::new() + .hide_banner() + // Keep in mind that all arguments that are about the input, need to go before! + .format("h264") // High risk here: What's is available? + .input("-") // stdin is our input! + .rawvideo() // Output rgb24 on stdout. (TODO(emilk) for later: any format we can read directly on re_renderer would be better!) + .spawn() + .expect("Failed to spawn ffmpeg") + }; + + let mut ffmpeg_stdin = ffmpeg.take_stdin().unwrap(); + let ffmpeg_iterator = ffmpeg.iter().unwrap(); + + let (frame_tx, frame_rx) = crossbeam::channel::unbounded(); + + let thread_handle = std::thread::Builder::new() + .name("ffmpeg-reader".to_owned()) + .spawn(move || { + for event in ffmpeg_iterator { + match event { + FfmpegEvent::Log(LogLevel::Warning, msg) => re_log::warn_once!("{msg}"), + FfmpegEvent::Log(LogLevel::Error, msg) => re_log::error_once!("{msg}"), // TODO: report errors + FfmpegEvent::Progress(p) => { + re_log::debug!("Progress: {}", p.time) + } + FfmpegEvent::OutputFrame(frame) => { + re_log::trace!( + "Received frame: d[0] {} time {:?} fmt {:?} size {}x{}", + frame.data[0], + frame.timestamp, + frame.pix_fmt, + frame.width, + frame.height + ); + + debug_assert_eq!(frame.pix_fmt, "rgb24"); + debug_assert_eq!( + frame.width as usize * frame.height as usize * 3, + frame.data.len() + ); + + frame_tx.send(Ok(super::Frame { + width: frame.width, + height: frame.height, + data: frame.data, + format: crate::PixelFormat::Rgb8Unorm, + timestamp: Time::from_secs(frame.timestamp as f64, timescale), + duration: Time::from_secs(0.1, timescale), // TODO + })); // TODO: handle disconnect + } + // TODO: handle all events + event => re_log::debug!("Event: {event:?}"), + } + } + re_log::debug!("Shutting down ffmpeg"); + }); + + Ok(Self { + ffmpeg_stdin, + frame_rx, + avcc, + timescale, + }) + } +} + +impl SyncDecoder for FfmpegCliH264Decoder { + fn submit_chunk( + &mut self, + should_stop: &std::sync::atomic::AtomicBool, + chunk: super::Chunk, + on_output: &super::OutputCallback, + ) { + re_tracing::profile_function!(); + + let mut state = NaluStreamState::default(); + write_avc_chunk_to_nalu_stream(&self.avcc, &mut self.ffmpeg_stdin, &chunk, &mut state) + .unwrap(); + // consider writing samples while at the same time reading frames, for even lower latency + // and maybe reuse the same ffmpeg process. + + // TODO: handle errors + while let Ok(frame_result) = self.frame_rx.try_recv() { + on_output(frame_result); + } + } + + fn reset(&mut self) { + // TODO: restart ffmpeg process + } +} + +/// Before every NAL unit, here is a nal start code. +/// Can also be 2 bytes of 0x00 and 1 byte of 0x01. +/// +/// This is used in byte stream formats such as h264 files. +/// Packet transform systems (RTP) may omit these. +pub const NAL_START_CODE: &[u8] = &[0x00, 0x00, 0x00, 0x01]; + +#[derive(Default)] +struct NaluStreamState { + previous_frame_was_idr: bool, +} + +fn write_avc_chunk_to_nalu_stream( + avcc: &re_mp4::Avc1Box, + nalu_stream: &mut dyn std::io::Write, + chunk: &super::Chunk, + state: &mut NaluStreamState, +) -> Result<(), Box> { + re_tracing::profile_function!(); + let avcc = &avcc.avcc; + + // Append SPS (Sequence Parameter Set) & PPS (Picture Parameter Set) NAL unit whenever encountering + // an IDR frame unless the previous frame was an IDR frame. + // TODO(andreas): Should we detect this rather from the NALU stream rather than the samples? + if chunk.is_sync && !state.previous_frame_was_idr { + for sps in (&avcc.sequence_parameter_sets).iter() { + nalu_stream.write_all(&NAL_START_CODE)?; + nalu_stream.write_all(&sps.bytes)?; + } + for pps in (&avcc.picture_parameter_sets).iter() { + nalu_stream.write_all(&NAL_START_CODE)?; + nalu_stream.write_all(&pps.bytes)?; + } + state.previous_frame_was_idr = true; + } else { + state.previous_frame_was_idr = false; + } + + // A single cjhunk may consist of multiple NAL units, each of which need our special treatment. + // (most of the time it's 1:1, but there might be extra NAL units for info, especially at the start). + let mut buffer_offset: usize = 0; + let sample_end = chunk.data.len(); + while buffer_offset < sample_end { + re_tracing::profile_scope!("nalu"); + + // Each NAL unit in mp4 is prefixed with a length prefix. + // In Annex B this doesn't exist. + let length_prefix_size = avcc.length_size_minus_one as usize + 1; + + // TODO: improve the error handling here. + let nal_unit_size = match length_prefix_size { + 4 => u32::from_be_bytes( + chunk.data[buffer_offset..(buffer_offset + 4)] + .try_into() + .unwrap(), + ) as usize, + 2 => u16::from_be_bytes( + chunk.data[buffer_offset..(buffer_offset + 2)] + .try_into() + .unwrap(), + ) as usize, + 1 => chunk.data[buffer_offset] as usize, + _ => panic!("invalid length prefix size"), + }; + //re_log::debug!("nal unit size: {}", nal_unit_size); + + if chunk.data.len() < nal_unit_size { + panic!( + "sample size {} is smaller than nal unit size {nal_unit_size}", + chunk.data.len() + ); + } + + nalu_stream.write_all(&NAL_START_CODE)?; + let data_start = buffer_offset + length_prefix_size; // Skip the size. + let data_end = buffer_offset + nal_unit_size + length_prefix_size; + let data = &chunk.data[data_start..data_end]; + + // Note that we don't have to insert "emulation prevention bytes" since mp4 NALU still use them. + // (unlike the NAL start code, the presentation bytes are part of the NAL spec!) + + nalu_stream.write_all(data)?; + + buffer_offset = data_end; + } + + Ok(()) +} diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index 7fd77960f6de..5dc4c8fee1d1 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -10,6 +10,10 @@ pub mod async_decoder; #[cfg(not(target_arch = "wasm32"))] pub use async_decoder::AsyncDecoder; +#[cfg(feature = "ffmpeg")] +#[cfg(not(target_arch = "wasm32"))] +pub mod ffmpeg; + use std::sync::atomic::AtomicBool; use crate::Time; diff --git a/crates/store/re_video/src/demux/mod.rs b/crates/store/re_video/src/demux/mod.rs index f32e17d2ce17..c4307cc6d3d5 100644 --- a/crates/store/re_video/src/demux/mod.rs +++ b/crates/store/re_video/src/demux/mod.rs @@ -222,6 +222,10 @@ impl Config { pub fn is_av1(&self) -> bool { matches!(self.stsd.contents, re_mp4::StsdBoxContent::Av01 { .. }) } + + pub fn is_h264(&self) -> bool { + matches!(self.stsd.contents, re_mp4::StsdBoxContent::Avc1 { .. }) + } } /// Errors that can occur when loading a video. From d4da02aec7aa1f4533e19a5cf5a54a8728481f45 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 8 Oct 2024 22:01:20 +0200 Subject: [PATCH 02/11] Actual H.264 video playback inside of Rerun viewer --- crates/store/re_video/Cargo.toml | 3 -- crates/store/re_video/examples/frames.rs | 29 +---------------- crates/store/re_video/src/decode/mod.rs | 31 +++++++++++++++++++ crates/viewer/re_renderer/Cargo.toml | 7 +++++ .../re_renderer/src/video/decoder/mod.rs | 30 +++++++++--------- crates/viewer/re_renderer/src/video/mod.rs | 4 +-- 6 files changed, 56 insertions(+), 48 deletions(-) diff --git a/crates/store/re_video/Cargo.toml b/crates/store/re_video/Cargo.toml index 2251a5d59cfd..c7e7e7038957 100644 --- a/crates/store/re_video/Cargo.toml +++ b/crates/store/re_video/Cargo.toml @@ -27,9 +27,6 @@ features = ["all"] [features] default = ["av1", "ffmpeg"] -## Native AV1 decoding. -av1 = ["dep:dav1d"] - ## Decode H.264 using ffmpeg over CLI. ffmpeg = ["dep:ffmpeg-sidecar"] diff --git a/crates/store/re_video/examples/frames.rs b/crates/store/re_video/examples/frames.rs index 61affc0c48b7..124a12bef724 100644 --- a/crates/store/re_video/examples/frames.rs +++ b/crates/store/re_video/examples/frames.rs @@ -38,38 +38,11 @@ fn main() { video.config.coded_height ); - let mut decoder = create_decoder(&video); + let mut decoder = re_video::decode::new_decoder(&video).expect("Failed to create decoder"); write_video_frames(&video, decoder.as_mut(), &output_dir); } -fn create_decoder(video: &VideoData) -> Box { - match &video.config.stsd.contents { - re_mp4::StsdBoxContent::Av01(av01_box) => { - println!("Decoding AV1…"); - Box::new( - re_video::decode::av1::SyncDav1dDecoder::new() - .expect("Failed to start AV1 decoder"), - ) - } - - re_mp4::StsdBoxContent::Avc1(avc1_box) => { - println!("Decoding H.264…"); - Box::new( - re_video::decode::ffmpeg::FfmpegCliH264Decoder::new( - avc1_box.clone(), - video.timescale, - ) - .expect("Failed to start H.264 decoder"), - ) - } - - _ => { - panic!("Unsupported codec: {}", video.human_readable_codec_string()); - } - } -} - fn write_video_frames( video: &re_video::VideoData, decoder: &mut dyn re_video::decode::SyncDecoder, diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index 5dc4c8fee1d1..6b430593bf59 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -20,6 +20,9 @@ use crate::Time; #[derive(thiserror::Error, Debug)] pub enum Error { + #[error("Unsupported codec: {0}")] + UnsupportedCodec(String), + #[cfg(feature = "av1")] #[cfg(not(target_arch = "wasm32"))] #[error("dav1d: {0}")] @@ -41,6 +44,34 @@ pub trait SyncDecoder { fn reset(&mut self) {} } +#[cfg(not(target_arch = "wasm32"))] +pub fn new_decoder(video: &crate::VideoData) -> Result> { + re_log::trace!( + "Looking for decoder for {}", + video.human_readable_codec_string() + ); + + match &video.config.stsd.contents { + #[cfg(feature = "av1")] + re_mp4::StsdBoxContent::Av01(_av01_box) => { + re_log::trace!("Decoding AV1…"); + Ok(Box::new(av1::SyncDav1dDecoder::new()?)) + } + + #[cfg(feature = "ffmpeg")] + re_mp4::StsdBoxContent::Avc1(avc1_box) => { + // TODO: check if we have ffmpeg ONCE, and remember + re_log::trace!("Decoding H.264…"); + Ok(Box::new(ffmpeg::FfmpegCliH264Decoder::new( + avc1_box.clone(), + video.timescale, + )?)) + } + + _ => Err(Error::UnsupportedCodec(video.human_readable_codec_string())), + } +} + /// One chunk of encoded video data; usually one frame. /// /// One loaded [`crate::Sample`]. diff --git a/crates/viewer/re_renderer/Cargo.toml b/crates/viewer/re_renderer/Cargo.toml index 690e7e3eb35a..31accb7af83d 100644 --- a/crates/viewer/re_renderer/Cargo.toml +++ b/crates/viewer/re_renderer/Cargo.toml @@ -35,6 +35,13 @@ default = ["import-obj", "import-gltf", "import-stl"] ## Support for Arrow datatypes for end-to-end zero-copy. arrow = ["dep:arrow2"] +<<<<<<< HEAD +======= +## Support for native AV1 video decoding. +## You need to install [nasm](https://nasm.us/) to compile with this feature. +video_av1 = ["re_video/av1"] # TODO: remove + +>>>>>>> 4ced5f84a4 (Actual H.264 video playback inside of Rerun viewer) ## Support importing .obj files import-obj = ["dep:tobj"] diff --git a/crates/viewer/re_renderer/src/video/decoder/mod.rs b/crates/viewer/re_renderer/src/video/decoder/mod.rs index 97ecd40dfcbd..4277eb9184b3 100644 --- a/crates/viewer/re_renderer/src/video/decoder/mod.rs +++ b/crates/viewer/re_renderer/src/video/decoder/mod.rs @@ -128,23 +128,23 @@ impl VideoDecoder { let decoder = web::WebVideoDecoder::new(data.clone(), hw_acceleration)?; return Ok(Self::from_chunk_decoder(render_ctx, data, decoder)); } else { - // Native AV1 video decoding: - - if !data.config.is_av1() { - return Err(DecodingError::UnsupportedCodec { - codec: data.human_readable_codec_string(), - }); + if data.config.is_av1() && cfg!(debug_assertions) { + // TODO: move to re_video + return Err(DecodingError::NoNativeDebug); // because debug builds of rav1d is so slow } - if cfg!(debug_assertions) { - return Err(DecodingError::NoNativeDebug); // because debug builds of rav1d are EXTREMELY slow - } else { - let av1_decoder = re_video::decode::av1::SyncDav1dDecoder::new() - .map_err(|err| DecodingError::StartDecoder(err.to_string()))?; - - let decoder = native_decoder::NativeDecoder::new(debug_name, Box::new(av1_decoder))?; - return Ok(Self::from_chunk_decoder(render_ctx, data, decoder)); - }; + match re_video::decode::new_decoder(&data) { + Ok(sync_decoder) => { + let decoder = native_decoder::NativeDecoder::new(debug_name, sync_decoder)?; + return Ok(Self::from_chunk_decoder(render_ctx, data, decoder)); + } + Err(re_video::decode::Error::UnsupportedCodec(codec)) => { + return Err(DecodingError::UnsupportedCodec { codec }); // TODO: better error message with feature flags etc + } + Err(err) => { + return Err(DecodingError::DecoderSetupFailure(err.to_string())); + } + } } } } diff --git a/crates/viewer/re_renderer/src/video/mod.rs b/crates/viewer/re_renderer/src/video/mod.rs index 517c6ecc08c2..e412b90fecae 100644 --- a/crates/viewer/re_renderer/src/video/mod.rs +++ b/crates/viewer/re_renderer/src/video/mod.rs @@ -58,8 +58,8 @@ pub enum DecodingError { UnsupportedCodec { codec: String }, #[cfg(not(target_arch = "wasm32"))] - #[error("Native video decoding not supported in native debug builds.")] - NoNativeDebug, + #[error("Video decoding not supported in native debug builds.")] + NoNativeDebug, // TODO: move to re_video? } pub type FrameDecodingResult = Result; From f2fc70b273c3fa2212af75e843dd962884dec63d Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 9 Oct 2024 06:00:16 +0200 Subject: [PATCH 03/11] Fix timestamps and seeking --- crates/store/re_video/src/decode/ffmpeg.rs | 108 +++++++++++++++------ crates/store/re_video/src/decode/mod.rs | 10 ++ 2 files changed, 90 insertions(+), 28 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 398682874776..9265d26c5705 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,6 +1,6 @@ //! Send video data to `ffmpeg` over CLI to decode it. -use crossbeam::channel::Receiver; +use crossbeam::channel::{Receiver, Sender}; use ffmpeg_sidecar::{ child::FfmpegChild, command::FfmpegCommand, @@ -9,14 +9,29 @@ use ffmpeg_sidecar::{ use crate::{Time, Timescale}; -use super::{Frame, Result, SyncDecoder}; +use super::{Error, Frame, Result, SyncDecoder}; + +/// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it. +struct FrameInfo { + /// Monotonic index, from start + frame_num: u32, + + timestamp: Time, + duration: Time, +} /// Decode H.264 video via ffmpeg over CLI pub struct FfmpegCliH264Decoder { + /// Monotonically increasing + frame_num: u32, + /// How we send more data to the ffmpeg process ffmpeg_stdin: std::process::ChildStdin, + /// For sending frame timestamps to the decoder thread + frame_info_tx: Sender, + /// How we receive new frames back from ffmpeg frame_rx: Receiver>, @@ -38,58 +53,82 @@ impl FfmpegCliH264Decoder { .input("-") // stdin is our input! .rawvideo() // Output rgb24 on stdout. (TODO(emilk) for later: any format we can read directly on re_renderer would be better!) .spawn() - .expect("Failed to spawn ffmpeg") + .map_err(Error::FailedToStartFfmpeg)? }; - let mut ffmpeg_stdin = ffmpeg.take_stdin().unwrap(); + let ffmpeg_stdin = ffmpeg.take_stdin().unwrap(); let ffmpeg_iterator = ffmpeg.iter().unwrap(); + let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded(); let (frame_tx, frame_rx) = crossbeam::channel::unbounded(); - let thread_handle = std::thread::Builder::new() + std::thread::Builder::new() .name("ffmpeg-reader".to_owned()) .spawn(move || { for event in ffmpeg_iterator { match event { - FfmpegEvent::Log(LogLevel::Warning, msg) => re_log::warn_once!("{msg}"), + FfmpegEvent::Log(LogLevel::Warning, msg) => { + if !msg.contains( + "No accelerated colorspace conversion found from yuv420p to rgb24", + ) { + re_log::warn_once!("{msg}"); + } + } FfmpegEvent::Log(LogLevel::Error, msg) => re_log::error_once!("{msg}"), // TODO: report errors FfmpegEvent::Progress(p) => { - re_log::debug!("Progress: {}", p.time) + re_log::debug!("Progress: {}", p.time); } FfmpegEvent::OutputFrame(frame) => { - re_log::trace!( - "Received frame: d[0] {} time {:?} fmt {:?} size {}x{}", - frame.data[0], - frame.timestamp, - frame.pix_fmt, - frame.width, - frame.height - ); + // The `frame.timestamp` is monotonically increasing, + // so it is not the actual timestamp in the stream. + + let frame_info: FrameInfo = frame_info_rx.recv().unwrap(); + + let ffmpeg_sidecar::event::OutputVideoFrame { + frame_num, + pix_fmt, + width, + height, + data, + .. + } = frame; + + debug_assert_eq!(frame_info.frame_num, frame_num, "We are out-of-sync"); // TODO: fix somehow - debug_assert_eq!(frame.pix_fmt, "rgb24"); - debug_assert_eq!( - frame.width as usize * frame.height as usize * 3, - frame.data.len() + re_log::trace!( + "Received frame {frame_num}: fmt {pix_fmt:?} size {width}x{height}" ); - frame_tx.send(Ok(super::Frame { - width: frame.width, - height: frame.height, - data: frame.data, - format: crate::PixelFormat::Rgb8Unorm, - timestamp: Time::from_secs(frame.timestamp as f64, timescale), - duration: Time::from_secs(0.1, timescale), // TODO - })); // TODO: handle disconnect + debug_assert_eq!(pix_fmt, "rgb24"); + debug_assert_eq!(width as usize * height as usize * 3, data.len()); + + if frame_tx + .send(Ok(super::Frame { + width, + height, + data, + format: crate::PixelFormat::Rgb8Unorm, + timestamp: frame_info.timestamp, + duration: frame_info.duration, + })) + .is_err() + { + re_log::debug!("Receiver disconnected"); + break; + } } // TODO: handle all events event => re_log::debug!("Event: {event:?}"), } } re_log::debug!("Shutting down ffmpeg"); - }); + }) + .expect("Failed to spawn ffmpeg thread"); Ok(Self { + frame_num: 0, ffmpeg_stdin, + frame_info_tx, frame_rx, avcc, timescale, @@ -106,6 +145,16 @@ impl SyncDecoder for FfmpegCliH264Decoder { ) { re_tracing::profile_function!(); + // NOTE: this assumes each sample/chunk will result in exactly one frame. + self.frame_info_tx.send(FrameInfo { + frame_num: self.frame_num, + timestamp: chunk.timestamp, + duration: chunk.duration, + }); + + // NOTE: a 60 FPS video can go for two years before wrapping a u32. + self.frame_num = self.frame_num.wrapping_add(1); + let mut state = NaluStreamState::default(); write_avc_chunk_to_nalu_stream(&self.avcc, &mut self.ffmpeg_stdin, &chunk, &mut state) .unwrap(); @@ -114,6 +163,9 @@ impl SyncDecoder for FfmpegCliH264Decoder { // TODO: handle errors while let Ok(frame_result) = self.frame_rx.try_recv() { + if should_stop.load(std::sync::atomic::Ordering::Relaxed) { + return; + } on_output(frame_result); } } diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index 6b430593bf59..ce8069e39031 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -27,6 +27,16 @@ pub enum Error { #[cfg(not(target_arch = "wasm32"))] #[error("dav1d: {0}")] Dav1d(#[from] dav1d::Error), + + #[cfg(feature = "ffmpeg")] + #[cfg(not(target_arch = "wasm32"))] + #[error("Failed to start ffmppeg: {0}")] + FailedToStartFfmpeg(std::io::Error), + + #[cfg(feature = "ffmpeg")] + #[cfg(not(target_arch = "wasm32"))] + #[error("Failed to start ffmppeg: {0}")] + FailedToSpawnThread(std::io::Error), } pub type Result = std::result::Result; From c07b015f9f69c0a53069457de2fad4934986e85e Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 9 Oct 2024 06:31:05 +0200 Subject: [PATCH 04/11] Code cleanup and better error handling --- crates/store/re_video/src/decode/ffmpeg.rs | 233 ++++++++++++++------- crates/store/re_video/src/decode/mod.rs | 10 +- 2 files changed, 164 insertions(+), 79 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 9265d26c5705..6ace131ac3b7 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,15 +1,29 @@ //! Send video data to `ffmpeg` over CLI to decode it. -use crossbeam::channel::{Receiver, Sender}; +use crossbeam::channel::{Receiver, Sender, TryRecvError}; use ffmpeg_sidecar::{ - child::FfmpegChild, command::FfmpegCommand, event::{FfmpegEvent, LogLevel}, }; -use crate::{Time, Timescale}; +use crate::Time; -use super::{Error, Frame, Result, SyncDecoder}; +use super::{Frame, Result, SyncDecoder}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Failed to start ffmppeg: {0}")] + FailedToStartFfmpeg(std::io::Error), + + #[error("Failed to get stdin handle")] + NoStdin, + + #[error("Failed to get iterator: {0}")] + NoIterator(String), + + #[error("There's a bug in Rerun")] + NoFrameInfo, +} /// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it. struct FrameInfo { @@ -33,14 +47,13 @@ pub struct FfmpegCliH264Decoder { frame_info_tx: Sender, /// How we receive new frames back from ffmpeg - frame_rx: Receiver>, + frame_rx: Receiver>, avcc: re_mp4::Avc1Box, - timescale: Timescale, } impl FfmpegCliH264Decoder { - pub fn new(avcc: re_mp4::Avc1Box, timescale: Timescale) -> Result { + pub fn new(avcc: re_mp4::Avc1Box) -> Result { re_tracing::profile_function!(); let mut ffmpeg = { @@ -56,8 +69,10 @@ impl FfmpegCliH264Decoder { .map_err(Error::FailedToStartFfmpeg)? }; - let ffmpeg_stdin = ffmpeg.take_stdin().unwrap(); - let ffmpeg_iterator = ffmpeg.iter().unwrap(); + let ffmpeg_stdin = ffmpeg.take_stdin().ok_or(Error::NoStdin)?; + let ffmpeg_iterator = ffmpeg + .iter() + .map_err(|err| Error::NoIterator(err.to_string()))?; let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded(); let (frame_tx, frame_rx) = crossbeam::channel::unbounded(); @@ -65,62 +80,7 @@ impl FfmpegCliH264Decoder { std::thread::Builder::new() .name("ffmpeg-reader".to_owned()) .spawn(move || { - for event in ffmpeg_iterator { - match event { - FfmpegEvent::Log(LogLevel::Warning, msg) => { - if !msg.contains( - "No accelerated colorspace conversion found from yuv420p to rgb24", - ) { - re_log::warn_once!("{msg}"); - } - } - FfmpegEvent::Log(LogLevel::Error, msg) => re_log::error_once!("{msg}"), // TODO: report errors - FfmpegEvent::Progress(p) => { - re_log::debug!("Progress: {}", p.time); - } - FfmpegEvent::OutputFrame(frame) => { - // The `frame.timestamp` is monotonically increasing, - // so it is not the actual timestamp in the stream. - - let frame_info: FrameInfo = frame_info_rx.recv().unwrap(); - - let ffmpeg_sidecar::event::OutputVideoFrame { - frame_num, - pix_fmt, - width, - height, - data, - .. - } = frame; - - debug_assert_eq!(frame_info.frame_num, frame_num, "We are out-of-sync"); // TODO: fix somehow - - re_log::trace!( - "Received frame {frame_num}: fmt {pix_fmt:?} size {width}x{height}" - ); - - debug_assert_eq!(pix_fmt, "rgb24"); - debug_assert_eq!(width as usize * height as usize * 3, data.len()); - - if frame_tx - .send(Ok(super::Frame { - width, - height, - data, - format: crate::PixelFormat::Rgb8Unorm, - timestamp: frame_info.timestamp, - duration: frame_info.duration, - })) - .is_err() - { - re_log::debug!("Receiver disconnected"); - break; - } - } - // TODO: handle all events - event => re_log::debug!("Event: {event:?}"), - } - } + read_ffmpeg_output(ffmpeg_iterator, &frame_info_rx, &frame_tx); re_log::debug!("Shutting down ffmpeg"); }) .expect("Failed to spawn ffmpeg thread"); @@ -131,11 +91,142 @@ impl FfmpegCliH264Decoder { frame_info_tx, frame_rx, avcc, - timescale, }) } } +fn read_ffmpeg_output( + ffmpeg_iterator: ffmpeg_sidecar::iter::FfmpegIterator, + frame_info_rx: &Receiver, + frame_tx: &Sender>, +) { + for event in ffmpeg_iterator { + #[allow(clippy::match_same_arms)] + match event { + FfmpegEvent::Log(LogLevel::Info, msg) => { + re_log::debug!("{msg}"); + } + + FfmpegEvent::Log(LogLevel::Warning, msg) => { + if !msg.contains("No accelerated colorspace conversion found from yuv420p to rgb24") + { + re_log::warn_once!("{msg}"); + } + } + + FfmpegEvent::Log(LogLevel::Error, msg) => { + // TODO: report errors + re_log::error_once!("{msg}"); + } + + // Usefuless info in these: + FfmpegEvent::ParsedInput(_) => {} + FfmpegEvent::ParsedOutput(_) => {} + FfmpegEvent::ParsedStreamMapping(_) => {} + + FfmpegEvent::ParsedInputStream(stream) => { + let ffmpeg_sidecar::event::AVStream { + stream_type, + format, + pix_fmt, // Often 'yuv420p' + width, + height, + fps, + .. + } = stream; + + re_log::debug!("ParsedInputStream {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS"); + + debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); + } + + FfmpegEvent::ParsedOutputStream(stream) => { + // This just repeats what we told ffmpeg to output, e.g. "rawvideo rgb24" + let ffmpeg_sidecar::event::AVStream { + stream_type, + format, + pix_fmt, + width, + height, + fps, + .. + } = stream; + + re_log::debug!("ParsedOutputStream {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS"); + + debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); + } + + FfmpegEvent::Progress(_) => { + // We can get out frame number etc here to know how far behind we are. + } + + FfmpegEvent::OutputFrame(frame) => { + // NOTE: `frame.timestamp` is monotonically increasing, + // and is not the actual timestamp in the stream. + + let frame_info: FrameInfo = match frame_info_rx.try_recv() { + Ok(frame_info) => frame_info, + + Err(TryRecvError::Disconnected) => { + re_log::debug!("Receiver disconnected"); + return; + } + + Err(TryRecvError::Empty) => { + // This shouldn't happen + if frame_tx.send(Err(Error::NoFrameInfo.into())).is_err() { + re_log::warn!("Got no frame-info, and failed to send error"); + } + return; + } + }; + + let ffmpeg_sidecar::event::OutputVideoFrame { + frame_num, + pix_fmt, + width, + height, + data, + .. + } = frame; + + debug_assert_eq!( + frame_info.frame_num, frame_num, + "We are out-of-sync with ffmpeg" + ); // TODO: fix somehow + + re_log::trace!("Received frame {frame_num}: fmt {pix_fmt:?} size {width}x{height}"); + + debug_assert_eq!(pix_fmt, "rgb24"); + debug_assert_eq!(width as usize * height as usize * 3, data.len()); + + if frame_tx + .send(Ok(super::Frame { + width, + height, + data, + format: crate::PixelFormat::Rgb8Unorm, + timestamp: frame_info.timestamp, + duration: frame_info.duration, + })) + .is_err() + { + re_log::debug!("Receiver disconnected"); + return; + } + } + + FfmpegEvent::Done => { + re_log::debug!("ffmpeg is Done"); + return; + } + // TODO: handle all events + event => re_log::debug!("Event: {event:?}"), + } + } +} + impl SyncDecoder for FfmpegCliH264Decoder { fn submit_chunk( &mut self, @@ -200,12 +291,12 @@ fn write_avc_chunk_to_nalu_stream( // an IDR frame unless the previous frame was an IDR frame. // TODO(andreas): Should we detect this rather from the NALU stream rather than the samples? if chunk.is_sync && !state.previous_frame_was_idr { - for sps in (&avcc.sequence_parameter_sets).iter() { - nalu_stream.write_all(&NAL_START_CODE)?; + for sps in &avcc.sequence_parameter_sets { + nalu_stream.write_all(NAL_START_CODE)?; nalu_stream.write_all(&sps.bytes)?; } - for pps in (&avcc.picture_parameter_sets).iter() { - nalu_stream.write_all(&NAL_START_CODE)?; + for pps in &avcc.picture_parameter_sets { + nalu_stream.write_all(NAL_START_CODE)?; nalu_stream.write_all(&pps.bytes)?; } state.previous_frame_was_idr = true; @@ -248,7 +339,7 @@ fn write_avc_chunk_to_nalu_stream( ); } - nalu_stream.write_all(&NAL_START_CODE)?; + nalu_stream.write_all(NAL_START_CODE)?; let data_start = buffer_offset + length_prefix_size; // Skip the size. let data_end = buffer_offset + nal_unit_size + length_prefix_size; let data = &chunk.data[data_start..data_end]; diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index ce8069e39031..c618dcc4bf75 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -30,13 +30,8 @@ pub enum Error { #[cfg(feature = "ffmpeg")] #[cfg(not(target_arch = "wasm32"))] - #[error("Failed to start ffmppeg: {0}")] - FailedToStartFfmpeg(std::io::Error), - - #[cfg(feature = "ffmpeg")] - #[cfg(not(target_arch = "wasm32"))] - #[error("Failed to start ffmppeg: {0}")] - FailedToSpawnThread(std::io::Error), + #[error("ffmppeg: {0}")] + Ffmpeg(#[from] ffmpeg::Error), } pub type Result = std::result::Result; @@ -74,7 +69,6 @@ pub fn new_decoder(video: &crate::VideoData) -> Result Date: Wed, 9 Oct 2024 12:43:33 +0200 Subject: [PATCH 05/11] Better error handling --- crates/store/re_video/src/decode/ffmpeg.rs | 127 +++++++++++++++------ 1 file changed, 92 insertions(+), 35 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 6ace131ac3b7..90b99bc9c5cc 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,5 +1,7 @@ //! Send video data to `ffmpeg` over CLI to decode it. +use std::sync::atomic::Ordering; + use crossbeam::channel::{Receiver, Sender, TryRecvError}; use ffmpeg_sidecar::{ command::FfmpegCommand, @@ -8,7 +10,7 @@ use ffmpeg_sidecar::{ use crate::Time; -use super::{Frame, Result, SyncDecoder}; +use super::{Frame, SyncDecoder}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -23,6 +25,12 @@ pub enum Error { #[error("There's a bug in Rerun")] NoFrameInfo, + + #[error("Failed to write data to ffmpeg: {0}")] + FailedToWriteToFfmpeg(std::io::Error), + + #[error("Bad video data: {0}")] + BadVideoData(String), } /// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it. @@ -53,7 +61,7 @@ pub struct FfmpegCliH264Decoder { } impl FfmpegCliH264Decoder { - pub fn new(avcc: re_mp4::Avc1Box) -> Result { + pub fn new(avcc: re_mp4::Avc1Box) -> Result { re_tracing::profile_function!(); let mut ffmpeg = { @@ -236,29 +244,53 @@ impl SyncDecoder for FfmpegCliH264Decoder { ) { re_tracing::profile_function!(); - // NOTE: this assumes each sample/chunk will result in exactly one frame. - self.frame_info_tx.send(FrameInfo { + // First read any outstanding messages (e.g. error reports), + // so they get orderer correctly. + while let Ok(frame_result) = self.frame_rx.try_recv() { + if should_stop.load(Ordering::Relaxed) { + return; + } + on_output(frame_result); + } + + // We send the information about this chunk first. + // This assumes each sample/chunk will result in exactly one frame. + // If this assumption is not held, we will get weird errors, like videos playing to slowly. + let frame_info = FrameInfo { frame_num: self.frame_num, timestamp: chunk.timestamp, duration: chunk.duration, - }); + }; // NOTE: a 60 FPS video can go for two years before wrapping a u32. self.frame_num = self.frame_num.wrapping_add(1); - let mut state = NaluStreamState::default(); - write_avc_chunk_to_nalu_stream(&self.avcc, &mut self.ffmpeg_stdin, &chunk, &mut state) - .unwrap(); - // consider writing samples while at the same time reading frames, for even lower latency - // and maybe reuse the same ffmpeg process. + if self.frame_info_tx.send(frame_info).is_err() { + // The other thread must be down, e.g. because `ffmpeg` crashed. + // It should already have reported that as an error - no need to repeat it here. + } else { + // Write chunk to ffmpeg: + let mut state = NaluStreamState::default(); // TODO: remove state? + if let Err(err) = write_avc_chunk_to_nalu_stream( + should_stop, + &self.avcc, + &mut self.ffmpeg_stdin, + &chunk, + &mut state, + ) { + on_output(Err(err.into())); + } + } - // TODO: handle errors + // Read results and/or errors: while let Ok(frame_result) = self.frame_rx.try_recv() { - if should_stop.load(std::sync::atomic::Ordering::Relaxed) { + if should_stop.load(Ordering::Relaxed) { return; } on_output(frame_result); } + + // TODO: block until we have processed the frame! } fn reset(&mut self) { @@ -279,11 +311,12 @@ struct NaluStreamState { } fn write_avc_chunk_to_nalu_stream( + should_stop: &std::sync::atomic::AtomicBool, avcc: &re_mp4::Avc1Box, nalu_stream: &mut dyn std::io::Write, chunk: &super::Chunk, state: &mut NaluStreamState, -) -> Result<(), Box> { +) -> Result<(), Error> { re_tracing::profile_function!(); let avcc = &avcc.avcc; @@ -292,12 +325,20 @@ fn write_avc_chunk_to_nalu_stream( // TODO(andreas): Should we detect this rather from the NALU stream rather than the samples? if chunk.is_sync && !state.previous_frame_was_idr { for sps in &avcc.sequence_parameter_sets { - nalu_stream.write_all(NAL_START_CODE)?; - nalu_stream.write_all(&sps.bytes)?; + nalu_stream + .write_all(NAL_START_CODE) + .map_err(Error::FailedToWriteToFfmpeg)?; + nalu_stream + .write_all(&sps.bytes) + .map_err(Error::FailedToWriteToFfmpeg)?; } for pps in &avcc.picture_parameter_sets { - nalu_stream.write_all(NAL_START_CODE)?; - nalu_stream.write_all(&pps.bytes)?; + nalu_stream + .write_all(NAL_START_CODE) + .map_err(Error::FailedToWriteToFfmpeg)?; + nalu_stream + .write_all(&pps.bytes) + .map_err(Error::FailedToWriteToFfmpeg)?; } state.previous_frame_was_idr = true; } else { @@ -308,46 +349,62 @@ fn write_avc_chunk_to_nalu_stream( // (most of the time it's 1:1, but there might be extra NAL units for info, especially at the start). let mut buffer_offset: usize = 0; let sample_end = chunk.data.len(); - while buffer_offset < sample_end { + while buffer_offset < sample_end && !should_stop.load(Ordering::Relaxed) { re_tracing::profile_scope!("nalu"); // Each NAL unit in mp4 is prefixed with a length prefix. // In Annex B this doesn't exist. let length_prefix_size = avcc.length_size_minus_one as usize + 1; - // TODO: improve the error handling here. + if sample_end < buffer_offset + length_prefix_size { + return Err(Error::BadVideoData( + "Not enough bytes to fit the length prefix".to_owned(), + )); + } + let nal_unit_size = match length_prefix_size { - 4 => u32::from_be_bytes( - chunk.data[buffer_offset..(buffer_offset + 4)] + 1 => chunk.data[buffer_offset] as usize, + + 2 => u16::from_be_bytes( + #[allow(clippy::unwrap_used)] // can't fail + chunk.data[buffer_offset..(buffer_offset + 2)] .try_into() .unwrap(), ) as usize, - 2 => u16::from_be_bytes( - chunk.data[buffer_offset..(buffer_offset + 2)] + + 4 => u32::from_be_bytes( + #[allow(clippy::unwrap_used)] // can't fail + chunk.data[buffer_offset..(buffer_offset + 4)] .try_into() .unwrap(), ) as usize, - 1 => chunk.data[buffer_offset] as usize, - _ => panic!("invalid length prefix size"), - }; - //re_log::debug!("nal unit size: {}", nal_unit_size); - if chunk.data.len() < nal_unit_size { - panic!( - "sample size {} is smaller than nal unit size {nal_unit_size}", - chunk.data.len() - ); - } + _ => { + return Err(Error::BadVideoData(format!( + "Bad length prefix size: {length_prefix_size}" + ))); + } + }; - nalu_stream.write_all(NAL_START_CODE)?; let data_start = buffer_offset + length_prefix_size; // Skip the size. let data_end = buffer_offset + nal_unit_size + length_prefix_size; + + if chunk.data.len() < data_end { + return Err(Error::BadVideoData("Not enough bytes to".to_owned())); + } + let data = &chunk.data[data_start..data_end]; + nalu_stream + .write_all(NAL_START_CODE) + .map_err(Error::FailedToWriteToFfmpeg)?; + // Note that we don't have to insert "emulation prevention bytes" since mp4 NALU still use them. // (unlike the NAL start code, the presentation bytes are part of the NAL spec!) - nalu_stream.write_all(data)?; + nalu_stream + .write_all(data) + .map_err(Error::FailedToWriteToFfmpeg)?; buffer_offset = data_end; } From b2b842b7ed9e530bdd9679b0a08465169fb2678d Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 9 Oct 2024 13:24:58 +0200 Subject: [PATCH 06/11] Improve log output and thread names --- .../re_video/src/decode/async_decoder.rs | 2 +- crates/store/re_video/src/decode/ffmpeg.rs | 60 ++++++++++++++++--- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/crates/store/re_video/src/decode/async_decoder.rs b/crates/store/re_video/src/decode/async_decoder.rs index bdabde53a2c6..c9165245d8ba 100644 --- a/crates/store/re_video/src/decode/async_decoder.rs +++ b/crates/store/re_video/src/decode/async_decoder.rs @@ -57,7 +57,7 @@ impl AsyncDecoder { let comms = Comms::default(); let thread = std::thread::Builder::new() - .name("av1_decoder".into()) + .name(format!("decoer thread for {debug_name}")) .spawn({ let comms = comms.clone(); move || { diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 90b99bc9c5cc..61ec07e581be 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -31,6 +31,12 @@ pub enum Error { #[error("Bad video data: {0}")] BadVideoData(String), + + #[error("FFMPEG error: {0}")] + Ffmpeg(String), + + #[error("FFMPEG IPC error: {0}")] + FfmpegSidecar(String), } /// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it. @@ -43,7 +49,6 @@ struct FrameInfo { } /// Decode H.264 video via ffmpeg over CLI - pub struct FfmpegCliH264Decoder { /// Monotonically increasing frame_num: u32, @@ -61,6 +66,8 @@ pub struct FfmpegCliH264Decoder { } impl FfmpegCliH264Decoder { + // TODO: make this robust against `pkill ffmpeg` somehow. + // Maybe `AsyncDecoder` can auto-restart us, or we wrap ourselves in a new struct that restarts us on certain errors? pub fn new(avcc: re_mp4::Avc1Box) -> Result { re_tracing::profile_function!(); @@ -108,23 +115,51 @@ fn read_ffmpeg_output( frame_info_rx: &Receiver, frame_tx: &Sender>, ) { + /// Ignore some common output from ffmpeg: + fn should_ignore_log_msg(msg: &str) -> bool { + let patterns = [ + "Duration: N/A, bitrate: N/A", + "frame= 0 fps=0.0 q=0.0 size= 0kB time=N/A bitrate=N/A speed=N/A", + "Metadata:", + "No accelerated colorspace conversion found from yuv420p to rgb24", + "Stream mapping:", + ]; + + for pattern in patterns { + if msg.contains(pattern) { + return true; + } + } + + false + } + for event in ffmpeg_iterator { #[allow(clippy::match_same_arms)] match event { FfmpegEvent::Log(LogLevel::Info, msg) => { - re_log::debug!("{msg}"); + if !should_ignore_log_msg(&msg) { + re_log::debug!("{msg}"); + } } FfmpegEvent::Log(LogLevel::Warning, msg) => { - if !msg.contains("No accelerated colorspace conversion found from yuv420p to rgb24") - { + if !should_ignore_log_msg(&msg) { re_log::warn_once!("{msg}"); } } FfmpegEvent::Log(LogLevel::Error, msg) => { - // TODO: report errors - re_log::error_once!("{msg}"); + frame_tx.send(Err(Error::Ffmpeg(msg).into())).ok(); + } + + FfmpegEvent::LogEOF => { + // This event proceeds `FfmpegEvent::Done`. + // This happens on `pkill ffmpeg`, for instance. + } + + FfmpegEvent::Error(error) => { + frame_tx.send(Err(Error::FfmpegSidecar(error).into())).ok(); } // Usefuless info in these: @@ -143,7 +178,9 @@ fn read_ffmpeg_output( .. } = stream; - re_log::debug!("ParsedInputStream {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS"); + re_log::debug!( + "Input: {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS" + ); debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); } @@ -160,7 +197,9 @@ fn read_ffmpeg_output( .. } = stream; - re_log::debug!("ParsedOutputStream {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS"); + re_log::debug!( + "Output: {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS" + ); debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); } @@ -226,9 +265,11 @@ fn read_ffmpeg_output( } FfmpegEvent::Done => { + // This happens on `pkill ffmpeg`, for instance. re_log::debug!("ffmpeg is Done"); return; } + // TODO: handle all events event => re_log::debug!("Event: {event:?}"), } @@ -350,7 +391,7 @@ fn write_avc_chunk_to_nalu_stream( let mut buffer_offset: usize = 0; let sample_end = chunk.data.len(); while buffer_offset < sample_end && !should_stop.load(Ordering::Relaxed) { - re_tracing::profile_scope!("nalu"); + re_tracing::profile_scope!("write_nalu"); // Each NAL unit in mp4 is prefixed with a length prefix. // In Annex B this doesn't exist. @@ -402,6 +443,7 @@ fn write_avc_chunk_to_nalu_stream( // Note that we don't have to insert "emulation prevention bytes" since mp4 NALU still use them. // (unlike the NAL start code, the presentation bytes are part of the NAL spec!) + re_tracing::profile_scope!("write_bytes", data.len().to_string()); nalu_stream .write_all(data) .map_err(Error::FailedToWriteToFfmpeg)?; From 26f4834ac4b88656ef8d7568ecd2fb9e0c5200c8 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 9 Oct 2024 16:20:41 +0200 Subject: [PATCH 07/11] Fix ffmpeg feature flag --- crates/store/re_video/Cargo.toml | 3 +++ crates/viewer/re_renderer/Cargo.toml | 7 ------- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/crates/store/re_video/Cargo.toml b/crates/store/re_video/Cargo.toml index c7e7e7038957..2251a5d59cfd 100644 --- a/crates/store/re_video/Cargo.toml +++ b/crates/store/re_video/Cargo.toml @@ -27,6 +27,9 @@ features = ["all"] [features] default = ["av1", "ffmpeg"] +## Native AV1 decoding. +av1 = ["dep:dav1d"] + ## Decode H.264 using ffmpeg over CLI. ffmpeg = ["dep:ffmpeg-sidecar"] diff --git a/crates/viewer/re_renderer/Cargo.toml b/crates/viewer/re_renderer/Cargo.toml index 31accb7af83d..690e7e3eb35a 100644 --- a/crates/viewer/re_renderer/Cargo.toml +++ b/crates/viewer/re_renderer/Cargo.toml @@ -35,13 +35,6 @@ default = ["import-obj", "import-gltf", "import-stl"] ## Support for Arrow datatypes for end-to-end zero-copy. arrow = ["dep:arrow2"] -<<<<<<< HEAD -======= -## Support for native AV1 video decoding. -## You need to install [nasm](https://nasm.us/) to compile with this feature. -video_av1 = ["re_video/av1"] # TODO: remove - ->>>>>>> 4ced5f84a4 (Actual H.264 video playback inside of Rerun viewer) ## Support importing .obj files import-obj = ["dep:tobj"] From 19f01adff6e436a703ca566cefdce1243c45eac4 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Fri, 25 Oct 2024 17:14:46 +0200 Subject: [PATCH 08/11] reduce ffmpeg decode delay --- crates/store/re_video/src/decode/ffmpeg.rs | 23 +++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 2d715298133d..2e7d339ff3f5 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -77,10 +77,31 @@ impl FfmpegCliH264Decoder { FfmpegCommand::new() .hide_banner() + // "Reduce the latency introduced by buffering during initial input streams analysis." + //.arg("-fflags nobuffer") + // + // .. instead use these more aggressive options found here + // https://stackoverflow.com/a/49273163 + .args([ + "-probesize", + "32", // 32 bytes is the minimum probe size. + "-analyzeduration", + "0", + ]) // Keep in mind that all arguments that are about the input, need to go before! .format("h264") // High risk here: What's is available? .input("-") // stdin is our input! - .rawvideo() // Output rgb24 on stdout. (TODO(emilk) for later: any format we can read directly on re_renderer would be better!) + // TODO: Do we have to do this instead? + // Set constant frame rate. + // We can't properly handle variable frame rate since `rawvideo` output won't report timestamps. + // To work around this we'd first need to establish a mapping of frame numbers to timestamps. + // This isn't entirely trivial since individual chunks may have arbitrary composition & decode timestamps. + //.fps_mode(1) + // + // TODO(andreas): at least do `rgba`. But we could also do `yuv420p` for instance if that's what the video is specifying + // (should be faster overall at no quality loss if the video is in this format). + // Check `ffmpeg -pix_fmts` for full list. + .rawvideo() // Output rgb24 on stdout. .spawn() .map_err(Error::FailedToStartFfmpeg)? }; From d0c9e6bba9b3c874a72de0cd3df78c4849d00d99 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Fri, 25 Oct 2024 17:14:58 +0200 Subject: [PATCH 09/11] fix re_video example build --- crates/store/re_video/examples/frames.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/store/re_video/examples/frames.rs b/crates/store/re_video/examples/frames.rs index 90da6304d5e5..0a707beb26ab 100644 --- a/crates/store/re_video/examples/frames.rs +++ b/crates/store/re_video/examples/frames.rs @@ -98,6 +98,9 @@ fn write_video_frames( re_video::PixelFormat::Rgba8Unorm => { write_ppm_rgba32(&mut file, frame.width, frame.height, &frame.data); } + re_video::PixelFormat::Yuv { .. } => { + re_log::error_once!("YUV frame writing is not not supported"); + } } } } From cb6d4d1f30ced7466c6a0b39e7adba327c25aa37 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Fri, 25 Oct 2024 17:31:39 +0200 Subject: [PATCH 10/11] add decode timestamp, nal header parsing, various comments, todo notes --- crates/store/re_video/src/decode/av1.rs | 2 +- crates/store/re_video/src/decode/ffmpeg.rs | 99 ++++++++++++++++--- crates/store/re_video/src/decode/mod.rs | 12 ++- crates/store/re_video/src/demux/mod.rs | 1 + .../re_renderer/src/video/decoder/mod.rs | 3 +- .../src/video/decoder/native_decoder.rs | 13 ++- 6 files changed, 110 insertions(+), 20 deletions(-) diff --git a/crates/store/re_video/src/decode/av1.rs b/crates/store/re_video/src/decode/av1.rs index c3259a672c6c..d892cf4fc6b0 100644 --- a/crates/store/re_video/src/decode/av1.rs +++ b/crates/store/re_video/src/decode/av1.rs @@ -232,7 +232,7 @@ fn output_picture( width: picture.width(), height: picture.height(), format, - timestamp: Time(picture.timestamp().unwrap_or(0)), + composition_timestamp: Time(picture.timestamp().unwrap_or(0)), duration: Time(picture.duration()), }; on_output(Ok(frame)); diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 2e7d339ff3f5..ffcb495d4f64 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,6 +1,6 @@ //! Send video data to `ffmpeg` over CLI to decode it. -use std::sync::atomic::Ordering; +use std::{io::Write, sync::atomic::Ordering}; use crossbeam::channel::{Receiver, Sender, TryRecvError}; use ffmpeg_sidecar::{ @@ -23,7 +23,7 @@ pub enum Error { #[error("Failed to get iterator: {0}")] NoIterator(String), - #[error("There's a bug in Rerun")] + #[error("No frame info received, this is a likely a bug in Rerun")] NoFrameInfo, #[error("Failed to write data to ffmpeg: {0}")] @@ -231,9 +231,6 @@ fn read_ffmpeg_output( } FfmpegEvent::OutputFrame(frame) => { - // NOTE: `frame.timestamp` is monotonically increasing, - // and is not the actual timestamp in the stream. - let frame_info: FrameInfo = match frame_info_rx.try_recv() { Ok(frame_info) => frame_info, @@ -257,7 +254,8 @@ fn read_ffmpeg_output( width, height, data, - .. + output_index: _, // This is the stream index. for all we do it's always 0. + timestamp: _, // This is a timestamp made up by ffmpeg_sidecar based on limited information it has. } = frame; debug_assert_eq!( @@ -276,7 +274,7 @@ fn read_ffmpeg_output( height, data, format: crate::PixelFormat::Rgb8Unorm, - timestamp: frame_info.composition_timestamp, + composition_timestamp: frame_info.composition_timestamp, duration: frame_info.duration, })) .is_err() @@ -308,7 +306,7 @@ impl SyncDecoder for FfmpegCliH264Decoder { re_tracing::profile_function!(); // First read any outstanding messages (e.g. error reports), - // so they get orderer correctly. + // so they get ordered correctly. while let Ok(frame_result) = self.frame_rx.try_recv() { if should_stop.load(Ordering::Relaxed) { return; @@ -319,6 +317,8 @@ impl SyncDecoder for FfmpegCliH264Decoder { // We send the information about this chunk first. // This assumes each sample/chunk will result in exactly one frame. // If this assumption is not held, we will get weird errors, like videos playing to slowly. + // TODO: this also assumes that the frame comes back in this order. + // Which is definitely wrong, as we know that frames are not necessarily in composition time stamp order! let frame_info = FrameInfo { frame_num: self.frame_num, composition_timestamp: chunk.composition_timestamp, @@ -343,17 +343,16 @@ impl SyncDecoder for FfmpegCliH264Decoder { ) { on_output(Err(err.into())); } + + self.ffmpeg_stdin.flush().ok(); } - // Read results and/or errors: while let Ok(frame_result) = self.frame_rx.try_recv() { if should_stop.load(Ordering::Relaxed) { return; } on_output(frame_result); } - - // TODO: block until we have processed the frame! } fn reset(&mut self) { @@ -408,7 +407,7 @@ fn write_avc_chunk_to_nalu_stream( state.previous_frame_was_idr = false; } - // A single cjhunk may consist of multiple NAL units, each of which need our special treatment. + // A single chunk may consist of multiple NAL units, each of which need our special treatment. // (most of the time it's 1:1, but there might be extra NAL units for info, especially at the start). let mut buffer_offset: usize = 0; let sample_end = chunk.data.len(); @@ -456,6 +455,13 @@ fn write_avc_chunk_to_nalu_stream( return Err(Error::BadVideoData("Not enough bytes to".to_owned())); } + let nal_header = NalHeader(chunk.data[data_start]); + re_log::trace!( + "nal_header: {:?}, {}", + nal_header.unit_type(), + nal_header.ref_idc() + ); + let data = &chunk.data[data_start..data_end]; nalu_stream @@ -475,3 +481,72 @@ fn write_avc_chunk_to_nalu_stream( Ok(()) } + +/// Possible values for `nal_unit_type` field in `nal_unit`. +/// +/// Encodes to 5 bits. +/// Via: https://docs.rs/less-avc/0.1.5/src/less_avc/nal_unit.rs.html#232 +#[derive(PartialEq, Eq)] +#[non_exhaustive] +#[repr(u8)] +#[derive(Copy, Clone, Debug)] +pub enum NalUnitType { + /// Unspecified + Unspecified = 0, + + /// Coded slice of a non-IDR picture + CodedSliceOfANonIDRPicture = 1, + + /// Coded slice data partition A + CodedSliceDataPartitionA = 2, + + /// Coded slice data partition B + CodedSliceDataPartitionB = 3, + + /// Coded slice data partition C + CodedSliceDataPartitionC = 4, + + /// Coded slice of an IDR picture + CodedSliceOfAnIDRPicture = 5, + + /// Supplemental enhancement information (SEI) + SupplementalEnhancementInformation = 6, + + /// Sequence parameter set + SequenceParameterSet = 7, + + /// Picture parameter set + PictureParameterSet = 8, + + /// Header type not listed here. + Other, +} + +/// Header of the "Network Abstraction Layer" unit that is used by H.264/AVC & H.265/HEVC. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct NalHeader(pub u8); + +impl NalHeader { + pub fn unit_type(self) -> NalUnitType { + match self.0 & 0b111 { + 0 => NalUnitType::Unspecified, + 1 => NalUnitType::CodedSliceOfANonIDRPicture, + 2 => NalUnitType::CodedSliceDataPartitionA, + 3 => NalUnitType::CodedSliceDataPartitionB, + 4 => NalUnitType::CodedSliceDataPartitionC, + 5 => NalUnitType::CodedSliceOfAnIDRPicture, + 6 => NalUnitType::SupplementalEnhancementInformation, + 7 => NalUnitType::SequenceParameterSet, + 8 => NalUnitType::PictureParameterSet, + _ => NalUnitType::Other, + } + } + + /// Ref idc is a value from 0-3 that tells us how "important" the frame/sample is. + /// + /// For details see: + /// + fn ref_idc(self) -> u8 { + (self.0 >> 5) & 0b11 + } +} diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index f9bdc7a13a8c..80c293c6f573 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -131,6 +131,8 @@ pub trait SyncDecoder { /// Submit some work and read the results. /// /// Stop early if `should_stop` is `true` or turns `true`. + /// + /// TODO: spec out how this blocks. is this viable for ffmpeg? probably not. we don't know when to block for sure. fn submit_chunk(&mut self, should_stop: &AtomicBool, chunk: Chunk, on_output: &OutputCallback); /// Clear and reset everything @@ -189,8 +191,16 @@ pub struct Chunk { pub data: Vec, + /// Decode timestamp of this sample. + /// Chunks are expected to be submitted in the order of decode timestamp. + /// + /// `decode_timestamp <= composition_timestamp` + pub decode_timestamp: Time, + /// Presentation/composition timestamp for the sample in this chunk. /// *not* decode timestamp. + /// + /// `decode_timestamp <= composition_timestamp` pub composition_timestamp: Time, pub duration: Time, @@ -202,7 +212,7 @@ pub struct Frame { pub width: u32, pub height: u32, pub format: PixelFormat, - pub timestamp: Time, + pub composition_timestamp: Time, pub duration: Time, } diff --git a/crates/store/re_video/src/demux/mod.rs b/crates/store/re_video/src/demux/mod.rs index 4a642113e39b..672d024d1b05 100644 --- a/crates/store/re_video/src/demux/mod.rs +++ b/crates/store/re_video/src/demux/mod.rs @@ -310,6 +310,7 @@ impl Sample { .to_vec(); Some(Chunk { data, + decode_timestamp: self.decode_timestamp, composition_timestamp: self.composition_timestamp, duration: self.duration, is_sync: self.is_sync, diff --git a/crates/viewer/re_renderer/src/video/decoder/mod.rs b/crates/viewer/re_renderer/src/video/decoder/mod.rs index a0fc1413d1ff..63969bf050ae 100644 --- a/crates/viewer/re_renderer/src/video/decoder/mod.rs +++ b/crates/viewer/re_renderer/src/video/decoder/mod.rs @@ -260,8 +260,9 @@ impl VideoDecoder { // = determines the decoding order of samples // // Note: `decode <= composition` for any given sample. - // For some codecs, the two timestamps are the same. + // For some codecs & videos, the two timestamps are the same. // We must enqueue samples in decode order, but show them in composition order. + // In the presence of b-frames this order may be different! // 1. Find the latest sample where `decode_timestamp <= presentation_timestamp`. // Because `decode <= composition`, we never have to look further ahead in the diff --git a/crates/viewer/re_renderer/src/video/decoder/native_decoder.rs b/crates/viewer/re_renderer/src/video/decoder/native_decoder.rs index 266a77b66712..d50620db59a6 100644 --- a/crates/viewer/re_renderer/src/video/decoder/native_decoder.rs +++ b/crates/viewer/re_renderer/src/video/decoder/native_decoder.rs @@ -46,7 +46,7 @@ impl NativeDecoder { let debug_name = debug_name.clone(); move |frame: re_video::decode::Result| match frame { Ok(frame) => { - re_log::trace!("Decoded frame at {:?}", frame.timestamp); + re_log::trace!("Decoded frame at {:?}", frame.composition_timestamp); let mut output = decoder_output.lock(); output.frames.push(frame); output.error = None; // We successfully decoded a frame, reset the error state. @@ -89,9 +89,11 @@ impl VideoChunkDecoder for NativeDecoder { let mut decoder_output = self.decoder_output.lock(); let frames = &mut decoder_output.frames; - let Some(frame_idx) = - latest_at_idx(frames, |frame| frame.timestamp, &presentation_timestamp) - else { + let Some(frame_idx) = latest_at_idx( + frames, + |frame| frame.composition_timestamp, + &presentation_timestamp, + ) else { return Err(DecodingError::EmptyBuffer); }; @@ -103,7 +105,8 @@ impl VideoChunkDecoder for NativeDecoder { let frame_idx = 0; let frame = &frames[frame_idx]; - let frame_time_range = frame.timestamp..frame.timestamp + frame.duration; + let frame_time_range = + frame.composition_timestamp..frame.composition_timestamp + frame.duration; if frame_time_range.contains(&presentation_timestamp) && video_texture.time_range != frame_time_range From 6391b850b8f2eb8320cc8d082e3cd444224da45f Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Fri, 25 Oct 2024 18:07:04 +0200 Subject: [PATCH 11/11] better timestamp syncing strategy? --- crates/store/re_video/src/decode/ffmpeg.rs | 72 ++++++++++++---------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index ffcb495d4f64..0d3c5e523156 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,8 +1,8 @@ //! Send video data to `ffmpeg` over CLI to decode it. -use std::{io::Write, sync::atomic::Ordering}; +use std::{collections::BTreeMap, io::Write, sync::atomic::Ordering}; -use crossbeam::channel::{Receiver, Sender, TryRecvError}; +use crossbeam::channel::{Receiver, Sender}; use ffmpeg_sidecar::{ command::FfmpegCommand, event::{FfmpegEvent, LogLevel}, @@ -41,11 +41,8 @@ pub enum Error { /// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it. struct FrameInfo { - /// Monotonic index, from start - frame_num: u32, - - composition_timestamp: Time, - + decode_timestamp: Time, + presentation_timestamp: Time, duration: Time, } @@ -156,6 +153,8 @@ fn read_ffmpeg_output( false } + let mut pending_frames = BTreeMap::new(); + for event in ffmpeg_iterator { #[allow(clippy::match_same_arms)] match event { @@ -231,25 +230,37 @@ fn read_ffmpeg_output( } FfmpegEvent::OutputFrame(frame) => { - let frame_info: FrameInfo = match frame_info_rx.try_recv() { - Ok(frame_info) => frame_info, - - Err(TryRecvError::Disconnected) => { - re_log::debug!("Receiver disconnected"); - return; - } - - Err(TryRecvError::Empty) => { - // This shouldn't happen - if frame_tx.send(Err(Error::NoFrameInfo.into())).is_err() { - re_log::warn!("Got no frame-info, and failed to send error"); + let frame_info = match pending_frames.pop_first() { + Some((_, frame_info)) => frame_info, + None => { + // Retrieve frame infos until decode timestamp is no longer behind composition timestamp. + // This is important because frame infos come not in in composition order, + // but ffmpeg will report frames in composition order! + loop { + let Ok(frame_info) = frame_info_rx.try_recv() else { + re_log::debug!("Receiver disconnected"); + return; + }; + + // Example how how presentation timestamps and decode timestamps can play out: + // PTS: 1 4 2 3 + // DTS: 1 2 3 4 + // Stream: I P B B + // + // Essentially we need to wait until the dts has "caught up" with the pts! + let highest_pts = pending_frames + .last_key_value() + .map_or(frame_info.presentation_timestamp, |(pts, _)| *pts); + if frame_info.decode_timestamp <= highest_pts { + break frame_info; + } + pending_frames.insert(frame_info.presentation_timestamp, frame_info); } - return; } }; let ffmpeg_sidecar::event::OutputVideoFrame { - frame_num, + frame_num: _, // This is made up by ffmpeg sidecar. pix_fmt, width, height, @@ -258,12 +269,11 @@ fn read_ffmpeg_output( timestamp: _, // This is a timestamp made up by ffmpeg_sidecar based on limited information it has. } = frame; - debug_assert_eq!( - frame_info.frame_num, frame_num, - "We are out-of-sync with ffmpeg" - ); // TODO: fix somehow - - re_log::trace!("Received frame {frame_num}: fmt {pix_fmt:?} size {width}x{height}"); + re_log::trace!( + "Received frame: dts {:?} cts {:?} fmt {pix_fmt:?} size {width}x{height}", + frame_info.decode_timestamp, + frame_info.presentation_timestamp + ); debug_assert_eq!(pix_fmt, "rgb24"); debug_assert_eq!(width as usize * height as usize * 3, data.len()); @@ -274,7 +284,7 @@ fn read_ffmpeg_output( height, data, format: crate::PixelFormat::Rgb8Unorm, - composition_timestamp: frame_info.composition_timestamp, + composition_timestamp: frame_info.presentation_timestamp, duration: frame_info.duration, })) .is_err() @@ -317,11 +327,9 @@ impl SyncDecoder for FfmpegCliH264Decoder { // We send the information about this chunk first. // This assumes each sample/chunk will result in exactly one frame. // If this assumption is not held, we will get weird errors, like videos playing to slowly. - // TODO: this also assumes that the frame comes back in this order. - // Which is definitely wrong, as we know that frames are not necessarily in composition time stamp order! let frame_info = FrameInfo { - frame_num: self.frame_num, - composition_timestamp: chunk.composition_timestamp, + presentation_timestamp: chunk.composition_timestamp, + decode_timestamp: chunk.decode_timestamp, duration: chunk.duration, };