From 7b769586d9ad767825833644a80e8118d2f022b8 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 12 Dec 2024 11:23:13 +0100 Subject: [PATCH] Enrich ReDap Catalog with RecordingUri on the fly (#8418) As of right now, only the viewer is fully aware of Data Platform's endpoint, so we leverage that to enrich Catalog view with RecordingUri for each recording. We also add the chunk id to make received record batch convertible to a Chunk. --- Cargo.lock | 1 + crates/store/re_grpc_client/Cargo.toml | 1 + crates/store/re_grpc_client/src/lib.rs | 78 +++++++++++++++++++++++--- 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f3a411f3fb9..bd21dfd45a73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5931,6 +5931,7 @@ dependencies = [ "re_log_types", "re_protos", "re_smart_channel", + "re_types", "thiserror", "tokio", "tokio-stream", diff --git a/crates/store/re_grpc_client/Cargo.toml b/crates/store/re_grpc_client/Cargo.toml index 665e7fc6faa9..efb330fc306e 100644 --- a/crates/store/re_grpc_client/Cargo.toml +++ b/crates/store/re_grpc_client/Cargo.toml @@ -27,6 +27,7 @@ re_log_encoding.workspace = true re_log_types.workspace = true re_protos.workspace = true re_smart_channel.workspace = true +re_types.workspace = true thiserror.workspace = true tokio-stream.workspace = true diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 478312bb6aaa..d6e7b3605e07 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -3,13 +3,19 @@ mod address; pub use address::{InvalidRedapAddress, RedapAddress}; +use re_chunk::external::arrow2; +use re_log_types::external::re_types_core::ComponentDescriptor; +use re_types::components::RecordingUri; +use re_types::Component; use url::Url; // ---------------------------------------------------------------------------- use std::error::Error; -use re_chunk::Chunk; +use arrow2::array::Utf8Array as Arrow2Utf8Array; +use arrow2::datatypes::Field as Arrow2Field; +use re_chunk::{Arrow2Array, Chunk, ChunkId, TransportChunk}; use re_log_encoding::codec::{wire::decode, CodecError}; use re_log_types::{ ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time, @@ -64,6 +70,9 @@ enum StreamError { #[error(transparent)] ChunkError(#[from] re_chunk::ChunkError), + + #[error("Invalid URI: {0}")] + InvalidUri(String), } // ---------------------------------------------------------------------------- @@ -179,7 +188,7 @@ async fn stream_recording_async( let store_id = StoreId::from_string(StoreKind::Recording, recording_id.clone()); let store_info = StoreInfo { - application_id: ApplicationId::from("rerun_data_platform"), + application_id: ApplicationId::from("redap_recording"), store_id: store_id.clone(), cloned_from: None, is_official_example: false, @@ -221,9 +230,6 @@ async fn stream_recording_async( Ok(()) } -/// TODO(zehiko) - this is a copy of `stream_recording_async` with a different gRPC call, -/// this will go away as we tackle unification of data and metadata streams REDAP #74, hence -/// avoiding refactoring right now async fn stream_catalog_async( tx: re_smart_channel::Sender, redap_endpoint: Url, @@ -273,7 +279,7 @@ async fn stream_catalog_async( let store_id = StoreId::from_string(StoreKind::Recording, "catalog".to_owned()); let store_info = StoreInfo { - application_id: ApplicationId::from("rerun_data_platform"), + application_id: ApplicationId::from("redap_catalog"), store_id: store_id.clone(), cloned_from: None, is_official_example: false, @@ -295,8 +301,64 @@ async fn stream_catalog_async( re_log::info!("Starting to read..."); while let Some(result) = resp.next().await { - let tc = result.map_err(TonicStatusError)?; - let chunk = Chunk::from_transport(&tc)?; + let mut tc = result.map_err(TonicStatusError)?; + // received TransportChunk doesn't have ChunkId, hence we need to add it before converting + // to Chunk + tc.schema.metadata.insert( + TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(), + ChunkId::new().to_string(), + ); + + let mut chunk = Chunk::from_transport(&tc)?; + + // enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know) + // and the recording id (that we have in the catalog data) + let host = redap_endpoint + .host() + .ok_or(StreamError::InvalidUri(format!( + "couldn't get host from {redap_endpoint}" + )))?; + let port = redap_endpoint + .port() + .ok_or(StreamError::InvalidUri(format!( + "couldn't get port from {redap_endpoint}" + )))?; + + let recording_uri_arrays: Vec> = chunk + .iter_component_arrays(&"id".into()) + .map(|id| { + let rec_id = id + .as_any() + .downcast_ref::>() + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: format!("id must be a utf8 array: {:?}", tc.schema), + }))? + .value(0); // each component batch is of length 1 i.e. single 'id' value + + let recording_uri = format!("rerun://{host}:{port}/recording/{rec_id}"); + + let recording_uri_data = Arrow2Utf8Array::::from([Some(recording_uri)]); + + Ok::, StreamError>( + Box::new(recording_uri_data) as Box + ) + }) + .collect::, _>>()?; + + let recording_id_arrays = recording_uri_arrays + .iter() + .map(|e| Some(e.as_ref())) + .collect::>(); + + let rec_id_field = Arrow2Field::new("item", arrow2::datatypes::DataType::Utf8, true); + #[allow(clippy::unwrap_used)] // we know we've given the right field type + let uris = re_chunk::util::arrays_to_list_array( + rec_id_field.data_type().clone(), + &recording_id_arrays, + ) + .unwrap(); + + chunk.add_component(ComponentDescriptor::new(RecordingUri::name()), uris)?; if tx .send(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?))