diff --git a/Cargo.lock b/Cargo.lock index b73b09d1fbed..4a7923cfdad7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2370,6 +2370,15 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "ffmpeg-sidecar" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd1e249e0ceeb0f5c9f84a3c6941c3bde3ebc2815f4b94531a7e806af61c4c0" +dependencies = [ + "anyhow", +] + [[package]] name = "filetime" version = "0.2.25" @@ -6318,6 +6327,7 @@ dependencies = [ "criterion", "crossbeam", "econtext", + "ffmpeg-sidecar", "indicatif", "itertools 0.13.0", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index 55e8b6b6b0a7..a34ff2217acd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -178,6 +178,7 @@ econtext = "0.2" # Prints error contexts on crashes ehttp = "0.5.0" enumset = "1.0.12" env_logger = { version = "0.10", default-features = false } +ffmpeg-sidecar = "1.1.2" fixed = { version = "1.28", default-features = false } flatbuffers = "23.0" futures-channel = "0.3" diff --git a/crates/store/re_video/Cargo.toml b/crates/store/re_video/Cargo.toml index f9dd00bf199f..458d68b4403c 100644 --- a/crates/store/re_video/Cargo.toml +++ b/crates/store/re_video/Cargo.toml @@ -23,7 +23,7 @@ features = ["all"] [features] -default = ["av1"] +default = ["av1", "ffmpeg"] ## Enable serialization for data structures that support it. serde = ["dep:serde"] @@ -31,6 +31,9 @@ serde = ["dep:serde"] ## Native AV1 decoding. av1 = ["dep:dav1d"] +## Decode H.264 using ffmpeg over CLI. +ffmpeg = ["dep:ffmpeg-sidecar"] + ## Enable faster native video decoding with assembly. ## You need to install [nasm](https://nasm.us/) to compile with this feature. nasm = [ @@ -49,9 +52,11 @@ econtext.workspace = true itertools.workspace = true parking_lot.workspace = true re_mp4.workspace = true -serde = { workspace = true, optional = true } thiserror.workspace = true +ffmpeg-sidecar = { workspace = true, optional = true } +serde = { workspace = true, optional = true } + # We enable re_rav1d on native, UNLESS we're on Linux Arm64 # See https://github.com/rerun-io/rerun/issues/7755 [target.'cfg(all(not(target_arch = "wasm32"), not(all(target_os = "linux", target_arch = "aarch64"))))'.dependencies] @@ -83,6 +88,7 @@ web-sys = { workspace = true, features = [ ] } [dev-dependencies] +# For the `frames` example: indicatif.workspace = true criterion.workspace = true diff --git a/crates/store/re_video/build.rs b/crates/store/re_video/build.rs index 261ccb770ba3..e9fdfb6aa24f 100644 --- a/crates/store/re_video/build.rs +++ b/crates/store/re_video/build.rs @@ -10,5 +10,6 @@ fn main() { native: { not(target_arch = "wasm32") }, linux_arm64: { all(target_os = "linux", target_arch = "aarch64") }, with_dav1d: { all(feature = "av1", native, not(linux_arm64)) }, // https://github.com/rerun-io/rerun/issues/7755 + with_ffmpeg: { all(feature= "ffmpeg", native) } } } diff --git a/crates/store/re_video/examples/frames.rs b/crates/store/re_video/examples/frames.rs index 88a2c34213f6..ac73e0234175 100644 --- a/crates/store/re_video/examples/frames.rs +++ b/crates/store/re_video/examples/frames.rs @@ -14,6 +14,8 @@ use indicatif::ProgressBar; use parking_lot::Mutex; fn main() { + re_log::setup_logging(); + // frames let args: Vec<_> = std::env::args().collect(); let Some(video_path) = args.get(1) else { @@ -83,13 +85,20 @@ fn main() { .create(true) .truncate(true) .open(output_dir.join(format!("{i:0width$}.ppm"))) - .expect("failed to open file"); - write_binary_ppm( - &mut file, - frame.content.width, - frame.content.height, - &frame.content.data, - ); + .expect("failed to oformatpen file"); + + let frame = &frame.content; + match frame.format { + re_video::PixelFormat::Rgb8Unorm => { + write_ppm_rgb24(&mut file, frame.width, frame.height, &frame.data); + } + re_video::PixelFormat::Rgba8Unorm => { + write_ppm_rgba32(&mut file, frame.width, frame.height, &frame.data); + } + re_video::PixelFormat::Yuv { .. } => { + re_log::error_once!("YUV frame writing is not supported"); + } + } } } } @@ -98,7 +107,24 @@ fn num_digits(n: usize) -> usize { (n as f64).log10().floor() as usize + 1 } -fn write_binary_ppm(file: &mut File, width: u32, height: u32, rgba: &[u8]) { +fn write_ppm_rgb24(file: &mut File, width: u32, height: u32, rgb: &[u8]) { + assert_eq!(width as usize * height as usize * 3, rgb.len()); + + let header = format!("P6\n{width} {height}\n255\n"); + + let mut data = Vec::with_capacity(header.len() + width as usize * height as usize * 3); + data.extend_from_slice(header.as_bytes()); + + for rgb in rgb.chunks(3) { + data.extend_from_slice(&[rgb[0], rgb[1], rgb[2]]); + } + + file.write_all(&data).expect("failed to write frame data"); +} + +fn write_ppm_rgba32(file: &mut File, width: u32, height: u32, rgba: &[u8]) { + assert_eq!(width as usize * height as usize * 4, rgba.len()); + let header = format!("P6\n{width} {height}\n255\n"); let mut data = Vec::with_capacity(header.len() + width as usize * height as usize * 3); diff --git a/crates/store/re_video/src/decode/async_decoder_wrapper.rs b/crates/store/re_video/src/decode/async_decoder_wrapper.rs index d572e1cbc0c6..546a5cbffa2a 100644 --- a/crates/store/re_video/src/decode/async_decoder_wrapper.rs +++ b/crates/store/re_video/src/decode/async_decoder_wrapper.rs @@ -73,7 +73,7 @@ impl AsyncDecoderWrapper { let comms = Comms::default(); let thread = std::thread::Builder::new() - .name("av1_decoder".into()) + .name(format!("decoder of {debug_name}")) .spawn({ let comms = comms.clone(); move || { diff --git a/crates/store/re_video/src/decode/av1.rs b/crates/store/re_video/src/decode/av1.rs index 127f0ae80328..c28215ae0f95 100644 --- a/crates/store/re_video/src/decode/av1.rs +++ b/crates/store/re_video/src/decode/av1.rs @@ -76,14 +76,14 @@ impl SyncDav1dDecoder { re_tracing::profile_function!(); econtext::econtext_function_data!(format!( "chunk timestamp: {:?}", - chunk.composition_timestamp + chunk.presentation_timestamp )); re_tracing::profile_scope!("send_data"); match self.decoder.send_data( chunk.data, None, - Some(chunk.composition_timestamp.0), + Some(chunk.presentation_timestamp.0), Some(chunk.duration.0), ) { Ok(()) => {} @@ -255,6 +255,7 @@ fn create_frame(debug_name: &str, picture: &dav1d::Picture) -> Result { info: FrameInfo { presentation_timestamp: Time(picture.timestamp().unwrap_or(0)), duration: Time(picture.duration()), + ..Default::default() }, }) } diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs new file mode 100644 index 000000000000..8fdfe466e256 --- /dev/null +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -0,0 +1,779 @@ +//! Send video data to `ffmpeg` over CLI to decode it. + +use std::{ + collections::BTreeMap, + io::Write, + sync::{atomic::AtomicBool, Arc}, +}; + +use crossbeam::channel::{Receiver, Sender}; +use ffmpeg_sidecar::{ + child::FfmpegChild, + command::FfmpegCommand, + event::{FfmpegEvent, LogLevel}, +}; + +use crate::Time; + +use super::{AsyncDecoder, Chunk, Frame, OutputCallback}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Failed to start ffmppeg: {0}")] + FailedToStartFfmpeg(std::io::Error), + + #[error("Failed to get stdin handle")] + NoStdin, + + #[error("Failed to get iterator: {0}")] + NoIterator(String), + + #[error("No frame info received, this is a likely a bug in Rerun")] + NoFrameInfo, + + #[error("Failed to write data to ffmpeg: {0}")] + FailedToWriteToFfmpeg(std::io::Error), + + #[error("Bad video data: {0}")] + BadVideoData(String), + + #[error("FFMPEG error: {0}")] + Ffmpeg(String), + + #[error("FFMPEG fatal error: {0}")] + FfmpegFatal(String), + + #[error("FFMPEG IPC error: {0}")] + FfmpegSidecar(String), + + #[error("FFMPEG exited unexpectedly with code {0:?}")] + FfmpegUnexpectedExit(Option), + + #[error("FFMPEG output a non-image chunk when we expected only images.")] + UnexpectedFfmpegOutputChunk, + + #[error("Failed to send video frame info to the ffmpeg read thread.")] + BrokenFrameInfoChannel, +} + +impl From for super::Error { + fn from(err: Error) -> Self { + Self::Ffmpeg(std::sync::Arc::new(err)) + } +} + +/// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it. +#[derive(Clone)] +struct FfmpegFrameInfo { + presentation_timestamp: Time, + duration: Time, + decode_timestamp: Time, +} + +enum FfmpegFrameData { + Chunk(Chunk), + EndOfStream, +} + +struct FfmpegProcessAndListener { + ffmpeg: FfmpegChild, + + /// For sending frame timestamps to the ffmpeg listener thread. + frame_info_tx: Sender, + + /// For sending chunks to the ffmpeg write thread. + frame_data_tx: Sender, + + listen_thread: Option>, + write_thread: Option>, + + /// If true, the write thread will not report errors. Used upon exit, so the write thread won't log spam on the hung up stdin. + suppress_write_error_reports: Arc, +} + +impl FfmpegProcessAndListener { + fn new( + debug_name: &str, + on_output: Arc, + avcc: re_mp4::Avc1Box, + ) -> Result { + re_tracing::profile_function!(); + + let mut ffmpeg = FfmpegCommand::new() + .hide_banner() + // "Reduce the latency introduced by buffering during initial input streams analysis." + //.arg("-fflags nobuffer") + // + // .. instead use these more aggressive options found here + // https://stackoverflow.com/a/49273163 + .args([ + "-probesize", + "32", // 32 bytes is the minimum probe size. + "-analyzeduration", + "0", + ]) + // Keep in mind that all arguments that are about the input, need to go before! + .format("h264") // TODO(andreas): should we check ahead of time whether this is available? + //.fps_mode("0") + .input("-") // stdin is our input! + // h264 bitstreams doesn't have timestamp information. Whatever ffmpeg tries to make up about timing & framerates is wrong! + // If we don't tell it to just pass the frames through, variable framerate (VFR) video will just not play at all. + .fps_mode("passthrough") + // TODO(andreas): at least do `rgba`. But we could also do `yuv420p` for instance if that's what the video is specifying + // (should be faster overall at no quality loss if the video is in this format). + // Check `ffmpeg -pix_fmts` for full list. + .rawvideo() // Output rgb24 on stdout. + .spawn() + .map_err(Error::FailedToStartFfmpeg)?; + + let ffmpeg_iterator = ffmpeg + .iter() + .map_err(|err| Error::NoIterator(err.to_string()))?; + + let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded(); + let (frame_data_tx, frame_data_rx) = crossbeam::channel::unbounded(); + + let listen_thread = std::thread::Builder::new() + .name(format!("ffmpeg-reader for {debug_name}")) + .spawn({ + let on_output = on_output.clone(); + let debug_name = debug_name.to_owned(); + move || { + read_ffmpeg_output( + &debug_name, + ffmpeg_iterator, + &frame_info_rx, + on_output.as_ref(), + ); + } + }) + .expect("Failed to spawn ffmpeg listener thread"); + + let suppress_write_error_reports = Arc::new(AtomicBool::new(false)); + let write_thread = std::thread::Builder::new() + .name(format!("ffmpeg-writer for {debug_name}")) + .spawn({ + let ffmpeg_stdin = ffmpeg.take_stdin().ok_or(Error::NoStdin)?; + let suppress_write_error_reports = suppress_write_error_reports.clone(); + move || { + write_ffmpeg_input( + ffmpeg_stdin, + &frame_data_rx, + on_output.as_ref(), + &avcc, + &suppress_write_error_reports, + ); + } + }) + .expect("Failed to spawn ffmpeg writer thread"); + + Ok(Self { + ffmpeg, + frame_info_tx, + frame_data_tx, + listen_thread: Some(listen_thread), + write_thread: Some(write_thread), + suppress_write_error_reports, + }) + } +} + +impl Drop for FfmpegProcessAndListener { + fn drop(&mut self) { + re_tracing::profile_function!(); + self.suppress_write_error_reports + .store(true, std::sync::atomic::Ordering::Relaxed); + self.frame_data_tx.send(FfmpegFrameData::EndOfStream).ok(); + self.ffmpeg.kill().ok(); + + if let Some(write_thread) = self.write_thread.take() { + if write_thread.join().is_err() { + re_log::error!("Failed to join ffmpeg listener thread."); + } + } + if let Some(listen_thread) = self.listen_thread.take() { + if listen_thread.join().is_err() { + re_log::error!("Failed to join ffmpeg listener thread."); + } + } + } +} + +fn write_ffmpeg_input( + mut ffmpeg_stdin: std::process::ChildStdin, + frame_data_rx: &Receiver, + on_output: &OutputCallback, + avcc: &re_mp4::Avc1Box, + suppress_write_error_reports: &AtomicBool, +) { + let mut state = NaluStreamState::default(); + + while let Ok(data) = frame_data_rx.recv() { + let chunk = match data { + FfmpegFrameData::Chunk(chunk) => chunk, + FfmpegFrameData::EndOfStream => break, + }; + + if let Err(err) = + write_avc_chunk_to_nalu_stream(avcc, &mut ffmpeg_stdin, &chunk, &mut state) + { + let write_error = matches!(err, Error::FailedToWriteToFfmpeg(_)); + if !write_error + || !suppress_write_error_reports.load(std::sync::atomic::Ordering::Relaxed) + { + (on_output)(Err(err.into())); + } + + // This is unlikely to improve! Ffmpeg process likely died. + // By exiting here we hang up on the channel, making future attempts to push into it fail which should cause a reset eventually. + if write_error { + return; + } + } else { + ffmpeg_stdin.flush().ok(); + } + } +} + +fn read_ffmpeg_output( + debug_name: &str, + ffmpeg_iterator: ffmpeg_sidecar::iter::FfmpegIterator, + frame_info_rx: &Receiver, + on_output: &OutputCallback, +) { + /// Ignore some common output from ffmpeg: + fn should_ignore_log_msg(msg: &str) -> bool { + let patterns = [ + "Duration: N/A, bitrate: N/A", + "frame= 0 fps=0.0 q=0.0 size= 0kB time=N/A bitrate=N/A speed=N/A", + "encoder : ", // Describes the encoder that was used to encode a video. + "Metadata:", + // TODO(andreas): we should just handle yuv420p directly! + "No accelerated colorspace conversion found from yuv420p to rgb24", + "Stream mapping:", + // We actually don't even want it to estimate a framerate! + "not enough frames to estimate rate", + ]; + + for pattern in patterns { + if msg.contains(pattern) { + return true; + } + } + + false + } + + // Pending frames, sorted by their presentation timestamp. + let mut pending_frame_infos = BTreeMap::new(); + let mut highest_dts = Time::MIN; // Highest dts encountered so far. + + for event in ffmpeg_iterator { + #[allow(clippy::match_same_arms)] + match event { + FfmpegEvent::Log(LogLevel::Info, msg) => { + if !should_ignore_log_msg(&msg) { + re_log::trace!("{debug_name} decoder: {msg}"); + } + } + + FfmpegEvent::Log(LogLevel::Warning, mut msg) => { + if !should_ignore_log_msg(&msg) { + // Make warn_once work on `[swscaler @ 0x148db8000]` style warnings even if the address is different every time. + if let Some(pos) = msg.find("[swscaler @ 0x") { + msg = [ + &msg[..pos], + &msg[(pos + "[swscaler @ 0x148db8000]".len())..], + ] + .join("[swscaler]"); + }; + re_log::warn_once!("{debug_name} decoder: {msg}"); + } + } + + FfmpegEvent::Log(LogLevel::Error, msg) => { + on_output(Err(Error::Ffmpeg(msg).into())); + } + + FfmpegEvent::Log(LogLevel::Fatal, msg) => { + on_output(Err(Error::FfmpegFatal(msg).into())); + return; + } + + FfmpegEvent::Log(LogLevel::Unknown, msg) => { + if msg.contains("system signals, hard exiting") { + // That was probably us, killing the process. + re_log::debug!("ffmpeg process for {debug_name} was killed"); + return; + } + if !should_ignore_log_msg(&msg) { + re_log::warn_once!("{debug_name} decoder: {msg}"); + } + } + + FfmpegEvent::LogEOF => { + // This event proceeds `FfmpegEvent::Done`. + // This happens on `pkill ffmpeg`, for instance. + } + + FfmpegEvent::Error(error) => { + // An error in ffmpeg sidecar itself, rather than ffmpeg. + on_output(Err(Error::FfmpegSidecar(error).into())); + } + + FfmpegEvent::ParsedInput(input) => { + re_log::trace!("{debug_name} decoder: {input:?}"); + } + + FfmpegEvent::ParsedOutput(output) => { + re_log::trace!("{debug_name} decoder: {output:?}"); + } + + FfmpegEvent::ParsedStreamMapping(_) => { + // This reports what input streams ffmpeg maps to which output streams. + // Very unspectecular in our case as know that we map h264 video to raw video. + } + + FfmpegEvent::ParsedInputStream(stream) => { + let ffmpeg_sidecar::event::AVStream { + stream_type, + format, + pix_fmt, // Often 'yuv420p' + width, + height, + fps, + .. + } = stream; + + re_log::trace!( + "{debug_name} decoder input: {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS" + ); + + debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); + } + + FfmpegEvent::ParsedOutputStream(stream) => { + // This just repeats what we told ffmpeg to output, e.g. "rawvideo rgb24" + let ffmpeg_sidecar::event::AVStream { + stream_type, + format, + pix_fmt, + width, + height, + fps, + .. + } = stream; + re_log::trace!( + "{debug_name} decoder output: {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS" + ); + + debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); + } + + FfmpegEvent::Progress(_) => { + // We can get out frame number etc here to know how far behind we are. + // By default this triggers every 5s. + } + + FfmpegEvent::OutputFrame(frame) => { + // We input frames into ffmpeg in decode (DTS) order, and so that's + // also the order we will receive the `FrameInfo`s from `frame_info_rx`. + // However, `ffmpeg` will re-order the frames to output them in presentation (PTS) order. + // We want to accurately match the `FrameInfo` with its corresponding output frame. + // To do that, we need to buffer frames that come out of ffmpeg. + // + // How do we know how large this buffer needs to be? + // Whenever the highest known DTS is behind the PTS, we need to wait until the DTS catches up. + // Otherwise, we'd assign the wrong PTS to the frame that just came in. + // + // Example how presentation timestamps and decode timestamps + // can play out in the presence of B-frames to illustrate this: + // PTS: 1 4 2 3 + // DTS: 1 2 3 4 + // Stream: I P B B + let frame_info = loop { + let oldest_pts_in_buffer = + pending_frame_infos.first_key_value().map(|(pts, _)| *pts); + let is_caught_up = oldest_pts_in_buffer.is_some_and(|pts| pts <= highest_dts); + if is_caught_up { + // There must be an element here, otherwise we wouldn't be here. + #[allow(clippy::unwrap_used)] + break pending_frame_infos.pop_first().unwrap().1; + } else { + // We're behind: + let Ok(frame_info) = frame_info_rx.try_recv() else { + re_log::debug!( + "{debug_name} ffmpeg decoder frame info channel disconnected" + ); + return; + }; + + // If the decodetimestamp did not increase, we're probably seeking backwards! + // We'd expect the video player to do a reset prior to that and close the channel as part of that, but we may not have noticed that in here yet! + // In any case, we'll have to just run with this as the new highest timestamp, not much else we can do. + if highest_dts > frame_info.decode_timestamp { + re_log::warn!("Video decode timestamps are expected to monotonically increase unless there was a decoder reset.\n\ + It went from {:?} to {:?} for the decoder of {debug_name}. This is probably a bug in Rerun.", highest_dts, frame_info.decode_timestamp); + } + highest_dts = frame_info.decode_timestamp; + + pending_frame_infos.insert(frame_info.presentation_timestamp, frame_info); + } + }; + + let ffmpeg_sidecar::event::OutputVideoFrame { + frame_num: _, // This is made up by ffmpeg sidecar. + pix_fmt, + width, + height, + data, + output_index: _, // This is the stream index. for all we do it's always 0. + timestamp: _, // This is a timestamp made up by ffmpeg_sidecar based on limited information it has. + } = frame; + + re_log::trace!( + "{debug_name} received frame: dts {:?} cts {:?} fmt {pix_fmt:?} size {width}x{height}", + frame_info.decode_timestamp, + frame_info.presentation_timestamp + ); + + debug_assert_eq!(pix_fmt, "rgb24"); + debug_assert_eq!(width as usize * height as usize * 3, data.len()); + + on_output(Ok(super::Frame { + content: super::FrameContent { + data, + width, + height, + format: crate::PixelFormat::Rgb8Unorm, + }, + info: super::FrameInfo { + presentation_timestamp: frame_info.presentation_timestamp, + duration: frame_info.duration, + latest_decode_timestamp: Some(frame_info.decode_timestamp), + }, + })); + } + + FfmpegEvent::Done => { + // This happens on `pkill ffmpeg`, for instance. + re_log::debug!("{debug_name}'s ffmpeg is Done"); + return; + } + + FfmpegEvent::ParsedVersion(ffmpeg_version) => { + re_log::debug_once!("ffmpeg version is: {}", ffmpeg_version.version); + } + + FfmpegEvent::ParsedConfiguration(ffmpeg_configuration) => { + re_log::debug_once!( + "ffmpeg configuration: {:?}", + ffmpeg_configuration.configuration + ); + } + + FfmpegEvent::ParsedDuration(_) => { + // ffmpeg has no way of knowing the duration of the stream. Whatever it might make up is wrong. + } + + FfmpegEvent::OutputChunk(_) => { + // Something went seriously wrong if we end up here. + re_log::error!("Unexpected ffmpeg output chunk for {debug_name}"); + on_output(Err(Error::UnexpectedFfmpegOutputChunk.into())); + return; + } + } + } +} + +/// Decode H.264 video via ffmpeg over CLI +pub struct FfmpegCliH264Decoder { + debug_name: String, + ffmpeg: FfmpegProcessAndListener, + avcc: re_mp4::Avc1Box, + on_output: Arc, +} + +impl FfmpegCliH264Decoder { + pub fn new( + debug_name: String, + avcc: re_mp4::Avc1Box, + on_output: impl Fn(super::Result) + Send + Sync + 'static, + ) -> Result { + re_tracing::profile_function!(); + + let on_output = Arc::new(on_output); + let ffmpeg = FfmpegProcessAndListener::new(&debug_name, on_output.clone(), avcc.clone())?; + + Ok(Self { + debug_name, + ffmpeg, + avcc, + on_output, + }) + } +} + +impl AsyncDecoder for FfmpegCliH264Decoder { + fn submit_chunk(&mut self, chunk: super::Chunk) -> super::Result<()> { + re_tracing::profile_function!(); + + // We send the information about this chunk first. + // Chunks are defined to always yield a single frame. + let frame_info = FfmpegFrameInfo { + presentation_timestamp: chunk.presentation_timestamp, + decode_timestamp: chunk.decode_timestamp, + duration: chunk.duration, + }; + let chunk = FfmpegFrameData::Chunk(chunk); + + if self.ffmpeg.frame_info_tx.send(frame_info).is_err() + || self.ffmpeg.frame_data_tx.send(chunk).is_err() + { + let err = super::Error::Ffmpeg(Arc::new( + if let Ok(exit_code) = self.ffmpeg.ffmpeg.as_inner_mut().try_wait() { + Error::FfmpegUnexpectedExit(exit_code) + } else { + Error::BrokenFrameInfoChannel + }, + )); + + // Report the error on the decoding stream. + (self.on_output)(Err(err.clone())); + return Err(err); + } + + Ok(()) + } + + fn reset(&mut self) -> super::Result<()> { + re_log::debug!("Resetting ffmpeg decoder {}", self.debug_name); + self.ffmpeg = FfmpegProcessAndListener::new( + &self.debug_name, + self.on_output.clone(), + self.avcc.clone(), + )?; + Ok(()) + } +} + +/// Before every NAL unit, here is a nal start code. +/// Can also be 2 bytes of 0x00 and 1 byte of 0x01. +/// +/// This is used in byte stream formats such as h264 files. +/// Packet transform systems (RTP) may omit these. +pub const NAL_START_CODE: &[u8] = &[0x00, 0x00, 0x00, 0x01]; + +#[derive(Default)] +struct NaluStreamState { + previous_frame_was_idr: bool, +} + +fn write_bytes(stream: &mut dyn std::io::Write, data: &[u8]) -> Result<(), Error> { + stream.write_all(data).map_err(Error::FailedToWriteToFfmpeg) +} + +fn write_avc_chunk_to_nalu_stream( + avcc: &re_mp4::Avc1Box, + nalu_stream: &mut dyn std::io::Write, + chunk: &super::Chunk, + state: &mut NaluStreamState, +) -> Result<(), Error> { + re_tracing::profile_function!(); + let avcc = &avcc.avcc; + + // We expect the stream of chunks to not have any SPS (Sequence Parameter Set) & PPS (Picture Parameter Set) + // just as it is the case with MP4 data. + // In order to have every IDR frame be able to be fully re-entrant, we need to prepend the SPS & PPS NAL units. + // Otherwise the decoder is not able to get the necessary information about how the video stream is encoded. + if chunk.is_sync && !state.previous_frame_was_idr { + for sps in &avcc.sequence_parameter_sets { + nalu_stream + .write_all(NAL_START_CODE) + .map_err(Error::FailedToWriteToFfmpeg)?; + nalu_stream + .write_all(&sps.bytes) + .map_err(Error::FailedToWriteToFfmpeg)?; + } + for pps in &avcc.picture_parameter_sets { + nalu_stream + .write_all(NAL_START_CODE) + .map_err(Error::FailedToWriteToFfmpeg)?; + nalu_stream + .write_all(&pps.bytes) + .map_err(Error::FailedToWriteToFfmpeg)?; + } + state.previous_frame_was_idr = true; + } else { + state.previous_frame_was_idr = false; + } + + // A single chunk may consist of multiple NAL units, each of which need our special treatment. + // (most of the time it's 1:1, but there might be extra NAL units for info, especially at the start). + let mut buffer_offset: usize = 0; + let sample_end = chunk.data.len(); + while buffer_offset < sample_end { + re_tracing::profile_scope!("write_nalu"); + + // Each NAL unit in mp4 is prefixed with a length prefix. + // In Annex B this doesn't exist. + let length_prefix_size = avcc.length_size_minus_one as usize + 1; + + if sample_end < buffer_offset + length_prefix_size { + return Err(Error::BadVideoData( + "Not enough bytes to fit the length prefix".to_owned(), + )); + } + + let nal_unit_size = match length_prefix_size { + 1 => chunk.data[buffer_offset] as usize, + + 2 => u16::from_be_bytes( + #[allow(clippy::unwrap_used)] // can't fail + chunk.data[buffer_offset..(buffer_offset + 2)] + .try_into() + .unwrap(), + ) as usize, + + 4 => u32::from_be_bytes( + #[allow(clippy::unwrap_used)] // can't fail + chunk.data[buffer_offset..(buffer_offset + 4)] + .try_into() + .unwrap(), + ) as usize, + + _ => { + return Err(Error::BadVideoData(format!( + "Bad length prefix size: {length_prefix_size}" + ))); + } + }; + + let data_start = buffer_offset + length_prefix_size; // Skip the size. + let data_end = buffer_offset + nal_unit_size + length_prefix_size; + + if chunk.data.len() < data_end { + return Err(Error::BadVideoData("Not enough bytes to".to_owned())); + } + + let nal_header = NalHeader(chunk.data[data_start]); + re_log::trace!( + "nal_header: {:?}, {}", + nal_header.unit_type(), + nal_header.ref_idc() + ); + + let data = &chunk.data[data_start..data_end]; + + write_bytes(nalu_stream, NAL_START_CODE)?; + + // Note that we don't have to insert "emulation prevention bytes" since mp4 NALU still use them. + // (unlike the NAL start code, the presentation bytes are part of the NAL spec!) + + re_tracing::profile_scope!("write_bytes", data.len().to_string()); + write_bytes(nalu_stream, data)?; + + buffer_offset = data_end; + } + + // Write an Access Unit Delimiter (AUD) NAL unit to the stream to signal the end of an access unit. + // This can help with ffmpeg picking up NALs right away before seeing the next chunk. + write_bytes(nalu_stream, NAL_START_CODE)?; + write_bytes( + nalu_stream, + &[ + NalHeader::new(NalUnitType::AccessUnitDelimiter, 3).0, + // Two arbitrary bytes? 0000 worked as well, but this is what + // https://stackoverflow.com/a/44394025/ uses. Couldn't figure out the rules for this. + 0xFF, + 0x80, + ], + )?; + + Ok(()) +} + +/// Possible values for `nal_unit_type` field in `nal_unit`. +/// +/// Encodes to 5 bits. +/// Via: +/// * +/// * +#[derive(PartialEq, Eq)] +#[non_exhaustive] +#[repr(u8)] +#[derive(Copy, Clone, Debug)] +pub enum NalUnitType { + /// Unspecified + Unspecified = 0, + + /// Coded slice of a non-IDR picture + CodedSliceOfANonIDRPicture = 1, + + /// Coded slice data partition A + CodedSliceDataPartitionA = 2, + + /// Coded slice data partition B + CodedSliceDataPartitionB = 3, + + /// Coded slice data partition C + CodedSliceDataPartitionC = 4, + + /// Coded slice of an IDR picture + CodedSliceOfAnIDRPicture = 5, + + /// Supplemental enhancement information (SEI) + SupplementalEnhancementInformation = 6, + + /// Sequence parameter set + SequenceParameterSet = 7, + + /// Picture parameter set + PictureParameterSet = 8, + + /// Signals the end of a NAL unit. + AccessUnitDelimiter = 9, + + EndSequence = 10, + EndStream = 11, + FillerData = 12, + SequenceParameterSetExt = 13, + + /// Header type not listed here. + Other, +} + +/// Header of the "Network Abstraction Layer" unit that is used by H.264/AVC & H.265/HEVC. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct NalHeader(pub u8); + +impl NalHeader { + pub const fn new(unit_type: NalUnitType, ref_idc: u8) -> Self { + Self((unit_type as u8) | (ref_idc << 5)) + } + + pub fn unit_type(self) -> NalUnitType { + match self.0 & 0b11111 { + 0 => NalUnitType::Unspecified, + 1 => NalUnitType::CodedSliceOfANonIDRPicture, + 2 => NalUnitType::CodedSliceDataPartitionA, + 3 => NalUnitType::CodedSliceDataPartitionB, + 4 => NalUnitType::CodedSliceDataPartitionC, + 5 => NalUnitType::CodedSliceOfAnIDRPicture, + 6 => NalUnitType::SupplementalEnhancementInformation, + 7 => NalUnitType::SequenceParameterSet, + 8 => NalUnitType::PictureParameterSet, + 9 => NalUnitType::AccessUnitDelimiter, + 10 => NalUnitType::EndSequence, + 11 => NalUnitType::EndStream, + 12 => NalUnitType::FillerData, + 13 => NalUnitType::SequenceParameterSetExt, + _ => NalUnitType::Other, + } + } + + /// Ref idc is a value from 0-3 that tells us how "important" the frame/sample is. + pub fn ref_idc(self) -> u8 { + (self.0 >> 5) & 0b11 + } +} diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index 2649095fe056..6b50ac7a87bb 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -81,13 +81,14 @@ mod async_decoder_wrapper; #[cfg(with_dav1d)] mod av1; - +#[cfg(with_ffmpeg)] +mod ffmpeg; #[cfg(target_arch = "wasm32")] mod webcodecs; use crate::Time; -#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +#[derive(thiserror::Error, Debug, Clone)] pub enum Error { #[error("Unsupported codec: {0}")] UnsupportedCodec(String), @@ -112,6 +113,10 @@ pub enum Error { #[error(transparent)] WebDecoder(#[from] webcodecs::Error), + #[cfg(with_ffmpeg)] + #[error(transparent)] + Ffmpeg(std::sync::Arc), + #[error("Unsupported bits per component: {0}")] BadBitsPerComponent(usize), } @@ -181,22 +186,40 @@ pub fn new_decoder( } } + #[cfg(with_ffmpeg)] + re_mp4::StsdBoxContent::Avc1(avc1_box) => { + re_log::trace!("Decoding H.264…"); + Ok(Box::new(ffmpeg::FfmpegCliH264Decoder::new( + debug_name.to_owned(), + avc1_box.clone(), + on_output, + )?)) + } + _ => Err(Error::UnsupportedCodec(video.human_readable_codec_string())), } } -/// One chunk of encoded video data; usually one frame. +/// One chunk of encoded video data, representing a single [`crate::Sample`]. /// -/// One loaded [`crate::Sample`]. +/// For details on how to interpret the data, see [`crate::Sample`]. pub struct Chunk { /// The start of a new [`crate::demux::GroupOfPictures`]? pub is_sync: bool, pub data: Vec, - /// Presentation/composition timestamp for the sample in this chunk. - /// *not* decode timestamp. - pub composition_timestamp: Time, + /// Decode timestamp of this sample. + /// Chunks are expected to be submitted in the order of decode timestamp. + /// + /// `decode_timestamp <= presentation_timestamp` + pub decode_timestamp: Time, + + /// Presentation timestamp for the sample in this chunk. + /// Often synonymous with `composition_timestamp`. + /// + /// `decode_timestamp <= presentation_timestamp` + pub presentation_timestamp: Time, pub duration: Time, } @@ -229,6 +252,11 @@ pub struct FrameInfo { /// A duration of [`Time::MAX`] indicates that the frame is invalid or not yet available. // Implementation note: unlike with presentation timestamp we may be able fine with making this optional. pub duration: Time, + + /// The decode timestamp of the last chunk that was needed to decode this frame. + /// + /// None indicates that the information is not available. + pub latest_decode_timestamp: Option