Skip to content

Commit

Permalink
Enrich ReDap catalog with RecordingUri on the fly
Browse files Browse the repository at this point in the history
  • Loading branch information
zehiko committed Dec 11, 2024
1 parent 3aaeb46 commit 3607ab5
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5932,6 +5932,7 @@ dependencies = [
"re_log_types",
"re_protos",
"re_smart_channel",
"re_types",
"thiserror",
"tokio",
"tokio-stream",
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_grpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ re_log_encoding.workspace = true
re_log_types.workspace = true
re_protos.workspace = true
re_smart_channel.workspace = true
re_types.workspace = true

thiserror.workspace = true
tokio-stream.workspace = true
Expand Down
69 changes: 62 additions & 7 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
mod address;

pub use address::{InvalidRedapAddress, RedapAddress};
use re_chunk::external::arrow2;
use re_log_types::external::re_types_core::ComponentDescriptor;
use re_types::components::RecordingUri;
use re_types::Component;
use url::Url;

// ----------------------------------------------------------------------------

use std::error::Error;

use re_chunk::Chunk;
use arrow2::array::Utf8Array as Arrow2Utf8Array;
use arrow2::datatypes::Field as Arrow2Field;
use re_chunk::{Arrow2Array, Chunk};
use re_log_encoding::codec::{wire::decode, CodecError};
use re_log_types::{
ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time,
Expand Down Expand Up @@ -64,6 +70,9 @@ enum StreamError {

#[error(transparent)]
ChunkError(#[from] re_chunk::ChunkError),

#[error("Invalid URI: {0}")]
InvalidUri(String),
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -179,7 +188,7 @@ async fn stream_recording_async(
let store_id = StoreId::from_string(StoreKind::Recording, recording_id.clone());

let store_info = StoreInfo {
application_id: ApplicationId::from("rerun_data_platform"),
application_id: ApplicationId::from("redap_recording"),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
Expand Down Expand Up @@ -221,9 +230,6 @@ async fn stream_recording_async(
Ok(())
}

/// TODO(zehiko) - this is a copy of `stream_recording_async` with a different gRPC call,
/// this will go away as we tackle unification of data and metadata streams REDAP #74, hence
/// avoiding refactoring right now
async fn stream_catalog_async(
tx: re_smart_channel::Sender<LogMsg>,
redap_endpoint: Url,
Expand Down Expand Up @@ -273,7 +279,7 @@ async fn stream_catalog_async(
let store_id = StoreId::from_string(StoreKind::Recording, "catalog".to_owned());

let store_info = StoreInfo {
application_id: ApplicationId::from("rerun_data_platform"),
application_id: ApplicationId::from("redap_catalog"),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
Expand All @@ -296,7 +302,56 @@ async fn stream_catalog_async(
re_log::info!("Starting to read...");
while let Some(result) = resp.next().await {
let tc = result.map_err(TonicStatusError)?;
let chunk = Chunk::from_transport(&tc)?;
let mut chunk = Chunk::from_transport(&tc)?;

// enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know)
// and the recording id (that we have in the catalog data)
let host = redap_endpoint
.host()
.ok_or(StreamError::InvalidUri(format!(
"couldn't get host from {redap_endpoint}"
)))?;
let port = redap_endpoint
.port()
.ok_or(StreamError::InvalidUri(format!(
"couldn't get port from {redap_endpoint}"
)))?;

let recording_uri_arrays: Vec<Box<dyn Arrow2Array>> = chunk
.iter_component_arrays(&"id".into())
.map(|id| {
let rec_id = id
.as_any()
.downcast_ref::<Arrow2Utf8Array<i32>>()
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: format!("id must be a utf8 array: {:?}", tc.schema),
}))?
.value(0); // each component batch is of length 1 i.e. single 'id' value

let recording_uri = format!("rerun://{host}:{port}/recording/{rec_id}");

let recording_uri_data = Arrow2Utf8Array::<i32>::from([Some(recording_uri)]);

Ok::<Box<_>, StreamError>(
Box::new(recording_uri_data) as Box<dyn arrow2::array::Array>
)
})
.collect::<Result<Vec<_>, _>>()?;

let recording_id_arrays = recording_uri_arrays
.iter()
.map(|e| Some(e.as_ref()))
.collect::<Vec<_>>();

let rec_id_field = Arrow2Field::new("item", arrow2::datatypes::DataType::Utf8, true);
#[allow(clippy::unwrap_used)] // we know we've given the right field type
let uris = re_chunk::util::arrays_to_list_array(
rec_id_field.data_type().clone(),
&recording_id_arrays,
)
.unwrap();

chunk.add_component(ComponentDescriptor::new(RecordingUri::name()), uris)?;

if tx
.send(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?))
Expand Down

0 comments on commit 3607ab5

Please sign in to comment.