From a5bbc4ef9d472450d249de3a996190652bd5a1cb Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 9 Oct 2024 18:07:30 +0200 Subject: [PATCH] implement pagination --- crates/store/re_dataframe/src/query.rs | 197 ++++++++++++++++++++----- 1 file changed, 156 insertions(+), 41 deletions(-) diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index bbea291e4739..2abf8c7227ca 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -1,9 +1,11 @@ -use std::sync::{ - atomic::{AtomicU64, Ordering}, - OnceLock, +use std::{ + collections::BTreeSet, + sync::{ + atomic::{AtomicU64, Ordering}, + OnceLock, + }, }; -use ahash::HashSet; use arrow2::{ array::{ Array as ArrowArray, BooleanArray as ArrowBooleanArray, @@ -41,7 +43,7 @@ use crate::{QueryEngine, RecordBatch}; // * [x] latestat sparse-filling // * [x] sampling support // * [x] clears -// * [ ] pagination (fast) +// * [x] pagination (fast) // * [ ] overlaps (less dumb) // * [ ] selector-based `filtered_index` // * [ ] configurable cache bypass @@ -125,7 +127,18 @@ struct QueryHandleState { /// /// This represents the number of rows that the caller has iterated on: it is completely /// unrelated to the cursors used to track the current position in each individual chunk. + /// + /// The corresponding index value can be obtained using `unique_index_values[cur_row]`. + /// + /// `unique_index_values[cur_row]`: [`QueryHandleState::unique_index_values`] cur_row: AtomicU64, + + /// All unique index values that can possibly be returned by this query. + /// + /// Guaranteed ascendingly sorted and deduped. + /// + /// See also [`QueryHandleState::cur_row`]. + unique_index_values: Vec, } impl<'a> QueryHandle<'a> { @@ -266,6 +279,47 @@ impl QueryHandle<'_> { } } + // 6. Collect all unique index values. + // + // Used to achieve ~O(log(n)) pagination. + let unique_index_values = + if let Some(using_index_values_stack) = using_index_values_stack.as_ref() { + let mut all_unique_index_values = using_index_values_stack.clone(); + all_unique_index_values.sort(); + all_unique_index_values + } else { + re_tracing::profile_scope!("index_values"); + + let mut view_chunks = view_chunks.iter(); + let view_chunks = if let Some(view_pov_chunks_idx) = view_pov_chunks_idx { + Either::Left(view_chunks.nth(view_pov_chunks_idx).into_iter()) + } else { + Either::Right(view_chunks) + }; + + let mut all_unique_index_values: BTreeSet = view_chunks + .flat_map(|chunks| { + chunks.iter().filter_map(|(_cursor, chunk)| { + if chunk.is_static() { + Some(Either::Left(std::iter::once(TimeInt::STATIC))) + } else { + chunk + .timelines() + .get(&self.query.filtered_index) + .map(|time_column| Either::Right(time_column.times())) + } + }) + }) + .flatten() + .collect(); + + if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() { + all_unique_index_values.retain(|time| filtered_index_values.contains(time)); + } + + all_unique_index_values.into_iter().collect_vec() + }; + QueryHandleState { view_contents, selected_contents, @@ -274,6 +328,7 @@ impl QueryHandle<'_> { view_pov_chunks_idx, using_index_values_stack: Mutex::new(using_index_values_stack), cur_row: AtomicU64::new(0), + unique_index_values, } } @@ -579,45 +634,108 @@ impl QueryHandle<'_> { &self.init().arrow_schema } - /// How many rows of data will be returned? + /// Advance all internal cursors so that the next row yielded will correspond to `row_idx`. /// - /// The number of rows depends and only depends on the _view contents_. - /// The _selected contents_ has no influence on this value. - // - // TODO(cmc): implement this properly, cache the result, etc. - pub fn num_rows(&self) -> u64 { + /// Does nothing if `row_idx` is out of bounds. + /// + /// ## Concurrency + /// + /// Cursors are implemented using atomic variables, which means calling any of the `seek_*` + /// while iteration is concurrently ongoing is memory-safe but logically undefined racy + /// behavior. Be careful. + /// + /// ## Performance + /// + /// This requires going through every chunk once, and for each chunk running a binary search if + /// the chunk's time range contains the `index_value`. + /// + /// I.e.: it's pretty cheap already. + #[inline] + pub fn seek_to_row(&self, row_idx: usize) { + let Some(index_value) = self.init().unique_index_values.get(row_idx) else { + return; + }; + + self.seek_to_index_value(*index_value); + } + + /// Advance all internal cursors so that the next row yielded will correspond to `index_value`. + /// + /// If `index_value` isn't present in the dataset, this seeks to the first index value + /// available past that point, if any. + /// + /// ## Concurrency + /// + /// Cursors are implemented using atomic variables, which means calling any of the `seek_*` + /// while iteration is concurrently ongoing is memory-safe but logically undefined racy + /// behavior. Be careful. + /// + /// ## Performance + /// + /// This requires going through every chunk once, and for each chunk running a binary search if + /// the chunk's time range contains the `index_value`. + /// + /// I.e.: it's pretty cheap already. + pub fn seek_to_index_value(&self, index_value: IndexValue) { re_tracing::profile_function!(); let state = self.init(); - let mut view_chunks = state.view_chunks.iter(); - let view_chunks = if let Some(view_pov_chunks_idx) = state.view_pov_chunks_idx { - Either::Left(view_chunks.nth(view_pov_chunks_idx).into_iter()) - } else { - Either::Right(view_chunks) - }; + // NOTE: If there are explicit samples specified, then we need to make sure to properly reset + // them each time an arbitrary seek happens. + let mut using_index_values_stack = state.using_index_values_stack.lock(); + if using_index_values_stack.is_some() { + let mut index_values = state + .unique_index_values + .iter() + .filter(|&&v| v >= index_value) + .copied() + .collect_vec(); + index_values.reverse(); + *using_index_values_stack = Some(index_values); + } - let mut all_unique_timestamps: HashSet = view_chunks - .flat_map(|chunks| { - chunks.iter().filter_map(|(_cursor, chunk)| { - if chunk.is_static() { - Some(Either::Left(std::iter::once(TimeInt::STATIC))) - } else { - chunk - .timelines() - .get(&self.query.filtered_index) - .map(|time_column| Either::Right(time_column.times())) - } - }) - }) - .flatten() - .collect(); + if index_value.is_static() { + for chunks in &state.view_chunks { + for (cursor, _chunk) in chunks { + cursor.store(0, Ordering::Relaxed); + } + } + return; + } + + for chunks in &state.view_chunks { + for (cursor, chunk) in chunks { + // NOTE: The chunk has been densified already: its global time range is the same as + // the time range for the specific component of interest. + let Some(time_column) = chunk.timelines().get(&self.query.filtered_index) else { + continue; + }; + + let time_range = time_column.time_range(); + + let new_cursor = if index_value < time_range.min() { + 0 + } else if index_value > time_range.max() { + chunk.num_rows() as u64 /* yes, one past the end -- not a mistake */ + } else { + time_column + .times_raw() + .partition_point(|&time| time < index_value.as_i64()) + as u64 + }; - if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() { - all_unique_timestamps.retain(|time| filtered_index_values.contains(time)); + cursor.store(new_cursor, Ordering::Relaxed); + } } + } - let num_rows = all_unique_timestamps.len() as _; + /// How many rows of data will be returned? + /// + /// The number of rows depends and only depends on the _view contents_. + /// The _selected contents_ has no influence on this value. + pub fn num_rows(&self) -> u64 { + let num_rows = self.init().unique_index_values.len() as _; if cfg!(debug_assertions) { let expected_num_rows = @@ -646,16 +764,13 @@ impl QueryHandle<'_> { /// /// ## Pagination /// - /// This does not offer any kind of native pagination yet. - /// - /// To emulate pagination from user-space, use the `Iterator` API: + /// Use [`Self::seek_to_row`] and [`Self::seek_to_index_value`]: /// ```ignore - /// for row in query_handle.into_iter().skip(offset).take(len) { + /// query_handle.seek_to_row(42); + /// for row in query_handle.into_iter().take(len) { /// // … /// } /// ``` - // - // TODO(cmc): better/actual pagination pub fn next_row(&self) -> Option>> { re_tracing::profile_function!();