Skip to content

Commit

Permalink
Implement basic remote query handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Dec 18, 2024
1 parent 659e675 commit dd635ef
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 16 deletions.
32 changes: 20 additions & 12 deletions rerun_py/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,13 +764,17 @@ impl PyRecordingView {
.map(|batch| batch.try_to_arrow_record_batch()),
std::sync::Arc::new(schema),
);

Ok(PyArrowType(Box::new(reader)))
}
#[cfg(feature = "remote")]
PyRecordingHandle::Remote(_) => Err::<_, PyErr>(PyRuntimeError::new_err(
"Schema is not implemented for for remote recordings yet.",
)),
PyRecordingHandle::Remote(recording) => {
let borrowed_recording = recording.borrow(py);
let mut borrowed_client = borrowed_recording.client.borrow_mut(py);
borrowed_client.exec_query(
borrowed_recording.store_info.store_id.clone(),
self.query_expression.clone(),
)
}
}
}

Expand Down Expand Up @@ -807,16 +811,15 @@ impl PyRecordingView {
args: &Bound<'_, PyTuple>,
columns: Option<Vec<AnyColumn>>,
) -> PyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> {
let mut query_expression = self.query_expression.clone();
// This is a static selection, so we clear the filtered index
query_expression.filtered_index = None;

match &self.recording {
PyRecordingHandle::Local(recording) => {
let borrowed = recording.borrow(py);
let engine = borrowed.engine();

let mut query_expression = self.query_expression.clone();

// This is a static selection, so we clear the filtered index
query_expression.filtered_index = None;

// If no columns provided, select all static columns
let static_columns = Self::select_args(args, columns)
.transpose()
Expand Down Expand Up @@ -862,9 +865,14 @@ impl PyRecordingView {
Ok(PyArrowType(Box::new(reader)))
}
#[cfg(feature = "remote")]
PyRecordingHandle::Remote(_) => Err::<_, PyErr>(PyRuntimeError::new_err(
"Schema is not implemented for for remote recordings yet.",
)),
PyRecordingHandle::Remote(recording) => {
let borrowed_recording = recording.borrow(py);
let mut borrowed_client = borrowed_recording.client.borrow_mut(py);
borrowed_client.exec_query(
borrowed_recording.store_info.store_id.clone(),
query_expression,
)
}
}
}

Expand Down
66 changes: 62 additions & 4 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ use pyo3::{
use re_chunk::{Chunk, TransportChunk};
use re_chunk_store::ChunkStore;
use re_dataframe::{ChunkStoreHandle, QueryExpression, SparseFillStrategy, ViewContentsSelector};
use re_grpc_client::TonicStatusError;
use re_log_encoding::codec::wire::{decode, encode};
use re_log_types::{EntityPathFilter, StoreInfo, StoreSource};
use re_protos::{
common::v0::{EncoderVersion, RecordingId},
remote_store::v0::{
storage_node_client::StorageNodeClient, CatalogFilter, DataframePart,
FetchRecordingRequest, QueryCatalogRequest, RecordingType, RegisterRecordingRequest,
UpdateCatalogRequest,
FetchRecordingRequest, QueryCatalogRequest, QueryRequest, RecordingType,
RegisterRecordingRequest, UpdateCatalogRequest,
},
};
use re_sdk::{ApplicationId, ComponentName, StoreId, StoreKind, Time, Timeline};
Expand Down Expand Up @@ -90,6 +91,7 @@ pub struct PyStorageNodeClient {
}

impl PyStorageNodeClient {
/// Get the [`StoreInfo`] for a single recording in the storage node.
fn get_store_info(&mut self, id: &str) -> PyResult<StoreInfo> {
let store_info = self
.runtime
Expand Down Expand Up @@ -133,6 +135,59 @@ impl PyStorageNodeClient {

Ok(store_info)
}

/// Execute a [`QueryExpression`] for a single recording in the storage node.
pub(crate) fn exec_query(
&mut self,
id: StoreId,
query: QueryExpression,
) -> PyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> {
let query: re_protos::common::v0::Query = query.into();

let batches = self.runtime.block_on(async {
// TODO(#8536): Avoid the need to collect here.
// This means we shouldn't be blocking on
let batches = self
.client
.query(QueryRequest {
recording_id: Some(id.into()),
query: Some(query.clone()),
})
.await
.map_err(TonicStatusError)?
.into_inner()
.filter_map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
})
.collect::<Result<Vec<_>, tonic::Status>>()
.await
.map_err(TonicStatusError)?;

let schema = batches
.first()
.map(|batch| batch.schema.clone())
.unwrap_or_else(|| arrow2::datatypes::Schema::from(vec![]));

let fields: Vec<arrow::datatypes::Field> =
schema.fields.iter().map(|f| f.clone().into()).collect();
let metadata = schema.metadata.clone().into_iter().collect();
let schema = arrow::datatypes::Schema::new(fields).with_metadata(metadata);

Ok(RecordBatchIterator::new(
batches.into_iter().map(|tc| tc.try_to_arrow_record_batch()),
std::sync::Arc::new(schema),
))
});

let result =
batches.map_err(|err: TonicStatusError| PyRuntimeError::new_err(err.to_string()))?;

Ok(PyArrowType(Box::new(result)))
}
}

#[pymethods]
Expand Down Expand Up @@ -303,9 +358,10 @@ impl PyStorageNodeClient {
})
}

/// Open a [`RemoteRecording`][rerun.dataframe.RemoteRecording] by id to use with the dataframe APIs.
/// Open a [`Recording`][rerun.dataframe.Recording] by id to use with the dataframe APIs.
///
/// This currently downloads the full recording to the local machine.
/// This will run queries against the remote storage node and stream the results. Faster for small
/// numbers of queries with small results.
///
/// Parameters
/// ----------
Expand Down Expand Up @@ -334,6 +390,8 @@ impl PyStorageNodeClient {

/// Download a [`Recording`][rerun.dataframe.Recording] by id to use with the dataframe APIs.
///
/// This will download the full recording to memory and run queries against a local chunk store.
///
/// Parameters
/// ----------
/// id : str
Expand Down

0 comments on commit dd635ef

Please sign in to comment.