From c6d842bb827d0a6363f158f1837e95e109cc9081 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 10 Oct 2024 09:22:48 +0200 Subject: [PATCH] Dataframe v2: support for clear semantics (#7652) Support clear semantics in the dataframe API. Tombstones are never visible to end-users, only their effect. Like every other Dataframe v2 feature PR, and following recommendations from @jleibs, this prioritizes convenience of implementation over everything else, for now. All clear chunks are fetched, post-processed, and re-injected into the view contents during init(), and then the streaming join runs as usual after that. Static clear semantics can get pretty unhinged, but that's A) not specific to the dataframe API and B) so extremely niche that our time is better spent on real-world problems right now: - #7650 - #7631 --- - Fixes https://github.com/rerun-io/rerun/issues/7495 - Fixes https://github.com/rerun-io/rerun/issues/7414 - Fixes https://github.com/rerun-io/rerun/issues/7468 - Fixes https://github.com/rerun-io/rerun/issues/7493 - DNM: requires #7649 --- crates/store/re_chunk_store/src/dataframe.rs | 16 +- crates/store/re_dataframe/src/query.rs | 700 ++++++++++++++----- 2 files changed, 551 insertions(+), 165 deletions(-) diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index 4d9161093885..42c9d23c7eac 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -6,10 +6,10 @@ use arrow2::{ array::ListArray as ArrowListArray, datatypes::{DataType as ArrowDatatype, Field as ArrowField}, }; +use nohash_hasher::IntSet; use re_chunk::TimelineName; -use re_log_types::ResolvedTimeRange; -use re_log_types::{EntityPath, TimeInt, Timeline}; +use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, Timeline}; use re_types_core::{ArchetypeName, ComponentName}; use crate::ChunkStore; @@ -765,12 +765,22 @@ impl ChunkStore { }) }); + use re_types_core::Archetype as _; + let clear_related_components: IntSet = + re_types_core::archetypes::Clear::all_components() + .iter() + .copied() + .collect(); + let components = static_components .chain(temporal_components) .filter(|col| match col { ColumnDescriptor::Time(_) => true, ColumnDescriptor::Component(descr) => { - !descr.component_name.is_indicator_component() + let is_indicator = descr.component_name.is_indicator_component(); + // Tombstones are not exposed to end users -- only their _effect_. + let is_tombstone = clear_related_components.contains(&descr.component_name); + !is_indicator && !is_tombstone } }) .collect::>(); diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 44b7a840e30d..1b310539443f 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -5,7 +5,10 @@ use std::sync::{ use ahash::HashSet; use arrow2::{ - array::{Array as ArrowArray, PrimitiveArray as ArrowPrimitiveArray}, + array::{ + Array as ArrowArray, BooleanArray as ArrowBooleanArray, + PrimitiveArray as ArrowPrimitiveArray, + }, chunk::Chunk as ArrowChunk, datatypes::Schema as ArrowSchema, Either, @@ -13,14 +16,17 @@ use arrow2::{ use itertools::Itertools; use parking_lot::Mutex; -use nohash_hasher::IntMap; -use re_chunk::{Chunk, RangeQuery, RowId, TimeInt, Timeline, UnitChunkShared}; +use nohash_hasher::{IntMap, IntSet}; +use re_chunk::{ + Chunk, ComponentName, EntityPath, RangeQuery, RowId, TimeInt, Timeline, UnitChunkShared, +}; use re_chunk_store::{ ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector, IndexValue, JoinEncoding, QueryExpression, SparseFillStrategy, TimeColumnDescriptor, TimeColumnSelector, }; use re_log_types::ResolvedTimeRange; +use re_types_core::components::ClearIsRecursive; use crate::{QueryEngine, RecordBatch}; @@ -34,10 +40,10 @@ use crate::{QueryEngine, RecordBatch}; // * [x] pov support // * [x] latestat sparse-filling // * [x] sampling support -// * [ ] clears +// * [x] clears +// * [ ] pagination (fast) // * [ ] overlaps (less dumb) // * [ ] selector-based `filtered_index` -// * [ ] pagination (fast) // * [ ] configurable cache bypass // * [ ] allocate null arrays once // * [ ] take kernel duplicates all memory @@ -157,91 +163,7 @@ impl QueryHandle<'_> { // still appear in the results. let selected_contents: Vec<(_, _)> = if let Some(selection) = self.query.selection.as_ref() { - selection - .iter() - .map(|column| { - match column { - ColumnSelector::Time(selected_column) => { - let TimeColumnSelector { - timeline: selected_timeline, - } = selected_column; - - view_contents - .iter() - .enumerate() - .filter_map(|(idx, view_column)| match view_column { - ColumnDescriptor::Time(view_descr) => Some((idx, view_descr)), - ColumnDescriptor::Component(_) => None, - }) - .find(|(_idx, view_descr)| { - *view_descr.timeline.name() == *selected_timeline - }) - .map_or_else( - || { - ( - usize::MAX, - ColumnDescriptor::Time(TimeColumnDescriptor { - // TODO(cmc): I picked a sequence here because I have to pick something. - // It doesn't matter, only the name will remain in the Arrow schema anyhow. - timeline: Timeline::new_sequence( - *selected_timeline, - ), - datatype: arrow2::datatypes::DataType::Null, - }), - ) - }, - |(idx, view_descr)| { - (idx, ColumnDescriptor::Time(view_descr.clone())) - }, - ) - } - - ColumnSelector::Component(selected_column) => { - let ComponentColumnSelector { - entity_path: selected_entity_path, - component: selected_component_name, - join_encoding: _, - } = selected_column; - - view_contents - .iter() - .enumerate() - .filter_map(|(idx, view_column)| match view_column { - ColumnDescriptor::Component(view_descr) => { - Some((idx, view_descr)) - } - ColumnDescriptor::Time(_) => None, - }) - .find(|(_idx, view_descr)| { - view_descr.entity_path == *selected_entity_path - && view_descr.component_name == *selected_component_name - }) - .map_or_else( - || { - ( - usize::MAX, - ColumnDescriptor::Component( - ComponentColumnDescriptor { - entity_path: selected_entity_path.clone(), - archetype_name: None, - archetype_field_name: None, - component_name: *selected_component_name, - store_datatype: - arrow2::datatypes::DataType::Null, - join_encoding: JoinEncoding::default(), - is_static: false, - }, - ), - ) - }, - |(idx, view_descr)| { - (idx, ColumnDescriptor::Component(view_descr.clone())) - }, - ) - } - } - }) - .collect_vec() + self.compute_user_selection(&view_contents, selection) } else { view_contents.clone().into_iter().enumerate().collect() }; @@ -264,12 +186,7 @@ impl QueryHandle<'_> { .map(|values| values.iter().rev().copied().collect_vec()); // 4. Perform the query and keep track of all the relevant chunks. - let mut view_pov_chunks_idx = self - .query - .filtered_point_of_view - .as_ref() - .map(|_| usize::MAX); - let view_chunks = { + let query = { let index_range = if let Some(using_index_values_stack) = using_index_values_stack.as_ref() { using_index_values_stack @@ -284,74 +201,70 @@ impl QueryHandle<'_> { .unwrap_or(ResolvedTimeRange::EVERYTHING) }; - let query = RangeQuery::new(self.query.filtered_index, index_range) + RangeQuery::new(self.query.filtered_index, index_range) .keep_extra_timelines(true) // we want all the timelines we can get! - .keep_extra_components(false); - - view_contents - .iter() - .enumerate() - .map(|(idx, selected_column)| match selected_column { - ColumnDescriptor::Time(_) => Vec::new(), - - ColumnDescriptor::Component(column) => { - // NOTE: Keep in mind that the range APIs natively make sure that we will - // either get a bunch of relevant _static_ chunks, or a bunch of relevant - // _temporal_ chunks, but never both. - // - // TODO(cmc): Going through the cache is very useful in a Viewer context, but - // not so much in an SDK context. Make it configurable. - let results = self.engine.cache.range( - self.engine.store, - &query, - &column.entity_path, - [column.component_name], - ); + .keep_extra_components(false) + }; + let (view_pov_chunks_idx, mut view_chunks) = self.fetch_view_chunks(&query, &view_contents); - debug_assert!( - results.components.len() <= 1, - "cannot possibly get more than one component with this query" - ); + // 5. Collect all relevant clear chunks and update the view accordingly. + // + // We'll turn the clears into actual empty arrays of the expected component type. + { + re_tracing::profile_scope!("clear_chunks"); - let chunks = results - .components - .into_iter() - .next() - .map(|(_component_name, chunks)| { - chunks - .into_iter() - .map(|chunk| { - // NOTE: Keep in mind that the range APIs would have already taken care - // of A) sorting the chunk on the `filtered_index` (and row-id) and - // B) densifying it according to the current `component_name`. - // Both of these are mandatory requirements for the deduplication logic to - // do what we want: keep the latest known value for `component_name` at all - // remaining unique index values all while taking row-id ordering semantics - // into account. - debug_assert!( - chunk.is_sorted(), - "the query cache should have already taken care of sorting (and densifying!) the chunk", - ); - - let chunk = chunk.deduped_latest_on_index(&self.query.filtered_index); - - (AtomicU64::default(), chunk) - }) - .collect_vec() - }) - .unwrap_or_default(); + let clear_chunks = self.fetch_clear_chunks(&query, &view_contents); + for (view_idx, chunks) in view_chunks.iter_mut().enumerate() { + let Some(ColumnDescriptor::Component(descr)) = view_contents.get(view_idx) else { + continue; + }; - if let Some(pov) = self.query.filtered_point_of_view.as_ref() { - if pov.entity_path == column.entity_path && pov.component == column.component_name { - view_pov_chunks_idx = Some(idx); - } + // NOTE: It would be tempting to concatenate all these individual clear chunks into one + // single big chunk, but that'd be a mistake: 1) it's costly to do so but more + // importantly 2) that would lead to likely very large chunk overlap, which is very bad + // for business. + if let Some(clear_chunks) = clear_chunks.get(&descr.entity_path) { + chunks.extend(clear_chunks.iter().map(|chunk| { + let child_datatype = match &descr.store_datatype { + arrow2::datatypes::DataType::List(field) + | arrow2::datatypes::DataType::LargeList(field) => { + field.data_type().clone() + } + arrow2::datatypes::DataType::Dictionary(_, datatype, _) => { + (**datatype).clone() } + datatype => datatype.clone(), + }; - chunks - }, - }) - .collect() - }; + let mut chunk = chunk.clone(); + // Only way this could fail is if the number of rows did not match. + #[allow(clippy::unwrap_used)] + chunk + .add_component( + descr.component_name, + re_chunk::util::new_list_array_of_empties( + child_datatype, + chunk.num_rows(), + ), + ) + .unwrap(); + + (AtomicU64::new(0), chunk) + })); + + // The chunks were sorted that way before, and it needs to stay that way after. + chunks.sort_by_key(|(_cursor, chunk)| { + // NOTE: The chunk has been densified already: its global time range is the same as + // the time range for the specific component of interest. + chunk + .timelines() + .get(&self.query.filtered_index) + .map(|time_column| time_column.time_range()) + .map_or(TimeInt::STATIC, |time_range| time_range.min()) + }); + } + } + } QueryHandleState { view_contents, @@ -364,6 +277,280 @@ impl QueryHandle<'_> { } } + #[allow(clippy::unused_self)] + fn compute_user_selection( + &self, + view_contents: &[ColumnDescriptor], + selection: &[ColumnSelector], + ) -> Vec<(usize, ColumnDescriptor)> { + selection + .iter() + .map(|column| { + match column { + ColumnSelector::Time(selected_column) => { + let TimeColumnSelector { + timeline: selected_timeline, + } = selected_column; + + view_contents + .iter() + .enumerate() + .filter_map(|(idx, view_column)| match view_column { + ColumnDescriptor::Time(view_descr) => Some((idx, view_descr)), + ColumnDescriptor::Component(_) => None, + }) + .find(|(_idx, view_descr)| { + *view_descr.timeline.name() == *selected_timeline + }) + .map_or_else( + || { + ( + usize::MAX, + ColumnDescriptor::Time(TimeColumnDescriptor { + // TODO(cmc): I picked a sequence here because I have to pick something. + // It doesn't matter, only the name will remain in the Arrow schema anyhow. + timeline: Timeline::new_sequence(*selected_timeline), + datatype: arrow2::datatypes::DataType::Null, + }), + ) + }, + |(idx, view_descr)| { + (idx, ColumnDescriptor::Time(view_descr.clone())) + }, + ) + } + + ColumnSelector::Component(selected_column) => { + let ComponentColumnSelector { + entity_path: selected_entity_path, + component: selected_component_name, + join_encoding: _, + } = selected_column; + + view_contents + .iter() + .enumerate() + .filter_map(|(idx, view_column)| match view_column { + ColumnDescriptor::Component(view_descr) => Some((idx, view_descr)), + ColumnDescriptor::Time(_) => None, + }) + .find(|(_idx, view_descr)| { + view_descr.entity_path == *selected_entity_path + && view_descr.component_name == *selected_component_name + }) + .map_or_else( + || { + ( + usize::MAX, + ColumnDescriptor::Component(ComponentColumnDescriptor { + entity_path: selected_entity_path.clone(), + archetype_name: None, + archetype_field_name: None, + component_name: *selected_component_name, + store_datatype: arrow2::datatypes::DataType::Null, + join_encoding: JoinEncoding::default(), + is_static: false, + }), + ) + }, + |(idx, view_descr)| { + (idx, ColumnDescriptor::Component(view_descr.clone())) + }, + ) + } + } + }) + .collect_vec() + } + + fn fetch_view_chunks( + &self, + query: &RangeQuery, + view_contents: &[ColumnDescriptor], + ) -> (Option, Vec>) { + let mut view_pov_chunks_idx = self + .query + .filtered_point_of_view + .as_ref() + .map(|_| usize::MAX); + + let view_chunks = view_contents + .iter() + .enumerate() + .map(|(idx, selected_column)| match selected_column { + ColumnDescriptor::Time(_) => Vec::new(), + + ColumnDescriptor::Component(column) => { + let chunks = self + .fetch_chunks(query, &column.entity_path, [column.component_name]) + .unwrap_or_default(); + + if let Some(pov) = self.query.filtered_point_of_view.as_ref() { + if pov.entity_path == column.entity_path + && pov.component == column.component_name + { + view_pov_chunks_idx = Some(idx); + } + } + + chunks + } + }) + .collect(); + + (view_pov_chunks_idx, view_chunks) + } + + /// Returns all potentially relevant clear [`Chunk`]s for each unique entity path in the view contents. + /// + /// These chunks take recursive clear semantics into account and are guaranteed to be properly densified. + /// The component data is stripped out, only the indices are left. + fn fetch_clear_chunks( + &self, + query: &RangeQuery, + view_contents: &[ColumnDescriptor], + ) -> IntMap> { + /// Returns all the ancestors of an [`EntityPath`]. + /// + /// Doesn't return `entity_path` itself. + fn entity_path_ancestors(entity_path: &EntityPath) -> impl Iterator { + std::iter::from_fn({ + let mut entity_path = entity_path.parent(); + move || { + let yielded = entity_path.clone()?; + entity_path = yielded.parent(); + Some(yielded) + } + }) + } + + /// Given a [`Chunk`] containing a [`ClearIsRecursive`] column, returns a filtered version + /// of that chunk where only rows with `ClearIsRecursive=true` are left. + /// + /// Returns `None` if the chunk either doesn't contain a `ClearIsRecursive` column or if + /// the end result is an empty chunk. + fn chunk_filter_recursive_only(chunk: &Chunk) -> Option { + let list_array = chunk.components().get(&ClearIsRecursive::name())?; + + let values = list_array + .values() + .as_any() + .downcast_ref::()?; + + let indices = ArrowPrimitiveArray::from_vec( + values + .iter() + .enumerate() + .filter_map(|(index, is_recursive)| { + (is_recursive == Some(true)).then_some(index as i32) + }) + .collect_vec(), + ); + + let chunk = chunk.taken(&indices); + + (!chunk.is_empty()).then_some(chunk) + } + + use re_types_core::Loggable as _; + let component_names = [re_types_core::components::ClearIsRecursive::name()]; + + // All unique entity paths present in the view contents. + let entity_paths: IntSet = view_contents + .iter() + .filter_map(|col| match col { + ColumnDescriptor::Component(descr) => Some(descr.entity_path.clone()), + ColumnDescriptor::Time(_) => None, + }) + .collect(); + + entity_paths + .iter() + .filter_map(|entity_path| { + // For the entity itself, any chunk that contains clear data is relevant, recursive or not. + // Just fetch everything we find. + let flat_chunks = self + .fetch_chunks(query, entity_path, component_names) + .map(|chunks| { + chunks + .into_iter() + .map(|(_cursor, chunk)| chunk) + .collect_vec() + }) + .unwrap_or_default(); + + let recursive_chunks = + entity_path_ancestors(entity_path).flat_map(|ancestor_path| { + self.fetch_chunks(query, &ancestor_path, component_names) + .into_iter() // option + .flat_map(|chunks| chunks.into_iter().map(|(_cursor, chunk)| chunk)) + // NOTE: Ancestors' chunks are only relevant for the rows where `ClearIsRecursive=true`. + .filter_map(|chunk| chunk_filter_recursive_only(&chunk)) + }); + + let chunks = flat_chunks + .into_iter() + .chain(recursive_chunks) + // The component data is irrelevant. + // We do not expose the actual tombstones to end-users, only their _effect_. + .map(|chunk| chunk.components_removed()) + .collect_vec(); + + (!chunks.is_empty()).then(|| (entity_path.clone(), chunks)) + }) + .collect() + } + + fn fetch_chunks( + &self, + query: &RangeQuery, + entity_path: &EntityPath, + component_names: [ComponentName; N], + ) -> Option> { + // NOTE: Keep in mind that the range APIs natively make sure that we will + // either get a bunch of relevant _static_ chunks, or a bunch of relevant + // _temporal_ chunks, but never both. + // + // TODO(cmc): Going through the cache is very useful in a Viewer context, but + // not so much in an SDK context. Make it configurable. + let results = + self.engine + .cache + .range(self.engine.store, query, entity_path, component_names); + + debug_assert!( + results.components.len() <= 1, + "cannot possibly get more than one component with this query" + ); + + results + .components + .into_iter() + .next() + .map(|(_component_name, chunks)| { + chunks + .into_iter() + .map(|chunk| { + // NOTE: Keep in mind that the range APIs would have already taken care + // of A) sorting the chunk on the `filtered_index` (and row-id) and + // B) densifying it according to the current `component_name`. + // Both of these are mandatory requirements for the deduplication logic to + // do what we want: keep the latest known value for `component_name` at all + // remaining unique index values all while taking row-id ordering semantics + // into account. + debug_assert!( + chunk.is_sorted(), + "the query cache should have already taken care of sorting (and densifying!) the chunk", + ); + + let chunk = chunk.deduped_latest_on_index(&self.query.filtered_index); + + (AtomicU64::default(), chunk) + }) + .collect_vec() + }) + } + /// The query used to instantiate this handle. pub fn query(&self) -> &QueryExpression { &self.query @@ -918,6 +1105,7 @@ mod tests { example_components::{MyColor, MyLabel, MyPoint}, EntityPath, Timeline, }; + use re_types::components::ClearIsRecursive; use re_types_core::Loggable as _; use crate::QueryCache; @@ -942,9 +1130,10 @@ mod tests { // * [x] using_index_values // // In addition to those, some much needed extras: + // * [x] num_rows + // * [x] clears // * [ ] timelines returned with selection=none - // * [ ] clears - // * [ ] num_rows + // * [ ] pagination // TODO(cmc): At some point I'd like to stress multi-entity queries too, but that feels less // urgent considering how things are implemented (each entity lives in its own index, so it's @@ -1694,15 +1883,106 @@ mod tests { Ok(()) } + #[test] + fn clears() -> anyhow::Result<()> { + re_log::setup_logging(); + + let mut store = create_nasty_store()?; + extend_nasty_store_with_clears(&mut store)?; + eprintln!("{store}"); + + let query_cache = QueryCache::new(&store); + let query_engine = QueryEngine { + store: &store, + cache: &query_cache, + }; + + let timeline = Timeline::new_sequence("frame_nr"); + let entity_path = EntityPath::from("this/that"); + + // barebones + { + let mut query = QueryExpression::new(timeline); + query.view_contents = Some([(entity_path.clone(), None)].into_iter().collect()); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + query_engine.query(query.clone()).into_iter().count() as u64, + query_handle.num_rows() + ); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!("{:#?}", dataframe.data.iter().collect_vec()); + let expected = unindent::unindent( + "\ + [ + Int64[None, 10, 20, 30, 40, 50, 60, 65, 70], + Timestamp(Nanosecond, None)[None, 1970-01-01 00:00:00.000000010, None, None, None, 1970-01-01 00:00:00.000000050, 1970-01-01 00:00:00.000000060, 1970-01-01 00:00:00.000000065, 1970-01-01 00:00:00.000000070], + ListArray[[], None, None, [2], [3], [4], [], [], [6]], + ListArray[[], None, None, None, None, None, [], [], None], + ListArray[[], [{x: 0, y: 0}], [{x: 1, y: 1}], [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [], [], [{x: 8, y: 8}]], + ]\ + " + ); + + similar_asserts::assert_eq!(expected, got); + } + + // sparse-filled + { + let mut query = QueryExpression::new(timeline); + query.view_contents = Some([(entity_path.clone(), None)].into_iter().collect()); + query.sparse_fill_strategy = SparseFillStrategy::LatestAtGlobal; + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + query_engine.query(query.clone()).into_iter().count() as u64, + query_handle.num_rows() + ); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + // TODO(#7650): Those null values for `MyColor` on 10 and 20 look completely insane, but then again + // static clear semantics in general are pretty unhinged right now, especially when + // ranges are involved. + // It's extremely niche, our time is better spent somewhere else right now. + let got = format!("{:#?}", dataframe.data.iter().collect_vec()); + let expected = unindent::unindent( + "\ + [ + Int64[None, 10, 20, 30, 40, 50, 60, 65, 70], + Timestamp(Nanosecond, None)[None, 1970-01-01 00:00:00.000000010, None, None, None, 1970-01-01 00:00:00.000000050, 1970-01-01 00:00:00.000000060, 1970-01-01 00:00:00.000000065, 1970-01-01 00:00:00.000000070], + ListArray[[], None, None, [2], [3], [4], [], [], [6]], + ListArray[[], [c], [c], [c], [c], [c], [], [], [c]], + ListArray[[], [{x: 0, y: 0}], [{x: 1, y: 1}], [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [], [], [{x: 8, y: 8}]], + ]\ + " + ); + + similar_asserts::assert_eq!(expected, got); + } + + Ok(()) + } + /// Returns a very nasty [`ChunkStore`] with all kinds of partial updates, chunk overlaps, - /// repeated timestamps, duplicated chunks, partial multi-timelines, etc. + /// repeated timestamps, duplicated chunks, partial multi-timelines, flat and recursive clears, etc. fn create_nasty_store() -> anyhow::Result { let mut store = ChunkStore::new( re_log_types::StoreId::random(re_log_types::StoreKind::Recording), ChunkStoreConfig::COMPACTION_DISABLED, ); - let entity_path = EntityPath::from("this/that"); + let entity_path = EntityPath::from("/this/that"); let frame1 = TimeInt::new_temporal(10); let frame2 = TimeInt::new_temporal(20); @@ -1889,6 +2169,102 @@ mod tests { Ok(store) } + fn extend_nasty_store_with_clears(store: &mut ChunkStore) -> anyhow::Result<()> { + let entity_path = EntityPath::from("/this/that"); + let entity_path_parent = EntityPath::from("/this"); + let entity_path_root = EntityPath::from("/"); + + let frame35 = TimeInt::new_temporal(35); + let frame55 = TimeInt::new_temporal(55); + let frame60 = TimeInt::new_temporal(60); + let frame65 = TimeInt::new_temporal(65); + + let clear_flat = ClearIsRecursive(false.into()); + let clear_recursive = ClearIsRecursive(true.into()); + + let row_id1_1 = RowId::new(); + let chunk1 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id1_1, + TimePoint::default(), + [(ClearIsRecursive::name(), Some(&clear_flat as _))], + ) + .build()?; + + let chunk1 = Arc::new(chunk1); + store.insert_chunk(&chunk1)?; + + // NOTE: This tombstone will never have any visible effect. + // + // Tombstones still obey the same rules as other all other data, specifically: if a component + // has been statically logged for an entity, it shadows any temporal data for that same + // component on that same entity. + // + // In this specific case, `this/that` already has been logged a static clear, so further temporal + // clears will be ignored. + // + // It's pretty weird, but then again static clear semantics in general are very weird. + let row_id2_1 = RowId::new(); + let chunk2 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id2_1, + [build_frame_nr(frame35), build_log_time(frame35.into())], + [(ClearIsRecursive::name(), Some(&clear_recursive as _))], + ) + .build()?; + + let chunk2 = Arc::new(chunk2); + store.insert_chunk(&chunk2)?; + + let row_id3_1 = RowId::new(); + let chunk3 = Chunk::builder(entity_path_root.clone()) + .with_sparse_component_batches( + row_id3_1, + [build_frame_nr(frame55), build_log_time(frame55.into())], + [(ClearIsRecursive::name(), Some(&clear_flat as _))], + ) + .with_sparse_component_batches( + row_id3_1, + [build_frame_nr(frame60), build_log_time(frame60.into())], + [(ClearIsRecursive::name(), Some(&clear_recursive as _))], + ) + .with_sparse_component_batches( + row_id3_1, + [build_frame_nr(frame65), build_log_time(frame65.into())], + [(ClearIsRecursive::name(), Some(&clear_flat as _))], + ) + .build()?; + + let chunk3 = Arc::new(chunk3); + store.insert_chunk(&chunk3)?; + + let row_id4_1 = RowId::new(); + let chunk4 = Chunk::builder(entity_path_parent.clone()) + .with_sparse_component_batches( + row_id4_1, + [build_frame_nr(frame60), build_log_time(frame60.into())], + [(ClearIsRecursive::name(), Some(&clear_flat as _))], + ) + .build()?; + + let chunk4 = Arc::new(chunk4); + store.insert_chunk(&chunk4)?; + + let row_id5_1 = RowId::new(); + let chunk5 = Chunk::builder(entity_path_parent.clone()) + .with_sparse_component_batches( + row_id5_1, + [build_frame_nr(frame65), build_log_time(frame65.into())], + [(ClearIsRecursive::name(), Some(&clear_recursive as _))], + ) + .build()?; + + let chunk5 = Arc::new(chunk5); + store.insert_chunk(&chunk5)?; + + Ok(()) + } + fn concatenate_record_batches(schema: ArrowSchema, batches: &[RecordBatch]) -> RecordBatch { assert!(batches.iter().map(|batch| &batch.schema).all_equal());