Skip to content

Commit

Permalink
Merge branch 'main' into release-0.19.0-alpha.2
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc authored Oct 12, 2024
2 parents 731dd5a + 2a81c67 commit d910e34
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 8 deletions.
22 changes: 22 additions & 0 deletions crates/store/re_chunk/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,28 @@ pub fn take_array<A: ArrowArray + Clone, O: arrow2::types::Index>(
"index arrays with validity bits are technically valid, but generally a sign that something went wrong",
);

if indices.len() == array.len() {
let indices = indices.values().as_slice();

let starts_at_zero = || indices[0] == O::zero();
let is_consecutive = || {
indices
.windows(2)
.all(|values| values[1] == values[0] + O::one())
};

if starts_at_zero() && is_consecutive() {
#[allow(clippy::unwrap_used)]
return array
.clone()
.as_any()
.downcast_ref::<A>()
// Unwrap: that's initial type that we got.
.unwrap()
.clone();
}
}

#[allow(clippy::unwrap_used)]
arrow2::compute::take::take(array, indices)
// Unwrap: this literally cannot fail.
Expand Down
130 changes: 129 additions & 1 deletion crates/store/re_chunk/tests/memory_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use arrow2::{
},
offset::Offsets as ArrowOffsets,
};
use itertools::Itertools as _;
use itertools::Itertools;

#[test]
fn filter_does_allocate() {
Expand Down Expand Up @@ -125,6 +125,72 @@ fn filter_does_allocate() {
}
}

#[test]
fn filter_empty_or_full_is_noop() {
re_log::setup_logging();

const NUM_SCALARS: i64 = 10_000_000;

let (((unfiltered, unfiltered_size_bytes), (filtered, filtered_size_bytes)), total_size_bytes) =
memory_use(|| {
let unfiltered = memory_use(|| {
let scalars = ArrowPrimitiveArray::from_vec((0..NUM_SCALARS).collect_vec());
ArrowListArray::<i32>::new(
ArrowListArray::<i32>::default_datatype(scalars.data_type().clone()),
ArrowOffsets::try_from_lengths(
std::iter::repeat(NUM_SCALARS as usize / 10).take(10),
)
.unwrap()
.into(),
scalars.to_boxed(),
None,
)
});

let filter = ArrowBooleanArray::from_slice(
std::iter::repeat(true)
.take(unfiltered.0.len())
.collect_vec(),
);
let filtered = memory_use(|| re_chunk::util::filter_array(&unfiltered.0, &filter));

(unfiltered, filtered)
});

eprintln!(
"unfiltered={} filtered={} total={}",
re_format::format_bytes(unfiltered_size_bytes as _),
re_format::format_bytes(filtered_size_bytes as _),
re_format::format_bytes(total_size_bytes as _),
);

assert!(
filtered_size_bytes < 1_000,
"filtered array should be the size of a few empty datastructures at most"
);

{
let unfiltered = unfiltered
.values()
.as_any()
.downcast_ref::<ArrowPrimitiveArray<i64>>()
.unwrap();
let filtered = filtered
.values()
.as_any()
.downcast_ref::<ArrowPrimitiveArray<i64>>()
.unwrap();

assert!(
std::ptr::eq(
unfiltered.values().as_ptr_range().start,
filtered.values().as_ptr_range().start
),
"whole thing should be a noop -- pointers should match"
);
}
}

#[test]
// TODO(cmc): That's the end goal, but it is simply impossible with `ListArray`'s encoding.
// See `Chunk::take_array`'s doc-comment for more information.
Expand Down Expand Up @@ -191,3 +257,65 @@ fn take_does_not_allocate() {
);
}
}

