Skip to content

Commit

Permalink
Port parts of dataframe viewer to arrow1
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Dec 18, 2024
1 parent 052ef89 commit 8d4066e
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5656,6 +5656,7 @@ version = "0.21.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
"arrow",
"criterion",
"document-features",
"indent",
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_chunk_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ re_types_core.workspace = true
# External dependencies:
ahash.workspace = true
anyhow.workspace = true
arrow.workspace = true
arrow2 = { workspace = true, features = ["compute_concatenate"] }
document-features.workspace = true
indent.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub use re_chunk::{
pub use re_log_types::{ResolvedTimeRange, TimeInt, TimeType, Timeline};

pub mod external {
pub use arrow;
pub use arrow2;

pub use re_chunk;
Expand Down
43 changes: 38 additions & 5 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use itertools::Itertools;

use nohash_hasher::{IntMap, IntSet};
use re_chunk::{
Chunk, ComponentName, EntityPath, RangeQuery, RowId, TimeInt, Timeline, UnitChunkShared,
external::arrow::array::ArrayRef, Chunk, ComponentName, EntityPath, RangeQuery, RowId, TimeInt,
Timeline, UnitChunkShared,
};
use re_chunk_store::{
ChunkStore, ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor,
Expand Down Expand Up @@ -794,7 +795,39 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// }
/// ```
#[inline]
pub fn next_row(&self) -> Option<Vec<Box<dyn Arrow2Array>>> {
pub fn next_row(&self) -> Option<Vec<ArrayRef>> {
self.engine
.with(|store, cache| self._next_row(store, cache))
.map(|vec| vec.into_iter().map(|a| a.into()).collect())
}

/// Returns the next row's worth of data.
///
/// The returned vector of Arrow arrays strictly follows the schema specified by [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
///
/// Each cell in the result corresponds to the latest _locally_ known value at that particular point in
/// the index, for each respective `ColumnDescriptor`.
/// See [`QueryExpression::sparse_fill_strategy`] to go beyond local resolution.
///
/// Example:
/// ```ignore
/// while let Some(row) = query_handle.next_row() {
/// // …
/// }
/// ```
///
/// ## Pagination
///
/// Use [`Self::seek_to_row`]:
/// ```ignore
/// query_handle.seek_to_row(42);
/// for row in query_handle.into_iter().take(len) {
/// // …
/// }
/// ```
#[inline]
pub fn next_row_arrow2(&self) -> Option<Vec<Box<dyn Arrow2Array>>> {
self.engine
.with(|store, cache| self._next_row(store, cache))
}
Expand Down Expand Up @@ -1239,7 +1272,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
pub fn next_row_batch(&self) -> Option<RecordBatch> {
Some(RecordBatch {
schema: self.schema().clone(),
data: Arrow2Chunk::new(self.next_row()?),
data: Arrow2Chunk::new(self.next_row_arrow2()?),
})
}

Expand All @@ -1266,13 +1299,13 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn iter(&self) -> impl Iterator<Item = Vec<Box<dyn Arrow2Array>>> + '_ {
std::iter::from_fn(move || self.next_row())
std::iter::from_fn(move || self.next_row_arrow2())
}

/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn into_iter(self) -> impl Iterator<Item = Vec<Box<dyn Arrow2Array>>> {
std::iter::from_fn(move || self.next_row())
std::iter::from_fn(move || self.next_row_arrow2())
}

/// Returns an iterator backed by [`Self::next_row_batch`].
Expand Down
4 changes: 2 additions & 2 deletions crates/viewer/re_view_dataframe/src/dataframe_ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::collections::BTreeMap;
use std::ops::Range;

use anyhow::Context;
use arrow::array::ArrayRef;
use egui::NumExt as _;
use itertools::Itertools;

use re_chunk_store::external::re_chunk::Arrow2Array;
use re_chunk_store::{ColumnDescriptor, LatestAtQuery};
use re_dataframe::external::re_query::StorageEngineArcReadGuard;
use re_dataframe::QueryHandle;
Expand Down Expand Up @@ -142,7 +142,7 @@ struct RowsDisplayData {
impl RowsDisplayData {
fn try_new(
row_indices: &Range<u64>,
row_data: Vec<Vec<Box<dyn Arrow2Array>>>,
row_data: Vec<Vec<ArrayRef>>,
selected_columns: &[ColumnDescriptor],
query_timeline: &Timeline,
) -> Result<Self, DisplayRecordBatchError> {
Expand Down
60 changes: 28 additions & 32 deletions crates/viewer/re_view_dataframe/src/display_record_batch.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
//! Intermediate data structures to make `re_datastore`'s row data more amenable to displaying in a
//! table.
use thiserror::Error;

use re_chunk_store::external::arrow2::{
use arrow::{
array::{
Array as Arrow2Array, DictionaryArray as Arrow2DictionaryArray,
ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray,
Array as ArrowArray, ArrayRef as ArrowArrayRef,
Int32DictionaryArray as ArrowInt32DictionaryArray, Int64Array, ListArray as ArrowListArray,
},
datatypes::DataType,
datatypes::DataType as Arrow2DataType,
datatypes::DataType as ArrowDataType,
};
use thiserror::Error;

use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, LatestAtQuery};
use re_log_types::{EntityPath, TimeInt, Timeline};
use re_types::external::arrow2::datatypes::IntegerType;
use re_types_core::ComponentName;
use re_ui::UiExt;
use re_viewer_context::{UiLayout, ViewerContext};

#[derive(Error, Debug)]
pub(crate) enum DisplayRecordBatchError {
#[error("Unexpected column data type for timeline '{0}': {1:?}")]
UnexpectedTimeColumnDataType(String, Arrow2DataType),
UnexpectedTimeColumnDataType(String, ArrowDataType),

#[error("Unexpected column data type for component '{0}': {1:?}")]
UnexpectedComponentColumnDataType(String, Arrow2DataType),
UnexpectedComponentColumnDataType(String, ArrowDataType),
}

/// A single column of component data.
Expand All @@ -33,38 +31,38 @@ pub(crate) enum DisplayRecordBatchError {
#[derive(Debug)]
pub(crate) enum ComponentData {
Null,
ListArray(Arrow2ListArray<i32>),
ListArray(ArrowListArray),
DictionaryArray {
dict: Arrow2DictionaryArray<i32>,
values: Arrow2ListArray<i32>,
dict: ArrowInt32DictionaryArray,
values: ArrowListArray,
},
}

impl ComponentData {
#[allow(clippy::borrowed_box)] // https://github.com/rust-lang/rust-clippy/issues/11940
fn try_new(
descriptor: &ComponentColumnDescriptor,
column_data: &Box<dyn Arrow2Array>,
column_data: &ArrowArrayRef,
) -> Result<Self, DisplayRecordBatchError> {
match column_data.data_type() {
DataType::Null => Ok(Self::Null),
DataType::List(_) => Ok(Self::ListArray(
ArrowDataType::Null => Ok(Self::Null),
ArrowDataType::List(_) => Ok(Self::ListArray(
column_data
.as_any()
.downcast_ref::<Arrow2ListArray<i32>>()
.downcast_ref::<ArrowListArray>()
.expect("`data_type` checked, failure is a bug in re_dataframe")
.clone(),
)),
DataType::Dictionary(IntegerType::Int32, _, _) => {
ArrowDataType::Dictionary(_, _) => {
let dict = column_data
.as_any()
.downcast_ref::<Arrow2DictionaryArray<i32>>()
.downcast_ref::<ArrowInt32DictionaryArray>()
.expect("`data_type` checked, failure is a bug in re_dataframe")
.clone();
let values = dict
.values()
.as_any()
.downcast_ref::<Arrow2ListArray<i32>>()
.downcast_ref::<ArrowListArray>()
.expect("`data_type` checked, failure is a bug in re_dataframe")
.clone();
Ok(Self::DictionaryArray { dict, values })
Expand All @@ -90,8 +88,8 @@ impl ComponentData {
}
}
Self::DictionaryArray { dict, values } => {
if dict.is_valid(row_index) {
values.value(dict.key_value(row_index)).len() as u64
if let Some(key) = dict.key(row_index) {
values.value(key).len() as u64
} else {
0
}
Expand Down Expand Up @@ -131,22 +129,20 @@ impl ComponentData {
Self::ListArray(list_array) => list_array
.is_valid(row_index)
.then(|| list_array.value(row_index)),
Self::DictionaryArray { dict, values } => dict
.is_valid(row_index)
.then(|| values.value(dict.key_value(row_index))),
Self::DictionaryArray { dict, values } => {
dict.key(row_index).map(|key| values.value(key))
}
};

if let Some(data) = data {
let data_to_display = if let Some(instance_index) = instance_index {
// Panics if the instance index is out of bound. This is checked in
// `DisplayColumn::data_ui`.
data.sliced(instance_index as usize, 1)
data.slice(instance_index as usize, 1)
} else {
data
};

let data_to_display: arrow::array::ArrayRef = data_to_display.into();

ctx.component_ui_registry.ui_raw(
ctx,
ui,
Expand All @@ -169,7 +165,7 @@ impl ComponentData {
pub(crate) enum DisplayColumn {
Timeline {
timeline: Timeline,
time_data: Arrow2PrimitiveArray<i64>,
time_data: Int64Array,
},
Component {
entity_path: EntityPath,
Expand All @@ -182,13 +178,13 @@ impl DisplayColumn {
#[allow(clippy::borrowed_box)] // https://github.com/rust-lang/rust-clippy/issues/11940
fn try_new(
column_descriptor: &ColumnDescriptor,
column_data: &Box<dyn Arrow2Array>,
column_data: &ArrowArrayRef,
) -> Result<Self, DisplayRecordBatchError> {
match column_descriptor {
ColumnDescriptor::Time(desc) => {
let time_data = column_data
.as_any()
.downcast_ref::<Arrow2PrimitiveArray<i64>>()
.downcast_ref::<Int64Array>()
.ok_or_else(|| {
DisplayRecordBatchError::UnexpectedTimeColumnDataType(
desc.timeline.name().as_str().to_owned(),
Expand Down Expand Up @@ -307,7 +303,7 @@ impl DisplayRecordBatch {
/// The columns in the record batch must match the selected columns. This is guaranteed by
/// `re_datastore`.
pub(crate) fn try_new(
row_data: &Vec<Box<dyn Arrow2Array>>,
row_data: &Vec<ArrowArrayRef>,
selected_columns: &[ColumnDescriptor],
) -> Result<Self, DisplayRecordBatchError> {
let num_rows = row_data.first().map(|arr| arr.len()).unwrap_or(0);
Expand Down

0 comments on commit 8d4066e

Please sign in to comment.