diff --git a/crates/store/re_chunk/Cargo.toml b/crates/store/re_chunk/Cargo.toml index 9681a27ae093..de978a9eaeed 100644 --- a/crates/store/re_chunk/Cargo.toml +++ b/crates/store/re_chunk/Cargo.toml @@ -52,6 +52,7 @@ anyhow.workspace = true arrow2 = { workspace = true, features = [ "compute_concatenate", "compute_filter", + "compute_take", ] } bytemuck.workspace = true document-features.workspace = true diff --git a/crates/store/re_chunk/src/slice.rs b/crates/store/re_chunk/src/slice.rs index efb07390f341..ccb577afe768 100644 --- a/crates/store/re_chunk/src/slice.rs +++ b/crates/store/re_chunk/src/slice.rs @@ -435,6 +435,80 @@ impl Chunk { .collect(), } } + + /// Removes duplicate rows from sections of consecutive identical indices. + /// + /// * If the [`Chunk`] is sorted on that index, the remaining values in the index column will be unique. + /// * If the [`Chunk`] has been densified on a specific column, the resulting chunk will + /// effectively contain the latest value of that column for each given index value. + /// + /// If this is a temporal chunk and `timeline` isn't present in it, this method is a no-op. + /// + /// This does _not_ obey `RowId`-ordering semantics (or any other kind of semantics for that + /// matter) -- it merely respects how the chunk is currently laid out: no more, no less. + /// Sort the chunk according to the semantics you're looking for before calling this method. + // + // TODO(cmc): `Timeline` should really be `Index`. + #[inline] + pub fn deduped_latest_on_index(&self, index: &Timeline) -> Self { + re_tracing::profile_function!(); + + if self.is_empty() { + return self.clone(); + } + + if self.is_static() { + return self.row_sliced(self.num_rows().saturating_sub(1), 1); + } + + let Some(time_column) = self.timelines.get(index) else { + return self.clone(); + }; + + let indices = { + let mut i = 0; + let indices = time_column + .times_raw() + .iter() + .copied() + .dedup_with_count() + .map(|(count, _time)| { + i += count; + i.saturating_sub(1) as i32 + }) + .collect_vec(); + ArrowPrimitiveArray::::from_vec(indices) + }; + + let chunk = Self { + id: self.id, + entity_path: self.entity_path.clone(), + heap_size_bytes: Default::default(), + is_sorted: self.is_sorted, + row_ids: crate::util::take_array(&self.row_ids, &indices), + timelines: self + .timelines + .iter() + .map(|(&timeline, time_column)| (timeline, time_column.taken(&indices))) + .collect(), + components: self + .components + .iter() + .map(|(&component_name, list_array)| { + let filtered = crate::util::take_array(list_array, &indices); + (component_name, filtered) + }) + .collect(), + }; + + #[cfg(debug_assertions)] + #[allow(clippy::unwrap_used)] // debug-only + { + chunk.sanity_check().unwrap(); + } + + chunk + } } impl TimeColumn { @@ -517,7 +591,9 @@ impl TimeColumn { ) } - /// Runs a filter compute kernel on the time data with the specified `mask`. + /// Runs a [filter] compute kernel on the time data with the specified `mask`. + /// + /// [filter]: arrow2::compute::filter::filter #[inline] pub(crate) fn filtered(&self, filter: &ArrowBooleanArray) -> Self { let Self { @@ -552,13 +628,35 @@ impl TimeColumn { crate::util::filter_array(times, filter), ) } + + /// Runs a [take] compute kernel on the time data with the specified `indices`. + /// + /// [take]: arrow2::compute::take::take + #[inline] + pub(crate) fn taken(&self, indices: &ArrowPrimitiveArray) -> Self { + let Self { + timeline, + times, + is_sorted, + time_range: _, + } = self; + + Self::new( + Some(*is_sorted), + *timeline, + crate::util::take_array(times, indices), + ) + } } // --- #[cfg(test)] mod tests { - use re_log_types::example_components::{MyColor, MyLabel, MyPoint}; + use re_log_types::{ + example_components::{MyColor, MyLabel, MyPoint}, + TimePoint, + }; use re_types_core::{ComponentBatch, Loggable}; use crate::{Chunk, RowId, Timeline}; @@ -684,4 +782,269 @@ mod tests { Ok(()) } + + #[test] + fn dedupe_temporal() -> anyhow::Result<()> { + let entity_path = "my/entity"; + + let row_id1 = RowId::new(); + let row_id2 = RowId::new(); + let row_id3 = RowId::new(); + let row_id4 = RowId::new(); + let row_id5 = RowId::new(); + + let timepoint1 = [ + (Timeline::log_time(), 1000), + (Timeline::new_sequence("frame"), 1), + ]; + let timepoint2 = [ + (Timeline::log_time(), 1032), + (Timeline::new_sequence("frame"), 1), + ]; + let timepoint3 = [ + (Timeline::log_time(), 1064), + (Timeline::new_sequence("frame"), 1), + ]; + let timepoint4 = [ + (Timeline::log_time(), 1096), + (Timeline::new_sequence("frame"), 2), + ]; + let timepoint5 = [ + (Timeline::log_time(), 1128), + (Timeline::new_sequence("frame"), 2), + ]; + + let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)]; + let points3 = &[MyPoint::new(6.0, 7.0)]; + + let colors4 = &[MyColor::from_rgb(1, 1, 1)]; + let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)]; + + let labels1 = &[MyLabel("a".into())]; + let labels2 = &[MyLabel("b".into())]; + let labels3 = &[MyLabel("c".into())]; + let labels4 = &[MyLabel("d".into())]; + let labels5 = &[MyLabel("e".into())]; + + let chunk = Chunk::builder(entity_path.into()) + .with_sparse_component_batches( + row_id1, + timepoint1, + [ + (MyPoint::name(), Some(points1 as _)), + (MyColor::name(), None), + (MyLabel::name(), Some(labels1 as _)), + ], + ) + .with_sparse_component_batches( + row_id2, + timepoint2, + [ + (MyPoint::name(), None), + (MyColor::name(), None), + (MyLabel::name(), Some(labels2 as _)), + ], + ) + .with_sparse_component_batches( + row_id3, + timepoint3, + [ + (MyPoint::name(), Some(points3 as _)), + (MyColor::name(), None), + (MyLabel::name(), Some(labels3 as _)), + ], + ) + .with_sparse_component_batches( + row_id4, + timepoint4, + [ + (MyPoint::name(), None), + (MyColor::name(), Some(colors4 as _)), + (MyLabel::name(), Some(labels4 as _)), + ], + ) + .with_sparse_component_batches( + row_id5, + timepoint5, + [ + (MyPoint::name(), None), + (MyColor::name(), Some(colors5 as _)), + (MyLabel::name(), Some(labels5 as _)), + ], + ) + .build()?; + + eprintln!("chunk:\n{chunk}"); + + { + let got = chunk.deduped_latest_on_index(&Timeline::new_sequence("frame")); + eprintln!("got:\n{got}"); + assert_eq!(2, got.num_rows()); + + let expectations: &[(_, _, Option<&dyn ComponentBatch>)] = &[ + (row_id3, MyPoint::name(), Some(points3 as _)), + (row_id3, MyColor::name(), None), + (row_id3, MyLabel::name(), Some(labels3 as _)), + // + (row_id5, MyPoint::name(), None), + (row_id5, MyColor::name(), Some(colors5 as _)), + (row_id5, MyLabel::name(), Some(labels5 as _)), + ]; + + for (row_id, component_name, expected) in expectations { + let expected = expected + .and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok()); + eprintln!("{component_name} @ {row_id}"); + similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name)); + } + } + + { + let got = chunk.deduped_latest_on_index(&Timeline::log_time()); + eprintln!("got:\n{got}"); + assert_eq!(5, got.num_rows()); + + let expectations: &[(_, _, Option<&dyn ComponentBatch>)] = &[ + (row_id1, MyPoint::name(), Some(points1 as _)), + (row_id1, MyColor::name(), None), + (row_id1, MyLabel::name(), Some(labels1 as _)), + (row_id2, MyPoint::name(), None), + (row_id2, MyColor::name(), None), + (row_id2, MyLabel::name(), Some(labels2 as _)), + (row_id3, MyPoint::name(), Some(points3 as _)), + (row_id3, MyColor::name(), None), + (row_id3, MyLabel::name(), Some(labels3 as _)), + (row_id4, MyPoint::name(), None), + (row_id4, MyColor::name(), Some(colors4 as _)), + (row_id4, MyLabel::name(), Some(labels4 as _)), + (row_id5, MyPoint::name(), None), + (row_id5, MyColor::name(), Some(colors5 as _)), + (row_id5, MyLabel::name(), Some(labels5 as _)), + ]; + + for (row_id, component_name, expected) in expectations { + let expected = expected + .and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok()); + eprintln!("{component_name} @ {row_id}"); + similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name)); + } + } + + Ok(()) + } + + #[test] + fn dedupe_static() -> anyhow::Result<()> { + let entity_path = "my/entity"; + + let row_id1 = RowId::new(); + let row_id2 = RowId::new(); + let row_id3 = RowId::new(); + let row_id4 = RowId::new(); + let row_id5 = RowId::new(); + + let timepoint_static = TimePoint::default(); + + let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)]; + let points3 = &[MyPoint::new(6.0, 7.0)]; + + let colors4 = &[MyColor::from_rgb(1, 1, 1)]; + let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)]; + + let labels1 = &[MyLabel("a".into())]; + let labels2 = &[MyLabel("b".into())]; + let labels3 = &[MyLabel("c".into())]; + let labels4 = &[MyLabel("d".into())]; + let labels5 = &[MyLabel("e".into())]; + + let chunk = Chunk::builder(entity_path.into()) + .with_sparse_component_batches( + row_id1, + timepoint_static.clone(), + [ + (MyPoint::name(), Some(points1 as _)), + (MyColor::name(), None), + (MyLabel::name(), Some(labels1 as _)), + ], + ) + .with_sparse_component_batches( + row_id2, + timepoint_static.clone(), + [ + (MyPoint::name(), None), + (MyColor::name(), None), + (MyLabel::name(), Some(labels2 as _)), + ], + ) + .with_sparse_component_batches( + row_id3, + timepoint_static.clone(), + [ + (MyPoint::name(), Some(points3 as _)), + (MyColor::name(), None), + (MyLabel::name(), Some(labels3 as _)), + ], + ) + .with_sparse_component_batches( + row_id4, + timepoint_static.clone(), + [ + (MyPoint::name(), None), + (MyColor::name(), Some(colors4 as _)), + (MyLabel::name(), Some(labels4 as _)), + ], + ) + .with_sparse_component_batches( + row_id5, + timepoint_static.clone(), + [ + (MyPoint::name(), None), + (MyColor::name(), Some(colors5 as _)), + (MyLabel::name(), Some(labels5 as _)), + ], + ) + .build()?; + + eprintln!("chunk:\n{chunk}"); + + { + let got = chunk.deduped_latest_on_index(&Timeline::new_sequence("frame")); + eprintln!("got:\n{got}"); + assert_eq!(1, got.num_rows()); + + let expectations: &[(_, _, Option<&dyn ComponentBatch>)] = &[ + (row_id5, MyPoint::name(), None), + (row_id5, MyColor::name(), Some(colors5 as _)), + (row_id5, MyLabel::name(), Some(labels5 as _)), + ]; + + for (row_id, component_name, expected) in expectations { + let expected = expected + .and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok()); + eprintln!("{component_name} @ {row_id}"); + similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name)); + } + } + + { + let got = chunk.deduped_latest_on_index(&Timeline::log_time()); + eprintln!("got:\n{got}"); + assert_eq!(1, got.num_rows()); + + let expectations: &[(_, _, Option<&dyn ComponentBatch>)] = &[ + (row_id5, MyPoint::name(), None), + (row_id5, MyColor::name(), Some(colors5 as _)), + (row_id5, MyLabel::name(), Some(labels5 as _)), + ]; + + for (row_id, component_name, expected) in expectations { + let expected = expected + .and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok()); + eprintln!("{component_name} @ {row_id}"); + similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name)); + } + } + + Ok(()) + } } diff --git a/crates/store/re_chunk/src/util.rs b/crates/store/re_chunk/src/util.rs index 7974fe2a33c8..33b394ec2698 100644 --- a/crates/store/re_chunk/src/util.rs +++ b/crates/store/re_chunk/src/util.rs @@ -280,9 +280,13 @@ pub fn pad_list_array_front( ArrowListArray::new(datatype, offsets.into(), values, Some(validity)) } -/// Applies a filter kernel to the given `array`. +/// Applies a [filter] kernel to the given `array`. +/// +/// Note: a `filter` kernel _copies_ the data in order to make the resulting arrays contiguous in memory. /// /// Takes care of up- and down-casting the data back and forth on behalf of the caller. +/// +/// [filter]: arrow2::compute::filter::filter pub fn filter_array(array: &A, filter: &ArrowBooleanArray) -> A { debug_assert!(filter.validity().is_none()); // just for good measure @@ -296,3 +300,31 @@ pub fn filter_array(array: &A, filter: &ArrowBooleanArray .unwrap() .clone() } + +/// Applies a [take] kernel to the given `array`. +/// +/// Note: a `take` kernel _copies_ the data in order to make the resulting arrays contiguous in memory. +/// +/// Takes care of up- and down-casting the data back and forth on behalf of the caller. +/// +/// [take]: arrow2::compute::take::take +// +// TODO(cmc): in an ideal world, a `take` kernel should merely _slice_ the data and avoid any allocations/copies +// where possible (e.g. list-arrays). +// That is not possible with vanilla `ListArray`s since they don't expose any way to encode optional lengths, +// in addition to offsets. +// For internal stuff, we could perhaps provide a custom implementation that returns a `DictionaryArray` instead? +pub fn take_array( + array: &A, + indices: &ArrowPrimitiveArray, +) -> A { + #[allow(clippy::unwrap_used)] + arrow2::compute::take::take(array, indices) + // Unwrap: this literally cannot fail. + .unwrap() + .as_any() + .downcast_ref::() + // Unwrap: that's initial type that we got. + .unwrap() + .clone() +} diff --git a/crates/store/re_chunk/tests/memory_test.rs b/crates/store/re_chunk/tests/memory_test.rs new file mode 100644 index 000000000000..037cea3a0cdb --- /dev/null +++ b/crates/store/re_chunk/tests/memory_test.rs @@ -0,0 +1,193 @@ +//! Measures the memory overhead of the chunk store. + +// https://github.com/rust-lang/rust-clippy/issues/10011 +#![cfg(test)] + +use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; + +thread_local! { + static LIVE_BYTES_IN_THREAD: AtomicUsize = AtomicUsize::new(0); +} + +pub struct TrackingAllocator { + allocator: std::alloc::System, +} + +#[global_allocator] +pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator { + allocator: std::alloc::System, +}; + +#[allow(unsafe_code)] +// SAFETY: +// We just do book-keeping and then let another allocator do all the actual work. +unsafe impl std::alloc::GlobalAlloc for TrackingAllocator { + #[allow(clippy::let_and_return)] + unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 { + LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed)); + + // SAFETY: + // Just deferring + unsafe { self.allocator.alloc(layout) } + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) { + LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), Relaxed)); + + // SAFETY: + // Just deferring + unsafe { self.allocator.dealloc(ptr, layout) }; + } +} + +fn live_bytes_local() -> usize { + LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed)) +} + +/// Returns `(ret, num_bytes_allocated_by_this_thread)`. +fn memory_use(run: impl Fn() -> R) -> (R, usize) { + let used_bytes_start_local = live_bytes_local(); + let ret = run(); + let bytes_used_local = live_bytes_local() - used_bytes_start_local; + (ret, bytes_used_local) +} + +// ---------------------------------------------------------------------------- + +use arrow2::{ + array::{ + Array as ArrowArray, BooleanArray as ArrowBooleanArray, ListArray as ArrowListArray, + PrimitiveArray as ArrowPrimitiveArray, + }, + offset::Offsets as ArrowOffsets, +}; +use itertools::Itertools as _; + +#[test] +fn filter_does_allocate() { + re_log::setup_logging(); + + const NUM_SCALARS: i64 = 10_000_000; + + let (((unfiltered, unfiltered_size_bytes), (filtered, filtered_size_bytes)), total_size_bytes) = + memory_use(|| { + let unfiltered = memory_use(|| { + let scalars = ArrowPrimitiveArray::from_vec((0..NUM_SCALARS).collect_vec()); + ArrowListArray::::new( + ArrowListArray::::default_datatype(scalars.data_type().clone()), + ArrowOffsets::try_from_lengths( + std::iter::repeat(NUM_SCALARS as usize / 10).take(10), + ) + .unwrap() + .into(), + scalars.to_boxed(), + None, + ) + }); + + let filter = ArrowBooleanArray::from_slice( + (0..unfiltered.0.len()).map(|i| i % 2 == 0).collect_vec(), + ); + let filtered = memory_use(|| re_chunk::util::filter_array(&unfiltered.0, &filter)); + + (unfiltered, filtered) + }); + + eprintln!( + "unfiltered={} filtered={} total={}", + re_format::format_bytes(unfiltered_size_bytes as _), + re_format::format_bytes(filtered_size_bytes as _), + re_format::format_bytes(total_size_bytes as _), + ); + + assert!(unfiltered_size_bytes + filtered_size_bytes <= total_size_bytes); + assert!(unfiltered_size_bytes <= filtered_size_bytes * 2); + + { + let unfiltered = unfiltered + .values() + .as_any() + .downcast_ref::>() + .unwrap(); + let filtered = filtered + .values() + .as_any() + .downcast_ref::>() + .unwrap(); + + assert!( + !std::ptr::eq( + unfiltered.values().as_ptr_range().start, + filtered.values().as_ptr_range().start + ), + "data should be copied -- pointers shouldn't match" + ); + } +} + +#[test] +// TODO(cmc): That's the end goal, but it is simply impossible with `ListArray`'s encoding. +// See `Chunk::take_array`'s doc-comment for more information. +#[should_panic = "assertion failed: untaken_size_bytes > taken_size_bytes * 10"] +fn take_does_not_allocate() { + re_log::setup_logging(); + + const NUM_SCALARS: i64 = 10_000_000; + + let (((untaken, untaken_size_bytes), (taken, taken_size_bytes)), total_size_bytes) = + memory_use(|| { + let untaken = memory_use(|| { + let scalars = ArrowPrimitiveArray::from_vec((0..NUM_SCALARS).collect_vec()); + ArrowListArray::::new( + ArrowListArray::::default_datatype(scalars.data_type().clone()), + ArrowOffsets::try_from_lengths( + std::iter::repeat(NUM_SCALARS as usize / 10).take(10), + ) + .unwrap() + .into(), + scalars.to_boxed(), + None, + ) + }); + + let indices = ArrowPrimitiveArray::from_vec( + (0..untaken.0.len() as i32) + .filter(|i| i % 2 == 0) + .collect_vec(), + ); + let taken = memory_use(|| re_chunk::util::take_array(&untaken.0, &indices)); + + (untaken, taken) + }); + + eprintln!( + "untaken={} taken={} total={}", + re_format::format_bytes(untaken_size_bytes as _), + re_format::format_bytes(taken_size_bytes as _), + re_format::format_bytes(total_size_bytes as _), + ); + + assert!(untaken_size_bytes + taken_size_bytes <= total_size_bytes); + assert!(untaken_size_bytes > taken_size_bytes * 10); + + { + let untaken = untaken + .values() + .as_any() + .downcast_ref::>() + .unwrap(); + let taken = taken + .values() + .as_any() + .downcast_ref::>() + .unwrap(); + + assert!( + std::ptr::eq( + untaken.values().as_ptr_range().start, + taken.values().as_ptr_range().start + ), + "data shouldn't be duplicated -- pointers should match" + ); + } +}