From bb58101f690516c58b43bf10467b6eb7f0b5363c Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 10 Oct 2024 18:25:41 +0200 Subject: [PATCH] Dataframe v2: "constant-time" pagination (#7665) This implements support for "constant-time" pagination. Obviously it's not constant-time, but it scales in a sane fashion. This PR is still _not_ about general performance optimizations. It is the last step before those can start though. * Fixes #7657 * DNM: requires #7652 --- Cargo.lock | 1 - crates/store/re_dataframe/Cargo.toml | 1 - crates/store/re_dataframe/src/query.rs | 378 ++++++++++++++++-- .../src/dataframe_ui.rs | 4 +- 4 files changed, 338 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3949173b595c..cafc4d1da18f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5131,7 +5131,6 @@ dependencies = [ name = "re_dataframe" version = "0.19.0-alpha.1+dev" dependencies = [ - "ahash", "anyhow", "itertools 0.13.0", "nohash-hasher", diff --git a/crates/store/re_dataframe/Cargo.toml b/crates/store/re_dataframe/Cargo.toml index 0b67d4b0e9b9..144168d505a1 100644 --- a/crates/store/re_dataframe/Cargo.toml +++ b/crates/store/re_dataframe/Cargo.toml @@ -34,7 +34,6 @@ re_tracing.workspace = true re_types_core.workspace = true # External dependencies: -ahash.workspace = true anyhow.workspace = true arrow2.workspace = true itertools.workspace = true diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 1b310539443f..2abf8c7227ca 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -1,9 +1,11 @@ -use std::sync::{ - atomic::{AtomicU64, Ordering}, - OnceLock, +use std::{ + collections::BTreeSet, + sync::{ + atomic::{AtomicU64, Ordering}, + OnceLock, + }, }; -use ahash::HashSet; use arrow2::{ array::{ Array as ArrowArray, BooleanArray as ArrowBooleanArray, @@ -41,7 +43,7 @@ use crate::{QueryEngine, RecordBatch}; // * [x] latestat sparse-filling // * [x] sampling support // * [x] clears -// * [ ] pagination (fast) +// * [x] pagination (fast) // * [ ] overlaps (less dumb) // * [ ] selector-based `filtered_index` // * [ ] configurable cache bypass @@ -125,7 +127,18 @@ struct QueryHandleState { /// /// This represents the number of rows that the caller has iterated on: it is completely /// unrelated to the cursors used to track the current position in each individual chunk. + /// + /// The corresponding index value can be obtained using `unique_index_values[cur_row]`. + /// + /// `unique_index_values[cur_row]`: [`QueryHandleState::unique_index_values`] cur_row: AtomicU64, + + /// All unique index values that can possibly be returned by this query. + /// + /// Guaranteed ascendingly sorted and deduped. + /// + /// See also [`QueryHandleState::cur_row`]. + unique_index_values: Vec, } impl<'a> QueryHandle<'a> { @@ -266,6 +279,47 @@ impl QueryHandle<'_> { } } + // 6. Collect all unique index values. + // + // Used to achieve ~O(log(n)) pagination. + let unique_index_values = + if let Some(using_index_values_stack) = using_index_values_stack.as_ref() { + let mut all_unique_index_values = using_index_values_stack.clone(); + all_unique_index_values.sort(); + all_unique_index_values + } else { + re_tracing::profile_scope!("index_values"); + + let mut view_chunks = view_chunks.iter(); + let view_chunks = if let Some(view_pov_chunks_idx) = view_pov_chunks_idx { + Either::Left(view_chunks.nth(view_pov_chunks_idx).into_iter()) + } else { + Either::Right(view_chunks) + }; + + let mut all_unique_index_values: BTreeSet = view_chunks + .flat_map(|chunks| { + chunks.iter().filter_map(|(_cursor, chunk)| { + if chunk.is_static() { + Some(Either::Left(std::iter::once(TimeInt::STATIC))) + } else { + chunk + .timelines() + .get(&self.query.filtered_index) + .map(|time_column| Either::Right(time_column.times())) + } + }) + }) + .flatten() + .collect(); + + if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() { + all_unique_index_values.retain(|time| filtered_index_values.contains(time)); + } + + all_unique_index_values.into_iter().collect_vec() + }; + QueryHandleState { view_contents, selected_contents, @@ -274,6 +328,7 @@ impl QueryHandle<'_> { view_pov_chunks_idx, using_index_values_stack: Mutex::new(using_index_values_stack), cur_row: AtomicU64::new(0), + unique_index_values, } } @@ -579,45 +634,108 @@ impl QueryHandle<'_> { &self.init().arrow_schema } - /// How many rows of data will be returned? + /// Advance all internal cursors so that the next row yielded will correspond to `row_idx`. /// - /// The number of rows depends and only depends on the _view contents_. - /// The _selected contents_ has no influence on this value. - // - // TODO(cmc): implement this properly, cache the result, etc. - pub fn num_rows(&self) -> u64 { + /// Does nothing if `row_idx` is out of bounds. + /// + /// ## Concurrency + /// + /// Cursors are implemented using atomic variables, which means calling any of the `seek_*` + /// while iteration is concurrently ongoing is memory-safe but logically undefined racy + /// behavior. Be careful. + /// + /// ## Performance + /// + /// This requires going through every chunk once, and for each chunk running a binary search if + /// the chunk's time range contains the `index_value`. + /// + /// I.e.: it's pretty cheap already. + #[inline] + pub fn seek_to_row(&self, row_idx: usize) { + let Some(index_value) = self.init().unique_index_values.get(row_idx) else { + return; + }; + + self.seek_to_index_value(*index_value); + } + + /// Advance all internal cursors so that the next row yielded will correspond to `index_value`. + /// + /// If `index_value` isn't present in the dataset, this seeks to the first index value + /// available past that point, if any. + /// + /// ## Concurrency + /// + /// Cursors are implemented using atomic variables, which means calling any of the `seek_*` + /// while iteration is concurrently ongoing is memory-safe but logically undefined racy + /// behavior. Be careful. + /// + /// ## Performance + /// + /// This requires going through every chunk once, and for each chunk running a binary search if + /// the chunk's time range contains the `index_value`. + /// + /// I.e.: it's pretty cheap already. + pub fn seek_to_index_value(&self, index_value: IndexValue) { re_tracing::profile_function!(); let state = self.init(); - let mut view_chunks = state.view_chunks.iter(); - let view_chunks = if let Some(view_pov_chunks_idx) = state.view_pov_chunks_idx { - Either::Left(view_chunks.nth(view_pov_chunks_idx).into_iter()) - } else { - Either::Right(view_chunks) - }; + // NOTE: If there are explicit samples specified, then we need to make sure to properly reset + // them each time an arbitrary seek happens. + let mut using_index_values_stack = state.using_index_values_stack.lock(); + if using_index_values_stack.is_some() { + let mut index_values = state + .unique_index_values + .iter() + .filter(|&&v| v >= index_value) + .copied() + .collect_vec(); + index_values.reverse(); + *using_index_values_stack = Some(index_values); + } - let mut all_unique_timestamps: HashSet = view_chunks - .flat_map(|chunks| { - chunks.iter().filter_map(|(_cursor, chunk)| { - if chunk.is_static() { - Some(Either::Left(std::iter::once(TimeInt::STATIC))) - } else { - chunk - .timelines() - .get(&self.query.filtered_index) - .map(|time_column| Either::Right(time_column.times())) - } - }) - }) - .flatten() - .collect(); + if index_value.is_static() { + for chunks in &state.view_chunks { + for (cursor, _chunk) in chunks { + cursor.store(0, Ordering::Relaxed); + } + } + return; + } + + for chunks in &state.view_chunks { + for (cursor, chunk) in chunks { + // NOTE: The chunk has been densified already: its global time range is the same as + // the time range for the specific component of interest. + let Some(time_column) = chunk.timelines().get(&self.query.filtered_index) else { + continue; + }; - if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() { - all_unique_timestamps.retain(|time| filtered_index_values.contains(time)); + let time_range = time_column.time_range(); + + let new_cursor = if index_value < time_range.min() { + 0 + } else if index_value > time_range.max() { + chunk.num_rows() as u64 /* yes, one past the end -- not a mistake */ + } else { + time_column + .times_raw() + .partition_point(|&time| time < index_value.as_i64()) + as u64 + }; + + cursor.store(new_cursor, Ordering::Relaxed); + } } + } - let num_rows = all_unique_timestamps.len() as _; + /// How many rows of data will be returned? + /// + /// The number of rows depends and only depends on the _view contents_. + /// The _selected contents_ has no influence on this value. + pub fn num_rows(&self) -> u64 { + let num_rows = self.init().unique_index_values.len() as _; if cfg!(debug_assertions) { let expected_num_rows = @@ -646,16 +764,13 @@ impl QueryHandle<'_> { /// /// ## Pagination /// - /// This does not offer any kind of native pagination yet. - /// - /// To emulate pagination from user-space, use the `Iterator` API: + /// Use [`Self::seek_to_row`] and [`Self::seek_to_index_value`]: /// ```ignore - /// for row in query_handle.into_iter().skip(offset).take(len) { + /// query_handle.seek_to_row(42); + /// for row in query_handle.into_iter().take(len) { /// // … /// } /// ``` - // - // TODO(cmc): better/actual pagination pub fn next_row(&self) -> Option>> { re_tracing::profile_function!(); @@ -1078,12 +1193,24 @@ impl QueryHandle<'_> { } impl<'a> QueryHandle<'a> { + /// Returns an iterator backed by [`Self::next_row`]. + #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work + pub fn iter(&'a self) -> impl Iterator>> + 'a { + std::iter::from_fn(move || self.next_row()) + } + /// Returns an iterator backed by [`Self::next_row`]. #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work pub fn into_iter(self) -> impl Iterator>> + 'a { std::iter::from_fn(move || self.next_row()) } + /// Returns an iterator backed by [`Self::next_row_batch`]. + #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work + pub fn batch_iter(&'a self) -> impl Iterator + 'a { + std::iter::from_fn(move || self.next_row_batch()) + } + /// Returns an iterator backed by [`Self::next_row_batch`]. #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work pub fn into_batch_iter(self) -> impl Iterator + 'a { @@ -1133,7 +1260,7 @@ mod tests { // * [x] num_rows // * [x] clears // * [ ] timelines returned with selection=none - // * [ ] pagination + // * [x] pagination // TODO(cmc): At some point I'd like to stress multi-entity queries too, but that feels less // urgent considering how things are implemented (each entity lives in its own index, so it's @@ -1974,6 +2101,173 @@ mod tests { Ok(()) } + #[test] + fn pagination() -> anyhow::Result<()> { + re_log::setup_logging(); + + let store = create_nasty_store()?; + eprintln!("{store}"); + let query_cache = QueryCache::new(&store); + let query_engine = QueryEngine { + store: &store, + cache: &query_cache, + }; + + let timeline = Timeline::new_sequence("frame_nr"); + let entity_path = EntityPath::from("this/that"); + + // basic + { + let query = QueryExpression::new(timeline); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + query_engine.query(query.clone()).into_iter().count() as u64, + query_handle.num_rows(), + ); + + let expected_rows = query_handle.batch_iter().collect_vec(); + + for _ in 0..3 { + for i in 0..expected_rows.len() { + query_handle.seek_to_row(i); + + let expected = concatenate_record_batches( + query_handle.schema().clone(), + &expected_rows.iter().skip(i).take(3).cloned().collect_vec(), + ); + let got = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.batch_iter().take(3).collect_vec(), + ); + + let expected = format!("{:#?}", expected.data.iter().collect_vec()); + let got = format!("{:#?}", got.data.iter().collect_vec()); + + similar_asserts::assert_eq!(expected, got); + } + } + } + + // with pov + { + let mut query = QueryExpression::new(timeline); + query.filtered_point_of_view = Some(ComponentColumnSelector { + entity_path: entity_path.clone(), + component: MyPoint::name(), + join_encoding: Default::default(), + }); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + query_engine.query(query.clone()).into_iter().count() as u64, + query_handle.num_rows(), + ); + + let expected_rows = query_handle.batch_iter().collect_vec(); + + for _ in 0..3 { + for i in 0..expected_rows.len() { + query_handle.seek_to_row(i); + + let expected = concatenate_record_batches( + query_handle.schema().clone(), + &expected_rows.iter().skip(i).take(3).cloned().collect_vec(), + ); + let got = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.batch_iter().take(3).collect_vec(), + ); + + let expected = format!("{:#?}", expected.data.iter().collect_vec()); + let got = format!("{:#?}", got.data.iter().collect_vec()); + + similar_asserts::assert_eq!(expected, got); + } + } + } + + // with sampling + { + let mut query = QueryExpression::new(timeline); + query.using_index_values = Some( + [0, 15, 30, 30, 45, 60, 75, 90] + .into_iter() + .map(TimeInt::new_temporal) + .chain(std::iter::once(TimeInt::STATIC)) + .collect(), + ); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + query_engine.query(query.clone()).into_iter().count() as u64, + query_handle.num_rows(), + ); + + let expected_rows = query_handle.batch_iter().collect_vec(); + + for _ in 0..3 { + for i in 0..expected_rows.len() { + query_handle.seek_to_row(i); + + let expected = concatenate_record_batches( + query_handle.schema().clone(), + &expected_rows.iter().skip(i).take(3).cloned().collect_vec(), + ); + let got = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.batch_iter().take(3).collect_vec(), + ); + + let expected = format!("{:#?}", expected.data.iter().collect_vec()); + let got = format!("{:#?}", got.data.iter().collect_vec()); + + similar_asserts::assert_eq!(expected, got); + } + } + } + + // with sparse-fill + { + let mut query = QueryExpression::new(timeline); + query.sparse_fill_strategy = SparseFillStrategy::LatestAtGlobal; + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + query_engine.query(query.clone()).into_iter().count() as u64, + query_handle.num_rows(), + ); + + let expected_rows = query_handle.batch_iter().collect_vec(); + + for _ in 0..3 { + for i in 0..expected_rows.len() { + query_handle.seek_to_row(i); + + let expected = concatenate_record_batches( + query_handle.schema().clone(), + &expected_rows.iter().skip(i).take(3).cloned().collect_vec(), + ); + let got = concatenate_record_batches( + query_handle.schema().clone(), + &query_handle.batch_iter().take(3).collect_vec(), + ); + + let expected = format!("{:#?}", expected.data.iter().collect_vec()); + let got = format!("{:#?}", got.data.iter().collect_vec()); + + similar_asserts::assert_eq!(expected, got); + } + } + } + + Ok(()) + } + /// Returns a very nasty [`ChunkStore`] with all kinds of partial updates, chunk overlaps, /// repeated timestamps, duplicated chunks, partial multi-timelines, flat and recursive clears, etc. fn create_nasty_store() -> anyhow::Result { diff --git a/crates/viewer/re_space_view_dataframe/src/dataframe_ui.rs b/crates/viewer/re_space_view_dataframe/src/dataframe_ui.rs index ab151588835b..0fb0cba0ff94 100644 --- a/crates/viewer/re_space_view_dataframe/src/dataframe_ui.rs +++ b/crates/viewer/re_space_view_dataframe/src/dataframe_ui.rs @@ -202,9 +202,9 @@ impl<'a> egui_table::TableDelegate for DataframeTableDelegate<'a> { let timeline = self.query_handle.query().filtered_index; - //TODO(ab, cmc): we probably need a better way to run a paginated query. + self.query_handle + .seek_to_row(info.visible_rows.start as usize); let data = std::iter::from_fn(|| self.query_handle.next_row()) - .skip(info.visible_rows.start as usize) .take((info.visible_rows.end - info.visible_rows.start) as usize) .collect();