Skip to content

Commit

Permalink
add pagination test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Oct 10, 2024
1 parent c6d842b commit ced6ffd
Showing 1 changed file with 180 additions and 1 deletion.
181 changes: 180 additions & 1 deletion crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1078,12 +1078,24 @@ impl QueryHandle<'_> {
}

impl<'a> QueryHandle<'a> {
/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn iter(&'a self) -> impl Iterator<Item = Vec<Box<dyn ArrowArray>>> + 'a {
std::iter::from_fn(move || self.next_row())
}

/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn into_iter(self) -> impl Iterator<Item = Vec<Box<dyn ArrowArray>>> + 'a {
std::iter::from_fn(move || self.next_row())
}

/// Returns an iterator backed by [`Self::next_row_batch`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn batch_iter(&'a self) -> impl Iterator<Item = RecordBatch> + 'a {
std::iter::from_fn(move || self.next_row_batch())
}

/// Returns an iterator backed by [`Self::next_row_batch`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn into_batch_iter(self) -> impl Iterator<Item = RecordBatch> + 'a {
Expand Down Expand Up @@ -1133,7 +1145,7 @@ mod tests {
// * [x] num_rows
// * [x] clears
// * [ ] timelines returned with selection=none
// * [ ] pagination
// * [x] pagination

// TODO(cmc): At some point I'd like to stress multi-entity queries too, but that feels less
// urgent considering how things are implemented (each entity lives in its own index, so it's
Expand Down Expand Up @@ -1974,6 +1986,173 @@ mod tests {
Ok(())
}

#[test]
fn pagination() -> anyhow::Result<()> {
re_log::setup_logging();

let store = create_nasty_store()?;
eprintln!("{store}");
let query_cache = QueryCache::new(&store);
let query_engine = QueryEngine {
store: &store,
cache: &query_cache,
};

let timeline = Timeline::new_sequence("frame_nr");
let entity_path = EntityPath::from("this/that");

// basic
{
let query = QueryExpression::new(timeline);
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
assert_eq!(
query_engine.query(query.clone()).into_iter().count() as u64,
query_handle.num_rows(),
);

let expected_rows = query_handle.batch_iter().collect_vec();

for _ in 0..3 {
for i in 0..expected_rows.len() {
query_handle.seek_to_row(i);

let expected = concatenate_record_batches(
query_handle.schema().clone(),
&expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
);
let got = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.batch_iter().take(3).collect_vec(),
);

let expected = format!("{:#?}", expected.data.iter().collect_vec());
let got = format!("{:#?}", got.data.iter().collect_vec());

similar_asserts::assert_eq!(expected, got);
}
}
}

// with pov
{
let mut query = QueryExpression::new(timeline);
query.filtered_point_of_view = Some(ComponentColumnSelector {
entity_path: entity_path.clone(),
component: MyPoint::name(),
join_encoding: Default::default(),
});
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
assert_eq!(
query_engine.query(query.clone()).into_iter().count() as u64,
query_handle.num_rows(),
);

let expected_rows = query_handle.batch_iter().collect_vec();

for _ in 0..3 {
for i in 0..expected_rows.len() {
query_handle.seek_to_row(i);

let expected = concatenate_record_batches(
query_handle.schema().clone(),
&expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
);
let got = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.batch_iter().take(3).collect_vec(),
);

let expected = format!("{:#?}", expected.data.iter().collect_vec());
let got = format!("{:#?}", got.data.iter().collect_vec());

similar_asserts::assert_eq!(expected, got);
}
}
}

// with sampling
{
let mut query = QueryExpression::new(timeline);
query.using_index_values = Some(
[0, 15, 30, 30, 45, 60, 75, 90]
.into_iter()
.map(TimeInt::new_temporal)
.chain(std::iter::once(TimeInt::STATIC))
.collect(),
);
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
assert_eq!(
query_engine.query(query.clone()).into_iter().count() as u64,
query_handle.num_rows(),
);

let expected_rows = query_handle.batch_iter().collect_vec();

for _ in 0..3 {
for i in 0..expected_rows.len() {
query_handle.seek_to_row(i);

let expected = concatenate_record_batches(
query_handle.schema().clone(),
&expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
);
let got = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.batch_iter().take(3).collect_vec(),
);

let expected = format!("{:#?}", expected.data.iter().collect_vec());
let got = format!("{:#?}", got.data.iter().collect_vec());

similar_asserts::assert_eq!(expected, got);
}
}
}

// with sparse-fill
{
let mut query = QueryExpression::new(timeline);
query.sparse_fill_strategy = SparseFillStrategy::LatestAtGlobal;
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
assert_eq!(
query_engine.query(query.clone()).into_iter().count() as u64,
query_handle.num_rows(),
);

let expected_rows = query_handle.batch_iter().collect_vec();

for _ in 0..3 {
for i in 0..expected_rows.len() {
query_handle.seek_to_row(i);

let expected = concatenate_record_batches(
query_handle.schema().clone(),
&expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
);
let got = concatenate_record_batches(
query_handle.schema().clone(),
&query_handle.batch_iter().take(3).collect_vec(),
);

let expected = format!("{:#?}", expected.data.iter().collect_vec());
let got = format!("{:#?}", got.data.iter().collect_vec());

similar_asserts::assert_eq!(expected, got);
}
}
}

Ok(())
}

/// Returns a very nasty [`ChunkStore`] with all kinds of partial updates, chunk overlaps,
/// repeated timestamps, duplicated chunks, partial multi-timelines, flat and recursive clears, etc.
fn create_nasty_store() -> anyhow::Result<ChunkStore> {
Expand Down

0 comments on commit ced6ffd

Please sign in to comment.