diff --git a/crates/store/re_chunk/src/util.rs b/crates/store/re_chunk/src/util.rs index 091f06962f6d..7974fe2a33c8 100644 --- a/crates/store/re_chunk/src/util.rs +++ b/crates/store/re_chunk/src/util.rs @@ -82,7 +82,7 @@ pub fn arrays_to_list_array( // TODO(cmc): A possible improvement would be to pick the smallest key datatype possible based // on the cardinality of the input arrays. pub fn arrays_to_dictionary( - array_datatype: ArrowDatatype, + array_datatype: &ArrowDatatype, arrays: &[Option<(Idx, &dyn ArrowArray)>], ) -> Option> { // Dedupe the input arrays based on the given primary key. @@ -115,33 +115,29 @@ pub fn arrays_to_dictionary( }; // Concatenate the underlying data as usual, except only the _unique_ values! + // We still need the underlying data to be a list-array, so the dictionary's keys can index + // into this list-array. let data = if arrays_dense_deduped.is_empty() { arrow2::array::new_empty_array(array_datatype.clone()) } else { - arrow2::compute::concatenate::concatenate(&arrays_dense_deduped) + let values = arrow2::compute::concatenate::concatenate(&arrays_dense_deduped) .map_err(|err| { re_log::warn_once!("failed to concatenate arrays: {err}"); err }) - .ok()? - }; - - // We still need the underlying data to be a list-array, so the dictionary's keys can index - // into this list-array. - let data = { - let datatype = ArrowListArray::::default_datatype(array_datatype); + .ok()?; #[allow(clippy::unwrap_used)] // yes, these are indeed lengths let offsets = ArrowOffsets::try_from_lengths(arrays_dense_deduped.iter().map(|array| array.len())) .unwrap(); - ArrowListArray::::new(datatype, offsets.into(), data, None) + ArrowListArray::::new(array_datatype.clone(), offsets.into(), values, None).to_boxed() }; let datatype = ArrowDatatype::Dictionary( arrow2::datatypes::IntegerType::Int32, - std::sync::Arc::new(data.data_type().clone()), + std::sync::Arc::new(array_datatype.clone()), true, // is_sorted ); diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index ba5cd27bdddb..b93dcb96744c 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -22,6 +22,47 @@ use crate::RowId; // --- Descriptors --- +/// When selecting secondary component columns, specify how the joined data should be encoded. +/// +/// Because range-queries often involve repeating the same joined-in data multiple times, +/// the strategy we choose for joining can have a significant impact on the size and memory +/// overhead of the `RecordBatch`. +#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] +pub enum JoinEncoding { + /// Slice the `RecordBatch` to minimal overlapping sub-ranges. + /// + /// This is the default, and should always be used for the POV component which defines + /// the optimal size for `RecordBatch`. + /// + /// This minimizes the need for allocation, but at the cost of `RecordBatch`es that are + /// almost always smaller than the optimal size. In the common worst-case, this will result + /// in single-row `RecordBatch`es. + #[default] + OverlappingSlice, + + /// Dictionary-encode the joined column. + /// + /// Using dictionary-encoding allows any repeated data to be shared between rows, + /// but comes with the cost of an extra dictionary-lookup indirection. + /// + /// Note that this changes the physical type of the returned column. + /// + /// Using this encoding for complex types is incompatible with some arrow libraries. + DictionaryEncode, + // + // TODO(jleibs): + // RepeatCopy, + // + // Repeat the joined column by physically copying the data. + // + // This will always allocate a new column in the `RecordBatch`, matching the size of the + // POV component. + // + // This is the most expensive option, but can make working with the data more efficient, + // especially when the copied column is small. + // +} + // TODO(#6889): At some point all these descriptors needs to be interned and have handles or // something. And of course they need to be codegen. But we'll get there once we're back to // natively tagged components. @@ -43,17 +84,17 @@ impl ColumnDescriptor { #[inline] pub fn entity_path(&self) -> Option<&EntityPath> { match self { - Self::Component(descr) => Some(&descr.entity_path), Self::Control(_) | Self::Time(_) => None, + Self::Component(descr) => Some(&descr.entity_path), } } #[inline] - pub fn datatype(&self) -> &ArrowDatatype { + pub fn datatype(&self) -> ArrowDatatype { match self { - Self::Control(descr) => &descr.datatype, - Self::Component(descr) => &descr.datatype, - Self::Time(descr) => &descr.datatype, + Self::Control(descr) => descr.datatype.clone(), + Self::Time(descr) => descr.datatype.clone(), + Self::Component(descr) => descr.returned_datatype(), } } @@ -193,8 +234,15 @@ pub struct ComponentColumnDescriptor { /// Example: `rerun.components.Position3D`. pub component_name: ComponentName, - /// The Arrow datatype of the column. - pub datatype: ArrowDatatype, + /// The Arrow datatype of the stored column. + /// + /// This is the log-time datatype corresponding to how this data is encoded + /// in a chunk. Currently this will always be an [`ArrowListArray`], but as + /// we introduce mono-type optimization, this might be a native type instead. + pub store_datatype: ArrowDatatype, + + /// How the data will be joined into the resulting `RecordBatch`. + pub join_encoding: JoinEncoding, /// Whether this column represents static data. pub is_static: bool, @@ -215,7 +263,8 @@ impl Ord for ComponentColumnDescriptor { archetype_name, archetype_field_name, component_name, - datatype: _, + join_encoding: _, + store_datatype: _, is_static: _, } = self; @@ -234,7 +283,8 @@ impl std::fmt::Display for ComponentColumnDescriptor { archetype_name, archetype_field_name, component_name, - datatype: _, + join_encoding: _, + store_datatype: _, is_static, } = self; @@ -267,59 +317,84 @@ impl std::fmt::Display for ComponentColumnDescriptor { impl ComponentColumnDescriptor { #[inline] pub fn new(entity_path: EntityPath) -> Self { + let join_encoding = JoinEncoding::default(); + + // NOTE: The data is always a at least a list, whether it's latest-at or range. + // It might be wrapped further in e.g. a dict, but at the very least + // it's a list. + let store_datatype = ArrowListArray::::default_datatype(C::arrow_datatype()); + Self { entity_path, archetype_name: None, archetype_field_name: None, component_name: C::name(), - // NOTE: The data is always a at least a list, whether it's latest-at or range. - // It might be wrapped further in e.g. a dict, but at the very least - // it's a list. - // TODO(#7365): user-specified datatypes have got to go. - datatype: ArrowListArray::::default_datatype(C::arrow_datatype()), + join_encoding, + store_datatype, is_static: false, } } - #[inline] - pub fn to_arrow_field(&self) -> ArrowField { + fn metadata(&self) -> arrow2::datatypes::Metadata { let Self { entity_path, archetype_name, archetype_field_name, component_name, - datatype, + join_encoding: _, + store_datatype: _, is_static, } = self; + [ + (*is_static).then_some(("sorbet.is_static".to_owned(), "yes".to_owned())), + Some(("sorbet.path".to_owned(), entity_path.to_string())), + Some(( + "sorbet.semantic_type".to_owned(), + component_name.short_name().to_owned(), + )), + archetype_name.map(|name| { + ( + "sorbet.semantic_family".to_owned(), + name.short_name().to_owned(), + ) + }), + archetype_field_name + .as_ref() + .map(|name| ("sorbet.logical_type".to_owned(), name.to_owned())), + ] + .into_iter() + .flatten() + .collect() + } + + #[inline] + pub fn returned_datatype(&self) -> ArrowDatatype { + match self.join_encoding { + JoinEncoding::OverlappingSlice => self.store_datatype.clone(), + JoinEncoding::DictionaryEncode => ArrowDatatype::Dictionary( + arrow2::datatypes::IntegerType::Int32, + std::sync::Arc::new(self.store_datatype.clone()), + true, + ), + } + } + + #[inline] + pub fn to_arrow_field(&self) -> ArrowField { ArrowField::new( - component_name.short_name().to_owned(), - datatype.clone(), - false, /* nullable */ + self.component_name.short_name().to_owned(), + self.returned_datatype(), + true, /* nullable */ ) // TODO(#6889): This needs some proper sorbetization -- I just threw these names randomly. - .with_metadata( - [ - (*is_static).then_some(("sorbet.is_static".to_owned(), "yes".to_owned())), - Some(("sorbet.path".to_owned(), entity_path.to_string())), - Some(( - "sorbet.semantic_type".to_owned(), - component_name.short_name().to_owned(), - )), - archetype_name.map(|name| { - ( - "sorbet.semantic_family".to_owned(), - name.short_name().to_owned(), - ) - }), - archetype_field_name - .as_ref() - .map(|name| ("sorbet.logical_type".to_owned(), name.to_owned())), - ] - .into_iter() - .flatten() - .collect(), - ) + .with_metadata(self.metadata()) + } + + #[inline] + pub fn with_join_encoding(mut self, join_encoding: JoinEncoding) -> Self { + self.join_encoding = join_encoding; + self } } @@ -491,7 +566,10 @@ impl ChunkStore { archetype_name: None, archetype_field_name: None, component_name: *component_name, - datatype: ArrowListArray::::default_datatype(datatype.clone()), + store_datatype: ArrowListArray::::default_datatype( + datatype.clone(), + ), + join_encoding: JoinEncoding::default(), is_static: true, }) }) @@ -520,7 +598,10 @@ impl ChunkStore { // NOTE: The data is always a at least a list, whether it's latest-at or range. // It might be wrapped further in e.g. a dict, but at the very least // it's a list. - datatype: ArrowListArray::::default_datatype(datatype.clone()), + store_datatype: ArrowListArray::::default_datatype( + datatype.clone(), + ), + join_encoding: JoinEncoding::default(), // NOTE: This will make it so shadowed temporal data automatically gets // discarded from the schema. is_static: self @@ -570,6 +651,8 @@ impl ChunkStore { // Then, discard any column descriptor which cannot possibly have data for the given query. // // TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice. + // TODO(jleibs): This filtering actually seems incorrect. This operation should be based solely + // on the timeline, let mut filtered_out = HashSet::default(); for column_descr in &schema { let ColumnDescriptor::Component(descr) = column_descr else { diff --git a/crates/store/re_chunk_store/src/lib.rs b/crates/store/re_chunk_store/src/lib.rs index b3deadc40405..ad6ad76cff1a 100644 --- a/crates/store/re_chunk_store/src/lib.rs +++ b/crates/store/re_chunk_store/src/lib.rs @@ -24,8 +24,8 @@ mod subscribers; mod writes; pub use self::dataframe::{ - ColumnDescriptor, ComponentColumnDescriptor, ControlColumnDescriptor, LatestAtQueryExpression, - QueryExpression, RangeQueryExpression, TimeColumnDescriptor, + ColumnDescriptor, ComponentColumnDescriptor, ControlColumnDescriptor, JoinEncoding, + LatestAtQueryExpression, QueryExpression, RangeQueryExpression, TimeColumnDescriptor, }; pub use self::events::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent}; pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget}; diff --git a/crates/store/re_dataframe/src/engine.rs b/crates/store/re_dataframe/src/engine.rs index 323788ee5301..a0eea41c8a17 100644 --- a/crates/store/re_dataframe/src/engine.rs +++ b/crates/store/re_dataframe/src/engine.rs @@ -111,7 +111,6 @@ impl QueryEngine<'_> { /// Creating a handle is very cheap as it doesn't perform any kind of querying. /// /// If `columns` is specified, the schema of the result will strictly follow this specification. - /// [`ComponentColumnDescriptor::datatype`] and [`ComponentColumnDescriptor::is_static`] are ignored. /// /// Any provided [`ColumnDescriptor`]s that don't match a column in the result will still be included, but the /// data will be null for the entire column. @@ -144,7 +143,6 @@ impl QueryEngine<'_> { /// Creating a handle is very cheap as it doesn't perform any kind of querying. /// /// If `columns` is specified, the schema of the result will strictly follow this specification. - /// [`ComponentColumnDescriptor::datatype`] and [`ComponentColumnDescriptor::is_static`] are ignored. /// /// Any provided [`ColumnDescriptor`]s that don't match a column in the result will still be included, but the /// data will be null for the entire column. diff --git a/crates/store/re_dataframe/src/latest_at.rs b/crates/store/re_dataframe/src/latest_at.rs index 3dee374aaf54..47d8d96de9fb 100644 --- a/crates/store/re_dataframe/src/latest_at.rs +++ b/crates/store/re_dataframe/src/latest_at.rs @@ -7,7 +7,7 @@ use arrow2::{ use itertools::Itertools; use re_chunk::{LatestAtQuery, TimeInt, Timeline, UnitChunkShared}; -use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, LatestAtQueryExpression}; +use re_chunk_store::{ColumnDescriptor, LatestAtQueryExpression}; use crate::{QueryEngine, RecordBatch}; @@ -102,14 +102,14 @@ impl LatestAtQueryHandle<'_> { let columns = self.schema(); - let all_units: HashMap<&ComponentColumnDescriptor, UnitChunkShared> = { + let all_units: HashMap<&ColumnDescriptor, UnitChunkShared> = { re_tracing::profile_scope!("queries"); // TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice. let query = LatestAtQuery::new(self.query.timeline, self.query.at); columns .iter() - .filter_map(|descr| match descr { + .filter_map(|col| match col { ColumnDescriptor::Component(descr) => { let results = self.engine.cache.latest_at( self.engine.store, @@ -122,7 +122,7 @@ impl LatestAtQueryHandle<'_> { .components .get(&descr.component_name) .cloned() - .map(|chunk| (descr, chunk)) + .map(|chunk| (col, chunk)) } _ => None, @@ -169,7 +169,7 @@ impl LatestAtQueryHandle<'_> { columns .iter() - .filter_map(|descr| match descr { + .filter_map(|col| match col { ColumnDescriptor::Control(_) => { if cfg!(debug_assertions) { unreachable!("filtered out during schema computation"); @@ -197,12 +197,12 @@ impl LatestAtQueryHandle<'_> { ColumnDescriptor::Component(descr) => Some( all_units - .get(descr) + .get(col) .and_then(|chunk| chunk.components().get(&descr.component_name)) .map_or_else( || { arrow2::array::new_null_array( - descr.datatype.clone(), + descr.returned_datatype(), null_array_length, ) }, @@ -308,7 +308,7 @@ mod tests { .all(|(descr, field)| descr.to_arrow_field() == *field) ); assert!(itertools::izip!(handle.schema(), batch.data.iter()) - .all(|(descr, array)| descr.datatype() == array.data_type())); + .all(|(descr, array)| &descr.datatype() == array.data_type())); } #[test] diff --git a/crates/store/re_dataframe/src/range.rs b/crates/store/re_dataframe/src/range.rs index 0917d5dae6f7..7b51cb83a7d1 100644 --- a/crates/store/re_dataframe/src/range.rs +++ b/crates/store/re_dataframe/src/range.rs @@ -4,13 +4,15 @@ use ahash::HashMap; use arrow2::{ array::{Array as ArrowArray, DictionaryArray as ArrowDictionaryArray}, chunk::Chunk as ArrowChunk, - datatypes::{DataType as ArrowDatatype, Schema as ArrowSchema}, + datatypes::Schema as ArrowSchema, Either, }; use itertools::Itertools; use re_chunk::{Chunk, LatestAtQuery, RangeQuery, RowId, TimeInt}; -use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, RangeQueryExpression}; +use re_chunk_store::{ + ColumnDescriptor, ComponentColumnDescriptor, JoinEncoding, RangeQueryExpression, +}; use crate::{QueryEngine, RecordBatch}; @@ -42,9 +44,15 @@ pub struct RangeQueryHandle<'a> { /// Internal private state. Lazily computed. struct RangeQuerytHandleState { - /// The final schema. + /// The columns that will be used to populate the results columns: Vec, + /// The derived arrow schema from the columns. All returned + /// record batches will have this schema. + /// + /// This may include conversion to dictionary-encoded data. + arrow_schema: ArrowSchema, + /// All the [`Chunk`]s for the active point-of-view. /// /// These are already sorted and vertically sliced according to the query. @@ -86,23 +94,17 @@ impl RangeQueryHandle<'_> { self.engine .store .schema_for_query(&self.query.clone().into()) - .into_iter() - // NOTE: At least for now, range queries always return dictionaries. - .map(|col| match col { - ColumnDescriptor::Component(mut descr) => { - descr.datatype = ArrowDatatype::Dictionary( - arrow2::datatypes::IntegerType::Int32, - descr.datatype.into(), - true, - ); - ColumnDescriptor::Component(descr) - } - _ => col, - }) - .collect() }) }; + let schema = ArrowSchema { + fields: columns + .iter() + .map(|descr| descr.to_arrow_field()) + .collect_vec(), + metadata: Default::default(), + }; + let pov_chunks = { re_tracing::profile_scope!("gather pov timestamps"); @@ -127,6 +129,7 @@ impl RangeQueryHandle<'_> { RangeQuerytHandleState { columns, + arrow_schema: schema, pov_chunks, cur_page: AtomicU64::new(0), } @@ -147,8 +150,16 @@ impl RangeQueryHandle<'_> { /// Partially executes the range query until the next natural page of results. /// - /// Returns a single [`RecordBatch`] containing as many rows as available in the page, or - /// `None` if all the dataset has been returned. + /// Returns a vector of [`RecordBatch`]es that in total contain as many rows as available in the next + /// "natural page" of data from the pof component, or `None` if all the dataset has been returned. + /// + /// At best, this will be a single [`RecordBatch`] containing a "natural page" of data, following the chunk + /// size of the pov-component. This will happen when all queried data either belongs to + /// the same chunk, or is requested using [`JoinEncoding::DictionaryEncode`]. + /// + /// However, in the case of mixed chunks without dictionary encoding, the engine will fall + /// back to a row-by-row approach, which can be less efficient. + /// /// Each cell in the result corresponds to the latest known value at that particular point in time /// for each respective `ColumnDescriptor`. /// @@ -166,7 +177,7 @@ impl RangeQueryHandle<'_> { /// // … /// } /// ``` - pub fn next_page(&mut self) -> Option { + pub fn next_page(&mut self) -> Option> { re_tracing::profile_function!(format!("next_page({})", self.query)); let state = self.init(); @@ -179,18 +190,15 @@ impl RangeQueryHandle<'_> { _ = state .cur_page .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - return Some(RecordBatch { - schema: ArrowSchema { - fields: columns.iter().map(|col| col.to_arrow_field()).collect(), - metadata: Default::default(), - }, + return Some(vec![RecordBatch { + schema: state.arrow_schema.clone(), data: ArrowChunk::new( columns .iter() .map(|descr| arrow2::array::new_null_array(descr.datatype().clone(), 0)) .collect_vec(), ), - }); + }]); } let pov_chunk = state.pov_chunks.as_ref()?.get(cur_page as usize)?; @@ -198,14 +206,22 @@ impl RangeQueryHandle<'_> { .cur_page .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - Some(self.dense_batch_at_pov(pov_chunk)) + Some(self.dense_batch_at_pov(&self.query.pov, pov_chunk, &state.arrow_schema)) } /// Partially executes the range query in order to return the specified range of rows. /// /// Returns a vector of [`RecordBatch`]es: as many as required to fill the specified range. - /// Each [`RecordBatch`] will correspond a "natural page" of data, even the first and last batch, - /// although they might be cut off at the edge. + /// + /// The exact size of the [`RecordBatch`]es is an implementation detail. + /// + /// At best, each [`RecordBatch`] will be a "natural page" of data, following the chunk + /// size of the pov-component. This will happen when all queried data either belongs to + /// the same chunk, or is requested as a [`JoinEncoding::DictionaryEncode`] column. + /// + /// However, in the case of mixed chunks without dictionary encoding, the engine will fall + /// back to a row-by-row approach, which can be less efficient. + /// /// Each cell in the result corresponds to the latest known value at that particular point in time /// for each respective `ColumnDescriptor`. /// @@ -215,8 +231,8 @@ impl RangeQueryHandle<'_> { /// /// "Natural pages" refers to pages of data that match 1:1 to the underlying storage. /// The size of each page cannot be known in advance, as it depends on unspecified - /// implementation details. - /// This is the most performant way to iterate over the dataset. + /// implementation details. This is the most performant way to iterate over the dataset. + /// // // TODO(cmc): This could be turned into an actual lazy iterator at some point. pub fn get(&self, offset: u64, mut len: u64) -> Vec { @@ -229,10 +245,7 @@ impl RangeQueryHandle<'_> { if offset == 0 && (len == 0 || state.pov_chunks.is_none()) { let columns = self.schema(); return vec![RecordBatch { - schema: ArrowSchema { - fields: columns.iter().map(|col| col.to_arrow_field()).collect(), - metadata: Default::default(), - }, + schema: state.arrow_schema.clone(), data: ArrowChunk::new( columns .iter() @@ -276,7 +289,11 @@ impl RangeQueryHandle<'_> { // Repeatedly compute dense ranges until we've returned `len` rows. while len > 0 { cur_pov_chunk = cur_pov_chunk.row_sliced(offset as _, len as _); - results.push(self.dense_batch_at_pov(&cur_pov_chunk)); + results.extend(self.dense_batch_at_pov( + &self.query.pov, + &cur_pov_chunk, + &state.arrow_schema, + )); offset = 0; // always start at the first row after the first chunk len = len.saturating_sub(cur_pov_chunk.num_rows() as u64); @@ -307,7 +324,12 @@ impl RangeQueryHandle<'_> { }) } - fn dense_batch_at_pov(&self, pov_chunk: &Chunk) -> RecordBatch { + fn dense_batch_at_pov( + &self, + pov: &ComponentColumnDescriptor, + pov_chunk: &Chunk, + schema: &ArrowSchema, + ) -> Vec { let pov_time_column = pov_chunk.timelines().get(&self.query.timeline); let columns = self.schema(); @@ -317,12 +339,15 @@ impl RangeQueryHandle<'_> { // // TODO(cmc): Opportunities for parallelization, if it proves to be a net positive in practice. let dict_arrays: HashMap<&ComponentColumnDescriptor, ArrowDictionaryArray> = { - re_tracing::profile_scope!("queries"); + re_tracing::profile_scope!("dict queries"); columns .iter() .filter_map(|descr| match descr { - ColumnDescriptor::Component(descr) => Some(descr), + ColumnDescriptor::Component(descr) => match descr.join_encoding { + JoinEncoding::OverlappingSlice => None, + JoinEncoding::DictionaryEncode => Some(descr), + }, _ => None, }) .filter_map(|descr| { @@ -369,20 +394,8 @@ impl RangeQueryHandle<'_> { }) .collect_vec(); - let dict_array = { - re_tracing::profile_scope!("concat"); - - // Sanitize the input datatype for `arrays_to_dictionary`. - let datatype = match &descr.datatype { - ArrowDatatype::Dictionary(_, inner, _) => match &**inner { - ArrowDatatype::List(field) => field.data_type().clone(), - datatype => datatype.clone(), - }, - ArrowDatatype::List(field) => field.data_type().clone(), - datatype => datatype.clone(), - }; - re_chunk::util::arrays_to_dictionary(datatype, &arrays) - }; + let dict_array = + { re_chunk::util::arrays_to_dictionary(&descr.store_datatype, &arrays) }; if cfg!(debug_assertions) { #[allow(clippy::unwrap_used)] // want to crash in dev @@ -395,50 +408,193 @@ impl RangeQueryHandle<'_> { .collect() }; - // NOTE: Keep in mind this must match the ordering specified by `Self::schema`. - let packed_arrays = { - re_tracing::profile_scope!("packing"); + let slice_arrays: HashMap<&ComponentColumnDescriptor, Vec>>> = { + re_tracing::profile_scope!("slice queries"); columns .iter() - .map(|descr| match descr { - ColumnDescriptor::Control(_descr) => pov_chunk.row_ids_array().to_boxed(), - - ColumnDescriptor::Time(descr) => { - let time_column = pov_chunk.timelines().get(&descr.timeline).cloned(); - time_column.map_or_else( - || { - arrow2::array::new_null_array( - descr.datatype.clone(), - pov_chunk.num_rows(), - ) - }, - |time_column| time_column.times_array().to_boxed(), + .filter_map(|descr| match descr { + ColumnDescriptor::Component(descr) => match descr.join_encoding { + JoinEncoding::OverlappingSlice => { + if descr != pov { + Some(descr) + } else { + None + } + } + JoinEncoding::DictionaryEncode => None, + }, + _ => None, + }) + .map(|descr| { + let arrays = pov_time_column + .map_or_else( + || Either::Left(std::iter::empty()), + |time_column| Either::Right(time_column.times()), ) - } + .chain(std::iter::repeat(TimeInt::STATIC)) + .take(pov_chunk.num_rows()) + .map(|time| { + let query = LatestAtQuery::new(self.query.timeline, time); - ColumnDescriptor::Component(descr) => dict_arrays.get(descr).map_or_else( - || { - arrow2::array::new_null_array( - descr.datatype.clone(), - pov_chunk.num_rows(), - ) - }, - |dict_array| dict_array.to_boxed(), - ), + let results = self.engine.cache.latest_at( + self.engine.store, + &query, + &descr.entity_path, + [descr.component_name], + ); + + results + .components + .get(&descr.component_name) + .and_then(|unit| { + unit.clone() + .into_chunk() + .components() + .get(&descr.component_name) + .map(|arr| arr.to_boxed()) + }) + }) + .collect_vec(); + + (descr, arrays) }) - .collect_vec() + .collect() }; - RecordBatch { - schema: ArrowSchema { - fields: columns + if slice_arrays.is_empty() { + // NOTE: Keep in mind this must match the ordering specified by `Self::schema`. + let packed_arrays = { + re_tracing::profile_scope!("packing"); + + columns .iter() - .map(|descr| descr.to_arrow_field()) - .collect_vec(), - metadata: Default::default(), - }, - data: ArrowChunk::new(packed_arrays), + .map(|descr| match descr { + ColumnDescriptor::Control(_descr) => pov_chunk.row_ids_array().to_boxed(), + + ColumnDescriptor::Time(descr) => { + let time_column = pov_chunk.timelines().get(&descr.timeline).cloned(); + time_column.map_or_else( + || { + arrow2::array::new_null_array( + descr.datatype.clone(), + pov_chunk.num_rows(), + ) + }, + |time_column| time_column.times_array().to_boxed(), + ) + } + + ColumnDescriptor::Component(descr) => match descr.join_encoding { + JoinEncoding::OverlappingSlice => { + if descr == pov { + pov_chunk + .components() + .get(&descr.component_name) + .map_or_else( + || { + arrow2::array::new_null_array( + descr.returned_datatype(), + pov_chunk.num_rows(), + ) + }, + |arr| arr.to_boxed(), + ) + } else { + unreachable!() + } + } + JoinEncoding::DictionaryEncode => dict_arrays.get(descr).map_or_else( + || { + arrow2::array::new_null_array( + descr.returned_datatype(), + pov_chunk.num_rows(), + ) + }, + |dict_array| dict_array.to_boxed(), + ), + }, + }) + .collect_vec() + }; + vec![RecordBatch { + schema: schema.clone(), + data: ArrowChunk::new(packed_arrays), + }] + } else { + (0..pov_chunk.num_rows()) + .map(|row| { + // NOTE: Keep in mind this must match the ordering specified by `Self::schema`. + let packed_arrays = columns + .iter() + .map(|descr| match descr { + ColumnDescriptor::Control(_descr) => { + pov_chunk.row_ids_array().sliced(row, 1).to_boxed() + } + + ColumnDescriptor::Time(descr) => { + let time_column = pov_chunk.timelines().get(&descr.timeline); + time_column.map_or_else( + || arrow2::array::new_null_array(descr.datatype.clone(), 1), + |time_column| { + time_column.times_array().sliced(row, 1).to_boxed() + }, + ) + } + + ColumnDescriptor::Component(descr) => match descr.join_encoding { + JoinEncoding::OverlappingSlice => { + if descr == pov { + pov_chunk + .components() + .get(&descr.component_name) + .map_or_else( + || { + arrow2::array::new_null_array( + descr.returned_datatype(), + 1, + ) + }, + |arr| arr.sliced(row, 1).to_boxed(), + ) + } else { + slice_arrays + .get(descr) + .and_then(|col| col.get(row).cloned()) + .flatten() + .map_or_else( + || { + arrow2::array::new_null_array( + descr.returned_datatype(), + 1, + ) + }, + |arr| arr, + ) + } + } + + JoinEncoding::DictionaryEncode => { + dict_arrays.get(descr).map_or_else( + || { + arrow2::array::new_null_array( + descr.returned_datatype(), + 1, + ) + }, + |dict_array| dict_array.sliced(row, 1).to_boxed(), + ) + } + }, + }) + .collect_vec(); + + RecordBatch { + schema: schema.clone(), + data: ArrowChunk::new(packed_arrays), + } + }) + .collect() } } } @@ -446,7 +602,7 @@ impl RangeQueryHandle<'_> { impl<'a> RangeQueryHandle<'a> { #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work pub fn into_iter(mut self) -> impl Iterator + 'a { - std::iter::from_fn(move || self.next_page()) + std::iter::from_fn(move || self.next_page()).flatten() } } @@ -507,9 +663,10 @@ mod tests { ColumnDescriptor::Component(ComponentColumnDescriptor::new::( entity_path.clone(), )), - ColumnDescriptor::Component(ComponentColumnDescriptor::new::( - entity_path.clone(), - )), + ColumnDescriptor::Component( + ComponentColumnDescriptor::new::(entity_path.clone()) + .with_join_encoding(re_chunk_store::JoinEncoding::DictionaryEncode), + ), ColumnDescriptor::Component(ComponentColumnDescriptor::new::(entity_path)), ]; @@ -517,16 +674,17 @@ mod tests { // Iterator API { - let batch = handle.next_page().unwrap(); + let batches = handle.next_page().unwrap(); // The output should be an empty recordbatch with the right schema and empty arrays. - assert_eq!(0, batch.num_rows()); - assert!( - itertools::izip!(handle.schema(), batch.schema.fields.iter()) - .all(|(descr, field)| descr.to_arrow_field() == *field) - ); - assert!(itertools::izip!(handle.schema(), batch.data.iter()) - .all(|(descr, array)| descr.datatype() == array.data_type())); - + for batch in batches { + assert_eq!(0, batch.num_rows()); + assert!( + itertools::izip!(handle.schema(), batch.schema.fields.iter()) + .all(|(descr, field)| descr.to_arrow_field() == *field) + ); + assert!(itertools::izip!(handle.schema(), batch.data.iter()) + .all(|(descr, array)| &descr.datatype() == array.data_type())); + } let batch = handle.next_page(); assert!(batch.is_none()); } @@ -541,7 +699,7 @@ mod tests { .all(|(descr, field)| descr.to_arrow_field() == *field) ); assert!(itertools::izip!(handle.schema(), batch.data.iter()) - .all(|(descr, array)| descr.datatype() == array.data_type())); + .all(|(descr, array)| &descr.datatype() == array.data_type())); let _batch = handle.get(0, 1).pop().unwrap(); @@ -563,7 +721,10 @@ mod tests { .with_component_batches( RowId::new(), TimePoint::default(), - [&[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)] as _], + [ + &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)] as _, + &[Radius(3.0.into()), Radius(4.0.into())] as _, + ], ) .build() .unwrap(), @@ -597,9 +758,10 @@ mod tests { ColumnDescriptor::Component(ComponentColumnDescriptor::new::( entity_path.clone(), )), - ColumnDescriptor::Component(ComponentColumnDescriptor::new::( - entity_path.clone(), - )), + ColumnDescriptor::Component( + ComponentColumnDescriptor::new::(entity_path.clone()) + .with_join_encoding(re_chunk_store::JoinEncoding::DictionaryEncode), + ), ColumnDescriptor::Component(ComponentColumnDescriptor::new::(entity_path)), ]; @@ -607,8 +769,12 @@ mod tests { // Iterator API { - let batch = handle.next_page().unwrap(); + let batches = handle.next_page().unwrap(); + let batch = batches.first().unwrap(); + assert_eq!(1, batch.num_rows()); + + // MyPoint should be a ListArray assert_eq!( chunk.components().get(&MyPoint::name()).unwrap().to_boxed(), itertools::izip!(batch.schema.fields.iter(), batch.data.iter()) @@ -616,12 +782,23 @@ mod tests { (field.name == MyPoint::name().short_name()).then_some(array.clone()) }) .unwrap() + ); + + // Radius should be a DictionaryArray + assert_eq!( + chunk.components().get(&Radius::name()).unwrap().to_boxed(), + itertools::izip!(batch.schema.fields.iter(), batch.data.iter()) + .find_map(|(field, array)| { + (field.name == Radius::name().short_name()).then_some(array.clone()) + }) + .unwrap() .as_any() .downcast_ref::>() .unwrap() .values() .clone() ); + assert!( itertools::izip!(handle.schema(), batch.schema.fields.iter()) .all(|(descr, field)| descr.to_arrow_field() == *field) @@ -636,6 +813,8 @@ mod tests { let batch = handle.get(0, 1).pop().unwrap(); // The output should be an empty recordbatch with the right schema and empty arrays. assert_eq!(1, batch.num_rows()); + + // MyPoint should be a ListArray assert_eq!( chunk.components().get(&MyPoint::name()).unwrap().to_boxed(), itertools::izip!(batch.schema.fields.iter(), batch.data.iter()) @@ -643,21 +822,32 @@ mod tests { (field.name == MyPoint::name().short_name()).then_some(array.clone()) }) .unwrap() + ); + + // Radius should be a DictionaryArray + assert_eq!( + chunk.components().get(&Radius::name()).unwrap().to_boxed(), + itertools::izip!(batch.schema.fields.iter(), batch.data.iter()) + .find_map(|(field, array)| { + (field.name == Radius::name().short_name()).then_some(array.clone()) + }) + .unwrap() .as_any() .downcast_ref::>() .unwrap() .values() .clone() ); + assert!( itertools::izip!(handle.schema(), batch.schema.fields.iter()) .all(|(descr, field)| descr.to_arrow_field() == *field) ); - let _batch = handle.get(1, 1).pop().unwrap(); - - let batch = handle.get(2, 1).pop(); - assert!(batch.is_none()); + // TODO(jleibs): Out-of-bounds behavior isn't well defined here. + // Should this always include an empty record-batch, or should + // it be an error? + assert!(handle.get(1, 1).is_empty()); } } } diff --git a/crates/viewer/re_space_view_dataframe/src/space_view_class.rs b/crates/viewer/re_space_view_dataframe/src/space_view_class.rs index 5fc62bdafa7d..d07a1bc4a6bb 100644 --- a/crates/viewer/re_space_view_dataframe/src/space_view_class.rs +++ b/crates/viewer/re_space_view_dataframe/src/space_view_class.rs @@ -165,7 +165,8 @@ mode sets the default time range to _everything_. You can override this in the s archetype_field_name: None, component_name: pov_component, // this is actually ignored: - datatype: re_chunk_store::external::arrow2::datatypes::DataType::Null, + store_datatype: re_chunk_store::external::arrow2::datatypes::DataType::Null, + join_encoding: Default::default(), is_static: false, }, };