Skip to content

Commit

Permalink
Better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Oct 9, 2024
1 parent 1f93c03 commit fa091c5
Showing 1 changed file with 92 additions and 35 deletions.
127 changes: 92 additions & 35 deletions crates/store/re_video/src/decode/ffmpeg.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Send video data to `ffmpeg` over CLI to decode it.
use std::sync::atomic::Ordering;

use crossbeam::channel::{Receiver, Sender, TryRecvError};
use ffmpeg_sidecar::{
command::FfmpegCommand,
Expand All @@ -8,7 +10,7 @@ use ffmpeg_sidecar::{

use crate::Time;

use super::{Frame, Result, SyncDecoder};
use super::{Frame, SyncDecoder};

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand All @@ -23,6 +25,12 @@ pub enum Error {

#[error("There's a bug in Rerun")]
NoFrameInfo,

#[error("Failed to write data to ffmpeg: {0}")]
FailedToWriteToFfmpeg(std::io::Error),

#[error("Bad video data: {0}")]
BadVideoData(String),
}

/// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it.
Expand Down Expand Up @@ -53,7 +61,7 @@ pub struct FfmpegCliH264Decoder {
}

impl FfmpegCliH264Decoder {
pub fn new(avcc: re_mp4::Avc1Box) -> Result<Self> {
pub fn new(avcc: re_mp4::Avc1Box) -> Result<Self, Error> {
re_tracing::profile_function!();

let mut ffmpeg = {
Expand Down Expand Up @@ -236,29 +244,53 @@ impl SyncDecoder for FfmpegCliH264Decoder {
) {
re_tracing::profile_function!();

// NOTE: this assumes each sample/chunk will result in exactly one frame.
self.frame_info_tx.send(FrameInfo {
// First read any outstanding messages (e.g. error reports),
// so they get orderer correctly.
while let Ok(frame_result) = self.frame_rx.try_recv() {
if should_stop.load(Ordering::Relaxed) {
return;
}
on_output(frame_result);
}

// We send the information about this chunk first.
// This assumes each sample/chunk will result in exactly one frame.
// If this assumption is not held, we will get weird errors, like videos playing to slowly.
let frame_info = FrameInfo {
frame_num: self.frame_num,
timestamp: chunk.timestamp,
duration: chunk.duration,
});
};

// NOTE: a 60 FPS video can go for two years before wrapping a u32.
self.frame_num = self.frame_num.wrapping_add(1);

let mut state = NaluStreamState::default();
write_avc_chunk_to_nalu_stream(&self.avcc, &mut self.ffmpeg_stdin, &chunk, &mut state)
.unwrap();
// consider writing samples while at the same time reading frames, for even lower latency
// and maybe reuse the same ffmpeg process.
if self.frame_info_tx.send(frame_info).is_err() {
// The other thread must be down, e.g. because `ffmpeg` crashed.
// It should already have reported that as an error - no need to repeat it here.
} else {
// Write chunk to ffmpeg:
let mut state = NaluStreamState::default(); // TODO: remove state?
if let Err(err) = write_avc_chunk_to_nalu_stream(
should_stop,
&self.avcc,
&mut self.ffmpeg_stdin,
&chunk,
&mut state,
) {
on_output(Err(err.into()));
}
}

// TODO: handle errors
// Read results and/or errors:
while let Ok(frame_result) = self.frame_rx.try_recv() {
if should_stop.load(std::sync::atomic::Ordering::Relaxed) {
if should_stop.load(Ordering::Relaxed) {
return;
}
on_output(frame_result);
}

// TODO: block until we have processed the frame!
}

fn reset(&mut self) {
Expand All @@ -279,11 +311,12 @@ struct NaluStreamState {
}

fn write_avc_chunk_to_nalu_stream(
should_stop: &std::sync::atomic::AtomicBool,
avcc: &re_mp4::Avc1Box,
nalu_stream: &mut dyn std::io::Write,
chunk: &super::Chunk,
state: &mut NaluStreamState,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Error> {
re_tracing::profile_function!();
let avcc = &avcc.avcc;

Expand All @@ -292,12 +325,20 @@ fn write_avc_chunk_to_nalu_stream(
// TODO(andreas): Should we detect this rather from the NALU stream rather than the samples?
if chunk.is_sync && !state.previous_frame_was_idr {
for sps in &avcc.sequence_parameter_sets {
nalu_stream.write_all(NAL_START_CODE)?;
nalu_stream.write_all(&sps.bytes)?;
nalu_stream
.write_all(NAL_START_CODE)
.map_err(Error::FailedToWriteToFfmpeg)?;
nalu_stream
.write_all(&sps.bytes)
.map_err(Error::FailedToWriteToFfmpeg)?;
}
for pps in &avcc.picture_parameter_sets {
nalu_stream.write_all(NAL_START_CODE)?;
nalu_stream.write_all(&pps.bytes)?;
nalu_stream
.write_all(NAL_START_CODE)
.map_err(Error::FailedToWriteToFfmpeg)?;
nalu_stream
.write_all(&pps.bytes)
.map_err(Error::FailedToWriteToFfmpeg)?;
}
state.previous_frame_was_idr = true;
} else {
Expand All @@ -308,46 +349,62 @@ fn write_avc_chunk_to_nalu_stream(
// (most of the time it's 1:1, but there might be extra NAL units for info, especially at the start).
let mut buffer_offset: usize = 0;
let sample_end = chunk.data.len();
while buffer_offset < sample_end {
while buffer_offset < sample_end && !should_stop.load(Ordering::Relaxed) {
re_tracing::profile_scope!("nalu");

// Each NAL unit in mp4 is prefixed with a length prefix.
// In Annex B this doesn't exist.
let length_prefix_size = avcc.length_size_minus_one as usize + 1;

// TODO: improve the error handling here.
if sample_end < buffer_offset + length_prefix_size {
return Err(Error::BadVideoData(
"Not enough bytes to fit the length prefix".to_owned(),
));
}

let nal_unit_size = match length_prefix_size {
4 => u32::from_be_bytes(
chunk.data[buffer_offset..(buffer_offset + 4)]
1 => chunk.data[buffer_offset] as usize,

2 => u16::from_be_bytes(
#[allow(clippy::unwrap_used)] // can't fail
chunk.data[buffer_offset..(buffer_offset + 2)]
.try_into()
.unwrap(),
) as usize,
2 => u16::from_be_bytes(
chunk.data[buffer_offset..(buffer_offset + 2)]

4 => u32::from_be_bytes(
#[allow(clippy::unwrap_used)] // can't fail
chunk.data[buffer_offset..(buffer_offset + 4)]
.try_into()
.unwrap(),
) as usize,
1 => chunk.data[buffer_offset] as usize,
_ => panic!("invalid length prefix size"),
};
//re_log::debug!("nal unit size: {}", nal_unit_size);

if chunk.data.len() < nal_unit_size {
panic!(
"sample size {} is smaller than nal unit size {nal_unit_size}",
chunk.data.len()
);
}
_ => {
return Err(Error::BadVideoData(format!(
"Bad length prefix size: {length_prefix_size}"
)));
}
};

nalu_stream.write_all(NAL_START_CODE)?;
let data_start = buffer_offset + length_prefix_size; // Skip the size.
let data_end = buffer_offset + nal_unit_size + length_prefix_size;

if chunk.data.len() < data_end {
return Err(Error::BadVideoData("Not enough bytes to".to_owned()));
}

let data = &chunk.data[data_start..data_end];

nalu_stream
.write_all(NAL_START_CODE)
.map_err(Error::FailedToWriteToFfmpeg)?;

// Note that we don't have to insert "emulation prevention bytes" since mp4 NALU still use them.
// (unlike the NAL start code, the presentation bytes are part of the NAL spec!)

nalu_stream.write_all(data)?;
nalu_stream
.write_all(data)
.map_err(Error::FailedToWriteToFfmpeg)?;

buffer_offset = data_end;
}
Expand Down

0 comments on commit fa091c5

Please sign in to comment.