#[test]
fn take_empty_or_full_is_noop() {
re_log::setup_logging();

const NUM_SCALARS: i64 = 10_000_000;

let (((untaken, untaken_size_bytes), (taken, taken_size_bytes)), total_size_bytes) =
memory_use(|| {
let untaken = memory_use(|| {
let scalars = ArrowPrimitiveArray::from_vec((0..NUM_SCALARS).collect_vec());
ArrowListArray::<i32>::new(
ArrowListArray::<i32>::default_datatype(scalars.data_type().clone()),
ArrowOffsets::try_from_lengths(
std::iter::repeat(NUM_SCALARS as usize / 10).take(10),
)
.unwrap()
.into(),
scalars.to_boxed(),
None,
)
});

let indices = ArrowPrimitiveArray::from_vec((0..untaken.0.len() as i32).collect_vec());
let taken = memory_use(|| re_chunk::util::take_array(&untaken.0, &indices));

(untaken, taken)
});

eprintln!(
"untaken={} taken={} total={}",
re_format::format_bytes(untaken_size_bytes as _),
re_format::format_bytes(taken_size_bytes as _),
re_format::format_bytes(total_size_bytes as _),
);

assert!(
taken_size_bytes < 1_000,
"taken array should be the size of a few empty datastructures at most"
);

{
let untaken = untaken
.values()
.as_any()
.downcast_ref::<ArrowPrimitiveArray<i64>>()
.unwrap();
let taken = taken
.values()
.as_any()
.downcast_ref::<ArrowPrimitiveArray<i64>>()
.unwrap();

assert!(
std::ptr::eq(
untaken.values().as_ptr_range().start,
taken.values().as_ptr_range().start
),
"whole thing should be a noop -- pointers should match"
);
}
}
33 changes: 27 additions & 6 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ use crate::{QueryEngine, RecordBatch};
// * [x] sampling support
// * [x] clears
// * [x] pagination (fast)
// * [x] take kernel duplicates all memory
// * [x] dedupe-latest without allocs/copies
// * [ ] allocate null arrays once
// * [ ] overlaps (less dumb)
// * [ ] selector-based `filtered_index`
// * [ ] configurable cache bypass
// * [ ] allocate null arrays once
// * [ ] take kernel duplicates all memory
// * [ ] dedupe-latest without allocs/copies

/// A handle to a dataframe query, ready to be executed.
///
Expand Down Expand Up @@ -623,7 +623,10 @@ impl QueryHandle<'_> {
"the query cache should have already taken care of sorting (and densifying!) the chunk",
);

let chunk = chunk.deduped_latest_on_index(&query.timeline);
// TODO(cmc): That'd be more elegant, but right now there is no way to
// avoid allocations and copies when using Arrow's `ListArray`.
//
// let chunk = chunk.deduped_latest_on_index(&query.timeline);

(AtomicU64::default(), chunk)
})
Expand Down Expand Up @@ -755,7 +758,10 @@ impl QueryHandle<'_> {
pub fn num_rows(&self) -> u64 {
let num_rows = self.init().unique_index_values.len() as _;

if cfg!(debug_assertions) {
// NOTE: This is too slow to run in practice, even for debug builds.
// Do keep this around though, it does come in handy.
#[allow(clippy::overly_complex_bool_expr)]
if false && cfg!(debug_assertions) {
let expected_num_rows =
self.engine.query(self.query.clone()).into_iter().count() as u64;
assert_eq!(expected_num_rows, num_rows);
Expand Down Expand Up @@ -851,13 +857,28 @@ impl QueryHandle<'_> {
// have an Arrow ListView at our disposal.
let cur_indices = cur_chunk.iter_indices(&state.filtered_index).collect_vec();
let (index_value, cur_row_id) = 'walk: loop {
let Some((index_value, cur_row_id)) =
let Some((mut index_value, mut cur_row_id)) =
cur_indices.get(cur_cursor_value as usize).copied()
else {
continue 'overlaps;
};

if index_value == *cur_index_value {
// TODO(cmc): Because of Arrow's `ListArray` limitations, we inline the
// "deduped_latest_on_index" logic here directly, which prevents a lot of
// unnecessary allocations and copies.
while let Some((next_index_value, next_row_id)) =
cur_indices.get(cur_cursor_value as usize + 1).copied()
{
if next_index_value == *cur_index_value {
index_value = next_index_value;
cur_row_id = next_row_id;
cur_cursor_value = cur_cursor.fetch_add(1, Ordering::Relaxed) + 1;
} else {
break;
}
}

break 'walk (index_value, cur_row_id);
}

Expand Down
3 changes: 2 additions & 1 deletion scripts/ci/check_large_files_allow_list.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Cargo.lock
CHANGELOG.md
Cargo.lock
crates/build/re_types_builder/src/reflection.rs
crates/store/re_dataframe/src/query.rs
crates/store/re_types/src/datatypes/tensor_buffer.rs
crates/viewer/re_ui/data/Inter-Medium.otf
crates/viewer/re_viewer/src/reflection/mod.rs
Expand Down

0 comments on commit d910e34

Please sign in to comment.