Skip to content

Commit

Permalink
Adding a new rrdp stream data source
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Oct 25, 2024
1 parent d3dd82f commit 23458eb
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 7 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5293,6 +5293,7 @@ dependencies = [
"re_log_encoding",
"re_log_types",
"re_remote_store_types",
"re_rrdp_comms",
"re_smart_channel",
"re_tracing",
"re_ws_comms",
Expand Down Expand Up @@ -5718,6 +5719,15 @@ dependencies = [
"zip",
]

[[package]]
name = "re_rrdp_comms"
version = "0.20.0-alpha.1+dev"
dependencies = [
"re_log",
"re_log_types",
"re_smart_channel",
]

[[package]]
name = "re_sdk"
version = "0.20.0-alpha.1+dev"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ re_format_arrow = { path = "crates/store/re_format_arrow", version = "=0.20.0-al
re_log_encoding = { path = "crates/store/re_log_encoding", version = "=0.20.0-alpha.1", default-features = false }
re_log_types = { path = "crates/store/re_log_types", version = "=0.20.0-alpha.1", default-features = false }
re_query = { path = "crates/store/re_query", version = "=0.20.0-alpha.1", default-features = false }
re_rrdp_comms = { path = "crates/store/re_rrdp_comms", version = "=0.20.0-alpha.1", default-features = false }
re_remote_store_types = { path = "crates/store/re_remote_store_types", version = "=0.20.0-alpha.1", default-features = false }
re_sdk_comms = { path = "crates/store/re_sdk_comms", version = "=0.20.0-alpha.1", default-features = false }
re_types = { path = "crates/store/re_types", version = "=0.20.0-alpha.1", default-features = false }
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_data_source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ all-features = true
default = []

## Enable the rrdp data source.
rrdp = ["dep:re_remote_store_types"]
rrdp = ["dep:re_remote_store_types", "dep:re_rrdp_comms"]


[dependencies]
Expand All @@ -44,6 +44,7 @@ rayon.workspace = true

# Optional dependencies:
re_remote_store_types = { workspace = true, optional = true }
re_rrdp_comms = { workspace = true, optional = true }

[build-dependencies]
re_build_tools.workspace = true
10 changes: 10 additions & 0 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ impl DataSource {

let path = std::path::Path::new(&uri).to_path_buf();

#[cfg(feature = "rrdp")]
if uri.starts_with("rrdp://") {
return Self::RrdpUrl { url: uri };
}

if uri.starts_with("file://") || path.exists() {
Self::FilePath(file_source, path)
} else if uri.starts_with("http://")
Expand Down Expand Up @@ -131,6 +136,8 @@ impl DataSource {
Self::WebSocketAddr(_) => None,
#[cfg(not(target_arch = "wasm32"))]
Self::Stdin => None,
#[cfg(feature = "rrdp")]
Self::RrdpUrl { .. } => None, // TODO(jleibs): This needs to come from the RRDP server.
}
}

Expand Down Expand Up @@ -235,6 +242,9 @@ impl DataSource {

Ok(rx)
}

#[cfg(feature = "rrdp")]
Self::RrdpUrl { url } => Ok(re_rrdp_comms::stream_recording(url, on_msg)),
}
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/utils/re_smart_channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ workspace = true
[package.metadata.docs.rs]
all-features = true


[dependencies]
re_tracing.workspace = true

Expand Down
19 changes: 16 additions & 3 deletions crates/utils/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ pub enum SmartChannelSource {

/// The channel was created in the context of streaming in RRD data from standard input.
Stdin,

/// The data is streaming in directly from an RRDP server.
RrdpStream {
/// Should include `rrdp://` prefix.
url: String,
},
}

impl std::fmt::Display for SmartChannelSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::File(path) => path.display().fmt(f),
Self::RrdHttpStream { url, follow: _ } => url.fmt(f),
Self::RrdHttpStream { url, follow: _ } | Self::RrdpStream { url } => url.fmt(f),
Self::RrdWebEventListener => "Web event listener".fmt(f),
Self::JsChannel { channel_name } => write!(f, "Javascript channel: {channel_name}"),
Self::Sdk => "SDK".fmt(f),
Expand All @@ -86,7 +92,8 @@ impl SmartChannelSource {
Self::RrdHttpStream { .. }
| Self::WsClient { .. }
| Self::JsChannel { .. }
| Self::TcpServer { .. } => true,
| Self::TcpServer { .. }
| Self::RrdpStream { .. } => true,
}
}
}
Expand Down Expand Up @@ -142,14 +149,20 @@ pub enum SmartMessageSource {

/// The data is streaming in from standard input.
Stdin,

/// The data is streaming in directly from an RRDP server.
RrdpStream {
/// Should include `rrdp://` prefix.
url: String,
},
}

