From 3a50bc23ccb9f90083546b69efcb51b063bd830f Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Thu, 9 Jan 2025 13:22:45 +0100 Subject: [PATCH] Port `PendingRow` to `arrow-rs` (#8617) * Part of https://github.com/rerun-io/rerun/issues/3741 * Requires https://github.com/rerun-io/re_arrow2/pull/15 NOTE: this will regress on memory use, because we lack `.shrink_to_fit`, which will arrive in * https://github.com/rerun-io/rerun/pull/8618 --- Cargo.lock | 7 +- Cargo.toml | 2 + clippy.toml | 16 +- crates/store/re_chunk/Cargo.toml | 1 + .../re_chunk/src/{util.rs => arrow2_util.rs} | 0 crates/store/re_chunk/src/arrow_util.rs | 319 ++++++++++++++++++ crates/store/re_chunk/src/batcher.rs | 90 +++-- crates/store/re_chunk/src/builder.rs | 12 +- crates/store/re_chunk/src/chunk.rs | 39 ++- crates/store/re_chunk/src/lib.rs | 3 +- crates/store/re_chunk/src/merge.rs | 18 +- crates/store/re_chunk/src/migration.rs | 5 +- crates/store/re_chunk/src/shuffle.rs | 2 +- crates/store/re_chunk/src/slice.rs | 22 +- crates/store/re_chunk/src/transport.rs | 8 +- crates/store/re_chunk/tests/memory_test.rs | 14 +- crates/store/re_chunk_store/src/writes.rs | 2 +- .../store/re_chunk_store/tests/memory_test.rs | 2 +- crates/store/re_dataframe/src/lib.rs | 2 +- crates/store/re_dataframe/src/query.rs | 4 +- crates/store/re_grpc_client/src/lib.rs | 4 +- crates/top/re_sdk/src/recording_stream.rs | 20 +- crates/top/rerun_c/src/lib.rs | 2 +- rerun_py/src/arrow.rs | 2 +- 24 files changed, 477 insertions(+), 119 deletions(-) rename crates/store/re_chunk/src/{util.rs => arrow2_util.rs} (100%) create mode 100644 crates/store/re_chunk/src/arrow_util.rs diff --git a/Cargo.lock b/Cargo.lock index d02046f7e1d7..e83ae8048e34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5203,7 +5203,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.1", "itertools 0.13.0", "log", "multimap", @@ -5568,9 +5568,8 @@ dependencies = [ [[package]] name = "re_arrow2" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f046c5679b0f305d610f80d93fd51ad702cfc077bbe16d9553a1660a2505160" +version = "0.18.1" +source = "git+https://github.com/rerun-io/re_arrow2.git?branch=main#c762f392ded92c5978fcdc2195988587a4976417" dependencies = [ "ahash", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 155e5fb499cd..5e6e52eaa238 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -586,3 +586,5 @@ egui_commonmark = { git = "https://github.com/rerun-io/egui_commonmark.git", bra # walkers = { git = "https://github.com/rerun-io/walkers", rev = "8939cceb3fa49ca8648ee16fe1d8432f5ab0bdcc" } # https://github.com/podusowski/walkers/pull/222 # dav1d = { path = "/home/cmc/dev/rerun-io/rav1d", package = "re_rav1d", version = "0.1.1" } + +re_arrow2 = { git = "https://github.com/rerun-io/re_arrow2.git", branch = "main" } diff --git a/clippy.toml b/clippy.toml index ec39e9871d72..bec822eb6e8f 100644 --- a/clippy.toml +++ b/clippy.toml @@ -50,13 +50,17 @@ disallowed-methods = [ { path = "std::panic::catch_unwind", reason = "We compile with `panic = 'abort'`" }, { path = "std::thread::spawn", reason = "Use `std::thread::Builder` and name the thread" }, + { path = "arrow::compute::concat", reason = "Use `re_chunk::arrow_util::concat_arrays` instead, which has better memory management" }, + { path = "arrow::compute::filter", reason = "Use `re_chunk::arrow_util::filter_array` instead" }, + { path = "arrow::compute::take", reason = "Use `re_chunk::arrow_util::take_array` instead" }, + # Specify both `arrow2` and `re_arrow2` -- clippy gets lost in all the package renaming happening. - { path = "arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::util::concat_arrays` instead, which has proper early outs" }, - { path = "arrow2::compute::filter::filter", reason = "Use `re_chunk::util::filter_array` instead, which has proper early outs" }, - { path = "arrow2::compute::take::take", reason = "Use `re_chunk::util::take_array` instead, which has proper early outs" }, - { path = "re_arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::util::concat_arrays` instead, which has proper early outs" }, - { path = "re_arrow2::compute::filter::filter", reason = "Use `re_chunk::util::filter_array` instead, which has proper early outs" }, - { path = "re_arrow2::compute::take::take", reason = "Use `re_chunk::util::take_array` instead, which has proper early outs" }, + { path = "arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::arrow2_util::concat_arrays` instead, which has proper early outs" }, + { path = "arrow2::compute::filter::filter", reason = "Use `re_chunk::arrow2_util::filter_array` instead, which has proper early outs" }, + { path = "arrow2::compute::take::take", reason = "Use `re_chunk::arrow2_util::take_array` instead, which has proper early outs" }, + { path = "re_arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::arrow2_util::concat_arrays` instead, which has proper early outs" }, + { path = "re_arrow2::compute::filter::filter", reason = "Use `re_chunk::arrow2_util::filter_array` instead, which has proper early outs" }, + { path = "re_arrow2::compute::take::take", reason = "Use `re_chunk::arrow2_util::take_array` instead, which has proper early outs" }, # There are many things that aren't allowed on wasm, # but we cannot disable them all here (because of e.g. https://github.com/rust-lang/rust-clippy/issues/10406) diff --git a/crates/store/re_chunk/Cargo.toml b/crates/store/re_chunk/Cargo.toml index 90e253be49dc..4e091da8a29e 100644 --- a/crates/store/re_chunk/Cargo.toml +++ b/crates/store/re_chunk/Cargo.toml @@ -52,6 +52,7 @@ ahash.workspace = true anyhow.workspace = true arrow.workspace = true arrow2 = { workspace = true, features = [ + "arrow", "compute_concatenate", "compute_filter", "compute_take", diff --git a/crates/store/re_chunk/src/util.rs b/crates/store/re_chunk/src/arrow2_util.rs similarity index 100% rename from crates/store/re_chunk/src/util.rs rename to crates/store/re_chunk/src/arrow2_util.rs diff --git a/crates/store/re_chunk/src/arrow_util.rs b/crates/store/re_chunk/src/arrow_util.rs new file mode 100644 index 000000000000..15ed476e7a51 --- /dev/null +++ b/crates/store/re_chunk/src/arrow_util.rs @@ -0,0 +1,319 @@ +use arrow::{ + array::{Array, ArrayRef, ArrowPrimitiveType, BooleanArray, ListArray, PrimitiveArray}, + buffer::{NullBuffer, OffsetBuffer}, + datatypes::{DataType, Field}, +}; +use itertools::Itertools; + +// --- + +/// Returns true if the given `list_array` is semantically empty. +/// +/// Semantic emptiness is defined as either one of these: +/// * The list is physically empty (literally no data). +/// * The list only contains null entries, or empty arrays, or a mix of both. +#[inline] +pub fn is_list_array_semantically_empty(list_array: &ListArray) -> bool { + list_array.values().is_empty() +} + +/// Create a sparse list-array out of an array of arrays. +/// +/// All arrays must have the same datatype. +/// +/// Returns `None` if `arrays` is empty. +#[inline] +pub fn arrays_to_list_array_opt(arrays: &[Option<&dyn Array>]) -> Option { + let datatype = arrays + .iter() + .flatten() + .map(|array| array.data_type().clone()) + .next()?; + arrays_to_list_array(datatype, arrays) +} + +/// An empty array of the given datatype. +pub fn new_empty_array(datatype: &DataType) -> ArrayRef { + let capacity = 0; + arrow::array::make_builder(datatype, capacity).finish() +} + +/// Create a sparse list-array out of an array of arrays. +/// +/// Returns `None` if any of the specified `arrays` doesn't match the given `array_datatype`. +/// +/// Returns an empty list if `arrays` is empty. +pub fn arrays_to_list_array( + array_datatype: DataType, + arrays: &[Option<&dyn Array>], +) -> Option { + let arrays_dense = arrays.iter().flatten().copied().collect_vec(); + + let data = if arrays_dense.is_empty() { + new_empty_array(&array_datatype) + } else { + re_tracing::profile_scope!("concatenate", arrays_dense.len().to_string()); + concat_arrays(&arrays_dense) + .map_err(|err| { + re_log::warn_once!("failed to concatenate arrays: {err}"); + err + }) + .ok()? + }; + + let nullable = true; + let field = Field::new_list_field(array_datatype, nullable); + + let offsets = OffsetBuffer::from_lengths( + arrays + .iter() + .map(|array| array.map_or(0, |array| array.len())), + ); + + #[allow(clippy::from_iter_instead_of_collect)] + let nulls = NullBuffer::from_iter(arrays.iter().map(Option::is_some)); + + Some(ListArray::new(field.into(), offsets, data, nulls.into())) +} + +/// Given a sparse [`ListArray`] (i.e. an array with a nulls bitmap that contains at least +/// one falsy value), returns a dense [`ListArray`] that only contains the non-null values from +/// the original list. +/// +/// This is a no-op if the original array is already dense. +pub fn sparse_list_array_to_dense_list_array(list_array: &ListArray) -> ListArray { + if list_array.is_empty() { + return list_array.clone(); + } + + let is_empty = list_array.nulls().map_or(false, |nulls| nulls.is_empty()); + if is_empty { + return list_array.clone(); + } + + let offsets = OffsetBuffer::from_lengths(list_array.iter().flatten().map(|array| array.len())); + + let fields = list_array_fields(list_array); + + ListArray::new(fields, offsets, list_array.values().clone(), None) +} + +fn list_array_fields(list_array: &arrow::array::GenericListArray) -> std::sync::Arc { + match list_array.data_type() { + DataType::List(fields) | DataType::LargeList(fields) => fields, + _ => unreachable!("The GenericListArray constructor guaranteed we can't get here"), + } + .clone() +} + +/// Create a new [`ListArray`] of target length by appending null values to its back. +/// +/// This will share the same child data array buffer, but will create new offset and nulls buffers. +pub fn pad_list_array_back(list_array: &ListArray, target_len: usize) -> ListArray { + let missing_len = target_len.saturating_sub(list_array.len()); + if missing_len == 0 { + return list_array.clone(); + } + + let fields = list_array_fields(list_array); + + let offsets = { + OffsetBuffer::from_lengths( + list_array + .iter() + .map(|array| array.map_or(0, |array| array.len())) + .chain(std::iter::repeat(0).take(missing_len)), + ) + }; + + let values = list_array.values().clone(); + + let nulls = { + if let Some(nulls) = list_array.nulls() { + #[allow(clippy::from_iter_instead_of_collect)] + NullBuffer::from_iter( + nulls + .iter() + .chain(std::iter::repeat(false).take(missing_len)), + ) + } else { + #[allow(clippy::from_iter_instead_of_collect)] + NullBuffer::from_iter( + std::iter::repeat(true) + .take(list_array.len()) + .chain(std::iter::repeat(false).take(missing_len)), + ) + } + }; + + ListArray::new(fields, offsets, values, Some(nulls)) +} + +/// Create a new [`ListArray`] of target length by appending null values to its front. +/// +/// This will share the same child data array buffer, but will create new offset and nulls buffers. +pub fn pad_list_array_front(list_array: &ListArray, target_len: usize) -> ListArray { + let missing_len = target_len.saturating_sub(list_array.len()); + if missing_len == 0 { + return list_array.clone(); + } + + let fields = list_array_fields(list_array); + + let offsets = { + OffsetBuffer::from_lengths( + std::iter::repeat(0).take(missing_len).chain( + list_array + .iter() + .map(|array| array.map_or(0, |array| array.len())), + ), + ) + }; + + let values = list_array.values().clone(); + + let nulls = { + if let Some(nulls) = list_array.nulls() { + #[allow(clippy::from_iter_instead_of_collect)] + NullBuffer::from_iter( + std::iter::repeat(false) + .take(missing_len) + .chain(nulls.iter()), + ) + } else { + #[allow(clippy::from_iter_instead_of_collect)] + NullBuffer::from_iter( + std::iter::repeat(false) + .take(missing_len) + .chain(std::iter::repeat(true).take(list_array.len())), + ) + } + }; + + ListArray::new(fields, offsets, values, Some(nulls)) +} + +/// Returns a new [[`ListArray`]] with len `entries`. +/// +/// Each entry will be an empty array of the given `child_datatype`. +pub fn new_list_array_of_empties(child_datatype: &DataType, len: usize) -> ListArray { + let empty_array = new_empty_array(child_datatype); + + let offsets = OffsetBuffer::from_lengths(std::iter::repeat(0).take(len)); + + let nullable = true; + ListArray::new( + Field::new_list_field(empty_array.data_type().clone(), nullable).into(), + offsets, + empty_array, + None, + ) +} + +/// Applies a [`arrow::compute::concat`] kernel to the given `arrays`. +/// +/// Early outs where it makes sense (e.g. `arrays.len() == 1`). +/// +/// Returns an error if the arrays don't share the exact same datatype. +pub fn concat_arrays(arrays: &[&dyn Array]) -> arrow::error::Result { + #[allow(clippy::disallowed_methods)] // that's the whole point + arrow::compute::concat(arrays) + // TODO(#3741): call .shrink_to_fit on the result +} + +/// Applies a [filter] kernel to the given `array`. +/// +/// Panics iff the length of the filter doesn't match the length of the array. +/// +/// In release builds, filters are allowed to have null entries (they will be interpreted as `false`). +/// In debug builds, null entries will panic. +/// +/// Note: a `filter` kernel _copies_ the data in order to make the resulting arrays contiguous in memory. +/// +/// Takes care of up- and down-casting the data back and forth on behalf of the caller. +/// +/// [filter]: arrow::compute::filter +pub fn filter_array(array: &A, filter: &BooleanArray) -> A { + assert_eq!( + array.len(), filter.len(), + "the length of the filter must match the length of the array (the underlying kernel will panic otherwise)", + ); + debug_assert!( + filter.nulls().is_none(), + "filter masks with nulls bits are technically valid, but generally a sign that something went wrong", + ); + + #[allow(clippy::disallowed_methods)] // that's the whole point + #[allow(clippy::unwrap_used)] + arrow::compute::filter(array, filter) + // Unwrap: this literally cannot fail. + .unwrap() + .as_any() + .downcast_ref::() + // Unwrap: that's initial type that we got. + .unwrap() + .clone() +} + +/// Applies a [take] kernel to the given `array`. +/// +/// In release builds, indices are allowed to have null entries (they will be taken as `null`s). +/// In debug builds, null entries will panic. +/// +/// Note: a `take` kernel _copies_ the data in order to make the resulting arrays contiguous in memory. +/// +/// Takes care of up- and down-casting the data back and forth on behalf of the caller. +/// +/// [take]: arrow::compute::take +// +// TODO(cmc): in an ideal world, a `take` kernel should merely _slice_ the data and avoid any allocations/copies +// where possible (e.g. list-arrays). +// That is not possible with vanilla [`ListArray`]s since they don't expose any way to encode optional lengths, +// in addition to offsets. +// For internal stuff, we could perhaps provide a custom implementation that returns a `DictionaryArray` instead? +pub fn take_array(array: &A, indices: &PrimitiveArray) -> A +where + A: Array + Clone + 'static, + O: ArrowPrimitiveType, + O::Native: std::ops::Add, +{ + use arrow::datatypes::ArrowNativeTypeOp as _; + + debug_assert!( + indices.nulls().is_none(), + "index arrays with nulls bits are technically valid, but generally a sign that something went wrong", + ); + + if indices.len() == array.len() { + let indices = indices.values().as_ref(); + + let starts_at_zero = || indices[0] == O::Native::ZERO; + let is_consecutive = || { + indices + .windows(2) + .all(|values| values[1] == values[0] + O::Native::ONE) + }; + + if starts_at_zero() && is_consecutive() { + #[allow(clippy::unwrap_used)] + return array + .clone() + .as_any() + .downcast_ref::() + // Unwrap: that's initial type that we got. + .unwrap() + .clone(); + } + } + + #[allow(clippy::disallowed_methods)] // that's the whole point + #[allow(clippy::unwrap_used)] + arrow::compute::take(array, indices, Default::default()) + // Unwrap: this literally cannot fail. + .unwrap() + .as_any() + .downcast_ref::() + // Unwrap: that's initial type that we got. + .unwrap() + .clone() +} diff --git a/crates/store/re_chunk/src/batcher.rs b/crates/store/re_chunk/src/batcher.rs index 44a9ffe6be70..9143124a9ad4 100644 --- a/crates/store/re_chunk/src/batcher.rs +++ b/crates/store/re_chunk/src/batcher.rs @@ -4,7 +4,8 @@ use std::{ time::{Duration, Instant}, }; -use arrow2::array::{Array as Arrow2Array, PrimitiveArray as Arrow2PrimitiveArray}; +use arrow::array::{Array as ArrowArray, ArrayRef}; +use arrow2::array::PrimitiveArray as Arrow2PrimitiveArray; use crossbeam::channel::{Receiver, Sender}; use nohash_hasher::IntMap; @@ -12,7 +13,7 @@ use re_byte_size::SizeBytes as _; use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline}; use re_types_core::ComponentDescriptor; -use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; +use crate::{arrow_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; // --- @@ -679,15 +680,12 @@ pub struct PendingRow { /// The component data. /// /// Each array is a single component, i.e. _not_ a list array. - pub components: IntMap>, + pub components: IntMap, } impl PendingRow { #[inline] - pub fn new( - timepoint: TimePoint, - components: IntMap>, - ) -> Self { + pub fn new(timepoint: TimePoint, components: IntMap) -> Self { Self { row_id: RowId::new(), timepoint, @@ -734,7 +732,7 @@ impl PendingRow { let mut per_name = ChunkComponents::default(); for (component_desc, array) in components { - let list_array = crate::util::arrays_to_list_array_opt(&[Some(&*array as _)]); + let list_array = arrow_util::arrays_to_list_array_opt(&[Some(&*array as _)]); if let Some(list_array) = list_array { per_name.insert_descriptor(component_desc, list_array); } @@ -826,7 +824,7 @@ impl PendingRow { // Create all the logical list arrays that we're going to need, accounting for the // possibility of sparse components in the data. - let mut all_components: IntMap>> = + let mut all_components: IntMap>> = IntMap::default(); for row in &rows { for component_desc in row.components.keys() { @@ -870,7 +868,7 @@ impl PendingRow { for (component_desc, arrays) in std::mem::take(&mut components) { let list_array = - crate::util::arrays_to_list_array_opt(&arrays); + arrow_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { per_name.insert_descriptor(component_desc, list_array); } @@ -898,7 +896,7 @@ impl PendingRow { arrays.push( row_components .get(component_desc) - .map(|array| &**array as &dyn Arrow2Array), + .map(|array| &**array as &dyn ArrowArray), ); } } @@ -915,7 +913,7 @@ impl PendingRow { { let mut per_name = ChunkComponents::default(); for (component_desc, arrays) in components { - let list_array = crate::util::arrays_to_list_array_opt(&arrays); + let list_array = arrow_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { per_name.insert_descriptor(component_desc, list_array); } @@ -1008,9 +1006,9 @@ mod tests { let timepoint2 = TimePoint::default().with(timeline1, 43); let timepoint3 = TimePoint::default().with(timeline1, 44); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1060,7 +1058,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) .unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( @@ -1087,9 +1085,9 @@ mod tests { let timeless = TimePoint::default(); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1132,7 +1130,7 @@ mod tests { let expected_timelines = []; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) .unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( @@ -1163,9 +1161,9 @@ mod tests { let timepoint2 = TimePoint::default().with(timeline1, 43); let timepoint3 = TimePoint::default().with(timeline1, 44); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1216,7 +1214,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[0].id, @@ -1244,7 +1242,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, @@ -1279,9 +1277,9 @@ mod tests { .with(timeline1, 44) .with(timeline2, 1001); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1331,7 +1329,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[0].id, @@ -1369,7 +1367,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, @@ -1399,10 +1397,10 @@ mod tests { let timepoint2 = TimePoint::default().with(timeline1, 43); let timepoint3 = TimePoint::default().with(timeline1, 44); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; let points2 = - MyPoint64::to_arrow2([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + MyPoint64::to_arrow([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; // same name, different datatype @@ -1452,7 +1450,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[0].id, @@ -1480,7 +1478,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, @@ -1524,11 +1522,11 @@ mod tests { .with(timeline2, 1003) .with(timeline1, 45); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let points4 = - MyPoint::to_arrow2([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?; + MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1591,7 +1589,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt( + arrow_util::arrays_to_list_array_opt( &[&*points1, &*points2, &*points3, &*points4].map(Some), ) .unwrap(), @@ -1638,11 +1636,11 @@ mod tests { .with(timeline2, 1003) .with(timeline1, 45); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let points4 = - MyPoint::to_arrow2([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?; + MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1705,7 +1703,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) .unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( @@ -1744,7 +1742,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, diff --git a/crates/store/re_chunk/src/builder.rs b/crates/store/re_chunk/src/builder.rs index debfa8297e45..2c7ae1dde8d9 100644 --- a/crates/store/re_chunk/src/builder.rs +++ b/crates/store/re_chunk/src/builder.rs @@ -9,7 +9,7 @@ use nohash_hasher::IntMap; use re_log_types::{EntityPath, TimeInt, TimePoint, Timeline}; use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor}; -use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; +use crate::{arrow2_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; // --- @@ -236,11 +236,11 @@ impl ChunkBuilder { .into_iter() .filter_map(|(component_desc, arrays)| { let arrays = arrays.iter().map(|array| array.as_deref()).collect_vec(); - crate::util::arrays_to_list_array_opt(&arrays) + arrow2_util::arrays_to_list_array_opt(&arrays) .map(|list_array| (component_desc, list_array)) }) { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor(component_desc, list_array.into()); } per_name }; @@ -295,15 +295,15 @@ impl ChunkBuilder { // If we know the datatype in advance, we're able to keep even fully sparse // columns around. if let Some(datatype) = datatypes.get(&component_desc) { - crate::util::arrays_to_list_array(datatype.clone(), &arrays) + arrow2_util::arrays_to_list_array(datatype.clone(), &arrays) .map(|list_array| (component_desc, list_array)) } else { - crate::util::arrays_to_list_array_opt(&arrays) + arrow2_util::arrays_to_list_array_opt(&arrays) .map(|list_array| (component_desc, list_array)) } }) { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor(component_desc, list_array.into()); } per_name }, diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index ca2605d97dd4..7380f9ae2810 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -1,6 +1,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use ahash::HashMap; +use arrow::array::ListArray as ArrowListArray; use arrow2::{ array::{ Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, @@ -65,14 +66,15 @@ impl ChunkComponents { pub fn insert_descriptor( &mut self, component_desc: ComponentDescriptor, - list_array: Arrow2ListArray, - ) -> Option> { + list_array: ArrowListArray, + ) -> Option { // TODO(cmc): revert me let component_desc = component_desc.untagged(); self.0 .entry(component_desc.component_name) .or_default() - .insert(component_desc, list_array) + .insert(component_desc, list_array.into()) + .map(|la| la.into()) } /// Returns all list arrays for the given component name. @@ -138,6 +140,33 @@ impl std::ops::DerefMut for ChunkComponents { } } +impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents { + #[inline] + fn from_iter>(iter: T) -> Self { + let mut this = Self::default(); + { + for (component_desc, list_array) in iter { + this.insert_descriptor(component_desc, list_array); + } + } + this + } +} + +// TODO(cmc): Kinda disgusting but it makes our lives easier during the interim, as long as we're +// in this weird halfway in-between state where we still have a bunch of things indexed by name only. +impl FromIterator<(ComponentName, ArrowListArray)> for ChunkComponents { + #[inline] + fn from_iter>(iter: T) -> Self { + iter.into_iter() + .map(|(component_name, list_array)| { + let component_desc = ComponentDescriptor::new(component_name); + (component_desc, list_array) + }) + .collect() + } +} + impl FromIterator<(ComponentDescriptor, Arrow2ListArray)> for ChunkComponents { #[inline] fn from_iter)>>( @@ -146,7 +175,7 @@ impl FromIterator<(ComponentDescriptor, Arrow2ListArray)> for ChunkComponen let mut this = Self::default(); { for (component_desc, list_array) in iter { - this.insert_descriptor(component_desc, list_array); + this.insert_descriptor(component_desc, list_array.into()); } } this @@ -915,7 +944,7 @@ impl Chunk { list_array: Arrow2ListArray, ) -> ChunkResult<()> { self.components - .insert_descriptor(component_desc, list_array); + .insert_descriptor(component_desc, list_array.into()); self.sanity_check() } diff --git a/crates/store/re_chunk/src/lib.rs b/crates/store/re_chunk/src/lib.rs index 6f24bbe8d371..9ba7b632b86d 100644 --- a/crates/store/re_chunk/src/lib.rs +++ b/crates/store/re_chunk/src/lib.rs @@ -4,6 +4,8 @@ #![doc = document_features::document_features!()] //! +pub mod arrow2_util; +pub mod arrow_util; mod builder; mod chunk; mod helpers; @@ -16,7 +18,6 @@ mod range; mod shuffle; mod slice; mod transport; -pub mod util; #[cfg(not(target_arch = "wasm32"))] mod batcher; diff --git a/crates/store/re_chunk/src/merge.rs b/crates/store/re_chunk/src/merge.rs index 056672a1ef2f..a804ede244fb 100644 --- a/crates/store/re_chunk/src/merge.rs +++ b/crates/store/re_chunk/src/merge.rs @@ -5,7 +5,9 @@ use arrow2::array::{ use itertools::{izip, Itertools}; use nohash_hasher::IntMap; -use crate::{chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, TimeColumn}; +use crate::{ + arrow2_util, chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, TimeColumn, +}; // --- @@ -46,7 +48,7 @@ impl Chunk { let row_ids = { re_tracing::profile_scope!("row_ids"); - let row_ids = crate::util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?; + let row_ids = arrow2_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?; #[allow(clippy::unwrap_used)] // concatenating 2 RowId arrays must yield another RowId array row_ids @@ -105,7 +107,7 @@ impl Chunk { )); let list_array = - crate::util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?; + arrow2_util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?; let list_array = list_array .as_any() .downcast_ref::>()? @@ -116,7 +118,7 @@ impl Chunk { re_tracing::profile_scope!("pad"); Some(( component_desc.clone(), - crate::util::pad_list_array_back( + arrow2_util::pad_list_array_back( lhs_list_array, self.num_rows() + rhs.num_rows(), ), @@ -147,7 +149,7 @@ impl Chunk { )); let list_array = - crate::util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?; + arrow2_util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?; let list_array = list_array .as_any() .downcast_ref::>()? @@ -158,7 +160,7 @@ impl Chunk { re_tracing::profile_scope!("pad"); Some(( component_desc.clone(), - crate::util::pad_list_array_front( + arrow2_util::pad_list_array_front( rhs_list_array, self.num_rows() + rhs.num_rows(), ), @@ -171,7 +173,7 @@ impl Chunk { let components = { let mut per_name = ChunkComponents::default(); for (component_desc, list_array) in components { - per_name.insert_descriptor(component_desc.clone(), list_array); + per_name.insert_descriptor(component_desc.clone(), list_array.into()); } per_name }; @@ -285,7 +287,7 @@ impl TimeColumn { let time_range = self.time_range.union(rhs.time_range); - let times = crate::util::concat_arrays(&[&self.times, &rhs.times]).ok()?; + let times = arrow2_util::concat_arrays(&[&self.times, &rhs.times]).ok()?; let times = times .as_any() .downcast_ref::>()? diff --git a/crates/store/re_chunk/src/migration.rs b/crates/store/re_chunk/src/migration.rs index 0728b859edf8..d641fc5f3713 100644 --- a/crates/store/re_chunk/src/migration.rs +++ b/crates/store/re_chunk/src/migration.rs @@ -78,7 +78,8 @@ impl Chunk { .map(|a| a.as_deref() as Option<&dyn Array>) .collect_vec(); - if let Some(list_array_patched) = crate::util::arrays_to_list_array_opt(&arrays) + if let Some(list_array_patched) = + crate::arrow2_util::arrays_to_list_array_opt(&arrays) { *list_array = list_array_patched; } @@ -87,7 +88,7 @@ impl Chunk { } for (desc, list_array) in components_patched { - chunk.components.insert_descriptor(desc, list_array); + chunk.components.insert_descriptor(desc, list_array.into()); } chunk diff --git a/crates/store/re_chunk/src/shuffle.rs b/crates/store/re_chunk/src/shuffle.rs index 3cc07a346f35..ba9639898856 100644 --- a/crates/store/re_chunk/src/shuffle.rs +++ b/crates/store/re_chunk/src/shuffle.rs @@ -279,7 +279,7 @@ impl Chunk { ArrowOffsets::try_from_lengths(sorted_arrays.iter().map(|array| array.len())) .unwrap(); #[allow(clippy::unwrap_used)] // these are slices of the same outer array - let values = crate::util::concat_arrays(&sorted_arrays).unwrap(); + let values = crate::arrow2_util::concat_arrays(&sorted_arrays).unwrap(); let validity = original .validity() .map(|validity| swaps.iter().map(|&from| validity.get_bit(from)).collect()); diff --git a/crates/store/re_chunk/src/slice.rs b/crates/store/re_chunk/src/slice.rs index 00b77a91a34d..b361254c76a9 100644 --- a/crates/store/re_chunk/src/slice.rs +++ b/crates/store/re_chunk/src/slice.rs @@ -9,7 +9,7 @@ use nohash_hasher::IntSet; use re_log_types::Timeline; use re_types_core::{ComponentDescriptor, ComponentName}; -use crate::{Chunk, RowId, TimeColumn}; +use crate::{arrow2_util, Chunk, RowId, TimeColumn}; // --- @@ -369,7 +369,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: crate::util::filter_array(row_ids, &validity_filter), + row_ids: arrow2_util::filter_array(row_ids, &validity_filter), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.filtered(&validity_filter))) @@ -377,7 +377,7 @@ impl Chunk { components: components .iter_flattened() .map(|(component_desc, list_array)| { - let filtered = crate::util::filter_array(list_array, &validity_filter); + let filtered = arrow2_util::filter_array(list_array, &validity_filter); let filtered = if component_desc.component_name == component_name_pov { // Make sure we fully remove the validity bitmap for the densified // component. @@ -546,7 +546,7 @@ impl Chunk { entity_path: self.entity_path.clone(), heap_size_bytes: Default::default(), is_sorted: self.is_sorted, - row_ids: crate::util::take_array(&self.row_ids, &indices), + row_ids: arrow2_util::take_array(&self.row_ids, &indices), timelines: self .timelines .iter() @@ -556,7 +556,7 @@ impl Chunk { .components .iter_flattened() .map(|(component_desc, list_array)| { - let filtered = crate::util::take_array(list_array, &indices); + let filtered = arrow2_util::take_array(list_array, &indices); (component_desc.clone(), filtered) }) .collect(), @@ -619,7 +619,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: crate::util::filter_array(row_ids, filter), + row_ids: arrow2_util::filter_array(row_ids, filter), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.filtered(filter))) @@ -627,7 +627,7 @@ impl Chunk { components: components .iter_flattened() .map(|(component_desc, list_array)| { - let filtered = crate::util::filter_array(list_array, filter); + let filtered = arrow2_util::filter_array(list_array, filter); (component_desc.clone(), filtered) }) .collect(), @@ -699,7 +699,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: crate::util::take_array(row_ids, indices), + row_ids: arrow2_util::take_array(row_ids, indices), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.taken(indices))) @@ -707,7 +707,7 @@ impl Chunk { components: components .iter_flattened() .map(|(component_desc, list_array)| { - let taken = crate::util::take_array(list_array, indices); + let taken = arrow2_util::take_array(list_array, indices); (component_desc.clone(), taken) }) .collect(), @@ -852,7 +852,7 @@ impl TimeColumn { Self::new( is_sorted_opt, *timeline, - crate::util::filter_array(times, filter), + arrow2_util::filter_array(times, filter), ) } @@ -871,7 +871,7 @@ impl TimeColumn { Self::new( Some(*is_sorted), *timeline, - crate::util::take_array(times, indices), + arrow2_util::take_array(times, indices), ) } } diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index c7bf9d52559e..0a2700d78c3c 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -651,7 +651,7 @@ impl Chunk { let component_desc = TransportChunk::component_descriptor_from_field(field); if components - .insert_descriptor(component_desc, column.clone()) + .insert_descriptor(component_desc, column.clone().into()) .is_some() { return Err(ChunkError::Malformed { @@ -724,6 +724,8 @@ mod tests { Timeline, }; + use crate::arrow2_util; + use super::*; #[test] @@ -763,7 +765,7 @@ mod tests { let components = [ (MyPoint::descriptor(), { - let list_array = crate::util::arrays_to_list_array_opt(&[ + let list_array = arrow2_util::arrays_to_list_array_opt(&[ Some(&*points1), points2, Some(&*points3), @@ -774,7 +776,7 @@ mod tests { list_array }), (MyPoint::descriptor(), { - let list_array = crate::util::arrays_to_list_array_opt(&[ + let list_array = arrow2_util::arrays_to_list_array_opt(&[ Some(&*colors1), Some(&*colors2), colors3, diff --git a/crates/store/re_chunk/tests/memory_test.rs b/crates/store/re_chunk/tests/memory_test.rs index a6c0e7c6df7c..84ae71fd25a3 100644 --- a/crates/store/re_chunk/tests/memory_test.rs +++ b/crates/store/re_chunk/tests/memory_test.rs @@ -62,6 +62,7 @@ use arrow2::{ offset::Offsets as Arrow2Offsets, }; use itertools::Itertools; +use re_chunk::arrow2_util; // --- concat --- @@ -89,8 +90,7 @@ fn concat_does_allocate() { .map(|a| &**a as &dyn Arrow2Array) .collect_vec(); - let concatenated = - memory_use(|| re_chunk::util::concat_arrays(&unconcatenated_refs).unwrap()); + let concatenated = memory_use(|| arrow2_util::concat_arrays(&unconcatenated_refs).unwrap()); (unconcatenated, concatenated) }); @@ -122,7 +122,7 @@ fn concat_single_is_noop() { }); let concatenated = - memory_use(|| re_chunk::util::concat_arrays(&[&*unconcatenated.0]).unwrap()); + memory_use(|| arrow2_util::concat_arrays(&[&*unconcatenated.0]).unwrap()); (unconcatenated, concatenated) }); @@ -185,7 +185,7 @@ fn filter_does_allocate() { let filter = Arrow2BooleanArray::from_slice( (0..unfiltered.0.len()).map(|i| i % 2 == 0).collect_vec(), ); - let filtered = memory_use(|| re_chunk::util::filter_array(&unfiltered.0, &filter)); + let filtered = memory_use(|| arrow2_util::filter_array(&unfiltered.0, &filter)); (unfiltered, filtered) }); @@ -249,7 +249,7 @@ fn filter_empty_or_full_is_noop() { .take(unfiltered.0.len()) .collect_vec(), ); - let filtered = memory_use(|| re_chunk::util::filter_array(&unfiltered.0, &filter)); + let filtered = memory_use(|| arrow2_util::filter_array(&unfiltered.0, &filter)); (unfiltered, filtered) }); @@ -320,7 +320,7 @@ fn take_does_not_allocate() { .filter(|i| i % 2 == 0) .collect_vec(), ); - let taken = memory_use(|| re_chunk::util::take_array(&untaken.0, &indices)); + let taken = memory_use(|| arrow2_util::take_array(&untaken.0, &indices)); (untaken, taken) }); @@ -380,7 +380,7 @@ fn take_empty_or_full_is_noop() { }); let indices = Arrow2PrimitiveArray::from_vec((0..untaken.0.len() as i32).collect_vec()); - let taken = memory_use(|| re_chunk::util::take_array(&untaken.0, &indices)); + let taken = memory_use(|| arrow2_util::take_array(&untaken.0, &indices)); (untaken, taken) }); diff --git a/crates/store/re_chunk_store/src/writes.rs b/crates/store/re_chunk_store/src/writes.rs index 19d14ad72b85..204ad39abc8d 100644 --- a/crates/store/re_chunk_store/src/writes.rs +++ b/crates/store/re_chunk_store/src/writes.rs @@ -387,7 +387,7 @@ impl ChunkStore { }); { let is_semantically_empty = - re_chunk::util::is_list_array_semantically_empty(list_array); + re_chunk::arrow2_util::is_list_array_semantically_empty(list_array); column_metadata_state.is_semantically_empty &= is_semantically_empty; } diff --git a/crates/store/re_chunk_store/tests/memory_test.rs b/crates/store/re_chunk_store/tests/memory_test.rs index cbbcdfa1b0f9..9af9d5c04ca6 100644 --- a/crates/store/re_chunk_store/tests/memory_test.rs +++ b/crates/store/re_chunk_store/tests/memory_test.rs @@ -100,7 +100,7 @@ fn scalar_memory_overhead() { let entity_path = re_log_types::entity_path!("scalar"); let timepoint = TimePoint::default().with(Timeline::new("log_time", TimeType::Time), i as i64); - let scalars = Scalar::to_arrow2([Scalar::from(i as f64)]).unwrap(); + let scalars = Scalar::to_arrow([Scalar::from(i as f64)]).unwrap(); let row = PendingRow::new( timepoint, diff --git a/crates/store/re_dataframe/src/lib.rs b/crates/store/re_dataframe/src/lib.rs index 5713c3413ad2..964ec466c789 100644 --- a/crates/store/re_dataframe/src/lib.rs +++ b/crates/store/re_dataframe/src/lib.rs @@ -9,7 +9,7 @@ pub use self::query::QueryHandle; #[doc(no_inline)] pub use self::external::arrow2::chunk::Chunk as Arrow2Chunk; #[doc(no_inline)] -pub use self::external::re_chunk::{util::concatenate_record_batches, TransportChunk}; +pub use self::external::re_chunk::{arrow2_util::concatenate_record_batches, TransportChunk}; #[doc(no_inline)] pub use self::external::re_chunk_store::{ ChunkStoreConfig, ChunkStoreHandle, ColumnSelector, ComponentColumnSelector, Index, IndexRange, diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index b5c8e9e2d2a2..b3ba686d7705 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -260,7 +260,7 @@ impl QueryHandle { archetype_name: descr.archetype_name, archetype_field_name: descr.archetype_field_name, }, - re_chunk::util::new_list_array_of_empties( + re_chunk::arrow2_util::new_list_array_of_empties( child_datatype, chunk.num_rows(), ), @@ -1334,7 +1334,7 @@ mod tests { use std::sync::Arc; use re_chunk::{ - util::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint, TransportChunk, + arrow2_util::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint, TransportChunk, }; use re_chunk_store::{ ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ResolvedTimeRange, TimeInt, diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 723569fcf01b..731f8cbcedbc 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -439,7 +439,7 @@ async fn stream_catalog_async( let data_arrays = sliced.iter().map(|e| Some(e.as_ref())).collect::>(); #[allow(clippy::unwrap_used)] // we know we've given the right field type let data_field_array: arrow2::array::ListArray = - re_chunk::util::arrays_to_list_array( + re_chunk::arrow2_util::arrays_to_list_array( data_field_inner.data_type().clone(), &data_arrays, ) @@ -502,7 +502,7 @@ async fn stream_catalog_async( let rec_id_field = Arrow2Field::new("item", arrow2::datatypes::DataType::Utf8, true); #[allow(clippy::unwrap_used)] // we know we've given the right field type - let uris = re_chunk::util::arrays_to_list_array( + let uris = re_chunk::arrow2_util::arrays_to_list_array( rec_id_field.data_type().clone(), &recording_id_arrays, ) diff --git a/crates/top/re_sdk/src/recording_stream.rs b/crates/top/re_sdk/src/recording_stream.rs index 7551dc9cca40..bfe2fb0fc3de 100644 --- a/crates/top/re_sdk/src/recording_stream.rs +++ b/crates/top/re_sdk/src/recording_stream.rs @@ -1176,7 +1176,7 @@ impl RecordingStream { .into_iter() .map(|comp_batch| { comp_batch - .to_arrow2() + .to_arrow() .map(|array| (comp_batch.descriptor().into_owned(), array)) }) .collect(); @@ -2556,7 +2556,7 @@ mod tests { components: [ ( MyPoint::descriptor(), - ::to_arrow2([ + ::to_arrow([ MyPoint::new(10.0, 10.0), MyPoint::new(20.0, 20.0), ]) @@ -2564,11 +2564,11 @@ mod tests { ), // ( MyColor::descriptor(), - ::to_arrow2([MyColor(0x8080_80FF)]).unwrap(), + ::to_arrow([MyColor(0x8080_80FF)]).unwrap(), ), // ( MyLabel::descriptor(), - ::to_arrow2([] as [MyLabel; 0]).unwrap(), + ::to_arrow([] as [MyLabel; 0]).unwrap(), ), // ] .into_iter() @@ -2583,15 +2583,15 @@ mod tests { components: [ ( MyPoint::descriptor(), - ::to_arrow2([] as [MyPoint; 0]).unwrap(), + ::to_arrow([] as [MyPoint; 0]).unwrap(), ), // ( MyColor::descriptor(), - ::to_arrow2([] as [MyColor; 0]).unwrap(), + ::to_arrow([] as [MyColor; 0]).unwrap(), ), // ( MyLabel::descriptor(), - ::to_arrow2([] as [MyLabel; 0]).unwrap(), + ::to_arrow([] as [MyLabel; 0]).unwrap(), ), // ] .into_iter() @@ -2606,15 +2606,15 @@ mod tests { components: [ ( MyPoint::descriptor(), - ::to_arrow2([] as [MyPoint; 0]).unwrap(), + ::to_arrow([] as [MyPoint; 0]).unwrap(), ), // ( MyColor::descriptor(), - ::to_arrow2([MyColor(0xFFFF_FFFF)]).unwrap(), + ::to_arrow([MyColor(0xFFFF_FFFF)]).unwrap(), ), // ( MyLabel::descriptor(), - ::to_arrow2([MyLabel("hey".into())]).unwrap(), + ::to_arrow([MyLabel("hey".into())]).unwrap(), ), // ] .into_iter() diff --git a/crates/top/rerun_c/src/lib.rs b/crates/top/rerun_c/src/lib.rs index d091d9322904..e90100d2a0b6 100644 --- a/crates/top/rerun_c/src/lib.rs +++ b/crates/top/rerun_c/src/lib.rs @@ -820,7 +820,7 @@ fn rr_recording_stream_log_impl( let component_type = component_type_registry.get(*component_type)?; let datatype = component_type.datatype.clone(); let values = unsafe { arrow_array_from_c_ffi(array, datatype) }?; - components.insert(component_type.descriptor.clone(), values); + components.insert(component_type.descriptor.clone(), values.into()); } } diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index f75e7a04ee30..04c6ea735e03 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -89,7 +89,7 @@ pub fn build_row_from_components( let component_descr = descriptor_to_rust(&component_descr)?; let (list_array, _field) = array_to_rust(&array, &component_descr)?; - components.insert(component_descr, list_array); + components.insert(component_descr, list_array.into()); } Ok(PendingRow {