diff --git a/Cargo.lock b/Cargo.lock index e946a50a1935..61be28db6ef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5569,7 +5569,7 @@ dependencies = [ [[package]] name = "re_arrow2" version = "0.18.1" -source = "git+https://github.com/rerun-io/re_arrow2.git?branch=main#82095762a42eed76e4043c82833e391893d5821c" +source = "git+https://github.com/rerun-io/re_arrow2.git?branch=main#573b5bafd071d09698353d8f0de8d31f3fa59017" dependencies = [ "ahash", "arrow-array", @@ -5802,6 +5802,7 @@ version = "0.22.0-alpha.1+dev" dependencies = [ "ahash", "anyhow", + "arrow", "crossbeam", "image", "notify", @@ -5879,6 +5880,7 @@ name = "re_dataframe" version = "0.22.0-alpha.1+dev" dependencies = [ "anyhow", + "arrow", "itertools 0.13.0", "nohash-hasher", "rayon", @@ -6330,7 +6332,6 @@ dependencies = [ "once_cell", "parking_lot", "rand", - "re_arrow2", "re_build_info", "re_build_tools", "re_byte_size", @@ -7341,6 +7342,7 @@ name = "rerun_c" version = "0.22.0-alpha.1+dev" dependencies = [ "ahash", + "arrow", "infer", "once_cell", "parking_lot", diff --git a/crates/store/re_chunk/src/batcher.rs b/crates/store/re_chunk/src/batcher.rs index 9143124a9ad4..e5c4a431fe49 100644 --- a/crates/store/re_chunk/src/batcher.rs +++ b/crates/store/re_chunk/src/batcher.rs @@ -5,7 +5,7 @@ use std::{ }; use arrow::array::{Array as ArrowArray, ArrayRef}; -use arrow2::array::PrimitiveArray as Arrow2PrimitiveArray; +use arrow::buffer::ScalarBuffer as ArrowScalarBuffer; use crossbeam::channel::{Receiver, Sender}; use nohash_hasher::IntMap; @@ -724,7 +724,7 @@ impl PendingRow { let timelines = timepoint .into_iter() .map(|(timeline, time)| { - let times = Arrow2PrimitiveArray::::from_vec(vec![time.as_i64()]); + let times = ArrowScalarBuffer::from(vec![time.as_i64()]); let time_column = TimeColumn::new(Some(true), timeline, times); (timeline, time_column) }) @@ -973,7 +973,7 @@ impl PendingTimeColumn { TimeColumn { timeline, - times: Arrow2PrimitiveArray::::from_vec(times).to(timeline.datatype()), + times: ArrowScalarBuffer::from(times), is_sorted, time_range, } @@ -1050,11 +1050,7 @@ mod tests { let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id]; let expected_timelines = [( timeline1, - TimeColumn::new( - Some(true), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![42, 43, 44]), - ), + TimeColumn::new(Some(true), timeline1, vec![42, 43, 44].into()), )]; let expected_components = [( MyPoint::descriptor(), @@ -1206,11 +1202,7 @@ mod tests { let expected_row_ids = vec![row1.row_id, row3.row_id]; let expected_timelines = [( timeline1, - TimeColumn::new( - Some(true), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![42, 44]), - ), + TimeColumn::new(Some(true), timeline1, vec![42, 44].into()), )]; let expected_components = [( MyPoint::descriptor(), @@ -1234,11 +1226,7 @@ mod tests { let expected_row_ids = vec![row2.row_id]; let expected_timelines = [( timeline1, - TimeColumn::new( - Some(true), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![43]), - ), + TimeColumn::new(Some(true), timeline1, vec![43].into()), )]; let expected_components = [( MyPoint::descriptor(), @@ -1321,11 +1309,7 @@ mod tests { let expected_row_ids = vec![row1.row_id]; let expected_timelines = [( timeline1, - TimeColumn::new( - Some(true), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![42]), - ), + TimeColumn::new(Some(true), timeline1, vec![42].into()), )]; let expected_components = [( MyPoint::descriptor(), @@ -1350,19 +1334,11 @@ mod tests { let expected_timelines = [ ( timeline1, - TimeColumn::new( - Some(true), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![43, 44]), - ), + TimeColumn::new(Some(true), timeline1, vec![43, 44].into()), ), ( timeline2, - TimeColumn::new( - Some(true), - timeline2, - Arrow2PrimitiveArray::from_vec(vec![1000, 1001]), - ), + TimeColumn::new(Some(true), timeline2, vec![1000, 1001].into()), ), ]; let expected_components = [( @@ -1442,11 +1418,7 @@ mod tests { let expected_row_ids = vec![row1.row_id, row3.row_id]; let expected_timelines = [( timeline1, - TimeColumn::new( - Some(true), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![42, 44]), - ), + TimeColumn::new(Some(true), timeline1, vec![42, 44].into()), )]; let expected_components = [( MyPoint::descriptor(), @@ -1470,11 +1442,7 @@ mod tests { let expected_row_ids = vec![row2.row_id]; let expected_timelines = [( timeline1, - TimeColumn::new( - Some(true), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![43]), - ), + TimeColumn::new(Some(true), timeline1, vec![43].into()), )]; let expected_components = [( MyPoint::descriptor(), @@ -1572,19 +1540,11 @@ mod tests { let expected_timelines = [ ( timeline1, - TimeColumn::new( - Some(false), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![45, 42, 43, 44]), - ), + TimeColumn::new(Some(false), timeline1, vec![45, 42, 43, 44].into()), ), ( timeline2, - TimeColumn::new( - Some(false), - timeline2, - Arrow2PrimitiveArray::from_vec(vec![1003, 1000, 1001, 1002]), - ), + TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001, 1002].into()), ), ]; let expected_components = [( @@ -1686,19 +1646,11 @@ mod tests { let expected_timelines = [ ( timeline1, - TimeColumn::new( - Some(false), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![45, 42, 43]), - ), + TimeColumn::new(Some(false), timeline1, vec![45, 42, 43].into()), ), ( timeline2, - TimeColumn::new( - Some(false), - timeline2, - Arrow2PrimitiveArray::from_vec(vec![1003, 1000, 1001]), - ), + TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001].into()), ), ]; let expected_components = [( @@ -1725,19 +1677,11 @@ mod tests { let expected_timelines = [ ( timeline1, - TimeColumn::new( - Some(true), - timeline1, - Arrow2PrimitiveArray::from_vec(vec![44]), - ), + TimeColumn::new(Some(true), timeline1, vec![44].into()), ), ( timeline2, - TimeColumn::new( - Some(true), - timeline2, - Arrow2PrimitiveArray::from_vec(vec![1002]), - ), + TimeColumn::new(Some(true), timeline2, vec![1002].into()), ), ]; let expected_components = [( diff --git a/crates/store/re_chunk/src/builder.rs b/crates/store/re_chunk/src/builder.rs index 243ba8d311e5..279aa625a223 100644 --- a/crates/store/re_chunk/src/builder.rs +++ b/crates/store/re_chunk/src/builder.rs @@ -352,8 +352,6 @@ impl TimeColumnBuilder { #[inline] pub fn build(self) -> TimeColumn { let Self { timeline, times } = self; - - let times = arrow2::array::PrimitiveArray::::from_vec(times).to(timeline.datatype()); - TimeColumn::new(None, timeline, times) + TimeColumn::new(None, timeline, times.into()) } } diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 7380f9ae2810..7d6d6526175d 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -1,7 +1,10 @@ use std::sync::atomic::{AtomicU64, Ordering}; use ahash::HashMap; -use arrow::array::ListArray as ArrowListArray; +use arrow::{ + array::{Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray}, + buffer::ScalarBuffer as ArrowScalarBuffer, +}; use arrow2::{ array::{ Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, @@ -781,7 +784,11 @@ pub struct TimeColumn { /// * This is guaranteed to always be dense, because chunks are split anytime a timeline is /// added or removed. /// * This cannot ever contain `TimeInt::STATIC`, since static data doesn't even have timelines. - pub(crate) times: Arrow2PrimitiveArray, + /// + /// When this buffer is converted to an arrow array, it's datatype will depend + /// on the timeline type, so it will either become a + /// [`arrow::array::Int64Array`] or a [`arrow::array::TimestampNanosecondArray`]. + pub(crate) times: ArrowScalarBuffer, /// Is [`Self::times`] sorted? /// @@ -795,6 +802,16 @@ pub struct TimeColumn { pub(crate) time_range: ResolvedTimeRange, } +/// Errors when deserializing/parsing/reading a column of time data. +#[derive(Debug, thiserror::Error)] +pub enum TimeColumnError { + #[error("Time columns had nulls, but should be dense")] + ContainsNulls, + + #[error("Unsupported data type : {0}")] + UnsupportedDataType(arrow::datatypes::DataType), +} + impl Chunk { /// Creates a new [`Chunk`]. /// @@ -968,15 +985,10 @@ impl TimeColumn { /// When left unspecified (`None`), it will be computed in O(n) time. /// /// For a row-oriented constructor, see [`Self::builder`]. - pub fn new( - is_sorted: Option, - timeline: Timeline, - times: Arrow2PrimitiveArray, - ) -> Self { + pub fn new(is_sorted: Option, timeline: Timeline, times: ArrowScalarBuffer) -> Self { re_tracing::profile_function_if!(1000 < times.len(), format!("{} times", times.len())); - let times = times.to(timeline.datatype()); - let time_slice = times.values().as_slice(); + let time_slice = times.as_ref(); let is_sorted = is_sorted.unwrap_or_else(|| time_slice.windows(2).all(|times| times[0] <= times[1])); @@ -1021,7 +1033,7 @@ impl TimeColumn { name: impl Into, times: impl IntoIterator>, ) -> Self { - let time_vec = times.into_iter().map(|t| { + let time_vec: Vec<_> = times.into_iter().map(|t| { let t = t.into(); TimeInt::try_from(t) .unwrap_or_else(|_| { @@ -1038,7 +1050,7 @@ impl TimeColumn { Self::new( None, Timeline::new_sequence(name.into()), - Arrow2PrimitiveArray::::from_vec(time_vec), + ArrowScalarBuffer::from(time_vec), ) } @@ -1060,12 +1072,12 @@ impl TimeColumn { TimeInt::MIN }) .as_i64() - }).collect(); + }).collect_vec(); Self::new( None, Timeline::new_temporal(name.into()), - Arrow2PrimitiveArray::::from_vec(time_vec), + ArrowScalarBuffer::from(time_vec), ) } @@ -1090,14 +1102,49 @@ impl TimeColumn { }) .as_i64() }) - .collect(); + .collect_vec(); Self::new( None, Timeline::new_temporal(name.into()), - Arrow2PrimitiveArray::::from_vec(time_vec), + ArrowScalarBuffer::from(time_vec), ) } + + /// Parse the given [`ArrowArray`] as a time column. + /// + /// Results in an error if the array is of the wrong datatype, or if it contains nulls. + pub fn read_array(array: &dyn ArrowArray) -> Result, TimeColumnError> { + #![allow(clippy::manual_map)] + + if array.null_count() > 0 { + return Err(TimeColumnError::ContainsNulls); + } + + // Sequence timelines are i64, but time columns are nanoseconds (also as i64). + if let Some(times) = array.as_any().downcast_ref::() { + Ok(times.values().clone()) + } else if let Some(times) = array + .as_any() + .downcast_ref::() + { + Ok(times.values().clone()) + } else if let Some(times) = array + .as_any() + .downcast_ref::() + { + Ok(times.values().clone()) + } else if let Some(times) = array + .as_any() + .downcast_ref::() + { + Ok(times.values().clone()) + } else { + Err(TimeColumnError::UnsupportedDataType( + array.data_type().clone(), + )) + } + } } // --- @@ -1188,7 +1235,7 @@ impl Chunk { #[inline] pub fn row_ids(&self) -> impl Iterator + '_ { let (times, counters) = self.row_ids_raw(); - izip!(times.values().as_slice(), counters.values().as_slice()) + izip!(times.values().as_ref(), counters.values().as_slice()) .map(|(&time, &counter)| RowId::from_u128((time as u128) << 64 | (counter as u128))) } @@ -1230,7 +1277,7 @@ impl Chunk { } let (times, counters) = self.row_ids_raw(); - let (times, counters) = (times.values().as_slice(), counters.values().as_slice()); + let (times, counters) = (times.values().as_ref(), counters.values().as_slice()); #[allow(clippy::unwrap_used)] // checked above let (index_min, index_max) = if self.is_sorted() { @@ -1333,23 +1380,24 @@ impl TimeColumn { } #[inline] - pub fn times_array(&self) -> &Arrow2PrimitiveArray { + pub fn times_buffer(&self) -> &ArrowScalarBuffer { &self.times } + /// Returns an array with the appropriate datatype. + #[inline] + pub fn times_array(&self) -> ArrowArrayRef { + self.timeline.typ().make_arrow_array(self.times.clone()) + } + #[inline] pub fn times_raw(&self) -> &[i64] { - self.times.values().as_slice() + self.times.as_ref() } #[inline] pub fn times(&self) -> impl DoubleEndedIterator + '_ { - self.times - .values() - .as_slice() - .iter() - .copied() - .map(TimeInt::new_temporal) + self.times_raw().iter().copied().map(TimeInt::new_temporal) } #[inline] @@ -1603,24 +1651,13 @@ impl TimeColumn { /// Costly checks are only run in debug builds. pub fn sanity_check(&self) -> ChunkResult<()> { let Self { - timeline, + timeline: _, times, is_sorted, time_range, } = self; - if *times.data_type() != timeline.datatype() { - return Err(ChunkError::Malformed { - reason: format!( - "Time data for timeline {} has the wrong datatype: expected {:?} but got {:?} instead", - timeline.name(), - timeline.datatype(), - *times.data_type(), - ), - }); - } - - let times = times.values().as_slice(); + let times = times.as_ref(); #[allow(clippy::collapsible_if)] // readability if cfg!(debug_assertions) { diff --git a/crates/store/re_chunk/src/lib.rs b/crates/store/re_chunk/src/lib.rs index 9ba7b632b86d..b4edc54d1ce6 100644 --- a/crates/store/re_chunk/src/lib.rs +++ b/crates/store/re_chunk/src/lib.rs @@ -26,7 +26,9 @@ mod batcher; mod arrow; pub use self::builder::{ChunkBuilder, TimeColumnBuilder}; -pub use self::chunk::{Chunk, ChunkComponents, ChunkError, ChunkResult, TimeColumn}; +pub use self::chunk::{ + Chunk, ChunkComponents, ChunkError, ChunkResult, TimeColumn, TimeColumnError, +}; pub use self::helpers::{ChunkShared, UnitChunkShared}; pub use self::id::{ChunkId, RowId}; pub use self::iter::{ diff --git a/crates/store/re_chunk/src/merge.rs b/crates/store/re_chunk/src/merge.rs index a804ede244fb..9b4433b1e975 100644 --- a/crates/store/re_chunk/src/merge.rs +++ b/crates/store/re_chunk/src/merge.rs @@ -1,6 +1,6 @@ +use arrow::buffer::ScalarBuffer as ArrowScalarBuffer; use arrow2::array::{ - Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, - StructArray as Arrow2StructArray, + Array as Arrow2Array, ListArray as Arrow2ListArray, StructArray as Arrow2StructArray, }; use itertools::{izip, Itertools}; use nohash_hasher::IntMap; @@ -281,17 +281,20 @@ impl TimeColumn { if self.timeline != rhs.timeline { return None; } + re_tracing::profile_function!(); let is_sorted = self.is_sorted && rhs.is_sorted && self.time_range.max() <= rhs.time_range.min(); let time_range = self.time_range.union(rhs.time_range); - let times = arrow2_util::concat_arrays(&[&self.times, &rhs.times]).ok()?; - let times = times - .as_any() - .downcast_ref::>()? - .clone(); + let times = self + .times_raw() + .iter() + .chain(rhs.times_raw()) + .copied() + .collect_vec(); + let times = ArrowScalarBuffer::from(times); Some(Self { timeline: self.timeline, diff --git a/crates/store/re_chunk/src/shuffle.rs b/crates/store/re_chunk/src/shuffle.rs index ba9639898856..bf5db29e82e8 100644 --- a/crates/store/re_chunk/src/shuffle.rs +++ b/crates/store/re_chunk/src/shuffle.rs @@ -1,3 +1,4 @@ +use arrow::buffer::ScalarBuffer as ArrowScalarBuffer; use arrow2::{ array::{ Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, @@ -238,19 +239,19 @@ impl Chunk { for info in timelines.values_mut() { let TimeColumn { - timeline, + timeline: _, times, is_sorted, time_range: _, } = info; - let mut sorted = times.values().to_vec(); + let mut sorted = times.to_vec(); for (to, from) in swaps.iter().copied().enumerate() { - sorted[to] = times.values()[from]; + sorted[to] = times[from]; } *is_sorted = sorted.windows(2).all(|times| times[0] <= times[1]); - *times = Arrow2PrimitiveArray::::from_vec(sorted).to(timeline.datatype()); + *times = ArrowScalarBuffer::from(sorted); } } diff --git a/crates/store/re_chunk/src/slice.rs b/crates/store/re_chunk/src/slice.rs index b361254c76a9..3b2f842cef92 100644 --- a/crates/store/re_chunk/src/slice.rs +++ b/crates/store/re_chunk/src/slice.rs @@ -1,6 +1,6 @@ use arrow2::array::{ Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, ListArray as Arrow2ListArray, - PrimitiveArray as Arrow2PrimitiveArray, StructArray as Arrow2StructArray, + StructArray as Arrow2StructArray, }; use itertools::Itertools; @@ -9,7 +9,7 @@ use nohash_hasher::IntSet; use re_log_types::Timeline; use re_types_core::{ComponentDescriptor, ComponentName}; -use crate::{arrow2_util, Chunk, RowId, TimeColumn}; +use crate::{arrow2_util, arrow_util, Chunk, RowId, TimeColumn}; // --- @@ -538,7 +538,7 @@ impl Chunk { i.saturating_sub(1) as i32 }) .collect_vec(); - Arrow2PrimitiveArray::::from_vec(indices) + arrow2::array::Int32Array::from_vec(indices) }; let chunk = Self { @@ -671,7 +671,7 @@ impl Chunk { /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`]. #[must_use] #[inline] - pub fn taken(&self, indices: &Arrow2PrimitiveArray) -> Self { + pub fn taken(&self, indices: &arrow2::array::Int32Array) -> Self { let Self { id, entity_path, @@ -792,11 +792,7 @@ impl TimeColumn { // The original chunk is unsorted, but the new sliced one actually ends up being sorted. let is_sorted_opt = is_sorted.then_some(is_sorted); - Self::new( - is_sorted_opt, - *timeline, - Arrow2PrimitiveArray::sliced(times.clone(), index, len), - ) + Self::new(is_sorted_opt, *timeline, times.clone().slice(index, len)) } /// Empties the [`TimeColumn`] vertically. @@ -806,16 +802,12 @@ impl TimeColumn { pub fn emptied(&self) -> Self { let Self { timeline, - times, + times: _, is_sorted: _, time_range: _, } = self; - Self::new( - Some(true), - *timeline, - Arrow2PrimitiveArray::new_empty(times.data_type().clone()), - ) + Self::new(Some(true), *timeline, vec![].into()) } /// Runs a [filter] compute kernel on the time data with the specified `mask`. @@ -852,15 +844,20 @@ impl TimeColumn { Self::new( is_sorted_opt, *timeline, - arrow2_util::filter_array(times, filter), + arrow_util::filter_array( + &arrow::array::Int64Array::new(times.clone(), None), + &filter.clone().into(), + ) + .into_parts() + .1, ) } /// Runs a [take] compute kernel on the time data with the specified `indices`. /// - /// [take]: arrow2::compute::take::take + /// [take]: arrow::compute::take #[inline] - pub(crate) fn taken(&self, indices: &Arrow2PrimitiveArray) -> Self { + pub(crate) fn taken(&self, indices: &arrow2::array::Int32Array) -> Self { let Self { timeline, times, @@ -868,11 +865,14 @@ impl TimeColumn { time_range: _, } = self; - Self::new( - Some(*is_sorted), - *timeline, - arrow2_util::take_array(times, indices), + let new_times = arrow_util::take_array( + &arrow::array::Int64Array::new(times.clone(), None), + &arrow::array::Int32Array::from(indices.clone()), ) + .into_parts() + .1; + + Self::new(Some(*is_sorted), *timeline, new_times) } } @@ -880,6 +880,7 @@ impl TimeColumn { #[cfg(test)] mod tests { + use arrow2::array::PrimitiveArray as Arrow2PrimitiveArray; use itertools::Itertools; use re_log_types::{ example_components::{MyColor, MyLabel, MyPoint}, diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 0a2700d78c3c..e603ad9fd856 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -1,8 +1,6 @@ +use arrow::array::ArrayRef as ArrowArrayRef; use arrow2::{ - array::{ - Array as Arrow2Array, ListArray, PrimitiveArray as Arrow2PrimitiveArray, - StructArray as Arrow2StructArray, - }, + array::{Array as Arrow2Array, ListArray, StructArray as Arrow2StructArray}, chunk::Chunk as Arrow2Chunk, datatypes::{ DataType as Arrow2Datatype, Field as ArrowField, Metadata as Arrow2Metadata, @@ -450,15 +448,16 @@ impl Chunk { .map(|(timeline, info)| { let TimeColumn { timeline: _, - times, + times: _, is_sorted, time_range: _, } = info; + let nullable = false; // timelines within a single chunk are always dense let field = ArrowField::new( timeline.name().to_string(), - times.data_type().clone(), - false, // timelines within a single chunk are always dense + timeline.datatype().into(), + nullable, ) .with_metadata({ let mut metadata = TransportChunk::field_metadata_time_column(); @@ -468,7 +467,7 @@ impl Chunk { metadata }); - let times = times.clone().boxed() /* cheap */; + let times = info.times_array(); (field, times) }) @@ -478,7 +477,7 @@ impl Chunk { for (field, times) in timelines { schema.fields.push(field); - columns.push(times); + columns.push(times.into()); } } @@ -590,36 +589,17 @@ impl Chunk { } }; - let times = column - .as_any() - .downcast_ref::>() - .ok_or_else(|| ChunkError::Malformed { - reason: format!( - "time column '{}' is not deserializable ({:?})", - field.name, - column.data_type() - ), - })?; - - if times.validity().is_some() { - return Err(ChunkError::Malformed { - reason: format!( - "time column '{}' must be dense ({:?})", - field.name, - column.data_type() - ), - }); - } + let times = TimeColumn::read_array(&ArrowArrayRef::from(column.clone())).map_err( + |err| ChunkError::Malformed { + reason: format!("Bad time column '{}': {err}", field.name), + }, + )?; let is_sorted = field .metadata .contains_key(TransportChunk::FIELD_METADATA_MARKER_IS_SORTED_BY_TIME); - let time_column = TimeColumn::new( - is_sorted.then_some(true), - timeline, - times.clone(), /* cheap */ - ); + let time_column = TimeColumn::new(is_sorted.then_some(true), timeline, times); if timelines.insert(timeline, time_column).is_some() { return Err(ChunkError::Malformed { reason: format!( @@ -735,11 +715,7 @@ mod tests { let timeline1 = Timeline::new_temporal("log_time"); let timelines1 = std::iter::once(( timeline1, - TimeColumn::new( - Some(true), - timeline1, - Arrow2PrimitiveArray::::from_vec(vec![42, 43, 44, 45]), - ), + TimeColumn::new(Some(true), timeline1, vec![42, 43, 44, 45].into()), )) .collect(); diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index 01c9f243d2a6..86e50a4c2e7d 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -708,7 +708,7 @@ impl ChunkStore { let timelines = self.all_timelines_sorted().into_iter().map(|timeline| { ColumnDescriptor::Time(TimeColumnDescriptor { timeline, - datatype: timeline.datatype(), + datatype: timeline.datatype().into(), }) }); @@ -782,7 +782,7 @@ impl ChunkStore { TimeColumnDescriptor { timeline, - datatype: timeline.datatype(), + datatype: timeline.datatype().into(), } } diff --git a/crates/store/re_chunk_store/tests/memory_test.rs b/crates/store/re_chunk_store/tests/memory_test.rs index 9af9d5c04ca6..96abe0c5f2b7 100644 --- a/crates/store/re_chunk_store/tests/memory_test.rs +++ b/crates/store/re_chunk_store/tests/memory_test.rs @@ -135,8 +135,8 @@ fn scalar_memory_overhead() { [ format!("{NUM_SCALARS} scalars"), format!( - "{} in total", - re_format::format_bytes(total_mem_use_global as _) + "{} MiB in total", + (total_mem_use_global as f64 / 1024.0 / 1024.0).round() // Round to nearest megabyte - we get fluctuations on the kB depending on platform ), format!( "{} per row", diff --git a/crates/store/re_chunk_store/tests/snapshots/memory_test__scalars_on_one_timeline_new.snap b/crates/store/re_chunk_store/tests/snapshots/memory_test__scalars_on_one_timeline_new.snap index 6d6b6432b176..6d2e21bbc212 100644 --- a/crates/store/re_chunk_store/tests/snapshots/memory_test__scalars_on_one_timeline_new.snap +++ b/crates/store/re_chunk_store/tests/snapshots/memory_test__scalars_on_one_timeline_new.snap @@ -1,9 +1,11 @@ --- source: crates/store/re_chunk_store/tests/memory_test.rs -expression: "[format!(\"{NUM_SCALARS} scalars\"),\n format!(\"{} in total\",\n re_format::format_bytes(total_mem_use_global as _)),\n format!(\"{} per row\",\n re_format::format_bytes(total_mem_use_global as f64 / NUM_SCALARS\n as f64))]" +assertion_line: 133 +expression: "[format!(\"{NUM_SCALARS} scalars\"),\nformat!(\"{} MiB in total\",\n(total_mem_use_global as f64 / 1024.0 / 1024.0).round()),\nformat!(\"{} per row\",\nre_format::format_bytes(total_mem_use_global as f64 / NUM_SCALARS as f64)),]" +snapshot_kind: text --- [ "1048576 scalars", - "37.1 MiB in total", + "37 MiB in total", "37 B per row", ] diff --git a/crates/store/re_data_loader/Cargo.toml b/crates/store/re_data_loader/Cargo.toml index 80ba230cc383..6374634ae26c 100644 --- a/crates/store/re_data_loader/Cargo.toml +++ b/crates/store/re_data_loader/Cargo.toml @@ -35,6 +35,7 @@ re_types = { workspace = true, features = ["image", "video"] } ahash.workspace = true anyhow.workspace = true +arrow.workspace = true arrow2.workspace = true crossbeam.workspace = true image.workspace = true diff --git a/crates/store/re_data_loader/src/loader_archetype.rs b/crates/store/re_data_loader/src/loader_archetype.rs index 7ed62c2d1f28..3e830c8ab776 100644 --- a/crates/store/re_data_loader/src/loader_archetype.rs +++ b/crates/store/re_data_loader/src/loader_archetype.rs @@ -5,7 +5,6 @@ use re_types::components::VideoTimestamp; use re_types::Archetype; use re_types::{components::MediaType, ComponentBatch}; -use arrow2::array::PrimitiveArray as Arrow2PrimitiveArray; use arrow2::Either; use crate::{DataLoader, DataLoaderError, LoadedData}; @@ -193,13 +192,14 @@ fn load_video( Ok(frame_timestamps_ns) => { // Time column. let is_sorted = Some(true); - let time_column_times = Arrow2PrimitiveArray::from_slice(&frame_timestamps_ns); + let frame_timestamps_ns: arrow::buffer::ScalarBuffer = frame_timestamps_ns.into(); let time_column = - re_chunk::TimeColumn::new(is_sorted, video_timeline, time_column_times); + re_chunk::TimeColumn::new(is_sorted, video_timeline, frame_timestamps_ns.clone()); // VideoTimestamp component column. let video_timestamps = frame_timestamps_ns - .into_iter() + .iter() + .copied() .map(VideoTimestamp::from_nanoseconds) .collect::>(); let video_timestamp_batch = &video_timestamps as &dyn ComponentBatch; diff --git a/crates/store/re_dataframe/Cargo.toml b/crates/store/re_dataframe/Cargo.toml index eaaca1a0a58d..185614733cc8 100644 --- a/crates/store/re_dataframe/Cargo.toml +++ b/crates/store/re_dataframe/Cargo.toml @@ -37,6 +37,7 @@ re_types_core.workspace = true # External dependencies: anyhow.workspace = true +arrow.workspace = true arrow2.workspace = true itertools.workspace = true nohash-hasher.workspace = true diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index b3ba686d7705..e998efc7aad0 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -6,6 +6,7 @@ use std::{ }, }; +use arrow::buffer::ScalarBuffer as ArrowScalarBuffer; use arrow2::{ array::{ Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, @@ -1133,7 +1134,8 @@ impl QueryHandle { // * return the minimum value instead of the max // * return the exact value for each component (that would be a _lot_ of columns!) // * etc - let mut max_value_per_index = IntMap::default(); + let mut max_value_per_index: IntMap)> = + IntMap::default(); { view_streaming_state .iter() @@ -1158,7 +1160,7 @@ impl QueryHandle { .map(|time| { ( *time_column.timeline(), - (time, time_column.times_array().sliced(cursor, 1)), + (time, time_column.times_buffer().slice(cursor, 1)), ) }) }) @@ -1182,9 +1184,7 @@ impl QueryHandle { state.filtered_index, ( *cur_index_value, - Arrow2PrimitiveArray::::from_vec(vec![cur_index_value.as_i64()]) - .to(state.filtered_index.datatype()) - .to_boxed(), + ArrowScalarBuffer::from(vec![cur_index_value.as_i64()]), ), ); } @@ -1250,7 +1250,13 @@ impl QueryHandle { ColumnDescriptor::Time(descr) => { max_value_per_index.get(&descr.timeline).map_or_else( || arrow2::array::new_null_array(column.datatype(), 1), - |(_time, time_sliced)| time_sliced.clone(), + |(_time, time_sliced)| { + descr + .timeline + .typ() + .make_arrow_array(time_sliced.clone()) + .into() + }, ) } diff --git a/crates/store/re_log_types/src/time_point/mod.rs b/crates/store/re_log_types/src/time_point/mod.rs index 696da3d8cc42..2626fb0b4a88 100644 --- a/crates/store/re_log_types/src/time_point/mod.rs +++ b/crates/store/re_log_types/src/time_point/mod.rs @@ -1,4 +1,7 @@ -use std::collections::{btree_map, BTreeMap}; +use std::{ + collections::{btree_map, BTreeMap}, + sync::Arc, +}; mod non_min_i64; mod time_int; @@ -182,6 +185,29 @@ impl TimeType { pub fn format_range_utc(&self, time_range: ResolvedTimeRange) -> String { self.format_range(time_range, TimeZone::Utc) } + + /// Returns the appropriate arrow datatype to represent this timeline. + #[inline] + pub fn datatype(self) -> arrow::datatypes::DataType { + match self { + Self::Time => { + arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None) + } + Self::Sequence => arrow::datatypes::DataType::Int64, + } + } + + /// Returns an array with the appropriate datatype. + pub fn make_arrow_array( + self, + times: impl Into>, + ) -> arrow::array::ArrayRef { + let times = times.into(); + match self { + Self::Time => Arc::new(arrow::array::TimestampNanosecondArray::new(times, None)), + Self::Sequence => Arc::new(arrow::array::Int64Array::new(times, None)), + } + } } // ---------------------------------------------------------------------------- diff --git a/crates/store/re_log_types/src/time_point/timeline.rs b/crates/store/re_log_types/src/time_point/timeline.rs index d86463f57cb4..5f3f509966e9 100644 --- a/crates/store/re_log_types/src/time_point/timeline.rs +++ b/crates/store/re_log_types/src/time_point/timeline.rs @@ -1,5 +1,3 @@ -use arrow2::datatypes::{DataType, TimeUnit}; - use crate::{time::TimeZone, ResolvedTimeRange, TimeType}; re_string_interner::declare_new_type!( @@ -100,11 +98,8 @@ impl Timeline { /// Returns the appropriate arrow datatype to represent this timeline. #[inline] - pub fn datatype(&self) -> DataType { - match self.typ { - TimeType::Time => DataType::Timestamp(TimeUnit::Nanosecond, None), - TimeType::Sequence => DataType::Int64, - } + pub fn datatype(&self) -> arrow::datatypes::DataType { + self.typ.datatype() } } diff --git a/crates/top/re_sdk/Cargo.toml b/crates/top/re_sdk/Cargo.toml index 35f9b2f7cc7a..b825292f6f85 100644 --- a/crates/top/re_sdk/Cargo.toml +++ b/crates/top/re_sdk/Cargo.toml @@ -60,7 +60,6 @@ re_sdk_comms = { workspace = true, features = ["client"] } re_types_core.workspace = true ahash.workspace = true -arrow2.workspace = true crossbeam.workspace = true document-features.workspace = true itertools.workspace = true diff --git a/crates/top/re_sdk/src/recording_stream.rs b/crates/top/re_sdk/src/recording_stream.rs index bfe2fb0fc3de..8771691ac47c 100644 --- a/crates/top/re_sdk/src/recording_stream.rs +++ b/crates/top/re_sdk/src/recording_stream.rs @@ -9,12 +9,10 @@ use itertools::Either; use nohash_hasher::IntMap; use parking_lot::Mutex; -use arrow2::array::PrimitiveArray as Arrow2PrimitiveArray; use re_chunk::{ - Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError, ChunkComponents, PendingRow, RowId, + Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError, ChunkComponents, ChunkError, + ChunkId, PendingRow, RowId, TimeColumn, }; - -use re_chunk::{ChunkError, ChunkId, TimeColumn}; use re_log_types::{ ApplicationId, ArrowChunkReleaseCallback, BlueprintActivationCommand, EntityPath, LogMsg, StoreId, StoreInfo, StoreKind, StoreSource, Time, TimeInt, TimePoint, TimeType, Timeline, @@ -1547,10 +1545,9 @@ impl RecordingStream { let time_timeline = Timeline::log_time(); let time = TimeInt::new_temporal(Time::now().nanos_since_epoch()); - let repeated_time = Arrow2PrimitiveArray::::from_values( - std::iter::repeat(time.as_i64()).take(chunk.num_rows()), - ) - .to(time_timeline.datatype()); + let repeated_time = std::iter::repeat(time.as_i64()) + .take(chunk.num_rows()) + .collect(); let time_column = TimeColumn::new(Some(true), time_timeline, repeated_time); @@ -1571,10 +1568,7 @@ impl RecordingStream { .tick .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let repeated_tick = Arrow2PrimitiveArray::::from_values( - std::iter::repeat(tick).take(chunk.num_rows()), - ) - .to(tick_timeline.datatype()); + let repeated_tick = std::iter::repeat(tick).take(chunk.num_rows()).collect(); let tick_chunk = TimeColumn::new(Some(true), tick_timeline, repeated_tick); diff --git a/crates/top/rerun_c/Cargo.toml b/crates/top/rerun_c/Cargo.toml index 40dfae4fd7cf..978d6cde38c5 100644 --- a/crates/top/rerun_c/Cargo.toml +++ b/crates/top/rerun_c/Cargo.toml @@ -40,6 +40,7 @@ re_sdk = { workspace = true, features = ["data_loaders"] } re_video.workspace = true ahash.workspace = true +arrow.workspace = true arrow2.workspace = true infer.workspace = true once_cell.workspace = true diff --git a/crates/top/rerun_c/src/lib.rs b/crates/top/rerun_c/src/lib.rs index e90100d2a0b6..2da8b11d97f4 100644 --- a/crates/top/rerun_c/src/lib.rs +++ b/crates/top/rerun_c/src/lib.rs @@ -17,6 +17,7 @@ use std::ffi::{c_char, c_uchar, CString}; use component_type_registry::COMPONENT_TYPES; use once_cell::sync::Lazy; +use arrow::array::ArrayRef as ArrowArrayRef; use arrow_utils::arrow_array_from_c_ffi; use re_sdk::{ external::nohash_hasher::IntMap, @@ -962,15 +963,12 @@ fn rr_recording_stream_send_columns_impl( let timeline: Timeline = time_column.timeline.clone().try_into()?; let datatype = arrow2::datatypes::DataType::Int64; let time_values_untyped = unsafe { arrow_array_from_c_ffi(&time_column.times, datatype) }?; - let time_values = time_values_untyped - .as_any() - .downcast_ref::>() - .ok_or_else(|| { - CError::new( - CErrorCode::ArrowFfiArrayImportError, - "Arrow C FFI import did not produce a Int64 time array - please file an issue at https://github.com/rerun-io/rerun/issues if you see this! This shouldn't be possible since conversion from C was successful with this datatype." - ) - })?; + let time_values = TimeColumn::read_array(&ArrowArrayRef::from(time_values_untyped)).map_err(|err| { + CError::new( + CErrorCode::ArrowFfiArrayImportError, + &format!("Arrow C FFI import did not produce a Int64 time array - please file an issue at https://github.com/rerun-io/rerun/issues if you see this! This shouldn't be possible since conversion from C was successful with this datatype. Details: {err}") + ) + })?; Ok(( timeline, diff --git a/crates/utils/re_byte_size/src/arrow_sizes.rs b/crates/utils/re_byte_size/src/arrow_sizes.rs index 7b9ca72d0fd0..7f8cba0f76dc 100644 --- a/crates/utils/re_byte_size/src/arrow_sizes.rs +++ b/crates/utils/re_byte_size/src/arrow_sizes.rs @@ -1,4 +1,8 @@ -use arrow::array::{Array, ArrayRef}; +use arrow::{ + array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray}, + buffer::ScalarBuffer, + datatypes::ArrowNativeType, +}; use super::SizeBytes; @@ -15,3 +19,17 @@ impl SizeBytes for ArrayRef { self.get_array_memory_size() as u64 } } + +impl SizeBytes for PrimitiveArray { + #[inline] + fn heap_size_bytes(&self) -> u64 { + Array::get_array_memory_size(self) as u64 + } +} + +impl SizeBytes for ScalarBuffer { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.inner().capacity() as _ + } +} diff --git a/crates/viewer/re_view_dataframe/src/display_record_batch.rs b/crates/viewer/re_view_dataframe/src/display_record_batch.rs index 671326e0a412..a1eddc1c3dd5 100644 --- a/crates/viewer/re_view_dataframe/src/display_record_batch.rs +++ b/crates/viewer/re_view_dataframe/src/display_record_batch.rs @@ -1,17 +1,18 @@ //! Intermediate data structures to make `re_datastore`'s row data more amenable to displaying in a //! table. +use arrow::buffer::ScalarBuffer as ArrowScalarBuffer; use arrow::{ array::{ Array as ArrowArray, ArrayRef as ArrowArrayRef, - Int32DictionaryArray as ArrowInt32DictionaryArray, Int64Array as ArrowInt64Array, - ListArray as ArrowListArray, TimestampNanosecondArray as ArrowTimestampNanosecondArray, + Int32DictionaryArray as ArrowInt32DictionaryArray, ListArray as ArrowListArray, }, datatypes::DataType as ArrowDataType, }; use thiserror::Error; use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, LatestAtQuery}; +use re_dataframe::external::re_chunk::{TimeColumn, TimeColumnError}; use re_log_types::{EntityPath, TimeInt, Timeline}; use re_types_core::ComponentName; use re_ui::UiExt; @@ -19,8 +20,11 @@ use re_viewer_context::{UiLayout, ViewerContext}; #[derive(Error, Debug)] pub(crate) enum DisplayRecordBatchError { - #[error("Unexpected column data type for timeline '{0}': {1:?}")] - UnexpectedTimeColumnDataType(String, ArrowDataType), + #[error("Bad column for timeline '{timeline}': {error}")] + BadTimeColumn { + timeline: String, + error: TimeColumnError, + }, #[error("Unexpected column data type for component '{0}': {1:?}")] UnexpectedComponentColumnDataType(String, ArrowDataType), @@ -165,7 +169,7 @@ impl ComponentData { pub(crate) enum DisplayColumn { Timeline { timeline: Timeline, - time_data: ArrowInt64Array, + time_data: ArrowScalarBuffer, }, Component { entity_path: EntityPath, @@ -179,39 +183,15 @@ impl DisplayColumn { column_descriptor: &ColumnDescriptor, column_data: &ArrowArrayRef, ) -> Result { - fn int64_from_nanoseconds( - duration_array: &ArrowTimestampNanosecondArray, - ) -> Option { - let data = duration_array.to_data(); - let buffer = data.buffers().first()?.clone(); - arrow::array::ArrayData::builder(arrow::datatypes::DataType::Int64) - .len(duration_array.len()) - .add_buffer(buffer) - .build() - .ok() - .map(ArrowInt64Array::from) - } - match column_descriptor { ColumnDescriptor::Time(desc) => { let timeline = desc.timeline; - // Sequence timelines are i64, but time columns are nanoseconds (also as i64) - let time_data_result = column_data - .as_any() - .downcast_ref::() - .cloned() - .or_else(|| { - column_data - .as_any() - .downcast_ref::() - .and_then(int64_from_nanoseconds) - }); - let time_data = time_data_result.ok_or_else(|| { - DisplayRecordBatchError::UnexpectedTimeColumnDataType( - timeline.name().as_str().to_owned(), - column_data.data_type().to_owned(), - ) + let time_data = TimeColumn::read_array(column_data).map_err(|err| { + DisplayRecordBatchError::BadTimeColumn { + timeline: timeline.name().as_str().to_owned(), + error: err, + } })?; Ok(Self::Timeline { @@ -266,9 +246,8 @@ impl DisplayColumn { return; } - if time_data.is_valid(row_index) { - let timestamp = TimeInt::try_from(time_data.value(row_index)); - match timestamp { + if let Some(value) = time_data.get(row_index) { + match TimeInt::try_from(*value) { Ok(timestamp) => { ui.label(timeline.typ().format(timestamp, ctx.app_options.time_zone)); } @@ -304,8 +283,8 @@ impl DisplayColumn { pub(crate) fn try_decode_time(&self, row_index: usize) -> Option { match self { Self::Timeline { time_data, .. } => { - let timestamp = time_data.value(row_index); - TimeInt::try_from(timestamp).ok() + let timestamp = time_data.get(row_index)?; + TimeInt::try_from(*timestamp).ok() } Self::Component { .. } => None, } diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 04c6ea735e03..e150be7fc33b 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -3,11 +3,11 @@ use std::borrow::Cow; use arrow::{ - array::{make_array, ArrayData}, + array::{make_array, ArrayData, ArrayRef as ArrowArrayRef}, pyarrow::PyArrowType, }; use arrow2::{ - array::{Array, ListArray, PrimitiveArray}, + array::{Array, ListArray}, datatypes::Field, offset::Offsets, }; @@ -121,7 +121,13 @@ pub fn build_chunk_from_components( let timelines: Result, ChunkError> = arrays .into_iter() .zip(fields) - .map(|(value, field)| { + .map(|(array, field)| { + let timeline_data = + TimeColumn::read_array(&ArrowArrayRef::from(array)).map_err(|err| { + ChunkError::Malformed { + reason: format!("Invalid timeline {}: {err}", field.name), + } + })?; let timeline = match field.data_type() { arrow2::datatypes::DataType::Int64 => { Ok(Timeline::new_sequence(field.name.clone())) @@ -133,13 +139,6 @@ pub fn build_chunk_from_components( reason: format!("Invalid data_type for timeline: {}", field.name), }), }?; - let timeline_data = value - .as_any() - .downcast_ref::>() - .ok_or_else(|| ChunkError::Malformed { - reason: format!("Invalid primitive array for timeline: {}", field.name), - })? - .clone(); Ok((timeline, timeline_data)) }) .collect();