diff --git a/Cargo.lock b/Cargo.lock index f6e10990c1b1f..113df09af5298 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6149,10 +6149,15 @@ dependencies = [ "re_build_info", "re_build_tools", "re_chunk", + "re_chunk_store", + "re_dataframe", + "re_entity_db", "re_log", + "re_log_encoding", "re_log_types", "re_memory", "re_sdk", + "re_types", "re_web_viewer_server", "re_ws_comms", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 7a12442f8d522..4071f86ef66a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ re_chunk = { path = "crates/store/re_chunk", version = "=0.19.0-alpha.1", defaul re_chunk_store = { path = "crates/store/re_chunk_store", version = "=0.19.0-alpha.1", default-features = false } re_data_loader = { path = "crates/store/re_data_loader", version = "=0.19.0-alpha.1", default-features = false } re_data_source = { path = "crates/store/re_data_source", version = "=0.19.0-alpha.1", default-features = false } +re_dataframe = { path = "crates/store/re_dataframe", version = "=0.19.0-alpha.1", default-features = false } re_entity_db = { path = "crates/store/re_entity_db", version = "=0.19.0-alpha.1", default-features = false } re_format_arrow = { path = "crates/store/re_format_arrow", version = "=0.19.0-alpha.1", default-features = false } re_log_encoding = { path = "crates/store/re_log_encoding", version = "=0.19.0-alpha.1", default-features = false } diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index e43151c47f2af..45f3ab79cc076 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -41,13 +41,19 @@ web_viewer = [ [dependencies] re_build_info.workspace = true -re_chunk.workspace = true +re_chunk_store = { workspace = true } +re_chunk = { workspace = true, features = ["arrow"] } +re_entity_db = { workspace = true } re_log = { workspace = true, features = ["setup"] } +re_log_encoding = { workspace = true } re_log_types.workspace = true re_memory.workspace = true re_sdk = { workspace = true, features = ["data_loaders"] } +re_types = { workspace = true } re_web_viewer_server = { workspace = true, optional = true } re_ws_comms = { workspace = true, optional = true } +re_dataframe = { workspace = true } + arrow = { workspace = true, features = ["pyarrow"] } arrow2 = { workspace = true, features = ["io_ipc", "io_print", "arrow"] } diff --git a/rerun_py/rerun_sdk/rerun/__init__.py b/rerun_py/rerun_sdk/rerun/__init__.py index c40e195a7faeb..01eb115fe2157 100644 --- a/rerun_py/rerun_sdk/rerun/__init__.py +++ b/rerun_py/rerun_sdk/rerun/__init__.py @@ -19,6 +19,7 @@ from . import ( blueprint as blueprint, + dataframe as dataframe, experimental as experimental, notebook as notebook, ) diff --git a/rerun_py/rerun_sdk/rerun/dataframe.py b/rerun_py/rerun_sdk/rerun/dataframe.py new file mode 100644 index 0000000000000..8729563d0c084 --- /dev/null +++ b/rerun_py/rerun_sdk/rerun/dataframe.py @@ -0,0 +1,88 @@ +from __future__ import annotations +from typing import Optional, Type + +from rerun import bindings +from rerun._baseclasses import ComponentMixin + + +class Schema: + """The schema representing all columns in a [`Dataset`][].""" + + def __init__(self, storage: bindings.PySchema) -> None: + self.storage = storage + + def control_columns(self) -> list[bindings.PyControlColumn]: + return self.storage.control_columns() + + def time_columns(self) -> list[bindings.PyTimeColumn]: + return self.storage.time_columns() + + def component_columns(self) -> list[bindings.PyComponentColumn]: + return self.storage.component_columns() + + def column_for(self, entity_path: str, component: str | Type[ComponentMixin]) -> Optional[bindings.PyColumn]: + if not isinstance(component, str): + component = component._BATCH_TYPE._ARROW_TYPE._TYPE_NAME + + for col in self.component_columns(): + if col.matches(entity_path, component): + return col + + +class Dataset: + """A single dataset from an RRD, representing a Recording or a Blueprint.""" + + def __init__(self, storage: bindings.PyChunkStore) -> None: + self.storage = storage + + def schema(self) -> bindings.PySchema: + """The schema of the dataset.""" + return Schema(self.storage.schema()) + + def range_query(self, entity_path_expr: str, pov: bindings.PyControlColumn) -> list[pa.RecordBatch]: + """Execute a range query on the dataset.""" + return self.storage.range_query(entity_path_expr, pov) + + +class Archive: + """An archive containing all the data stores in an RRD file.""" + + def __init__(self, storage: bindings.PyRRDArchive) -> None: + self.storage = storage + + def num_recordings(self) -> int: + """The number of recordings in the archive.""" + return self.storage.num_recordings() + + def all_recordings(self) -> list[Dataset]: + """The recordings in the archive.""" + return [Dataset(r) for r in self.storage.all_recordings()] + + +def load_recording(filename: str) -> Dataset: + """ + Load a rerun data file from disk. + + :param filename: The path to the file to load. + :return: A dictionary of stores in the file. + """ + archive = load_archive(filename) + + if archive.num_recordings() != 1: + raise ValueError(f"Expected exactly one recording in the archive, got {archive.num_recordings()}") + + recordings = archive.all_recordings() + + return Dataset(recordings[0]) + + +def load_archive(filename: str) -> Archive: + """ + Load a rerun archive file from disk. + + :param filename: The path to the file to load. + :return: A dictionary of stores in the file. + """ + stores = bindings.load_rrd(filename) + + return Archive(stores) diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs new file mode 100644 index 0000000000000..f00c0aab21d8e --- /dev/null +++ b/rerun_py/src/dataframe.rs @@ -0,0 +1,206 @@ +#![allow(clippy::needless_pass_by_value)] // A lot of arguments to #[pyfunction] need to be by value +#![allow(clippy::borrow_deref_ref)] // False positive due to #[pyfunction] macro +#![allow(unsafe_op_in_unsafe_fn)] // False positive due to #[pyfunction] macro + +use std::collections::BTreeMap; + +use arrow::{array::RecordBatch, pyarrow::PyArrowType}; +use pyo3::{exceptions::PyRuntimeError, prelude::*}; +use re_chunk_store::{ + ChunkStore, ChunkStoreConfig, ColumnDescriptor, ComponentColumnDescriptor, + ControlColumnDescriptor, RangeQueryExpression, TimeColumnDescriptor, VersionPolicy, +}; +use re_dataframe::QueryEngine; +use re_log_types::ResolvedTimeRange; +use re_sdk::{StoreId, StoreKind, Timeline}; + +#[pyclass(frozen)] +#[derive(Clone)] +pub struct PyControlColumn { + pub column: ControlColumnDescriptor, +} + +#[pymethods] +impl PyControlColumn { + fn __repr__(&self) -> String { + format!("Ctrl({})", self.column.component_name.short_name()) + } +} + +#[pyclass(frozen)] +#[derive(Clone)] +pub struct PyTimeColumn { + pub column: TimeColumnDescriptor, +} + +#[pymethods] +impl PyTimeColumn { + fn __repr__(&self) -> String { + format!("Time({})", self.column.timeline.name()) + } +} + +#[pyclass(frozen)] +#[derive(Clone)] +pub struct PyComponentColumn { + pub column: ComponentColumnDescriptor, +} + +#[pymethods] +impl PyComponentColumn { + fn __repr__(&self) -> String { + format!( + "Component({}:{})", + self.column.entity_path, + self.column.component_name.short_name() + ) + } + + fn matches(&self, entity_path: &str, component_name: &str) -> bool { + self.column.entity_path == entity_path.into() + && self.column.component_name == component_name + } +} + +#[pyclass(frozen)] +#[derive(Clone)] +pub struct PySchema { + // TODO(jleibs): This gets replaced with the new schema object + pub schema: Vec, +} + +#[pymethods] +impl PySchema { + fn control_columns(&self) -> Vec { + self.schema + .iter() + .filter_map(|column| { + if let ColumnDescriptor::Control(col) = column { + Some(PyControlColumn { + column: col.clone(), + }) + } else { + None + } + }) + .collect() + } + + fn time_columns(&self) -> Vec { + self.schema + .iter() + .filter_map(|column| { + if let ColumnDescriptor::Time(col) = column { + Some(PyTimeColumn { + column: col.clone(), + }) + } else { + None + } + }) + .collect() + } + + fn component_columns(&self) -> Vec { + self.schema + .iter() + .filter_map(|column| { + if let ColumnDescriptor::Component(col) = column { + Some(PyComponentColumn { + column: col.clone(), + }) + } else { + None + } + }) + .collect() + } +} + +#[pyclass(frozen)] +#[derive(Clone)] +pub struct PyDataset { + pub store: ChunkStore, +} + +#[pymethods] +impl PyDataset { + fn schema(&self) -> PySchema { + PySchema { + schema: self.store.schema(), + } + } + + fn range_query( + &self, + expr: &str, + pov: PyComponentColumn, + ) -> PyResult>> { + // TODO(jleibs): Move this ctx into PyChunkStore? + let cache = re_dataframe::external::re_query::Caches::new(&self.store); + let engine = QueryEngine { + store: &self.store, + cache: &cache, + }; + + // TODO(jleibs): Move to arguments + let timeline = Timeline::log_tick(); + let time_range = ResolvedTimeRange::EVERYTHING; + + let query = RangeQueryExpression { + entity_path_expr: expr.into(), + timeline, + time_range, + pov: pov.column, + }; + + let query_handle = engine.range(&query, None /* columns */); + + let batches: Result, _> = query_handle + .into_iter() + .map(|batch| batch.try_as_arrow_record_batch()) + .collect(); + + let batches = batches.map_err(|err| PyRuntimeError::new_err(err.to_string()))?; + + Ok(PyArrowType(batches)) + } +} + +#[pyclass(frozen)] +#[derive(Clone)] +pub struct PyRRDArchive { + pub datasets: BTreeMap, +} + +#[pymethods] +impl PyRRDArchive { + fn num_recordings(&self) -> usize { + self.datasets + .iter() + .filter(|(id, _)| matches!(id.kind, StoreKind::Recording)) + .count() + } + + // TODO(jleibs): This could probably return an iterator + fn all_recordings(&self) -> Vec { + self.datasets + .iter() + .filter(|(id, _)| matches!(id.kind, StoreKind::Recording)) + .map(|(_, store)| PyDataset { + store: store.clone(), + }) + .collect() + } +} + +#[pyfunction] +pub fn load_rrd(path_to_rrd: String) -> PyResult { + let stores = + ChunkStore::from_rrd_filepath(&ChunkStoreConfig::DEFAULT, path_to_rrd, VersionPolicy::Warn) + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; + + let archive = PyRRDArchive { datasets: stores }; + + Ok(archive) +} diff --git a/rerun_py/src/lib.rs b/rerun_py/src/lib.rs index 73c8e4a368a53..fadc56440c294 100644 --- a/rerun_py/src/lib.rs +++ b/rerun_py/src/lib.rs @@ -14,4 +14,5 @@ static GLOBAL: AccountingAllocator = AccountingAllocator::new(mimalloc::MiMalloc); mod arrow; +mod dataframe; mod python_bridge; diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 15b472e01dce6..1e2eca00099aa 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -33,6 +33,8 @@ use re_ws_comms::RerunServerPort; use once_cell::sync::{Lazy, OnceCell}; +use crate::dataframe::{PyDataset, PyRRDArchive}; + // The bridge needs to have complete control over the lifetimes of the individual recordings, // otherwise all the recording shutdown machinery (which includes deallocating C, Rust and Python // data and joining a bunch of threads) can end up running at any time depending on what the @@ -105,6 +107,8 @@ fn rerun_bindings(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { // These two components are necessary for imports to work m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; // If this is a special RERUN_APP_ONLY context (launched via .spawn), we // can bypass everything else, which keeps us from preparing an SDK session @@ -169,6 +173,9 @@ fn rerun_bindings(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(escape_entity_path_part, m)?)?; m.add_function(wrap_pyfunction!(new_entity_path, m)?)?; + // dataframes + m.add_function(wrap_pyfunction!(crate::dataframe::load_rrd, m)?)?; + Ok(()) }