Skip to content

Commit

Permalink
Introduce retryable rrd file reader #4056
Browse files Browse the repository at this point in the history
We stop streaming data from an .rrd file that is still being written
once we reach EOF. As a first simple remediation we introduce a
retryable reader that will try to read more data from the file
indefinitely.
  • Loading branch information
zehiko committed Sep 20, 2024
1 parent e12e809 commit d68346a
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4944,6 +4944,7 @@ dependencies = [
"ahash",
"anyhow",
"image",
"notify",
"once_cell",
"parking_lot",
"rayon",
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ ndarray = "0.16"
ndarray-rand = "0.15"
never = "0.1"
nohash-hasher = "0.2"
notify = "6.0"
notify = {version = "6.1.1", features = ["macos_kqueue"]}
num-derive = "0.4"
num-traits = "0.2"
once_cell = "1.17" # No lazy_static - use `std::sync::OnceLock` or `once_cell` instead
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_data_loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ anyhow.workspace = true
arrow2.workspace = true
image.workspace = true
once_cell.workspace = true
notify.workspace = true
parking_lot.workspace = true
rayon.workspace = true
thiserror.workspace = true
Expand Down
131 changes: 116 additions & 15 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
use std::{
io::Read,
path::Path,
sync::mpsc::{channel, Receiver},
};

use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use re_log_encoding::decoder::Decoder;

use crate::DataLoaderError;

// ---

/// Loads data from any `rrd` file or in-memory contents.
Expand Down Expand Up @@ -36,22 +45,48 @@ impl crate::DataLoader for RrdLoader {
);

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let file = std::fs::File::open(&filepath)
.with_context(|| format!("Failed to open file {filepath:?}"))?;
let file = std::io::BufReader::new(file);

let decoder = re_log_encoding::decoder::Decoder::new(version_policy, file)?;

// NOTE: This is IO bound, it must run on a dedicated thread, not the shared rayon thread pool.
std::thread::Builder::new()
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
}
})
.with_context(|| format!("Failed to open spawn IO thread for {filepath:?}"))?;
match extension.as_str() {
"rbl" => {
let file = std::fs::File::open(&filepath)
.with_context(|| format!("Failed to open file {filepath:?}"))?;
let file = std::io::BufReader::new(file);

let decoder = Decoder::new(version_policy, file)?;

// NOTE: This is IO bound, it must run on a dedicated thread, not the shared rayon thread pool.
std::thread::Builder::new()
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
}
})
.with_context(|| format!("Failed to open spawn IO thread for {filepath:?}"))?;
}
"rrd" => {
// for .rrd files we retry reading despite reaching EOF to support live (writer) streaming
// TODO (#4056) instead of indefinitely retrying and keeping the file open we should introduce
// a new "eof marker" message header in the encoding and handle it accordingly in the Decoder.
let retryable_reader = RetryableFileReader::new(&filepath).with_context(|| {
format!("failed to create retryable file reader for {filepath:?}")
})?;
let decoder = Decoder::new(version_policy, retryable_reader)?;

// NOTE: This is IO bound, it must run on a dedicated thread, not the shared rayon thread pool.
std::thread::Builder::new()
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
}
})
.with_context(|| format!("Failed to open spawn IO thread for {filepath:?}"))?;
}
_ => unreachable!(),
}

Ok(())
}
Expand Down Expand Up @@ -110,3 +145,69 @@ fn decode_and_stream<R: std::io::Read>(
}
}
}

// Retryable file reader that keeps retrying to read more data despite
// reading zero bytes or reaching EOF.
struct RetryableFileReader {
reader: std::io::BufReader<std::fs::File>,
rx: Receiver<notify::Result<Event>>,
#[allow(dead_code)]
watcher: RecommendedWatcher,
}

impl RetryableFileReader {
fn new(filepath: &Path) -> Result<Self, DataLoaderError> {
use anyhow::Context as _;

let file = std::fs::File::open(filepath)
.with_context(|| format!("Failed to open file {filepath:?}"))?;
let reader = std::io::BufReader::new(file);

let (tx, rx) = channel();
let mut watcher = notify::recommended_watcher(tx)
.with_context(|| format!("failed to create file watcher for {filepath:?}"))?;

watcher
.watch(filepath, RecursiveMode::NonRecursive)
.with_context(|| format!("failed to to watch file changes on {filepath:?}"))?;

Ok(Self {
reader,
rx,
watcher,
})
}
}

impl Read for RetryableFileReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
loop {
match self.reader.read(buf) {
Ok(0) => match self.rx.recv() {
Ok(Ok(event)) => match event.kind {
EventKind::Remove(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"file removed",
))
}
_ => continue,
},
Ok(Err(e)) => {
return Err(std::io::Error::new(std::io::ErrorKind::Other, e));
}
Err(e) => {
return Err(std::io::Error::new(std::io::ErrorKind::Other, e));
}
},
Ok(n) => {
return Ok(n);
}
Err(err) => match err.kind() {
std::io::ErrorKind::Interrupted => continue,
_ => return Err(err),
},
}
}
}
}

0 comments on commit d68346a

Please sign in to comment.