Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stalls on ffmpeg based decoder reset #7998

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 116 additions & 56 deletions crates/store/re_video/src/decode/ffmpeg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{
collections::BTreeMap,
io::Write,
process::ChildStdin,
sync::{atomic::AtomicBool, Arc},
};

Expand All @@ -12,6 +12,7 @@ use ffmpeg_sidecar::{
command::FfmpegCommand,
event::{FfmpegEvent, LogLevel},
};
use parking_lot::Mutex;

use crate::Time;

Expand Down Expand Up @@ -87,6 +88,36 @@ enum FfmpegFrameData {
EndOfStream,
}

/// Wraps an stdin with a shared shutdown boolean.
struct StdinWithShutdown {
shutdown: Arc<AtomicBool>,
stdin: ChildStdin,
}

impl StdinWithShutdown {
// Don't use `std::io::ErrorKind::Interrupted` because it has special meaning for default implementations of the `Write` trait,
// causing it to continue.
const SHUTDOWN_ERROR_KIND: std::io::ErrorKind = std::io::ErrorKind::Other;
}

impl std::io::Write for StdinWithShutdown {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.shutdown.load(std::sync::atomic::Ordering::Acquire) {
Err(std::io::Error::new(Self::SHUTDOWN_ERROR_KIND, "shutdown"))
} else {
self.stdin.write(buf)
}
}

fn flush(&mut self) -> std::io::Result<()> {
if self.shutdown.load(std::sync::atomic::Ordering::Acquire) {
Err(std::io::Error::new(Self::SHUTDOWN_ERROR_KIND, "shutdown"))
} else {
self.stdin.flush()
}
}
}

struct FfmpegProcessAndListener {
ffmpeg: FfmpegChild,

Expand All @@ -100,7 +131,10 @@ struct FfmpegProcessAndListener {
write_thread: Option<std::thread::JoinHandle<()>>,

/// If true, the write thread will not report errors. Used upon exit, so the write thread won't log spam on the hung up stdin.
suppress_write_error_reports: Arc<AtomicBool>,
stdin_shutdown: Arc<AtomicBool>,

/// On output instance used by the threads.
on_output: Arc<Mutex<Option<Arc<OutputCallback>>>>,
}

impl FfmpegProcessAndListener {
Expand Down Expand Up @@ -151,6 +185,12 @@ impl FfmpegProcessAndListener {
let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded();
let (frame_data_tx, frame_data_rx) = crossbeam::channel::unbounded();

let stdin_shutdown = Arc::new(AtomicBool::new(false));

// Mutex protect `on_output` so that we can shut down the threads at a defined point in time at which we
// no longer receive any new frames or errors from this process.
let on_output = Arc::new(Mutex::new(Some(on_output)));

let listen_thread = std::thread::Builder::new()
.name(format!("ffmpeg-reader for {debug_name}"))
.spawn({
Expand All @@ -166,20 +206,21 @@ impl FfmpegProcessAndListener {
}
})
.expect("Failed to spawn ffmpeg listener thread");

let suppress_write_error_reports = Arc::new(AtomicBool::new(false));
let write_thread = std::thread::Builder::new()
.name(format!("ffmpeg-writer for {debug_name}"))
.spawn({
let on_output = on_output.clone();
let ffmpeg_stdin = ffmpeg.take_stdin().ok_or(Error::NoStdin)?;
let suppress_write_error_reports = suppress_write_error_reports.clone();
let mut ffmpeg_stdin = StdinWithShutdown {
stdin: ffmpeg_stdin,
shutdown: stdin_shutdown.clone(),
};
move || {
write_ffmpeg_input(
ffmpeg_stdin,
&mut ffmpeg_stdin,
&frame_data_rx,
on_output.as_ref(),
&avcc,
&suppress_write_error_reports,
);
}
})
Expand All @@ -191,38 +232,65 @@ impl FfmpegProcessAndListener {
frame_data_tx,
listen_thread: Some(listen_thread),
write_thread: Some(write_thread),
suppress_write_error_reports,
stdin_shutdown,
on_output,
})
}
}

