diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index 4d9161093885..42c9d23c7eac 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -6,10 +6,10 @@ use arrow2::{ array::ListArray as ArrowListArray, datatypes::{DataType as ArrowDatatype, Field as ArrowField}, }; +use nohash_hasher::IntSet; use re_chunk::TimelineName; -use re_log_types::ResolvedTimeRange; -use re_log_types::{EntityPath, TimeInt, Timeline}; +use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, Timeline}; use re_types_core::{ArchetypeName, ComponentName}; use crate::ChunkStore; @@ -765,12 +765,22 @@ impl ChunkStore { }) }); + use re_types_core::Archetype as _; + let clear_related_components: IntSet = + re_types_core::archetypes::Clear::all_components() + .iter() + .copied() + .collect(); + let components = static_components .chain(temporal_components) .filter(|col| match col { ColumnDescriptor::Time(_) => true, ColumnDescriptor::Component(descr) => { - !descr.component_name.is_indicator_component() + let is_indicator = descr.component_name.is_indicator_component(); + // Tombstones are not exposed to end users -- only their _effect_. + let is_tombstone = clear_related_components.contains(&descr.component_name); + !is_indicator && !is_tombstone } }) .collect::>(); diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 44b7a840e30d..1b310539443f 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -5,7 +5,10 @@ use std::sync::{ use ahash::HashSet; use arrow2::{ - array::{Array as ArrowArray, PrimitiveArray as ArrowPrimitiveArray}, + array::{ + Array as ArrowArray, BooleanArray as ArrowBooleanArray, + PrimitiveArray as ArrowPrimitiveArray, + }, chunk::Chunk as ArrowChunk, datatypes::Schema as ArrowSchema, Either, @@ -13,14 +16,17 @@ use arrow2::{ use itertools::Itertools; use parking_lot::Mutex; -use nohash_hasher::IntMap; -use re_chunk::{Chunk, RangeQuery, RowId, TimeInt, Timeline, UnitChunkShared}; +use nohash_hasher::{IntMap, IntSet}; +use re_chunk::{ + Chunk, ComponentName, EntityPath, RangeQuery, RowId, TimeInt, Timeline, UnitChunkShared, +}; use re_chunk_store::{ ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector, IndexValue, JoinEncoding, QueryExpression, SparseFillStrategy, TimeColumnDescriptor, TimeColumnSelector, }; use re_log_types::ResolvedTimeRange; +use re_types_core::components::ClearIsRecursive; use crate::{QueryEngine, RecordBatch}; @@ -34,10 +40,10 @@ use crate::{QueryEngine, RecordBatch}; // * [x] pov support // * [x] latestat sparse-filling // * [x] sampling support -// * [ ] clears +// * [x] clears +// * [ ] pagination (fast) // * [ ] overlaps (less dumb) // * [ ] selector-based `filtered_index` -// * [ ] pagination (fast) // * [ ] configurable cache bypass // * [ ] allocate null arrays once // * [ ] take kernel duplicates all memory @@ -157,91 +163,7 @@ impl QueryHandle<'_> { // still appear in the results. let selected_contents: Vec<(_, _)> = if let Some(selection) = self.query.selection.as_ref() { - selection - .iter() - .map(|column| { - match column { - ColumnSelector::Time(selected_column) => { - let TimeColumnSelector { - timeline: selected_timeline, - } = selected_column; - - view_contents - .iter() - .enumerate() - .filter_map(|(idx, view_column)| match view_column { - ColumnDescriptor::Time(view_descr) => Some((idx, view_descr)), - ColumnDescriptor::Component(_) => None, - }) - .find(|(_idx, view_descr)| { - *view_descr.timeline.name() == *selected_timeline - }) - .map_or_else( - || { - ( - usize::MAX, - ColumnDescriptor::Time(TimeColumnDescriptor { - // TODO(cmc): I picked a sequence here because I have to pick something. - // It doesn't matter, only the name will remain in the Arrow schema anyhow. - timeline: Timeline::new_sequence( - *selected_timeline, - ), - datatype: arrow2::datatypes::DataType::Null, - }), - ) - }, - |(idx, view_descr)| { - (idx, ColumnDescriptor::Time(view_descr.clone())) - }, - ) - } - - ColumnSelector::Component(selected_column) => { - let ComponentColumnSelector { - entity_path: selected_entity_path, - component: selected_component_name, - join_encoding: _, - } = selected_column; - - view_contents - .iter() - .enumerate() - .filter_map(|(idx, view_column)| match view_column { - ColumnDescriptor::Component(view_descr) => { - Some((idx, view_descr)) - } - ColumnDescriptor::Time(_) => None, - }) - .find(|(_idx, view_descr)| { - view_descr.entity_path == *selected_entity_path - && view_descr.component_name == *selected_component_name - }) - .map_or_else( - || { - ( - usize::MAX, - ColumnDescriptor::Component( - ComponentColumnDescriptor { - entity_path: selected_entity_path.clone(), - archetype_name: None, - archetype_field_name: None, - component_name: *selected_component_name, - store_datatype: - arrow2::datatypes::DataType::Null, - join_encoding: JoinEncoding::default(), - is_static: false, - }, - ), - ) - }, - |(idx, view_descr)| { - (idx, ColumnDescriptor::Component(view_descr.clone())) - }, - ) - } - } - }) - .collect_vec() + self.compute_user_selection(&view_contents, selection) } else { view_contents.clone().into_iter().enumerate().collect() }; @@ -264,12 +186,7 @@ impl QueryHandle<'_> { .map(|values| values.iter().rev().copied().collect_vec()); // 4. Perform the query and keep track of all the relevant chunks. - let mut view_pov_chunks_idx = self - .query - .filtered_point_of_view - .as_ref() - .map(|_| usize::MAX); - let view_chunks = { + let query = { let index_range = if let Some(using_index_values_stack) = using_index_values_stack.as_ref() { using_index_values_stack @@ -284,74 +201,70 @@ impl QueryHandle<'_> { .unwrap_or(ResolvedTimeRange::EVERYTHING) }; - let query = RangeQuery::new(self.query.filtered_index, index_range) + RangeQuery::new(self.query.filtered_index, index_range) .keep_extra_timelines(true) // we want all the timelines we can get! - .keep_extra_components(false); - - view_contents - .iter() - .enumerate() - .map(|(idx, selected_column)| match selected_column { - ColumnDescriptor::Time(_) => Vec::new(), - - ColumnDescriptor::Component(column) => { - // NOTE: Keep in mind that the range APIs natively make sure that we will - // either get a bunch of relevant _static_ chunks, or a bunch of relevant - // _temporal_ chunks, but never both. - // - // TODO(cmc): Going through the cache is very useful in a Viewer context, but - // not so much in an SDK context. Make it configurable. - let results = self.engine.cache.range( - self.engine.store, - &query, - &column.entity_path, - [column.component_name], - ); + .keep_extra_components(false) + }; + let (view_pov_chunks_idx, mut view_chunks) = self.fetch_view_chunks(&query, &view_contents); - debug_assert!( - results.components.len() <= 1, - "cannot possibly get more than one component with this query" - ); + // 5. Collect all relevant clear chunks and update the view accordingly. + // + // We'll turn the clears into actual empty arrays of the expected component type. + { + re_tracing::profile_scope!("clear_chunks"); - let chunks = results - .components - .into_iter() - .next() - .map(|(_component_name, chunks)| { - chunks - .into_iter() - .map(|chunk| { - // NOTE: Keep in mind that the range APIs would have already taken care - // of A) sorting the chunk on the `filtered_index` (and row-id) and - // B) densifying it according to the current `component_name`. - // Both of these are mandatory requirements for the deduplication logic to - // do what we want: keep the latest known value for `component_name` at all - // remaining unique index values all while taking row-id ordering semantics - // into account. - debug_assert!( - chunk.is_sorted(), - "the query cache should have already taken care of sorting (and densifying!) the chunk", - ); - - let chunk = chunk.deduped_latest_on_index(&self.query.filtered_index); - - (AtomicU64::default(), chunk) - }) - .collect_vec() - }) - .unwrap_or_default(); + let clear_chunks = self.fetch_clear_chunks(&query, &view_contents); + for (view_idx, chunks) in view_chunks.iter_mut().enumerate() { + let Some(ColumnDescriptor::Component(descr)) = view_contents.get(view_idx) else { + continue; + }; - if let Some(pov) = self.query.filtered_point_of_view.as_ref() { - if pov.entity_path == column.entity_path && pov.component == column.component_name { - view_pov_chunks_idx = Some(idx); - } + // NOTE: It would be tempting to concatenate all these individual clear chunks into one + // single big chunk, but that'd be a mistake: 1) it's costly to do so but more + // importantly 2) that would lead to likely very large chunk overlap, which is very bad + // for business. + if let Some(clear_chunks) = clear_chunks.get(&descr.entity_path) { + chunks.extend(clear_chunks.iter().map(|chunk| { + let child_datatype = match &descr.store_datatype { + arrow2::datatypes::DataType::List(field) + | arrow2::datatypes::DataType::LargeList(field) => { + field.data_type().clone() + } + arrow2::datatypes::DataType::Dictionary(_, datatype, _) => { + (**datatype).clone() } + datatype => datatype.clone(), + }; - chunks - }, - }) - .collect() - }; + let mut chunk = chunk.clone(); + // Only way this could fail is if the number of rows did not match. + #[allow(clippy::unwrap_used)] + chunk + .add_component( + descr.component_name, + re_chunk::util::new_list_array_of_empties( + child_datatype, + chunk.num_rows(), + ), + ) + .unwrap(); + + (AtomicU64::new(0), chunk) + })); + + // The chunks were sorted that way before, and it needs to stay that way after. + chunks.sort_by_key(|(_cursor, chunk)| { + // NOTE: The chunk has been densified already: its global time range is the same as + // the time range for the specific component of interest. + chunk + .timelines() + .get(&self.query.filtered_index) + .map(|time_column| time_column.time_range()) + .map_or(TimeInt::STATIC, |time_range| time_range.min()) + }); + } + } + } QueryHandleState { view_contents, @@ -364,6 +277,280 @@ impl QueryHandle<'_> { } } + #[allow(clippy::unused_self)] + fn compute_user_selection( + &self, + view_contents: &[ColumnDescriptor], + selection: &[ColumnSelector], + ) -> Vec<(usize, ColumnDescriptor)> { + selection + .iter() + .map(|column| { + match column { + ColumnSelector::Time(selected_column) => { + let TimeColumnSelector { + timeline: selected_timeline, + } = selected_column; + + view_contents + .iter() + .enumerate() + .filter_map(|(idx, view_column)| match view_column { + ColumnDescriptor::Time(view_descr) => Some((idx, view_descr)), + ColumnDescriptor::Component(_) => None, + }) + .find(|(_idx, view_descr)| { + *view_descr.timeline.name() == *selected_timeline + }) + .map_or_else( + || { + ( + usize::MAX, + ColumnDescriptor::Time(TimeColumnDescriptor { + // TODO(cmc): I picked a sequence here because I have to pick something. + // It doesn't matter, only the name will remain in the Arrow schema anyhow. + timeline: Timeline::new_sequence(*selected_timeline), + datatype: arrow2::datatypes::DataType::Null, + }), + ) + }, + |(idx, view_descr)| { + (idx, ColumnDescriptor::Time(view_descr.clone())) + }, + ) + } + + ColumnSelector::Component(selected_column) => { + let ComponentColumnSelector { + entity_path: selected_entity_path, + component: selected_component_name, + join_encoding: _, + } = selected_column; + + view_contents + .iter() + .enumerate() + .filter_map(|(idx, view_column)| match view_column { + ColumnDescriptor::Component(view_descr) => Some((idx, view_descr)), + ColumnDescriptor::Time(_) => None, + }) + .find(|(_idx, view_descr)| { + view_descr.entity_path == *selected_entity_path + && view_descr.component_name == *selected_component_name + }) + .map_or_else( + || { + ( + usize::MAX, + ColumnDescriptor::Component(ComponentColumnDescriptor { + entity_path: selected_entity_path.clone(), + archetype_name: None, + archetype_field_name: None, + component_name: *selected_component_name, + store_datatype: arrow2::datatypes::DataType::Null, + join_encoding: JoinEncoding::default(), + is_static: false, + }), + ) + }, + |(idx, view_descr)| { + (idx, ColumnDescriptor::Component(view_descr.clone())) + }, + ) + } + } + }) + .collect_vec() + } + + fn fetch_view_chunks( + &self, + query: &RangeQuery, + view_contents: &[ColumnDescriptor], + ) -> (Option, Vec>) { + let mut view_pov_chunks_idx = self + .query + .filtered_point_of_view + .as_ref() + .map(|_| usize::MAX); + + let view_chunks = view_contents + .iter() + .enumerate() + .map(|(idx, selected_column)| match selected_column { + ColumnDescriptor::Time(_) => Vec::new(), + + ColumnDescriptor::Component(column) => { + let chunks = self + .fetch_chunks(query, &column.entity_path, [column.component_name]) + .unwrap_or_default(); + + if let Some(pov) = self.query.filtered_point_of_view.as_ref() { + if pov.entity_path == column.entity_path + && pov.component == column.component_name + { + view_pov_chunks_idx = Some(idx); + } + } + + chunks + } + }) + .collect(); + + (view_pov_chunks_idx, view_chunks) + } + + /// Returns all potentially relevant clear [`Chunk`]s for each unique entity path in the view contents. + /// + /// These chunks take recursive clear semantics into account and are guaranteed to be properly densified. + /// The component data is stripped out, only the indices are left. + fn fetch_clear_chunks( + &self, + query: &RangeQuery, + view_contents: &[ColumnDescriptor], + ) -> IntMap> { + /// Returns all the ancestors of an [`EntityPath`]. + /// + /// Doesn't return `entity_path` itself. + fn entity_path_ancestors(entity_path: &EntityPath) -> impl Iterator { + std::iter::from_fn({ + let mut entity_path = entity_path.parent(); + move || { + let yielded = entity_path.clone()?; + entity_path = yielded.parent(); + Some(yielded) + } + }) + } + + /// Given a [`Chunk`] containing a [`ClearIsRecursive`] column, returns a filtered version + /// of that chunk where only rows with `ClearIsRecursive=true` are left. + /// + /// Returns `None` if the chunk either doesn't contain a `ClearIsRecursive` column or if + /// the end result is an empty chunk. + fn chunk_filter_recursive_only(chunk: &Chunk) -> Option { + let list_array = chunk.components().get(&ClearIsRecursive::name())?; + + let values = list_array + .values() + .as_any() + .downcast_ref::()?; + + let indices = ArrowPrimitiveArray::from_vec( + values + .iter() + .enumerate() + .filter_map(|(index, is_recursive)| { + (is_recursive == Some(true)).then_some(index as i32) + }) + .collect_vec(), + ); + + let chunk = chunk.taken(&indices); + + (!chunk.is_empty()).then_some(chunk) + } + + use re_types_core::Loggable as _; + let component_names = [re_types_core::components::ClearIsRecursive::name()]; + + // All unique entity paths present in the view contents. + let entity_paths: IntSet = view_contents + .iter() + .filter_map(|col| match col { + ColumnDescriptor::Component(descr) => Some(descr.entity_path.clone()), + ColumnDescriptor::Time(_) => None, + }) + .collect(); + + entity_paths + .iter() + .filter_map(|entity_path| { + // For the entity itself, any chunk that contains clear data is relevant, recursive or not. + // Just fetch everything we find. + let flat_chunks = self + .fetch_chunks(query, entity_path, component_names) + .map(|chunks| { + chunks + .into_iter() + .map(|(_cursor, chunk)| chunk) + .collect_vec() + }) + .unwrap_or_default(); + + let recursive_chunks = + entity_path_ancestors(entity_path).flat_map(|ancestor_path| { + self.fetch_chunks(query, &ancestor_path, component_names) + .into_iter() // option + .flat_map(|chunks| chunks.into_iter().map(|(_cursor, chunk)| chunk)) + // NOTE: Ancestors' chunks are only relevant for the rows where `ClearIsRecursive=true`. + .filter_map(|chunk| chunk_filter_recursive_only(&chunk)) + }); + + let chunks = flat_chunks + .into_iter() + .chain(recursive_chunks) + // The component data is irrelevant. + // We do not expose the actual tombstones to end-users, only their _effect_. + .map(|chunk| chunk.components_removed()) + .collect_vec(); + + (!chunks.is_empty()).then(|| (entity_path.clone(), chunks)) + }) + .collect() + } + + fn fetch_chunks( + &self, + query: &RangeQuery, + entity_path: &EntityPath, + component_names: [ComponentName; N], + ) -> Option> { + // NOTE: Keep in mind that the range APIs natively make sure that we will + // either get a bunch of relevant _static_ chunks, or a bunch of relevant + // _temporal_ chunks, but never both. + // + // TODO(cmc): Going through the cache is very useful in a Viewer context, but + // not so much in an SDK context. Make it configurable. + let results = + self.engine + .cache + .range(self.engine.store, query, entity_path, component_names); + + debug_assert!( + results.components.len() <= 1, + "cannot possibly get more than one component with this query" + ); + + results + .components + .into_iter() + .next() + .map(|(_component_name, chunks)| { + chunks + .into_iter() + .map(|chunk| { + // NOTE: Keep in mind that the range APIs would have already taken care + // of A) sorting the chunk on the `filtered_index` (and row-id) and + // B) densifying it according to the current `component_name`. + // Both of these are mandatory requirements for the deduplication logic to + // do what we want: keep the latest known value for `component_name` at all + // remaining unique index values all while taking row-id ordering semantics + // into account. + debug_assert!( + chunk.is_sorted(), + "the query cache should have already taken care of sorting (and densifying!) the chunk", + ); + + let chunk = chunk.deduped_latest_on_index(&self.query.filtered_index); + + (AtomicU64::default(), chunk) + }) + .collect_vec() + }) + } + /// The query used to instantiate this handle. pub fn query(&self) -> &QueryExpression { &self.query @@ -918,6 +1105,7 @@ mod tests { example_components::{MyColor, MyLabel, MyPoint}, EntityPath, Timeline, }; + use re_types::components::ClearIsRecursive; use re_types_core::Loggable as _; use crate::QueryCache; @@ -942,9 +1130,10 @@ mod tests { // * [x] using_index_values // // In addition to those, some much needed extras: + // * [x] num_rows + // * [x] clears // * [ ] timelines returned with selection=none - // * [ ] clears - // * [ ] num_rows + // * [ ] pagination // TODO(cmc): At some point I'd like to stress multi-entity queries too, but that feels less // urgent considering how things are implemented (each entity lives in its own index, so it's @@ -1694,15 +1883,106 @@ mod tests { Ok(()) } + #[test] + fn clears() -> anyhow::Result<()> { + re_log::setup_logging(); + + let mut store = create_nasty_store()?; + extend_nasty_store_with_clears(&mut store)?; + eprintln!("{store}"); + + let query_cache = QueryCache::new(&store); + let query_engine = QueryEngine { + store: &store, + cache: &query_cache, + }; + + let timeline = Timeline::new_sequence("frame_nr"); + let entity_path = EntityPath::from("this/that"); + + // barebones + { + let mut query = QueryExpression::new(timeline); + query.view_contents = Some([(entity_path.clone(), None)].into_iter().collect()); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + query_engine.query(query.clone()).into_iter().count() as u64, + query_handle.num_rows() + ); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + let got = format!("{:#?}", dataframe.data.iter().collect_vec()); + let expected = unindent::unindent( + "\ + [ + Int64[None, 10, 20, 30, 40, 50, 60, 65, 70], + Timestamp(Nanosecond, None)[None, 1970-01-01 00:00:00.000000010, None, None, None, 1970-01-01 00:00:00.000000050, 1970-01-01 00:00:00.000000060, 1970-01-01 00:00:00.000000065, 1970-01-01 00:00:00.000000070], + ListArray[[], None, None, [2], [3], [4], [], [], [6]], + ListArray[[], None, None, None, None, None, [], [], None], + ListArray[[], [{x: 0, y: 0}], [{x: 1, y: 1}], [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [], [], [{x: 8, y: 8}]], + ]\ + " + ); + + similar_asserts::assert_eq!(expected, got); + } + + // sparse-filled + { + let mut query = QueryExpression::new(timeline); + query.view_contents = Some([(entity_path.clone(), None)].into_iter().collect()); + query.sparse_fill_strategy = SparseFillStrategy::LatestAtGlobal; + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + query_engine.query(query.clone()).into_iter().count() as u64, + query_handle.num_rows() + ); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.into_batch_iter().collect_vec(), + ); + eprintln!("{dataframe}"); + + // TODO(#7650): Those null values for `MyColor` on 10 and 20 look completely insane, but then again + // static clear semantics in general are pretty unhinged right now, especially when + // ranges are involved. + // It's extremely niche, our time is better spent somewhere else right now. + let got = format!("{:#?}", dataframe.data.iter().collect_vec()); + let expected = unindent::unindent( + "\ + [ + Int64[None, 10, 20, 30, 40, 50, 60, 65, 70], + Timestamp(Nanosecond, None)[None, 1970-01-01 00:00:00.000000010, None, None, None, 1970-01-01 00:00:00.000000050, 1970-01-01 00:00:00.000000060, 1970-01-01 00:00:00.000000065, 1970-01-01 00:00:00.000000070], + ListArray[[], None, None, [2], [3], [4], [], [], [6]], + ListArray[[], [c], [c], [c], [c], [c], [], [], [c]], + ListArray[[], [{x: 0, y: 0}], [{x: 1, y: 1}], [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [], [], [{x: 8, y: 8}]], + ]\ + " + ); + + similar_asserts::assert_eq!(expected, got); + } + + Ok(()) + } + /// Returns a very nasty [`ChunkStore`] with all kinds of partial updates, chunk overlaps, - /// repeated timestamps, duplicated chunks, partial multi-timelines, etc. + /// repeated timestamps, duplicated chunks, partial multi-timelines, flat and recursive clears, etc. fn create_nasty_store() -> anyhow::Result { let mut store = ChunkStore::new( re_log_types::StoreId::random(re_log_types::StoreKind::Recording), ChunkStoreConfig::COMPACTION_DISABLED, ); - let entity_path = EntityPath::from("this/that"); + let entity_path = EntityPath::from("/this/that"); let frame1 = TimeInt::new_temporal(10); let frame2 = TimeInt::new_temporal(20); @@ -1889,6 +2169,102 @@ mod tests { Ok(store) } + fn extend_nasty_store_with_clears(store: &mut ChunkStore) -> anyhow::Result<()> { + let entity_path = EntityPath::from("/this/that"); + let entity_path_parent = EntityPath::from("/this"); + let entity_path_root = EntityPath::from("/"); + + let frame35 = TimeInt::new_temporal(35); + let frame55 = TimeInt::new_temporal(55); + let frame60 = TimeInt::new_temporal(60); + let frame65 = TimeInt::new_temporal(65); + + let clear_flat = ClearIsRecursive(false.into()); + let clear_recursive = ClearIsRecursive(true.into()); + + let row_id1_1 = RowId::new(); + let chunk1 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id1_1, + TimePoint::default(), + [(ClearIsRecursive::name(), Some(&clear_flat as _))], + ) + .build()?; + + let chunk1 = Arc::new(chunk1); + store.insert_chunk(&chunk1)?; + + // NOTE: This tombstone will never have any visible effect. + // + // Tombstones still obey the same rules as other all other data, specifically: if a component + // has been statically logged for an entity, it shadows any temporal data for that same + // component on that same entity. + // + // In this specific case, `this/that` already has been logged a static clear, so further temporal + // clears will be ignored. + // + // It's pretty weird, but then again static clear semantics in general are very weird. + let row_id2_1 = RowId::new(); + let chunk2 = Chunk::builder(entity_path.clone()) + .with_sparse_component_batches( + row_id2_1, + [build_frame_nr(frame35), build_log_time(frame35.into())], + [(ClearIsRecursive::name(), Some(&clear_recursive as _))], + ) + .build()?; + + let chunk2 = Arc::new(chunk2); + store.insert_chunk(&chunk2)?; + + let row_id3_1 = RowId::new(); + let chunk3 = Chunk::builder(entity_path_root.clone()) + .with_sparse_component_batches( + row_id3_1, + [build_frame_nr(frame55), build_log_time(frame55.into())], + [(ClearIsRecursive::name(), Some(&clear_flat as _))], + ) + .with_sparse_component_batches( + row_id3_1, + [build_frame_nr(frame60), build_log_time(frame60.into())], + [(ClearIsRecursive::name(), Some(&clear_recursive as _))], + ) + .with_sparse_component_batches( + row_id3_1, + [build_frame_nr(frame65), build_log_time(frame65.into())], + [(ClearIsRecursive::name(), Some(&clear_flat as _))], + ) + .build()?; + + let chunk3 = Arc::new(chunk3); + store.insert_chunk(&chunk3)?; + + let row_id4_1 = RowId::new(); + let chunk4 = Chunk::builder(entity_path_parent.clone()) + .with_sparse_component_batches( + row_id4_1, + [build_frame_nr(frame60), build_log_time(frame60.into())], + [(ClearIsRecursive::name(), Some(&clear_flat as _))], + ) + .build()?; + + let chunk4 = Arc::new(chunk4); + store.insert_chunk(&chunk4)?; + + let row_id5_1 = RowId::new(); + let chunk5 = Chunk::builder(entity_path_parent.clone()) + .with_sparse_component_batches( + row_id5_1, + [build_frame_nr(frame65), build_log_time(frame65.into())], + [(ClearIsRecursive::name(), Some(&clear_recursive as _))], + ) + .build()?; + + let chunk5 = Arc::new(chunk5); + store.insert_chunk(&chunk5)?; + + Ok(()) + } + fn concatenate_record_batches(schema: ArrowSchema, batches: &[RecordBatch]) -> RecordBatch { assert!(batches.iter().map(|batch| &batch.schema).all_equal());