Skip to content

Commit

Permalink
implement pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Oct 10, 2024
1 parent ced6ffd commit a5bbc4e
Showing 1 changed file with 156 additions and 41 deletions.
197 changes: 156 additions & 41 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::{
atomic::{AtomicU64, Ordering},
OnceLock,
use std::{
collections::BTreeSet,
sync::{
atomic::{AtomicU64, Ordering},
OnceLock,
},
};

use ahash::HashSet;
use arrow2::{
array::{
Array as ArrowArray, BooleanArray as ArrowBooleanArray,
Expand Down Expand Up @@ -41,7 +43,7 @@ use crate::{QueryEngine, RecordBatch};
// * [x] latestat sparse-filling
// * [x] sampling support
// * [x] clears
// * [ ] pagination (fast)
// * [x] pagination (fast)
// * [ ] overlaps (less dumb)
// * [ ] selector-based `filtered_index`
// * [ ] configurable cache bypass
Expand Down Expand Up @@ -125,7 +127,18 @@ struct QueryHandleState {
///
/// This represents the number of rows that the caller has iterated on: it is completely
/// unrelated to the cursors used to track the current position in each individual chunk.
///
/// The corresponding index value can be obtained using `unique_index_values[cur_row]`.
///
/// `unique_index_values[cur_row]`: [`QueryHandleState::unique_index_values`]
cur_row: AtomicU64,

/// All unique index values that can possibly be returned by this query.
///
/// Guaranteed ascendingly sorted and deduped.
///
/// See also [`QueryHandleState::cur_row`].
unique_index_values: Vec<IndexValue>,
}

impl<'a> QueryHandle<'a> {
Expand Down Expand Up @@ -266,6 +279,47 @@ impl QueryHandle<'_> {
}
}

// 6. Collect all unique index values.
//
// Used to achieve ~O(log(n)) pagination.
let unique_index_values =
if let Some(using_index_values_stack) = using_index_values_stack.as_ref() {
let mut all_unique_index_values = using_index_values_stack.clone();
all_unique_index_values.sort();
all_unique_index_values
} else {
re_tracing::profile_scope!("index_values");

let mut view_chunks = view_chunks.iter();
let view_chunks = if let Some(view_pov_chunks_idx) = view_pov_chunks_idx {
Either::Left(view_chunks.nth(view_pov_chunks_idx).into_iter())
} else {
Either::Right(view_chunks)
};

let mut all_unique_index_values: BTreeSet<TimeInt> = view_chunks
.flat_map(|chunks| {
chunks.iter().filter_map(|(_cursor, chunk)| {
if chunk.is_static() {
Some(Either::Left(std::iter::once(TimeInt::STATIC)))
} else {
chunk
.timelines()
.get(&self.query.filtered_index)
.map(|time_column| Either::Right(time_column.times()))
}
})
})
.flatten()
.collect();

if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() {
all_unique_index_values.retain(|time| filtered_index_values.contains(time));
}

all_unique_index_values.into_iter().collect_vec()
};

QueryHandleState {
view_contents,
selected_contents,
Expand All @@ -274,6 +328,7 @@ impl QueryHandle<'_> {
view_pov_chunks_idx,
using_index_values_stack: Mutex::new(using_index_values_stack),
cur_row: AtomicU64::new(0),
unique_index_values,
}
}

Expand Down Expand Up @@ -579,45 +634,108 @@ impl QueryHandle<'_> {
&self.init().arrow_schema
}