impl std::fmt::Display for SmartMessageSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&match self {
Self::Unknown => "unknown".into(),
Self::File(path) => format!("file://{}", path.to_string_lossy()),
Self::RrdHttpStream { url } => url.clone(),
Self::RrdHttpStream { url } | Self::RrdpStream { url } => url.clone(),
Self::RrdWebEventCallback => "web_callback".into(),
Self::JsChannelPush => "javascript".into(),
Self::Sdk => "sdk".into(),
Expand Down
5 changes: 4 additions & 1 deletion crates/viewer/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,9 @@ impl App {
// That's the case of `SmartChannelSource::RrdHttpStream`.
// TODO(emilk): exactly what things get kept and what gets cleared?
self.rx.retain(|r| match r.source() {
SmartChannelSource::File(_) | SmartChannelSource::RrdHttpStream { .. } => false,
SmartChannelSource::File(_)
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::RrdpStream { .. } => false,

SmartChannelSource::WsClient { .. }
| SmartChannelSource::JsChannel { .. }
Expand Down Expand Up @@ -1425,6 +1427,7 @@ impl App {
match &*source {
SmartChannelSource::File(_)
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::RrdpStream { .. }
| SmartChannelSource::Stdin
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::Sdk
Expand Down
1 change: 1 addition & 0 deletions crates/viewer/re_viewer/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ fn recording_config_entry<'cfgs>(
// We assume the `RrdHttpStream` is a done recording.
re_smart_channel::SmartChannelSource::File(_)
| re_smart_channel::SmartChannelSource::RrdHttpStream { follow: false, .. }
| re_smart_channel::SmartChannelSource::RrdpStream { .. }
| re_smart_channel::SmartChannelSource::RrdWebEventListener => PlayState::Playing,

// Live data - follow it!
Expand Down
1 change: 1 addition & 0 deletions crates/viewer/re_viewer/src/ui/recordings_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ fn loading_receivers_ui(ctx: &ViewerContext<'_>, rx: &ReceiveSet<LogMsg>, ui: &m
// We only show things we know are very-soon-to-be recordings:
SmartChannelSource::File(path) => format!("Loading {}…", path.display()),
SmartChannelSource::RrdHttpStream { url, .. } => format!("Loading {url}…"),
SmartChannelSource::RrdpStream { url } => format!("Loading {url}…"),

SmartChannelSource::RrdWebEventListener
| SmartChannelSource::JsChannel { .. }
Expand Down
5 changes: 4 additions & 1 deletion crates/viewer/re_viewer/src/ui/top_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
match source.as_ref() {
SmartChannelSource::File(_)
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::RrdpStream { .. }
| SmartChannelSource::Stdin => {
false // These show up in the recordings panel as a "Loading…" in `recordings_panel.rs`
}
Expand Down Expand Up @@ -198,6 +199,7 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
SmartChannelSource::File(_)
| SmartChannelSource::Stdin
| SmartChannelSource::RrdHttpStream { .. }
| SmartChannelSource::RrdpStream { .. }
| SmartChannelSource::RrdWebEventListener
| SmartChannelSource::JsChannel { .. }
| SmartChannelSource::Sdk
Expand All @@ -221,7 +223,8 @@ fn connection_status_ui(ui: &mut egui::Ui, rx: &ReceiveSet<re_log_types::LogMsg>
format!("Loading {}…", path.display())
}
re_smart_channel::SmartChannelSource::Stdin => "Loading stdin…".to_owned(),
re_smart_channel::SmartChannelSource::RrdHttpStream { url, .. } => {
re_smart_channel::SmartChannelSource::RrdHttpStream { url, .. }
| re_smart_channel::SmartChannelSource::RrdpStream { url } => {
format!("Loading {url}…")
}
re_smart_channel::SmartChannelSource::RrdWebEventListener
Expand Down
1 change: 1 addition & 0 deletions crates/viewer/re_viewer/src/viewer_analytics/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub fn open_recording(
let data_source = entity_db.data_source.as_ref().map(|v| match v {
re_smart_channel::SmartChannelSource::File(_) => "file", // .rrd, .png, .glb, …
re_smart_channel::SmartChannelSource::RrdHttpStream { .. } => "http",
re_smart_channel::SmartChannelSource::RrdpStream { .. } => "rrdp",
re_smart_channel::SmartChannelSource::RrdWebEventListener { .. } => "web_event",
re_smart_channel::SmartChannelSource::JsChannel { .. } => "javascript", // mediated via rerun-js
re_smart_channel::SmartChannelSource::Sdk => "sdk", // show()
Expand Down

0 comments on commit 23458eb

Please sign in to comment.