From 0bd35e6b73bb8f822749234f44cd5cdaf7ab78b8 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Fri, 6 Dec 2024 08:53:10 +0100 Subject: [PATCH] gRPC spec update: make metadata APIs more like the data API (#8292) As part of simplifying metadata APIs and as pre-req for creating gRPC catalog data source, we: * expose only single API for querying metadata (that supports filtering and projection) * this API now returns a stream of chunks, same as the data API --- .../proto/rerun/v0/remote_store.proto | 54 +++-- .../re_protos/src/v0/rerun.remote_store.v0.rs | 200 ++++++------------ examples/python/remote/metadata.py | 4 +- rerun_py/rerun_bindings/rerun_bindings.pyi | 4 +- rerun_py/src/remote.rs | 39 ++-- 5 files changed, 116 insertions(+), 185 deletions(-) diff --git a/crates/store/re_protos/proto/rerun/v0/remote_store.proto b/crates/store/re_protos/proto/rerun/v0/remote_store.proto index 6b156a4b3eda..e98a40958d0d 100644 --- a/crates/store/re_protos/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_protos/proto/rerun/v0/remote_store.proto @@ -10,9 +10,8 @@ service StorageNode { rpc FetchRecording(FetchRecordingRequest) returns (stream FetchRecordingResponse) {} // metadata API calls - rpc ListRecordings(ListRecordingsRequest) returns (ListRecordingsResponse) {} - rpc GetRecordingMetadata(GetRecordingMetadataRequest) returns (GetRecordingMetadataResponse) {} - rpc UpdateRecordingMetadata(UpdateRecordingMetadataRequest) returns (UpdateRecordingMetadataResponse) {} + rpc QueryCatalog(QueryCatalogRequest) returns (stream QueryCatalogResponse) {} + rpc UpdateCatalog(UpdateCatalogRequest) returns (UpdateCatalogResponse) {} rpc RegisterRecording(RegisterRecordingRequest) returns (RegisterRecordingResponse) {} } @@ -45,30 +44,14 @@ message RegisterRecordingResponse { RecordingMetadata metadata = 2; } -// ---------------- GetRecordingMetadata ----------------- +// ---------------- UpdateCatalog ----------------- -message GetRecordingMetadataRequest { +message UpdateCatalogRequest { RecordingId recording_id = 1; -} - -message GetRecordingMetadataResponse { - RecordingId id = 1; RecordingMetadata metadata = 2; } -message TimeMetadata { - Timeline timeline = 1; - TimeRange time_range = 2; -} - -// ---------------- UpdateRecordingMetadata ----------------- - -message UpdateRecordingMetadataRequest { - RecordingId recording_id = 1; - RecordingMetadata metadata = 2; -} - -message UpdateRecordingMetadataResponse {} +message UpdateCatalogResponse {} // ---------------- Query ----------------- @@ -94,21 +77,30 @@ enum EncoderVersion { } -// ----------------- ListRecordings ----------------- +// ----------------- QueryCatalog ----------------- -message ListRecordingsRequest { - // define which columns should be returned / projected - // we define a separate message to make it optional. - // If not provided, all columns should be returned +message QueryCatalogRequest { + // Column projection - define which columns should be returned. + // Providing it is optional, if not provided, all columns should be returned ColumnProjection column_projection = 1; + // Filter specific recordings that match the criteria (selection) + CatalogFilter filter = 2; } message ColumnProjection { repeated string columns = 1; } -message ListRecordingsResponse { - repeated RecordingMetadata recordings = 1; +message CatalogFilter { + // Filtering is very simple right now, we can only select + // recordings by their ids. + repeated RecordingId recording_ids = 1; +} + +message QueryCatalogResponse { + EncoderVersion encoder_version = 1; + // raw bytes are TransportChunks (i.e. RecordBatches) encoded with the relevant codec + bytes payload = 2; } enum RecordingType { @@ -132,7 +124,9 @@ message FetchRecordingResponse { bytes payload = 2; } -// Application level error - use as `details` in the `google.rpc.Status` message +// ----------------- Error handling ----------------- + +// Application level error - used as `details` in the `google.rpc.Status` message message RemoteStoreError { // error code ErrorCode code = 1; diff --git a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs index bb0c4bd235f3..e956a1082d00 100644 --- a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs @@ -245,33 +245,14 @@ pub struct RegisterRecordingResponse { pub metadata: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetRecordingMetadataRequest { - #[prost(message, optional, tag = "1")] - pub recording_id: ::core::option::Option, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetRecordingMetadataResponse { - #[prost(message, optional, tag = "1")] - pub id: ::core::option::Option, - #[prost(message, optional, tag = "2")] - pub metadata: ::core::option::Option, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct TimeMetadata { - #[prost(message, optional, tag = "1")] - pub timeline: ::core::option::Option, - #[prost(message, optional, tag = "2")] - pub time_range: ::core::option::Option, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct UpdateRecordingMetadataRequest { +pub struct UpdateCatalogRequest { #[prost(message, optional, tag = "1")] pub recording_id: ::core::option::Option, #[prost(message, optional, tag = "2")] pub metadata: ::core::option::Option, } #[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct UpdateRecordingMetadataResponse {} +pub struct UpdateCatalogResponse {} #[derive(Clone, PartialEq, ::prost::Message)] pub struct QueryRequest { /// unique identifier of the recording @@ -293,12 +274,14 @@ pub struct QueryResponse { pub payload: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ListRecordingsRequest { - /// define which columns should be returned / projected - /// we define a separate message to make it optional. - /// If not provided, all columns should be returned +pub struct QueryCatalogRequest { + /// Column projection - define which columns should be returned. + /// Providing it is optional, if not provided, all columns should be returned #[prost(message, optional, tag = "1")] pub column_projection: ::core::option::Option, + /// Filter specific recordings that match the criteria (selection) + #[prost(message, optional, tag = "2")] + pub filter: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ColumnProjection { @@ -306,9 +289,19 @@ pub struct ColumnProjection { pub columns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ListRecordingsResponse { +pub struct CatalogFilter { + /// Filtering is very simple right now, we can only select + /// recordings by their ids. #[prost(message, repeated, tag = "1")] - pub recordings: ::prost::alloc::vec::Vec, + pub recording_ids: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueryCatalogResponse { + #[prost(enumeration = "EncoderVersion", tag = "1")] + pub encoder_version: i32, + /// raw bytes are TransportChunks (i.e. RecordBatches) encoded with the relevant codec + #[prost(bytes = "vec", tag = "2")] + pub payload: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct FetchRecordingRequest { @@ -328,7 +321,7 @@ pub struct FetchRecordingResponse { #[prost(bytes = "vec", tag = "2")] pub payload: ::prost::alloc::vec::Vec, } -/// Application level error - use as `details` in the `google.rpc.Status` message +/// Application level error - used as `details` in the `google.rpc.Status` message #[derive(Clone, PartialEq, ::prost::Message)] pub struct RemoteStoreError { /// error code @@ -546,62 +539,43 @@ pub mod storage_node_client { self.inner.server_streaming(req, path, codec).await } /// metadata API calls - pub async fn list_recordings( + pub async fn query_catalog( &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { self.inner.ready().await.map_err(|e| { tonic::Status::unknown(format!("Service was not ready: {}", e.into())) })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/rerun.remote_store.v0.StorageNode/ListRecordings", + "/rerun.remote_store.v0.StorageNode/QueryCatalog", ); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new( "rerun.remote_store.v0.StorageNode", - "ListRecordings", + "QueryCatalog", )); - self.inner.unary(req, path, codec).await + self.inner.server_streaming(req, path, codec).await } - pub async fn get_recording_metadata( + pub async fn update_catalog( &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::unknown(format!("Service was not ready: {}", e.into())) })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/rerun.remote_store.v0.StorageNode/GetRecordingMetadata", - ); - let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "rerun.remote_store.v0.StorageNode", - "GetRecordingMetadata", - )); - self.inner.unary(req, path, codec).await - } - pub async fn update_recording_metadata( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::unknown(format!("Service was not ready: {}", e.into())) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/rerun.remote_store.v0.StorageNode/UpdateRecordingMetadata", + "/rerun.remote_store.v0.StorageNode/UpdateCatalog", ); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new( "rerun.remote_store.v0.StorageNode", - "UpdateRecordingMetadata", + "UpdateCatalog", )); self.inner.unary(req, path, codec).await } @@ -658,22 +632,20 @@ pub mod storage_node_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the QueryCatalog method. + type QueryCatalogStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + + 'static; /// metadata API calls - async fn list_recordings( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - async fn get_recording_metadata( + async fn query_catalog( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - async fn update_recording_metadata( + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn update_catalog( &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; async fn register_recording( &self, request: tonic::Request, @@ -836,63 +808,24 @@ pub mod storage_node_server { }; Box::pin(fut) } - "/rerun.remote_store.v0.StorageNode/ListRecordings" => { - #[allow(non_camel_case_types)] - struct ListRecordingsSvc(pub Arc); - impl tonic::server::UnaryService - for ListRecordingsSvc - { - type Response = super::ListRecordingsResponse; - type Future = BoxFuture, tonic::Status>; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::list_recordings(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = ListRecordingsSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/rerun.remote_store.v0.StorageNode/GetRecordingMetadata" => { + "/rerun.remote_store.v0.StorageNode/QueryCatalog" => { #[allow(non_camel_case_types)] - struct GetRecordingMetadataSvc(pub Arc); + struct QueryCatalogSvc(pub Arc); impl - tonic::server::UnaryService - for GetRecordingMetadataSvc + tonic::server::ServerStreamingService + for QueryCatalogSvc { - type Response = super::GetRecordingMetadataResponse; - type Future = BoxFuture, tonic::Status>; + type Response = super::QueryCatalogResponse; + type ResponseStream = T::QueryCatalogStream; + type Future = + BoxFuture, tonic::Status>; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_recording_metadata(&inner, request).await + ::query_catalog(&inner, request).await }; Box::pin(fut) } @@ -903,7 +836,7 @@ pub mod storage_node_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = GetRecordingMetadataSvc(inner); + let method = QueryCatalogSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -914,27 +847,26 @@ pub mod storage_node_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) } - "/rerun.remote_store.v0.StorageNode/UpdateRecordingMetadata" => { + "/rerun.remote_store.v0.StorageNode/UpdateCatalog" => { #[allow(non_camel_case_types)] - struct UpdateRecordingMetadataSvc(pub Arc); - impl - tonic::server::UnaryService - for UpdateRecordingMetadataSvc + struct UpdateCatalogSvc(pub Arc); + impl tonic::server::UnaryService + for UpdateCatalogSvc { - type Response = super::UpdateRecordingMetadataResponse; + type Response = super::UpdateCatalogResponse; type Future = BoxFuture, tonic::Status>; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::update_recording_metadata(&inner, request).await + ::update_catalog(&inner, request).await }; Box::pin(fut) } @@ -945,7 +877,7 @@ pub mod storage_node_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = UpdateRecordingMetadataSvc(inner); + let method = UpdateCatalogSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( diff --git a/examples/python/remote/metadata.py b/examples/python/remote/metadata.py index 6affe479e413..e199499d8f2a 100644 --- a/examples/python/remote/metadata.py +++ b/examples/python/remote/metadata.py @@ -25,7 +25,7 @@ # Register the new rrd conn = rr.remote.connect("http://0.0.0.0:51234") - catalog = pl.from_arrow(conn.list_recordings()) + catalog = pl.from_arrow(conn.query_catalog()) if args.subcommand == "print": print(catalog) @@ -38,4 +38,4 @@ exit(1) print(f"Updating metadata for {id}") - conn.update_metadata(id, {args.key: pa.array([args.value])}) + conn.update_catalog(id, {args.key: pa.array([args.value])}) diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index dbe45bb9e285..47866fe13399 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -575,7 +575,7 @@ class StorageNodeClient: Required-feature: `remote` """ - def list_recordings(self) -> pa.RecordBatchReader: + def query_catalog(self) -> pa.RecordBatchReader: """Get the metadata for all recordings in the storage node.""" ... @@ -593,7 +593,7 @@ class StorageNodeClient: """ ... - def update_metadata(self, id: str, metadata: dict[str, MetadataLike]) -> None: + def update_catalog(self, id: str, metadata: dict[str, MetadataLike]) -> None: """ Update the metadata for the recording with the given id. diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index eeaf24c652a7..abb95fc12e1f 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -14,11 +14,12 @@ use re_protos::{ codec::decode, v0::{ storage_node_client::StorageNodeClient, EncoderVersion, FetchRecordingRequest, - ListRecordingsRequest, RecordingId, RecordingMetadata, RecordingType, - RegisterRecordingRequest, UpdateRecordingMetadataRequest, + QueryCatalogRequest, RecordingId, RecordingMetadata, RecordingType, + RegisterRecordingRequest, UpdateCatalogRequest, }, }; use re_sdk::{ApplicationId, StoreId, StoreKind, Time}; +use tokio_stream::StreamExt; use crate::dataframe::PyRecording; @@ -80,26 +81,30 @@ pub struct PyStorageNodeClient { #[pymethods] impl PyStorageNodeClient { - /// Get the metadata for all recordings in the storage node. - fn list_recordings(&mut self) -> PyResult>> { + /// Query the recordings metadata catalog. + fn query_catalog(&mut self) -> PyResult>> { let reader = self.runtime.block_on(async { - // TODO(jleibs): Support column projection - let request = ListRecordingsRequest { + // TODO(jleibs): Support column projection and filtering + let request = QueryCatalogRequest { column_projection: None, + filter: None, }; - let resp = self + let transport_chunks = self .client - .list_recordings(request) + .query_catalog(request) .await - .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - - let transport_chunks = resp + .map_err(|err| PyRuntimeError::new_err(err.to_string()))? .into_inner() - .recordings - .into_iter() - .map(|recording| recording.data()) + .filter_map(|resp| { + resp.and_then(|r| { + decode(r.encoder_version(), &r.payload) + .map_err(|err| tonic::Status::internal(err.to_string())) + }) + .transpose() + }) .collect::, _>>() + .await .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; let record_batches: Vec> = @@ -216,7 +221,7 @@ impl PyStorageNodeClient { id, metadata ))] - fn update_metadata(&mut self, id: &str, metadata: &Bound<'_, PyDict>) -> PyResult<()> { + fn update_catalog(&mut self, id: &str, metadata: &Bound<'_, PyDict>) -> PyResult<()> { self.runtime.block_on(async { let (schema, data): ( Vec, @@ -247,13 +252,13 @@ impl PyStorageNodeClient { let metadata = RecordingMetadata::try_from(EncoderVersion::V0, &metadata_tc) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - let request = UpdateRecordingMetadataRequest { + let request = UpdateCatalogRequest { recording_id: Some(RecordingId { id: id.to_owned() }), metadata: Some(metadata), }; self.client - .update_recording_metadata(request) + .update_catalog(request) .await .map_err(|err| PyRuntimeError::new_err(err.to_string()))?;