Skip to content

Commit

Permalink
Work-in-progress initial scaffolding
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Sep 5, 2024
1 parent 5a5fee1 commit ae4f3f5
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 1 deletion.
5 changes: 5 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6149,10 +6149,15 @@ dependencies = [
"re_build_info",
"re_build_tools",
"re_chunk",
"re_chunk_store",
"re_dataframe",
"re_entity_db",
"re_log",
"re_log_encoding",
"re_log_types",
"re_memory",
"re_sdk",
"re_types",
"re_web_viewer_server",
"re_ws_comms",
"uuid",
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ re_chunk = { path = "crates/store/re_chunk", version = "=0.19.0-alpha.1", defaul
re_chunk_store = { path = "crates/store/re_chunk_store", version = "=0.19.0-alpha.1", default-features = false }
re_data_loader = { path = "crates/store/re_data_loader", version = "=0.19.0-alpha.1", default-features = false }
re_data_source = { path = "crates/store/re_data_source", version = "=0.19.0-alpha.1", default-features = false }
re_dataframe = { path = "crates/store/re_dataframe", version = "=0.19.0-alpha.1", default-features = false }
re_entity_db = { path = "crates/store/re_entity_db", version = "=0.19.0-alpha.1", default-features = false }
re_format_arrow = { path = "crates/store/re_format_arrow", version = "=0.19.0-alpha.1", default-features = false }
re_log_encoding = { path = "crates/store/re_log_encoding", version = "=0.19.0-alpha.1", default-features = false }
Expand Down
8 changes: 7 additions & 1 deletion rerun_py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,19 @@ web_viewer = [

[dependencies]
re_build_info.workspace = true
re_chunk.workspace = true
re_chunk_store = { workspace = true }
re_chunk = { workspace = true, features = ["arrow"] }
re_entity_db = { workspace = true }
re_log = { workspace = true, features = ["setup"] }
re_log_encoding = { workspace = true }
re_log_types.workspace = true
re_memory.workspace = true
re_sdk = { workspace = true, features = ["data_loaders"] }
re_types = { workspace = true }
re_web_viewer_server = { workspace = true, optional = true }
re_ws_comms = { workspace = true, optional = true }
re_dataframe = { workspace = true }


arrow = { workspace = true, features = ["pyarrow"] }
arrow2 = { workspace = true, features = ["io_ipc", "io_print", "arrow"] }
Expand Down
1 change: 1 addition & 0 deletions rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from . import (
blueprint as blueprint,
dataframe as dataframe,
experimental as experimental,
notebook as notebook,
)
Expand Down
88 changes: 88 additions & 0 deletions rerun_py/rerun_sdk/rerun/dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations
from typing import Optional, Type

from rerun import bindings
from rerun._baseclasses import ComponentMixin


class Schema:
"""The schema representing all columns in a [`Dataset`][]."""

def __init__(self, storage: bindings.PySchema) -> None:
self.storage = storage

def control_columns(self) -> list[bindings.PyControlColumn]:
return self.storage.control_columns()

def time_columns(self) -> list[bindings.PyTimeColumn]:
return self.storage.time_columns()

def component_columns(self) -> list[bindings.PyComponentColumn]:
return self.storage.component_columns()

def column_for(self, entity_path: str, component: str | Type[ComponentMixin]) -> Optional[bindings.PyColumn]:
if not isinstance(component, str):
component = component._BATCH_TYPE._ARROW_TYPE._TYPE_NAME

for col in self.component_columns():
if col.matches(entity_path, component):
return col


class Dataset:
"""A single dataset from an RRD, representing a Recording or a Blueprint."""

def __init__(self, storage: bindings.PyChunkStore) -> None:
self.storage = storage

def schema(self) -> bindings.PySchema:
"""The schema of the dataset."""
return Schema(self.storage.schema())

def range_query(self, entity_path_expr: str, pov: bindings.PyControlColumn) -> list[pa.RecordBatch]:
"""Execute a range query on the dataset."""
return self.storage.range_query(entity_path_expr, pov)


class Archive:
"""An archive containing all the data stores in an RRD file."""

def __init__(self, storage: bindings.PyRRDArchive) -> None:
self.storage = storage

def num_recordings(self) -> int:
"""The number of recordings in the archive."""
return self.storage.num_recordings()

def all_recordings(self) -> list[Dataset]:
"""The recordings in the archive."""
return [Dataset(r) for r in self.storage.all_recordings()]


def load_recording(filename: str) -> Dataset:
"""
Load a rerun data file from disk.
:param filename: The path to the file to load.
:return: A dictionary of stores in the file.
"""
archive = load_archive(filename)

if archive.num_recordings() != 1:
raise ValueError(f"Expected exactly one recording in the archive, got {archive.num_recordings()}")

recordings = archive.all_recordings()

return Dataset(recordings[0])


def load_archive(filename: str) -> Archive:
"""
Load a rerun archive file from disk.
:param filename: The path to the file to load.
:return: A dictionary of stores in the file.
"""
stores = bindings.load_rrd(filename)

return Archive(stores)
206 changes: 206 additions & 0 deletions rerun_py/src/dataframe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#![allow(clippy::needless_pass_by_value)] // A lot of arguments to #[pyfunction] need to be by value
#![allow(clippy::borrow_deref_ref)] // False positive due to #[pyfunction] macro
#![allow(unsafe_op_in_unsafe_fn)] // False positive due to #[pyfunction] macro

use std::collections::BTreeMap;

use arrow::{array::RecordBatch, pyarrow::PyArrowType};
use pyo3::{exceptions::PyRuntimeError, prelude::*};
use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ColumnDescriptor, ComponentColumnDescriptor,
ControlColumnDescriptor, RangeQueryExpression, TimeColumnDescriptor, VersionPolicy,
};
use re_dataframe::QueryEngine;
use re_log_types::ResolvedTimeRange;
use re_sdk::{StoreId, StoreKind, Timeline};

#[pyclass(frozen)]
#[derive(Clone)]
pub struct PyControlColumn {
pub column: ControlColumnDescriptor,
}

#[pymethods]
impl PyControlColumn {
fn __repr__(&self) -> String {
format!("Ctrl({})", self.column.component_name.short_name())
}
}

#[pyclass(frozen)]
#[derive(Clone)]
pub struct PyTimeColumn {
pub column: TimeColumnDescriptor,
}

#[pymethods]
impl PyTimeColumn {
fn __repr__(&self) -> String {
format!("Time({})", self.column.timeline.name())
}
}

#[pyclass(frozen)]
#[derive(Clone)]
pub struct PyComponentColumn {
pub column: ComponentColumnDescriptor,
}

#[pymethods]
impl PyComponentColumn {
fn __repr__(&self) -> String {
format!(
"Component({}:{})",
self.column.entity_path,
self.column.component_name.short_name()
)
}

fn matches(&self, entity_path: &str, component_name: &str) -> bool {
self.column.entity_path == entity_path.into()
&& self.column.component_name == component_name
}
}

#[pyclass(frozen)]
#[derive(Clone)]
pub struct PySchema {
// TODO(jleibs): This gets replaced with the new schema object
pub schema: Vec<ColumnDescriptor>,
}

#[pymethods]
impl PySchema {
fn control_columns(&self) -> Vec<PyControlColumn> {
self.schema
.iter()
.filter_map(|column| {
if let ColumnDescriptor::Control(col) = column {
Some(PyControlColumn {
column: col.clone(),
})
} else {
None
}
})
.collect()
}

fn time_columns(&self) -> Vec<PyTimeColumn> {
self.schema
.iter()
.filter_map(|column| {
if let ColumnDescriptor::Time(col) = column {
Some(PyTimeColumn {
column: col.clone(),
})
} else {
None
}
})
.collect()
}

fn component_columns(&self) -> Vec<PyComponentColumn> {
self.schema
.iter()
.filter_map(|column| {
if let ColumnDescriptor::Component(col) = column {
Some(PyComponentColumn {
column: col.clone(),
})
} else {
None
}
})
.collect()
}
}

#[pyclass(frozen)]
#[derive(Clone)]
pub struct PyDataset {
pub store: ChunkStore,
}

#[pymethods]
impl PyDataset {
fn schema(&self) -> PySchema {
PySchema {
schema: self.store.schema(),
}
}

fn range_query(
&self,
expr: &str,
pov: PyComponentColumn,
) -> PyResult<PyArrowType<Vec<RecordBatch>>> {
// TODO(jleibs): Move this ctx into PyChunkStore?
let cache = re_dataframe::external::re_query::Caches::new(&self.store);
let engine = QueryEngine {
store: &self.store,
cache: &cache,
};

// TODO(jleibs): Move to arguments
let timeline = Timeline::log_tick();
let time_range = ResolvedTimeRange::EVERYTHING;

let query = RangeQueryExpression {
entity_path_expr: expr.into(),
timeline,
time_range,
pov: pov.column,
};

let query_handle = engine.range(&query, None /* columns */);

let batches: Result<Vec<_>, _> = query_handle
.into_iter()
.map(|batch| batch.try_as_arrow_record_batch())
.collect();

let batches = batches.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

Ok(PyArrowType(batches))
}
}

#[pyclass(frozen)]
#[derive(Clone)]
pub struct PyRRDArchive {
pub datasets: BTreeMap<StoreId, ChunkStore>,
}

#[pymethods]
impl PyRRDArchive {
fn num_recordings(&self) -> usize {
self.datasets
.iter()
.filter(|(id, _)| matches!(id.kind, StoreKind::Recording))
.count()
}

// TODO(jleibs): This could probably return an iterator
fn all_recordings(&self) -> Vec<PyDataset> {
self.datasets
.iter()
.filter(|(id, _)| matches!(id.kind, StoreKind::Recording))
.map(|(_, store)| PyDataset {
store: store.clone(),
})
.collect()
}
}

#[pyfunction]
pub fn load_rrd(path_to_rrd: String) -> PyResult<PyRRDArchive> {
let stores =
ChunkStore::from_rrd_filepath(&ChunkStoreConfig::DEFAULT, path_to_rrd, VersionPolicy::Warn)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

let archive = PyRRDArchive { datasets: stores };

Ok(archive)
}
1 change: 1 addition & 0 deletions rerun_py/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ static GLOBAL: AccountingAllocator<mimalloc::MiMalloc> =
AccountingAllocator::new(mimalloc::MiMalloc);

mod arrow;
mod dataframe;
mod python_bridge;
7 changes: 7 additions & 0 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use re_ws_comms::RerunServerPort;

use once_cell::sync::{Lazy, OnceCell};

use crate::dataframe::{PyDataset, PyRRDArchive};

// The bridge needs to have complete control over the lifetimes of the individual recordings,
// otherwise all the recording shutdown machinery (which includes deallocating C, Rust and Python
// data and joining a bunch of threads) can end up running at any time depending on what the
Expand Down Expand Up @@ -105,6 +107,8 @@ fn rerun_bindings(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
// These two components are necessary for imports to work
m.add_class::<PyMemorySinkStorage>()?;
m.add_class::<PyRecordingStream>()?;
m.add_class::<PyRRDArchive>()?;
m.add_class::<PyDataset>()?;

// If this is a special RERUN_APP_ONLY context (launched via .spawn), we
// can bypass everything else, which keeps us from preparing an SDK session
Expand Down Expand Up @@ -169,6 +173,9 @@ fn rerun_bindings(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(escape_entity_path_part, m)?)?;
m.add_function(wrap_pyfunction!(new_entity_path, m)?)?;

// dataframes
m.add_function(wrap_pyfunction!(crate::dataframe::load_rrd, m)?)?;

Ok(())
}

Expand Down

0 comments on commit ae4f3f5

Please sign in to comment.