/// How many rows of data will be returned?
/// Advance all internal cursors so that the next row yielded will correspond to `row_idx`.
///
/// The number of rows depends and only depends on the _view contents_.
/// The _selected contents_ has no influence on this value.
//
// TODO(cmc): implement this properly, cache the result, etc.
pub fn num_rows(&self) -> u64 {
/// Does nothing if `row_idx` is out of bounds.
///
/// ## Concurrency
///
/// Cursors are implemented using atomic variables, which means calling any of the `seek_*`
/// while iteration is concurrently ongoing is memory-safe but logically undefined racy
/// behavior. Be careful.
///
/// ## Performance
///
/// This requires going through every chunk once, and for each chunk running a binary search if
/// the chunk's time range contains the `index_value`.
///
/// I.e.: it's pretty cheap already.
#[inline]
pub fn seek_to_row(&self, row_idx: usize) {
let Some(index_value) = self.init().unique_index_values.get(row_idx) else {
return;
};

self.seek_to_index_value(*index_value);
}

/// Advance all internal cursors so that the next row yielded will correspond to `index_value`.
///
/// If `index_value` isn't present in the dataset, this seeks to the first index value
/// available past that point, if any.
///
/// ## Concurrency
///
/// Cursors are implemented using atomic variables, which means calling any of the `seek_*`
/// while iteration is concurrently ongoing is memory-safe but logically undefined racy
/// behavior. Be careful.
///
/// ## Performance
///
/// This requires going through every chunk once, and for each chunk running a binary search if
/// the chunk's time range contains the `index_value`.
///
/// I.e.: it's pretty cheap already.
pub fn seek_to_index_value(&self, index_value: IndexValue) {
re_tracing::profile_function!();

let state = self.init();

let mut view_chunks = state.view_chunks.iter();
let view_chunks = if let Some(view_pov_chunks_idx) = state.view_pov_chunks_idx {
Either::Left(view_chunks.nth(view_pov_chunks_idx).into_iter())
} else {
Either::Right(view_chunks)
};
// NOTE: If there are explicit samples specified, then we need to make sure to properly reset
// them each time an arbitrary seek happens.
let mut using_index_values_stack = state.using_index_values_stack.lock();
if using_index_values_stack.is_some() {
let mut index_values = state
.unique_index_values
.iter()
.filter(|&&v| v >= index_value)
.copied()
.collect_vec();
index_values.reverse();
*using_index_values_stack = Some(index_values);
}

let mut all_unique_timestamps: HashSet<TimeInt> = view_chunks
.flat_map(|chunks| {
chunks.iter().filter_map(|(_cursor, chunk)| {
if chunk.is_static() {
Some(Either::Left(std::iter::once(TimeInt::STATIC)))
} else {
chunk
.timelines()
.get(&self.query.filtered_index)
.map(|time_column| Either::Right(time_column.times()))
}
})
})
.flatten()
.collect();
if index_value.is_static() {
for chunks in &state.view_chunks {
for (cursor, _chunk) in chunks {
cursor.store(0, Ordering::Relaxed);
}
}
return;
}

for chunks in &state.view_chunks {
for (cursor, chunk) in chunks {
// NOTE: The chunk has been densified already: its global time range is the same as
// the time range for the specific component of interest.
let Some(time_column) = chunk.timelines().get(&self.query.filtered_index) else {
continue;
};

let time_range = time_column.time_range();

let new_cursor = if index_value < time_range.min() {
0
} else if index_value > time_range.max() {
chunk.num_rows() as u64 /* yes, one past the end -- not a mistake */
} else {
time_column
.times_raw()
.partition_point(|&time| time < index_value.as_i64())
as u64
};

if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() {
all_unique_timestamps.retain(|time| filtered_index_values.contains(time));
cursor.store(new_cursor, Ordering::Relaxed);
}
}
}

let num_rows = all_unique_timestamps.len() as _;
/// How many rows of data will be returned?
///
/// The number of rows depends and only depends on the _view contents_.
/// The _selected contents_ has no influence on this value.
pub fn num_rows(&self) -> u64 {
let num_rows = self.init().unique_index_values.len() as _;

if cfg!(debug_assertions) {
let expected_num_rows =
Expand Down Expand Up @@ -646,16 +764,13 @@ impl QueryHandle<'_> {
///
/// ## Pagination
///
/// This does not offer any kind of native pagination yet.
///
/// To emulate pagination from user-space, use the `Iterator` API:
/// Use [`Self::seek_to_row`] and [`Self::seek_to_index_value`]:
/// ```ignore
/// for row in query_handle.into_iter().skip(offset).take(len) {
/// query_handle.seek_to_row(42);
/// for row in query_handle.into_iter().take(len) {
/// // …
/// }
/// ```
//
// TODO(cmc): better/actual pagination
pub fn next_row(&self) -> Option<Vec<Box<dyn ArrowArray>>> {
re_tracing::profile_function!();

Expand Down

0 comments on commit a5bbc4e

Please sign in to comment.