Skip to content

Commit

Permalink
Implement Chunk::iter_indices (#6877)
Browse files Browse the repository at this point in the history
Implements `Chunk::iter_indices`, which makes it possible to efficiently
iterate all over the indices of a `Chunk` without having to be bounded
to the (extremely inconvenient) lifetime of the `self` receiver.
  • Loading branch information
teh-cmc authored Jul 12, 2024
1 parent 7a6437d commit 83762df
Showing 1 changed file with 171 additions and 5 deletions.
176 changes: 171 additions & 5 deletions crates/store/re_chunk/src/iter.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use arrow2::array::Array as ArrowArray;
use std::sync::Arc;

use arrow2::array::Array as ArrowArray;
use itertools::Itertools as _;

use re_log_types::{TimeInt, Timeline};
use re_types_core::ComponentName;

use crate::{Chunk, RowId};
use crate::{Chunk, ChunkTimeline, RowId};

// ---

Expand Down Expand Up @@ -92,9 +94,77 @@ impl Chunk {
}
}

pub struct ChunkIndicesIter {
chunk: Arc<Chunk>,

time_chunk: Option<ChunkTimeline>,
index: usize,
}

impl Iterator for ChunkIndicesIter {
type Item = (TimeInt, RowId);

fn next(&mut self) -> Option<Self::Item> {
let i = self.index;
self.index += 1;

let row_id = {
let (times, incs) = self.chunk.row_ids_raw();
let times = times.values().as_slice();
let incs = incs.values().as_slice();

let time = *times.get(i)?;
let inc = *incs.get(i)?;

RowId::from_u128(((time as u128) << 64) | (inc as u128))
};

if let Some(time_chunk) = &self.time_chunk {
let time = *time_chunk.times_raw().get(i)?;
let time = TimeInt::new_temporal(time);
Some((time, row_id))
} else {
Some((TimeInt::STATIC, row_id))
}
}
}

impl Chunk {
/// Returns an iterator over the indices (`(TimeInt, RowId)`) of a [`Chunk`], for a given timeline.
///
/// If the chunk is static, `timeline` will be ignored.
///
/// The returned iterator outlives `self`, thus it can be passed around freely.
#[inline]
pub fn iter_indices(self: Arc<Self>, timeline: &Timeline) -> Option<ChunkIndicesIter> {
if self.is_static() {
Some(ChunkIndicesIter {
chunk: self,
time_chunk: None,
index: 0,
})
} else {
self.timelines
.get(timeline)
.cloned()
.map(|time_chunk| ChunkIndicesIter {
chunk: self,
time_chunk: Some(time_chunk),
index: 0,
})
}
}
}

#[cfg(test)]
mod tests {
use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
use std::sync::Arc;

use itertools::{izip, Itertools};
use re_log_types::{
example_components::{MyColor, MyLabel, MyPoint},
EntityPath, TimeInt, TimePoint,
};
use re_types_core::{ComponentBatch, Loggable};

use crate::{Chunk, RowId, Timeline};
Expand Down Expand Up @@ -205,7 +275,7 @@ mod tests {
let expected =
expected.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_name} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, &component_name));
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name));
}

chunk.sort_if_unsorted();
Expand All @@ -215,7 +285,103 @@ mod tests {
let expected =
expected.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_name} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, &component_name));
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name));
}

Ok(())
}

#[test]
fn iter_indices_temporal() -> anyhow::Result<()> {
let entity_path = EntityPath::from("this/that");

let row_id1 = RowId::new();
let row_id2 = RowId::new();
let row_id3 = RowId::new();
let row_id4 = RowId::new();
let row_id5 = RowId::new();

let timeline_frame = Timeline::new_sequence("frame");

let timepoint1 = [(timeline_frame, 1)];
let timepoint2 = [(timeline_frame, 3)];
let timepoint3 = [(timeline_frame, 5)];
let timepoint4 = [(timeline_frame, 7)];
let timepoint5 = [(timeline_frame, 9)];

let points1 = &[MyPoint::new(1.0, 1.0)];
let points2 = &[MyPoint::new(2.0, 2.0)];
let points3 = &[MyPoint::new(3.0, 3.0)];
let points4 = &[MyPoint::new(4.0, 4.0)];
let points5 = &[MyPoint::new(5.0, 5.0)];

let chunk = Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batches(row_id1, timepoint1, [points1 as _])
.with_component_batches(row_id2, timepoint2, [points2 as _])
.with_component_batches(row_id3, timepoint3, [points3 as _])
.with_component_batches(row_id4, timepoint4, [points4 as _])
.with_component_batches(row_id5, timepoint5, [points5 as _])
.build()?,
);

{
let got = Arc::clone(&chunk)
.iter_indices(&timeline_frame)
.map(|it| it.collect_vec())
.unwrap_or_default();
let expected = izip!(
chunk
.timelines
.get(&timeline_frame)
.map(|time_chunk| time_chunk.times().collect_vec())
.unwrap_or_default(),
chunk.row_ids()
)
.collect_vec();

similar_asserts::assert_eq!(expected, got);
}

Ok(())
}

#[test]
fn iter_indices_static() -> anyhow::Result<()> {
let entity_path = EntityPath::from("this/that");

let row_id1 = RowId::new();
let row_id2 = RowId::new();
let row_id3 = RowId::new();
let row_id4 = RowId::new();
let row_id5 = RowId::new();

let timeline_frame = Timeline::new_sequence("frame");

let points1 = &[MyPoint::new(1.0, 1.0)];
let points2 = &[MyPoint::new(2.0, 2.0)];
let points3 = &[MyPoint::new(3.0, 3.0)];
let points4 = &[MyPoint::new(4.0, 4.0)];
let points5 = &[MyPoint::new(5.0, 5.0)];

let chunk = Arc::new(
Chunk::builder(entity_path.clone())
.with_component_batches(row_id1, TimePoint::default(), [points1 as _])
.with_component_batches(row_id2, TimePoint::default(), [points2 as _])
.with_component_batches(row_id3, TimePoint::default(), [points3 as _])
.with_component_batches(row_id4, TimePoint::default(), [points4 as _])
.with_component_batches(row_id5, TimePoint::default(), [points5 as _])
.build()?,
);

{
let got = Arc::clone(&chunk)
.iter_indices(&timeline_frame)
.map(|it| it.collect_vec())
.unwrap_or_default();
let expected = izip!(std::iter::repeat(TimeInt::STATIC), chunk.row_ids()).collect_vec();

similar_asserts::assert_eq!(expected, got);
}

Ok(())
Expand Down

0 comments on commit 83762df

Please sign in to comment.