diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index 6e9aeda7a922c..3f20e2b9a7de7 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -322,7 +322,7 @@ fn build_table(n: usize, packed: bool) -> DataTable { // Do a serialization roundtrip to pack everything in contiguous memory. if packed { let (schema, columns) = table.serialize().unwrap(); - table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap(); + table = DataTable::deserialize(TableId::ZERO, &schema, columns, None).unwrap(); } table diff --git a/crates/re_arrow_store/src/arrow_util.rs b/crates/re_arrow_store/src/arrow_util.rs index cb84665a1354c..3555545089aba 100644 --- a/crates/re_arrow_store/src/arrow_util.rs +++ b/crates/re_arrow_store/src/arrow_util.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use ahash::HashSet; use arrow2::{ array::{ growable::make_growable, Array, FixedSizeListArray, ListArray, StructArray, UnionArray, diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 3e62a48367bb0..d87337fc5ccec 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -1,5 +1,11 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; +use re_data_store::log_db::collect_datatypes; +use re_log_types::{ + datagen::{build_frame_nr, build_some_point2d}, + DataCell, LogMsg, TimeInt, TimePoint, Timeline, +}; + thread_local! { static LIVE_BYTES_IN_THREAD: AtomicUsize = AtomicUsize::new(0); } @@ -48,18 +54,13 @@ fn live_bytes() -> usize { // ---------------------------------------------------------------------------- -use re_log_types::{entity_path, DataRow, RecordingId, RowId}; +use re_log_types::{entity_path, DataRow, DataTable, RecordingId, RowId, TableId}; fn main() { log_messages(); } fn log_messages() { - use re_log_types::{ - datagen::{build_frame_nr, build_some_point2d}, - LogMsg, TimeInt, TimePoint, Timeline, - }; - // Note: we use Box in this function so that we also count the "static" // part of all the data, i.e. its `std::mem::size_of`. @@ -89,6 +90,7 @@ fn log_messages() { bytes_used } + const NUM_ROWS: usize = 100_000; const NUM_POINTS: usize = 1_000; let recording_id = RecordingId::random(); @@ -104,55 +106,67 @@ fn log_messages() { drop(entity_path); } - { + fn arrow_payload(recording_id: RecordingId, num_rows: usize, num_points: usize, packed: bool) { + println!("--- {num_rows} rows each containing {num_points} points (packed={packed}) ---"); let used_bytes_start = live_bytes(); - let table = Box::new( - DataRow::from_cells1( - RowId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - 1, - build_some_point2d(1), - ) - .into_table(), - ); + let table = Box::new(create_table(num_rows, num_points, packed)); let table_bytes = live_bytes() - used_bytes_start; let log_msg = Box::new(LogMsg::ArrowMsg( recording_id, table.to_arrow_msg().unwrap(), )); let log_msg_bytes = live_bytes() - used_bytes_start; - println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); + println!( + "Arrow payload containing {num_points}x Pos2 uses {} bytes in RAM", + re_format::format_bytes(table_bytes as _) + ); let encoded = encode_log_msg(&log_msg); println!( - "Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded", - size_decoded(&encoded), encoded.len() + "Arrow LogMsg containing {num_points}x Pos2 uses {}-{} bytes in RAM, and {} bytes encoded", + re_format::format_bytes(size_decoded(&encoded) as _), + re_format::format_bytes(log_msg_bytes as _), + re_format::format_bytes(encoded.len() as _), ); + println!(); } + let num_rows = [1, NUM_ROWS]; + let num_points = [1, NUM_POINTS]; + let packed = [false, true]; + + for (num_rows, num_points, packed) in num_rows + .into_iter() + .flat_map(|num_row| std::iter::repeat(num_row).zip(num_points)) + .flat_map(|num_row| std::iter::repeat(num_row).zip(packed)) + .map(|((a, b), c)| (a, b, c)) { - let used_bytes_start = live_bytes(); - let table = Box::new( - DataRow::from_cells1( - RowId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - NUM_POINTS as _, - build_some_point2d(NUM_POINTS), - ) - .into_table(), - ); - let table_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg( - recording_id, - table.to_arrow_msg().unwrap(), - )); - let log_msg_bytes = live_bytes() - used_bytes_start; - println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); - let encoded = encode_log_msg(&log_msg); - println!( - "Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded", - size_decoded(&encoded), encoded.len() - ); + arrow_payload(recording_id, num_rows, num_points, packed); } } + +fn create_table(num_rows: usize, num_points: usize, packed: bool) -> DataTable { + let rows = (0..num_rows).map(|i| { + DataRow::from_cells1( + RowId::random(), + entity_path!("points"), + [build_frame_nr((i as i64).into())], + num_points as _, + build_some_point2d(num_points), + ) + }); + let mut table = DataTable::from_rows(TableId::random(), rows); + + // Do a serialization roundtrip to pack everything in contiguous memory. + if packed { + let (schema, columns) = table.serialize().unwrap(); + + let mut datatypes = Default::default(); + for column in columns.arrays() { + collect_datatypes(&mut datatypes, &**column); + } + + table = DataTable::deserialize(TableId::ZERO, &schema, columns, Some(&datatypes)).unwrap(); + } + + table +} diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index b520262aa4f9b..a4d9e6de3c929 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -1,12 +1,15 @@ use std::collections::BTreeMap; +use ahash::HashSet; use nohash_hasher::IntMap; use re_arrow_store::{DataStoreConfig, TimeInt}; use re_log_types::{ - component_types::InstanceKey, ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, - DataCell, DataRow, DataTable, EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, PathOp, - RecordingId, RecordingInfo, RowId, TimePoint, Timeline, + component_types::InstanceKey, + external::arrow2::{chunk::Chunk, datatypes::DataType}, + ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, + EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, PathOp, RecordingId, RecordingInfo, RowId, + TimePoint, Timeline, }; use crate::{Error, TimesPerTimeline}; @@ -26,6 +29,8 @@ pub struct EntityDb { /// Stores all components for all entities for all timelines. pub data_store: re_arrow_store::DataStore, + + pub datatypes: HashSet, } impl Default for EntityDb { @@ -38,6 +43,7 @@ impl Default for EntityDb { InstanceKey::name(), DataStoreConfig::default(), ), + datatypes: Default::default(), } } } @@ -54,11 +60,34 @@ impl EntityDb { .or_insert_with(|| entity_path.clone()); } - fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> { + fn try_add_arrow_msg(&mut self, msg: ArrowMsg) -> Result<(), Error> { crate::profile_function!(); + let ArrowMsg { + table_id, + timepoint_max, + schema, + chunk, + } = msg; + + // TODO: move everything in datatable? + // TODO: this obviously cannot be doing all these crazy clones + { + crate::profile_scope!("collect_datatypes"); + for column in chunk.arrays() { + collect_datatypes(&mut self.datatypes, &**column); + } + } + + let msg = ArrowMsg { + table_id, + timepoint_max, + schema, + chunk, + }; + // TODO(#1760): Compute the size of the datacells in the batching threads on the clients. - let mut table = DataTable::from_arrow_msg(msg)?; + let mut table = DataTable::from_arrow_msg(msg, Some(&self.datatypes))?; table.compute_all_size_bytes(); // TODO(#1619): batch all of this @@ -142,6 +171,7 @@ impl EntityDb { times_per_timeline, tree, data_store: _, // purged before this function is called + datatypes: _, } = self; { @@ -212,17 +242,17 @@ impl LogDb { self.num_rows() == 0 } - pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> { + pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> { crate::profile_function!(); - match &msg { + match msg { LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg), LogMsg::EntityPathOpMsg(_, msg) => { let EntityPathOpMsg { row_id, time_point, path_op, - } = msg; + } = &msg; self.entity_op_msgs.insert(*row_id, msg.clone()); self.entity_db.add_path_op(*row_id, time_point, path_op); } @@ -233,8 +263,8 @@ impl LogDb { Ok(()) } - fn add_begin_recording_msg(&mut self, msg: &BeginRecordingMsg) { - self.recording_msg = Some(msg.clone()); + fn add_begin_recording_msg(&mut self, msg: BeginRecordingMsg) { + self.recording_msg = Some(msg); } /// Returns an iterator over all [`EntityPathOpMsg`]s that have been written to this `LogDb`. @@ -278,3 +308,140 @@ impl LogDb { entity_db.purge(&cutoff_times, &drop_row_ids); } } + +// --- + +use itertools::Itertools; +use re_log_types::external::arrow2::{ + array::{ + growable::make_growable, Array, FixedSizeListArray, ListArray, StructArray, UnionArray, + }, + bitmap::Bitmap, + datatypes::{Field, UnionMode}, + offset::Offsets, +}; +use std::sync::Arc; + +pub fn collect_datatypes(datatypes: &mut HashSet, array: &dyn Array) { + crate::profile_function!(); + + fn fill(datatypes: &mut HashSet, array: &dyn Array) { + let datatype = array.data_type().clone(); + + if !datatypes.insert(datatype.clone()) { + return; + } + + match &datatype { + DataType::List(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + fill(datatypes, array.values().as_ref()); + } + DataType::LargeList(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + fill(datatypes, array.values().as_ref()); + } + DataType::FixedSizeList(_, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + fill(datatypes, array.values().as_ref()); + } + DataType::Struct(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + array + .values() + .iter() + .map(|v| fill(datatypes, v.as_ref())) + .for_each(|_| {}); + } + DataType::Union(_, _, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + array + .fields() + .iter() + .map(|v| fill(datatypes, v.as_ref())) + .for_each(|_| {}); + } + _ => {} + } + } + + fill(datatypes, array); +} + +// TODO +// - we shouldnt have to pay for crazy expensive virtual clones... datatype should be overridable +fn dedupe_datatypes(datatypes: &mut HashSet, array: &dyn Array) -> Box { + crate::profile_function!(); + + fn fill(datatypes: &mut HashSet, mut array: &dyn Array) -> Box { + let datatype = if let Some(datatype) = datatypes.get(array.data_type()) { + datatype.clone() + } else { + let datatype = array.data_type().clone(); + datatypes.insert(datatype.clone()); + datatype + }; + + match array.data_type() { + DataType::List(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + ListArray::::new( + datatype, + array.offsets().clone(), + fill(datatypes, array.values().as_ref()), + array.validity().cloned(), + ) + .boxed() + } + DataType::LargeList(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + ListArray::::new( + datatype, + array.offsets().clone(), + fill(datatypes, array.values().as_ref()), + array.validity().cloned(), + ) + .boxed() + } + DataType::FixedSizeList(_, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + FixedSizeListArray::new( + datatype, + fill(datatypes, array.values().as_ref()), + array.validity().cloned(), + ) + .boxed() + } + DataType::Struct(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + StructArray::new( + datatype, + array + .values() + .iter() + .map(|v| fill(datatypes, v.as_ref())) + .collect_vec(), + array.validity().cloned(), + ) + .boxed() + } + DataType::Union(_, _, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + UnionArray::new( + datatype, + array.types().clone(), + array + .fields() + .iter() + .map(|v| fill(datatypes, v.as_ref())) + .collect_vec(), + array.offsets().cloned(), + ) + .boxed() + } + _ => array.to_boxed(), + } + } + + fill(datatypes, array) +} diff --git a/crates/re_log_encoding/benches/msg_encode_benchmark.rs b/crates/re_log_encoding/benches/msg_encode_benchmark.rs index ddc7fc9740d96..abddad67b4996 100644 --- a/crates/re_log_encoding/benches/msg_encode_benchmark.rs +++ b/crates/re_log_encoding/benches/msg_encode_benchmark.rs @@ -54,7 +54,7 @@ fn decode_tables(messages: &[LogMsg]) -> Vec { .iter() .map(|log_msg| { if let LogMsg::ArrowMsg(_, arrow_msg) = log_msg { - DataTable::from_arrow_msg(arrow_msg).unwrap() + DataTable::from_arrow_msg(arrow_msg.clone(), None).unwrap() // TODO } else { unreachable!() } diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index 54e5c01b68e5a..8de641aa114f5 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -155,7 +155,7 @@ mod tests { let buf = rmp_serde::to_vec(&msg_in).unwrap(); let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap(); let table_out = { - let mut table = DataTable::from_arrow_msg(&msg_out).unwrap(); + let mut table = DataTable::from_arrow_msg(msg_out.clone(), None).unwrap(); table.compute_all_size_bytes(); table }; diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index c7f2776b0d80e..3168a89591c0a 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeMap, sync::Arc}; -use ahash::HashMap; +use ahash::{HashMap, HashSet}; use itertools::Itertools as _; use nohash_hasher::{IntMap, IntSet}; use smallvec::SmallVec; @@ -879,7 +879,8 @@ impl DataTable { pub fn deserialize( table_id: TableId, schema: &Schema, - chunk: &Chunk>, + chunk: Chunk>, + datatypes: Option<&HashSet>, ) -> DataTableResult { crate::profile_function!(); @@ -933,22 +934,25 @@ impl DataTable { // --- Components --- + let mut columns = chunk.into_arrays(); + let columns: DataTableResult<_> = schema .fields .iter() .enumerate() .filter_map(|(i, field)| { - field.metadata.get(METADATA_KIND).and_then(|kind| { - (kind == METADATA_KIND_DATA).then_some((field.name.as_str(), i)) - }) + field + .metadata + .get(METADATA_KIND) + .and_then(|kind| (kind == METADATA_KIND_DATA).then_some((field, i))) }) - .map(|(name, index)| { - let component: ComponentName = name.into(); - chunk - .get(index) - .ok_or(DataTableError::MissingColumn(name.to_owned())) + .map(|(field, index)| { + let component: ComponentName = field.name.as_str().into(); + columns + .get_mut(index) + .ok_or(DataTableError::MissingColumn(field.name.clone())) .and_then(|column| { - Self::deserialize_data_column(component, &**column) + Self::deserialize_data_column(component, &**column, datatypes) .map(|data| (component, data)) }) }) @@ -998,17 +1002,43 @@ impl DataTable { fn deserialize_data_column( component: ComponentName, column: &dyn Array, + datatypes: Option<&HashSet>, ) -> DataTableResult { crate::profile_function!(); + + let column = column + .as_any() + .downcast_ref::>() + .ok_or(DataTableError::NotAColumn(component.to_string()))?; + + // let datatype = datatypes.and_then(|datatypes| { + // datatypes.get(ListArray::::get_child_type(column.data_type())) + // }); + + // let chunk = { + // crate::profile_scope!("dedupe_datatypes"); + // let mut columns = chunk.into_arrays(); + // for column in &mut columns { + // *column = dedupe_datatypes(&mut self.datatypes, column.as_ref()); + // } + // Chunk::new(columns) + // }; + Ok(DataCellColumn( column - .as_any() - .downcast_ref::>() - .ok_or(DataTableError::NotAColumn(component.to_string()))? .iter() // TODO(#1805): Schema metadata gets cloned in every single array. // This'll become a problem as soon as we enable batching. - .map(|array| array.map(|values| DataCell::from_arrow(component, values))) + .map(|array| { + array.map(|values| { + if let Some(datatypes) = datatypes { + let values = swap_datatypes(datatypes, values); + DataCell::from_arrow(component, values) + } else { + DataCell::from_arrow(component, values) + } + }) + }) .collect(), )) } @@ -1019,7 +1049,10 @@ impl DataTable { impl DataTable { /// Deserializes the contents of an [`ArrowMsg`] into a `DataTable`. #[inline] - pub fn from_arrow_msg(msg: &ArrowMsg) -> DataTableResult { + pub fn from_arrow_msg( + msg: ArrowMsg, + datatypes: Option<&HashSet>, + ) -> DataTableResult { let ArrowMsg { table_id, timepoint_max: _, @@ -1027,7 +1060,7 @@ impl DataTable { chunk, } = msg; - Self::deserialize(*table_id, schema, chunk) + Self::deserialize(table_id, &schema, chunk, datatypes) } /// Serializes the contents of a `DataTable` into an [`ArrowMsg`]. @@ -1130,3 +1163,83 @@ impl DataTable { table } } + +// --- Deduplication --- + +use arrow2::array::{FixedSizeListArray, StructArray, UnionArray}; +use itertools::Itertools; + +// TODO +// - we shouldnt have to pay for crazy expensive virtual clones... datatype should be overridable +fn swap_datatypes(datatypes: &HashSet, array: Box) -> Box { + crate::profile_function!(); + + fn fill(datatypes: &HashSet, array: &Box) -> Box { + let datatype = datatypes + .get(array.data_type()) + .cloned() + .unwrap_or(array.data_type().clone()); + + match array.data_type() { + DataType::List(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + ListArray::::new( + datatype, + array.offsets().clone(), + fill(datatypes, array.values()), + array.validity().cloned(), + ) + .boxed() + } + DataType::LargeList(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + ListArray::::new( + datatype, + array.offsets().clone(), + fill(datatypes, array.values()), + array.validity().cloned(), + ) + .boxed() + } + DataType::FixedSizeList(_, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + FixedSizeListArray::new( + datatype, + fill(datatypes, array.values()), + array.validity().cloned(), + ) + .boxed() + } + DataType::Struct(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + StructArray::new( + datatype, + array + .values() + .iter() + .map(|v| fill(datatypes, v)) + .collect_vec(), + array.validity().cloned(), + ) + .boxed() + } + DataType::Union(_, _, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + UnionArray::new( + datatype, + array.types().clone(), + array + .fields() + .iter() + .map(|v| fill(datatypes, v)) + .collect_vec(), + array.offsets().cloned(), + ) + .boxed() + } + _ => array.to_boxed(), + } + } + + fill(datatypes, &array) +} diff --git a/crates/re_viewer/src/app.rs b/crates/re_viewer/src/app.rs index b001c12fbbefb..fe665ef4a9fe1 100644 --- a/crates/re_viewer/src/app.rs +++ b/crates/re_viewer/src/app.rs @@ -704,7 +704,7 @@ impl App { log_db.data_source = Some(self.rx.source().clone()); } - if let Err(err) = log_db.add(&msg) { + if let Err(err) = log_db.add(msg) { re_log::error!("Failed to add incoming msg: {err}"); }; @@ -1810,7 +1810,7 @@ fn load_rrd_to_log_db(mut read: impl std::io::Read) -> anyhow::Result { let mut log_db = LogDb::default(); for msg in decoder { - log_db.add(&msg?)?; + log_db.add(msg?)?; } Ok(log_db) } diff --git a/crates/re_viewer/src/ui/data_ui/log_msg.rs b/crates/re_viewer/src/ui/data_ui/log_msg.rs index c4c982c2d066a..4c886d5a779ca 100644 --- a/crates/re_viewer/src/ui/data_ui/log_msg.rs +++ b/crates/re_viewer/src/ui/data_ui/log_msg.rs @@ -101,7 +101,7 @@ impl DataUi for ArrowMsg { verbosity: UiVerbosity, query: &re_arrow_store::LatestAtQuery, ) { - let table = match DataTable::from_arrow_msg(self) { + let table = match DataTable::from_arrow_msg(self.clone() /* TODO */, None) { Ok(table) => table, Err(err) => { ui.label( diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 0cbebd6ed75bb..8e05289a61af3 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -441,7 +441,7 @@ fn receive_into_log_db(rx: &Receiver) -> anyhow::Result { re_log::info_once!("Received first message."); let is_goodbye = matches!(msg, re_log_types::LogMsg::Goodbye(_)); - db.add(&msg)?; + db.add(msg)?; num_messages += 1; if is_goodbye { db.entity_db.data_store.sanity_check()?;