impl Drop for FfmpegProcessAndListener {
fn drop(&mut self) {
re_tracing::profile_function!();
self.suppress_write_error_reports
.store(true, std::sync::atomic::Ordering::Relaxed);

// Stop all outputs from being written to - any attempt from here on out will fail and cause thread shutdown.
// This way, we ensure all ongoing writes are finished and won't get any more on_output callbacks from this process
// before we take any other action on the shutdown sequence.
{
self.on_output.lock().take();
}

// Notify (potentially wake up) the stdin write thread to stop it (it might be sleeping).
self.frame_data_tx.send(FfmpegFrameData::EndOfStream).ok();
// Kill stdin for the write thread. This helps cancelling ongoing stream write operations.
self.stdin_shutdown
.store(true, std::sync::atomic::Ordering::Release);

// Kill the ffmpeg process itself.
// This should wake up the listen thread if it is sleeping, but that may take a while.
self.ffmpeg.kill().ok();

if let Some(write_thread) = self.write_thread.take() {
if write_thread.join().is_err() {
re_log::error!("Failed to join ffmpeg listener thread.");
// Unfortunately, even with the above measures, it can still happen that the listen threads take occasionally 100ms and more to shut down.
// (very much depending on the system & OS, typical times may be low with large outliers)
// It is crucial that the threads come down eventually and rather timely so to avoid leaking resources.
// However, in order to avoid stalls, we'll let them finish in parallel.
//
// Since we disconnected the `on_output` callback from them, they won't influence any new instances.
if false {
{
re_tracing::profile_scope!("shutdown write thread");
if let Some(write_thread) = self.write_thread.take() {
if write_thread.join().is_err() {
re_log::error!("Failed to join ffmpeg listener thread.");
}
}
}
}
if let Some(listen_thread) = self.listen_thread.take() {
if listen_thread.join().is_err() {
re_log::error!("Failed to join ffmpeg listener thread.");
{
re_tracing::profile_scope!("shutdown listen thread");
if let Some(listen_thread) = self.listen_thread.take() {
if listen_thread.join().is_err() {
re_log::error!("Failed to join ffmpeg listener thread.");
}
}
}
}
}
}

fn write_ffmpeg_input(
mut ffmpeg_stdin: std::process::ChildStdin,
ffmpeg_stdin: &mut dyn std::io::Write,
frame_data_rx: &Receiver<FfmpegFrameData>,
on_output: &OutputCallback,
on_output: &Mutex<Option<Arc<OutputCallback>>>,
avcc: &re_mp4::Avc1Box,
suppress_write_error_reports: &AtomicBool,
) {
let mut state = NaluStreamState::default();

Expand All @@ -232,19 +300,18 @@ fn write_ffmpeg_input(
FfmpegFrameData::EndOfStream => break,
};

if let Err(err) =
write_avc_chunk_to_nalu_stream(avcc, &mut ffmpeg_stdin, &chunk, &mut state)
{
let write_error = matches!(err, Error::FailedToWriteToFfmpeg(_));
if !write_error
|| !suppress_write_error_reports.load(std::sync::atomic::Ordering::Relaxed)
{
(on_output)(Err(err.into()));
}
if let Err(err) = write_avc_chunk_to_nalu_stream(avcc, ffmpeg_stdin, &chunk, &mut state) {
let on_output = on_output.lock();
if let Some(on_output) = on_output.as_ref() {
let write_error = matches!(err, Error::FailedToWriteToFfmpeg(_));
on_output(Err(err.into()));

// This is unlikely to improve! Ffmpeg process likely died.
// By exiting here we hang up on the channel, making future attempts to push into it fail which should cause a reset eventually.
if write_error {
if write_error {
// This is unlikely to improve! Ffmpeg process likely died.
// By exiting here we hang up on the channel, making future attempts to push into it fail which should cause a reset eventually.
return;
}
} else {
return;
}
} else {
Expand All @@ -257,8 +324,8 @@ fn read_ffmpeg_output(
debug_name: &str,
ffmpeg_iterator: ffmpeg_sidecar::iter::FfmpegIterator,
frame_info_rx: &Receiver<FfmpegFrameInfo>,
on_output: &OutputCallback,
) {
on_output: &Mutex<Option<Arc<OutputCallback>>>,
) -> Option<()> {
/// Ignore some common output from ffmpeg:
fn should_ignore_log_msg(msg: &str) -> bool {
let patterns = [
Expand Down Expand Up @@ -310,19 +377,18 @@ fn read_ffmpeg_output(
}

FfmpegEvent::Log(LogLevel::Error, msg) => {
on_output(Err(Error::Ffmpeg(msg).into()));
(on_output.lock().as_ref()?)(Err(Error::Ffmpeg(msg).into()));
}

FfmpegEvent::Log(LogLevel::Fatal, msg) => {
on_output(Err(Error::FfmpegFatal(msg).into()));
return;
(on_output.lock().as_ref()?)(Err(Error::FfmpegFatal(msg).into()));
}

FfmpegEvent::Log(LogLevel::Unknown, msg) => {
if msg.contains("system signals, hard exiting") {
// That was probably us, killing the process.
re_log::debug!("FFmpeg process for {debug_name} was killed");
return;
return None;
}
if !should_ignore_log_msg(&msg) {
re_log::warn_once!("{debug_name} decoder: {msg}");
Expand All @@ -336,7 +402,7 @@ fn read_ffmpeg_output(

FfmpegEvent::Error(error) => {
// An error in ffmpeg sidecar itself, rather than ffmpeg.
on_output(Err(Error::FfmpegSidecar(error).into()));
(on_output.lock().as_ref()?)(Err(Error::FfmpegSidecar(error).into()));
}

FfmpegEvent::ParsedInput(input) => {
Expand Down Expand Up @@ -423,7 +489,7 @@ fn read_ffmpeg_output(
re_log::debug!(
"{debug_name} ffmpeg decoder frame info channel disconnected"
);
return;
return None;
};

// If the decodetimestamp did not increase, we're probably seeking backwards!
Expand Down Expand Up @@ -458,7 +524,7 @@ fn read_ffmpeg_output(
debug_assert_eq!(pix_fmt, "rgb24");
debug_assert_eq!(width as usize * height as usize * 3, data.len());

on_output(Ok(super::Frame {
(on_output.lock().as_ref()?)(Ok(super::Frame {
content: super::FrameContent {
data,
width,
Expand All @@ -476,7 +542,7 @@ fn read_ffmpeg_output(
FfmpegEvent::Done => {
// This happens on `pkill ffmpeg`, for instance.
re_log::debug!("{debug_name}'s ffmpeg is Done");
return;
return None;
}

FfmpegEvent::ParsedVersion(ffmpeg_version) => {
Expand All @@ -497,11 +563,13 @@ fn read_ffmpeg_output(
FfmpegEvent::OutputChunk(_) => {
// Something went seriously wrong if we end up here.
re_log::error!("Unexpected ffmpeg output chunk for {debug_name}");
on_output(Err(Error::UnexpectedFfmpegOutputChunk.into()));
return;
(on_output.lock().as_ref()?)(Err(Error::UnexpectedFfmpegOutputChunk.into()));
return None;
}
}
}

Some(())
}

/// Decode H.264 video via ffmpeg over CLI
Expand Down Expand Up @@ -606,20 +674,12 @@ fn write_avc_chunk_to_nalu_stream(
// Otherwise the decoder is not able to get the necessary information about how the video stream is encoded.
if chunk.is_sync && !state.previous_frame_was_idr {
for sps in &avcc.sequence_parameter_sets {
nalu_stream
.write_all(NAL_START_CODE)
.map_err(Error::FailedToWriteToFfmpeg)?;
nalu_stream
.write_all(&sps.bytes)
.map_err(Error::FailedToWriteToFfmpeg)?;
write_bytes(nalu_stream, NAL_START_CODE)?;
write_bytes(nalu_stream, &sps.bytes)?;
}
for pps in &avcc.picture_parameter_sets {
nalu_stream
.write_all(NAL_START_CODE)
.map_err(Error::FailedToWriteToFfmpeg)?;
nalu_stream
.write_all(&pps.bytes)
.map_err(Error::FailedToWriteToFfmpeg)?;
write_bytes(nalu_stream, NAL_START_CODE)?;
write_bytes(nalu_stream, &pps.bytes)?;
}
state.previous_frame_was_idr = true;
} else {
Expand Down
Loading