diff --git a/Cargo.lock b/Cargo.lock index 43602ada5aa1..ed522826e9b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4765,6 +4765,7 @@ dependencies = [ "rand", "re_build_info", "re_build_tools", + "re_chunk", "re_data_loader", "re_data_store", "re_log", @@ -5525,6 +5526,7 @@ dependencies = [ "re_arrow2", "re_build_info", "re_build_tools", + "re_chunk", "re_log", "re_log_types", "re_memory", diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 334f106336d5..87a40d7adcf4 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -272,67 +272,6 @@ re_types_core::delegate_arrow_tuid!(TableId as "rerun.controls.TableId"); /// │ 2 ┆ 2023-04-05 09:36:47.188855872 ┆ 1753004ACBF5D6E651F2983C3DAF260C ┆ c ┆ [hey] ┆ - ┆ [4294967295] │ /// └──────────┴───────────────────────────────┴──────────────────────────────────┴───────────────────┴─────────────┴──────────────────────────────────┴─────────────────┘ /// ``` -/// -/// ## Example -/// -/// ```rust -/// # use re_log_types::{ -/// # example_components::{MyColor, MyLabel, MyPoint}, -/// # DataRow, DataTable, RowId, TableId, Timeline, TimePoint, -/// # }; -/// # -/// # let table_id = TableId::new(); -/// # -/// # let timepoint = |frame_nr: i64, clock: i64| { -/// # TimePoint::from([ -/// # (Timeline::new_sequence("frame_nr"), frame_nr), -/// # (Timeline::new_sequence("clock"), clock), -/// # ]) -/// # }; -/// # -/// let row0 = { -/// let points: &[MyPoint] = &[MyPoint { x: 10.0, y: 10.0 }, MyPoint { x: 20.0, y: 20.0 }]; -/// let colors: &[_] = &[MyColor(0xff7f7f7f)]; -/// let labels: &[MyLabel] = &[]; -/// -/// DataRow::from_cells3( -/// RowId::new(), -/// "a", -/// timepoint(1, 1), -/// (points, colors, labels), -/// ).unwrap() -/// }; -/// -/// let row1 = { -/// let colors: &[MyColor] = &[]; -/// -/// DataRow::from_cells1(RowId::new(), "b", timepoint(1, 2), colors).unwrap() -/// }; -/// -/// let row2 = { -/// let colors: &[_] = &[MyColor(0xff7f7f7f)]; -/// let labels: &[_] = &[MyLabel("hey".into())]; -/// -/// DataRow::from_cells2( -/// RowId::new(), -/// "c", -/// timepoint(2, 1), -/// (colors, labels), -/// ).unwrap() -/// }; -/// -/// let table_in = DataTable::from_rows(table_id, [row0, row1, row2]); -/// eprintln!("Table in:\n{table_in}"); -/// -/// let (schema, columns) = table_in.serialize().unwrap(); -/// // eprintln!("{schema:#?}"); -/// eprintln!("Wired chunk:\n{columns:#?}"); -/// -/// let table_out = DataTable::deserialize(table_id, &schema, &columns).unwrap(); -/// eprintln!("Table out:\n{table_out}"); -/// # -/// # assert_eq!(table_in, table_out); -/// ``` #[derive(Debug, Clone, PartialEq)] pub struct DataTable { /// Auto-generated `TUID`, uniquely identifying this batch of data and keeping track of the @@ -582,6 +521,20 @@ impl DataTable { let mut schema = Schema::default(); let mut columns = Vec::new(); + // Temporary compatibility layer with Chunks. + if let Some(entity_path) = self.col_entity_path.front() { + /// The key used to identify a Rerun [`EntityPath`] in chunk-level [`ArrowSchema`] metadata. + // + // NOTE: Temporarily copied from `re_chunk` while we're transitioning away to the new data + // model. + const CHUNK_METADATA_KEY_ENTITY_PATH: &str = "rerun.entity_path"; + + schema.metadata.insert( + CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(), + entity_path.to_string(), + ); + } + { let (control_schema, control_columns) = self.serialize_time_columns(); schema.fields.extend(control_schema.fields); @@ -873,6 +826,18 @@ impl DataTable { ) -> DataTableResult { re_tracing::profile_function!(); + /// The key used to identify a Rerun [`EntityPath`] in chunk-level [`ArrowSchema`] metadata. + // + // NOTE: Temporarily copied from `re_chunk` while we're transitioning away to the new data + // model. + const CHUNK_METADATA_KEY_ENTITY_PATH: &str = "rerun.entity_path"; + + let entity_path = schema + .metadata + .get(CHUNK_METADATA_KEY_ENTITY_PATH) + .ok_or_else(|| DataTableError::MissingColumn("metadata:entity_path".to_owned()))?; + let entity_path = EntityPath::parse_forgiving(entity_path); + // --- Time --- let col_timelines: DataTableResult<_> = schema @@ -920,13 +885,9 @@ impl DataTable { .unwrap() .as_ref(), )?; - #[allow(clippy::unwrap_used)] - let col_entity_path = EntityPath::from_arrow( - chunk - .get(control_index(EntityPath::name().as_str())?) - .unwrap() - .as_ref(), - )?; + let col_entity_path = std::iter::repeat_with(|| entity_path.clone()) + .take(col_row_id.len()) + .collect_vec(); // --- Components --- diff --git a/crates/re_log_types/src/data_table_batcher.rs b/crates/re_log_types/src/data_table_batcher.rs deleted file mode 100644 index f45de80f89cc..000000000000 --- a/crates/re_log_types/src/data_table_batcher.rs +++ /dev/null @@ -1,548 +0,0 @@ -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; - -use crossbeam::channel::{Receiver, Sender}; - -use re_types_core::SizeBytes as _; - -use crate::{DataRow, DataTable, TableId}; - -// --- - -/// Errors that can occur when creating/manipulating a [`DataTableBatcher`]. -#[derive(thiserror::Error, Debug)] -pub enum DataTableBatcherError { - /// Error when parsing configuration from environment. - #[error("Failed to parse config: '{name}={value}': {err}")] - ParseConfig { - name: &'static str, - value: String, - err: Box, - }, - - /// Error spawning one of the background threads. - #[error("Failed to spawn background thread '{name}': {err}")] - SpawnThread { - name: &'static str, - err: Box, - }, -} - -pub type DataTableBatcherResult = Result; - -/// Callbacks you can install on the [`DataTableBatcher`]. -#[derive(Clone, Default)] -pub struct BatcherHooks { - /// Called when a new row arrives. - /// - /// The callback is given the slice of all rows not yet batched, - /// including the new one. - /// - /// Used for testing. - #[allow(clippy::type_complexity)] - pub on_insert: Option>, - - /// Callback to be run when an Arrow Chunk goes out of scope. - /// - /// See [`crate::ArrowChunkReleaseCallback`] for more information. - pub on_release: Option, -} - -impl BatcherHooks { - pub const NONE: Self = Self { - on_insert: None, - on_release: None, - }; -} - -impl PartialEq for BatcherHooks { - fn eq(&self, other: &Self) -> bool { - let Self { - on_insert, - on_release, - } = self; - - let on_insert_eq = match (on_insert, &other.on_insert) { - (Some(a), Some(b)) => Arc::ptr_eq(a, b), - (None, None) => true, - _ => false, - }; - - on_insert_eq && on_release == &other.on_release - } -} - -impl std::fmt::Debug for BatcherHooks { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Self { - on_insert, - on_release, - } = self; - f.debug_struct("BatcherHooks") - .field("on_insert", &on_insert.as_ref().map(|_| "…")) - .field("on_release", &on_release) - .finish() - } -} - -// --- - -/// Defines the different thresholds of the associated [`DataTableBatcher`]. -/// -/// See [`Self::default`] and [`Self::from_env`]. -#[derive(Clone, Debug, PartialEq)] -pub struct DataTableBatcherConfig { - /// Duration of the periodic tick. - // - // NOTE: We use `std::time` directly because this library has to deal with `crossbeam` as well - // as std threads, which both expect standard types anyway. - // - // TODO(cmc): Add support for burst debouncing. - pub flush_tick: Duration, - - /// Flush if the accumulated payload has a size in bytes equal or greater than this. - /// - /// The resulting [`DataTable`] might be larger than `flush_num_bytes`! - pub flush_num_bytes: u64, - - /// Flush if the accumulated payload has a number of rows equal or greater than this. - pub flush_num_rows: u64, - - /// Size of the internal channel of commands. - /// - /// Unbounded if left unspecified. - pub max_commands_in_flight: Option, - - /// Size of the internal channel of [`DataTable`]s. - /// - /// Unbounded if left unspecified. - pub max_tables_in_flight: Option, - - /// Callbacks you can install on the [`DataTableBatcher`]. - pub hooks: BatcherHooks, -} - -impl Default for DataTableBatcherConfig { - fn default() -> Self { - Self::DEFAULT - } -} - -impl DataTableBatcherConfig { - /// Default configuration, applicable to most use cases. - pub const DEFAULT: Self = Self { - flush_tick: Duration::from_millis(8), // We want it fast enough for 60 Hz for real time camera feel - flush_num_bytes: 1024 * 1024, // 1 MiB - flush_num_rows: u64::MAX, - max_commands_in_flight: None, - max_tables_in_flight: None, - hooks: BatcherHooks::NONE, - }; - - /// Always flushes ASAP. - pub const ALWAYS: Self = Self { - flush_tick: Duration::MAX, - flush_num_bytes: 0, - flush_num_rows: 0, - max_commands_in_flight: None, - max_tables_in_flight: None, - hooks: BatcherHooks::NONE, - }; - - /// Never flushes unless manually told to. - pub const NEVER: Self = Self { - flush_tick: Duration::MAX, - flush_num_bytes: u64::MAX, - flush_num_rows: u64::MAX, - max_commands_in_flight: None, - max_tables_in_flight: None, - hooks: BatcherHooks::NONE, - }; - - /// Environment variable to configure [`Self::flush_tick`]. - pub const ENV_FLUSH_TICK: &'static str = "RERUN_FLUSH_TICK_SECS"; - - /// Environment variable to configure [`Self::flush_num_bytes`]. - pub const ENV_FLUSH_NUM_BYTES: &'static str = "RERUN_FLUSH_NUM_BYTES"; - - /// Environment variable to configure [`Self::flush_num_rows`]. - pub const ENV_FLUSH_NUM_ROWS: &'static str = "RERUN_FLUSH_NUM_ROWS"; - - /// Creates a new `DataTableBatcherConfig` using the default values, optionally overridden - /// through the environment. - /// - /// See [`Self::apply_env`]. - #[inline] - pub fn from_env() -> DataTableBatcherResult { - Self::default().apply_env() - } - - /// Returns a copy of `self`, overriding existing fields with values from the environment if - /// they are present. - /// - /// See [`Self::ENV_FLUSH_TICK`], [`Self::ENV_FLUSH_NUM_BYTES`], [`Self::ENV_FLUSH_NUM_BYTES`]. - pub fn apply_env(&self) -> DataTableBatcherResult { - let mut new = self.clone(); - - if let Ok(s) = std::env::var(Self::ENV_FLUSH_TICK) { - let flush_duration_secs: f64 = - s.parse() - .map_err(|err| DataTableBatcherError::ParseConfig { - name: Self::ENV_FLUSH_TICK, - value: s.clone(), - err: Box::new(err), - })?; - - new.flush_tick = Duration::from_secs_f64(flush_duration_secs); - } - - if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) { - if let Some(num_bytes) = re_format::parse_bytes(&s) { - // e.g. "10MB" - new.flush_num_bytes = num_bytes.unsigned_abs(); - } else { - // Assume it's just an integer - new.flush_num_bytes = - s.parse() - .map_err(|err| DataTableBatcherError::ParseConfig { - name: Self::ENV_FLUSH_NUM_BYTES, - value: s.clone(), - err: Box::new(err), - })?; - } - } - - if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) { - new.flush_num_rows = s - .parse() - .map_err(|err| DataTableBatcherError::ParseConfig { - name: Self::ENV_FLUSH_NUM_ROWS, - value: s.clone(), - err: Box::new(err), - })?; - } - - Ok(new) - } -} - -#[test] -fn data_table_batcher_config() { - // Detect breaking changes in our environment variables. - std::env::set_var("RERUN_FLUSH_TICK_SECS", "0.3"); - std::env::set_var("RERUN_FLUSH_NUM_BYTES", "42"); - std::env::set_var("RERUN_FLUSH_NUM_ROWS", "666"); - - let config = DataTableBatcherConfig::from_env().unwrap(); - - let expected = DataTableBatcherConfig { - flush_tick: Duration::from_millis(300), - flush_num_bytes: 42, - flush_num_rows: 666, - ..Default::default() - }; - - assert_eq!(expected, config); -} - -// --- - -/// Implements an asynchronous batcher that coalesces [`DataRow`]s into [`DataTable`]s based upon -/// the thresholds defined in the associated [`DataTableBatcherConfig`]. -/// -/// ## Multithreading and ordering -/// -/// [`DataTableBatcher`] can be cheaply clone and used freely across any number of threads. -/// -/// Internally, all operations are linearized into a pipeline: -/// - All operations sent by a given thread will take effect in the same exact order as that -/// thread originally sent them in, from its point of view. -/// - There isn't any well defined global order across multiple threads. -/// -/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all -/// previous data sent by the calling thread has been batched and sent down the channel returned -/// by [`DataTableBatcher::tables`]; no more, no less. -/// -/// ## Shutdown -/// -/// The batcher can only be shutdown by dropping all instances of it, at which point it will -/// automatically take care of flushing any pending data that might remain in the pipeline. -/// -/// Shutting down cannot ever block. -#[derive(Clone)] -pub struct DataTableBatcher { - inner: Arc, -} - -// NOTE: The receiving end of the command stream as well as the sending end of the table stream are -// owned solely by the batching thread. -struct DataTableBatcherInner { - /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed, - /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's - /// running. - tx_cmds: Sender, - // NOTE: Option so we can make shutdown non-blocking even with bounded channels. - rx_tables: Option>, - cmds_to_tables_handle: Option>, -} - -impl Drop for DataTableBatcherInner { - fn drop(&mut self) { - // Drop the receiving end of the table stream first and foremost, so that we don't block - // even if the output channel is bounded and currently full. - if let Some(rx_tables) = self.rx_tables.take() { - if !rx_tables.is_empty() { - re_log::warn!("Dropping data"); - } - } - - // NOTE: The command channel is private, if we're here, nothing is currently capable of - // sending data down the pipeline. - self.tx_cmds.send(Command::Shutdown).ok(); - if let Some(handle) = self.cmds_to_tables_handle.take() { - handle.join().ok(); - } - } -} - -enum Command { - // TODO(cmc): support for appending full tables - AppendRow(DataRow), - Flush(Sender<()>), - Shutdown, -} - -impl Command { - fn flush() -> (Self, Receiver<()>) { - let (tx, rx) = crossbeam::channel::bounded(0); // oneshot - (Self::Flush(tx), rx) - } -} - -impl DataTableBatcher { - /// Creates a new [`DataTableBatcher`] using the passed in `config`. - /// - /// The returned object must be kept in scope: dropping it will trigger a clean shutdown of the - /// batcher. - #[must_use = "Batching threads will automatically shutdown when this object is dropped"] - #[allow(clippy::needless_pass_by_value)] - pub fn new(config: DataTableBatcherConfig) -> DataTableBatcherResult { - let (tx_cmds, rx_cmd) = match config.max_commands_in_flight { - Some(cap) => crossbeam::channel::bounded(cap as _), - None => crossbeam::channel::unbounded(), - }; - - let (tx_table, rx_tables) = match config.max_tables_in_flight { - Some(cap) => crossbeam::channel::bounded(cap as _), - None => crossbeam::channel::unbounded(), - }; - - let cmds_to_tables_handle = { - const NAME: &str = "DataTableBatcher::cmds_to_tables"; - std::thread::Builder::new() - .name(NAME.into()) - .spawn({ - let config = config.clone(); - move || batching_thread(config, rx_cmd, tx_table) - }) - .map_err(|err| DataTableBatcherError::SpawnThread { - name: NAME, - err: Box::new(err), - })? - }; - - re_log::debug!(?config, "creating new table batcher"); - - let inner = DataTableBatcherInner { - tx_cmds, - rx_tables: Some(rx_tables), - cmds_to_tables_handle: Some(cmds_to_tables_handle), - }; - - Ok(Self { - inner: Arc::new(inner), - }) - } - - // --- Send commands --- - - /// Pushes a [`DataRow`] down the batching pipeline. - /// - /// This will call [`DataRow::compute_all_size_bytes`] from the batching thread! - /// - /// See [`DataTableBatcher`] docs for ordering semantics and multithreading guarantees. - #[inline] - pub fn push_row(&self, row: DataRow) { - self.inner.push_row(row); - } - - /// Initiates a flush of the pipeline and returns immediately. - /// - /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]). - /// See [`DataTableBatcher`] docs for ordering semantics and multithreading guarantees. - #[inline] - pub fn flush_async(&self) { - self.inner.flush_async(); - } - - /// Initiates a flush the batching pipeline and waits for it to propagate. - /// - /// See [`DataTableBatcher`] docs for ordering semantics and multithreading guarantees. - #[inline] - pub fn flush_blocking(&self) { - self.inner.flush_blocking(); - } - - // --- Subscribe to tables --- - - /// Returns a _shared_ channel in which are sent the batched [`DataTable`]s. - /// - /// Shutting down the batcher will close this channel. - /// - /// See [`DataTableBatcher`] docs for ordering semantics and multithreading guarantees. - pub fn tables(&self) -> Receiver { - // NOTE: `rx_tables` is only ever taken when the batcher as a whole is dropped, at which - // point it is impossible to call this method. - #![allow(clippy::unwrap_used)] - self.inner.rx_tables.clone().unwrap() - } -} - -impl DataTableBatcherInner { - fn push_row(&self, row: DataRow) { - self.send_cmd(Command::AppendRow(row)); - } - - fn flush_async(&self) { - let (flush_cmd, _) = Command::flush(); - self.send_cmd(flush_cmd); - } - - fn flush_blocking(&self) { - let (flush_cmd, oneshot) = Command::flush(); - self.send_cmd(flush_cmd); - oneshot.recv().ok(); - } - - fn send_cmd(&self, cmd: Command) { - // NOTE: Internal channels can never be closed outside of the `Drop` impl, this cannot - // fail. - self.tx_cmds.send(cmd).ok(); - } -} - -#[allow(clippy::needless_pass_by_value)] -fn batching_thread( - config: DataTableBatcherConfig, - rx_cmd: Receiver, - tx_table: Sender, -) { - let rx_tick = crossbeam::channel::tick(config.flush_tick); - - struct Accumulator { - latest: Instant, - pending_rows: Vec, - pending_num_bytes: u64, - } - - impl Accumulator { - fn reset(&mut self) { - self.latest = Instant::now(); - self.pending_rows.clear(); - self.pending_num_bytes = 0; - } - } - - let mut acc = Accumulator { - latest: Instant::now(), - pending_rows: Default::default(), - pending_num_bytes: Default::default(), - }; - - fn do_push_row(acc: &mut Accumulator, mut row: DataRow) { - // TODO(#1760): now that we're re doing this here, it really is a massive waste not to send - // it over the wire… - row.compute_all_size_bytes(); - - acc.pending_num_bytes += row.total_size_bytes(); - acc.pending_rows.push(row); - } - - fn do_flush_all(acc: &mut Accumulator, tx_table: &Sender, reason: &str) { - let rows = &mut acc.pending_rows; - - if rows.is_empty() { - return; - } - - re_log::trace!( - "Flushing {} rows and {} bytes. Reason: {reason}", - rows.len(), - re_format::format_bytes(acc.pending_num_bytes as _) - ); - - let table = DataTable::from_rows(TableId::new(), rows.drain(..)); - // TODO(#1981): efficient table sorting here, following the same rules as the store's. - // table.sort(); - - // NOTE: This can only fail if all receivers have been dropped, which simply cannot happen - // as long the batching thread is alive… which is where we currently are. - tx_table.send(table).ok(); - - acc.reset(); - } - - re_log::trace!( - "Flushing every: {:.2}s, {} rows, {}", - config.flush_tick.as_secs_f64(), - config.flush_num_rows, - re_format::format_bytes(config.flush_num_bytes as _), - ); - - use crossbeam::select; - loop { - select! { - recv(rx_cmd) -> cmd => { - let Ok(cmd) = cmd else { - // All command senders are gone, which can only happen if the - // `DataTableBatcher` itself has been dropped. - break; - }; - - match cmd { - Command::AppendRow(row) => { - do_push_row(&mut acc, row); - - if let Some(config) = config.hooks.on_insert.as_ref() { - config(&acc.pending_rows); - } - - if acc.pending_rows.len() as u64 >= config.flush_num_rows { - do_flush_all(&mut acc, &tx_table, "rows"); - } else if acc.pending_num_bytes >= config.flush_num_bytes { - do_flush_all(&mut acc, &tx_table, "bytes"); - } - }, - Command::Flush(oneshot) => { - do_flush_all(&mut acc, &tx_table, "manual"); - drop(oneshot); // signals the oneshot - }, - Command::Shutdown => break, - }; - }, - recv(rx_tick) -> _ => { - do_flush_all(&mut acc, &tx_table, "tick"); - }, - }; - } - - drop(rx_cmd); - do_flush_all(&mut acc, &tx_table, "shutdown"); - drop(tx_table); - - // NOTE: The receiving end of the command stream as well as the sending end of the table - // stream are owned solely by this thread. - // Past this point, all command writes and all table reads will return `ErrDisconnected`. -} diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index 57ecc4b6037a..5475389f50e4 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -32,9 +32,6 @@ mod time; mod time_real; mod vec_deque_ext; -#[cfg(not(target_arch = "wasm32"))] -mod data_table_batcher; - use std::sync::Arc; use re_build_info::CrateVersion; @@ -60,11 +57,6 @@ pub use self::time_point::{ pub use self::time_real::TimeReal; pub use self::vec_deque_ext::{VecDequeInsertionExt, VecDequeRemovalExt, VecDequeSortingExt}; -#[cfg(not(target_arch = "wasm32"))] -pub use self::data_table_batcher::{ - DataTableBatcher, DataTableBatcherConfig, DataTableBatcherError, -}; - pub mod external { pub use arrow2; diff --git a/crates/re_sdk/Cargo.toml b/crates/re_sdk/Cargo.toml index 2cf286ed48e8..ce8adf3014e8 100644 --- a/crates/re_sdk/Cargo.toml +++ b/crates/re_sdk/Cargo.toml @@ -51,6 +51,7 @@ web_viewer = [ [dependencies] re_build_info.workspace = true +re_chunk.workspace = true re_log_encoding = { workspace = true, features = ["encoder"] } re_log_types.workspace = true re_log.workspace = true diff --git a/crates/re_sdk/src/lib.rs b/crates/re_sdk/src/lib.rs index 456ef8c92113..df1c8f9b3d21 100644 --- a/crates/re_sdk/src/lib.rs +++ b/crates/re_sdk/src/lib.rs @@ -74,10 +74,11 @@ pub mod sink { /// Things directly related to logging. pub mod log { - pub use re_log_types::{ - DataCell, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig, LogMsg, RowId, - TableId, + pub use re_chunk::{ + Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError, ChunkBatcherResult, ChunkError, + ChunkResult, PendingRow, TransportChunk, }; + pub use re_log_types::{DataCell, DataRow, DataTable, LogMsg, RowId, TableId}; } /// Time-related types. diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index e7f38e92df37..b5ae4a388bfb 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::fmt; use std::io::IsTerminal; use std::sync::Weak; @@ -5,13 +6,13 @@ use std::sync::{atomic::AtomicI64, Arc}; use ahash::HashMap; use crossbeam::channel::{Receiver, Sender}; - use itertools::Either; use parking_lot::Mutex; + +use re_chunk::{Chunk, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError, PendingRow}; use re_log_types::{ - ApplicationId, ArrowChunkReleaseCallback, BlueprintActivationCommand, DataCell, DataCellError, - DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig, DataTableBatcherError, - EntityPath, LogMsg, RowId, StoreId, StoreInfo, StoreKind, StoreSource, Time, TimeInt, + ApplicationId, ArrowChunkReleaseCallback, ArrowMsg, BlueprintActivationCommand, DataCellError, + EntityPath, LogMsg, RowId, StoreId, StoreInfo, StoreKind, StoreSource, TableId, Time, TimeInt, TimePoint, TimeType, Timeline, TimelineName, }; use re_types_core::{AsComponents, ComponentBatch, SerializationError}; @@ -49,9 +50,9 @@ pub enum RecordingStreamError { #[error("Failed to create the underlying file sink: {0}")] FileSink(#[from] re_log_encoding::FileSinkError), - /// Error within the underlying table batcher. + /// Error within the underlying chunk batcher. #[error("Failed to spawn the underlying batcher: {0}")] - DataTableBatcher(#[from] DataTableBatcherError), + ChunkBatcher(#[from] ChunkBatcherError), /// Error within the underlying data cell. #[error("Failed to instantiate data cell: {0}")] @@ -112,7 +113,7 @@ pub struct RecordingStreamBuilder { default_enabled: bool, enabled: Option, - batcher_config: Option, + batcher_config: Option, is_official_example: bool, } @@ -206,9 +207,9 @@ impl RecordingStreamBuilder { /// Specifies the configuration of the internal data batching mechanism. /// - /// See [`DataTableBatcher`] & [`DataTableBatcherConfig`] for more information. + /// See [`ChunkBatcher`] & [`ChunkBatcherConfig`] for more information. #[inline] - pub fn batcher_config(mut self, config: DataTableBatcherConfig) -> Self { + pub fn batcher_config(mut self, config: ChunkBatcherConfig) -> Self { self.batcher_config = Some(config); self } @@ -528,7 +529,7 @@ impl RecordingStreamBuilder { /// /// This can be used to then construct a [`RecordingStream`] manually using /// [`RecordingStream::new`]. - pub fn into_args(self) -> (bool, StoreInfo, DataTableBatcherConfig) { + pub fn into_args(self) -> (bool, StoreInfo, ChunkBatcherConfig) { let enabled = self.is_enabled(); let Self { @@ -559,11 +560,11 @@ impl RecordingStreamBuilder { }; let batcher_config = - batcher_config.unwrap_or_else(|| match DataTableBatcherConfig::from_env() { + batcher_config.unwrap_or_else(|| match ChunkBatcherConfig::from_env() { Ok(config) => config, Err(err) => { - re_log::error!("Failed to parse DataTableBatcherConfig from env: {}", err); - DataTableBatcherConfig::default() + re_log::error!("Failed to parse ChunkBatcherConfig from env: {}", err); + ChunkBatcherConfig::default() } }); @@ -681,7 +682,7 @@ struct RecordingStreamInner { /// running. cmds_tx: Sender, - batcher: DataTableBatcher, + batcher: ChunkBatcher, batcher_to_sink_handle: Option>, /// Keeps track of the top-level threads that were spawned in order to execute the `DataLoader` @@ -714,7 +715,7 @@ impl Drop for RecordingStreamInner { // NOTE: The command channel is private, if we're here, nothing is currently capable of // sending data down the pipeline. self.batcher.flush_blocking(); - self.cmds_tx.send(Command::PopPendingTables).ok(); + self.cmds_tx.send(Command::PopPendingChunks).ok(); self.cmds_tx.send(Command::Shutdown).ok(); if let Some(handle) = self.batcher_to_sink_handle.take() { handle.join().ok(); @@ -725,11 +726,11 @@ impl Drop for RecordingStreamInner { impl RecordingStreamInner { fn new( info: StoreInfo, - batcher_config: DataTableBatcherConfig, + batcher_config: ChunkBatcherConfig, sink: Box, ) -> RecordingStreamResult { let on_release = batcher_config.hooks.on_release.clone(); - let batcher = DataTableBatcher::new(batcher_config)?; + let batcher = ChunkBatcher::new(batcher_config)?; { re_log::debug!( @@ -755,7 +756,7 @@ impl RecordingStreamInner { .spawn({ let info = info.clone(); let batcher = batcher.clone(); - move || forwarding_thread(info, sink, cmds_rx, batcher.tables(), on_release) + move || forwarding_thread(info, sink, cmds_rx, batcher.chunks(), on_release) }) .map_err(|err| RecordingStreamError::SpawnThread { name: NAME.into(), @@ -796,7 +797,7 @@ enum Command { RecordMsg(LogMsg), SwapSink(Box), Flush(Sender<()>), - PopPendingTables, + PopPendingChunks, Shutdown, } @@ -821,7 +822,7 @@ impl RecordingStream { #[must_use = "Recording will get closed automatically once all instances of this object have been dropped"] pub fn new( info: StoreInfo, - batcher_config: DataTableBatcherConfig, + batcher_config: ChunkBatcherConfig, sink: Box, ) -> RecordingStreamResult { let sink = (info.store_id.kind == StoreKind::Recording) @@ -1018,7 +1019,7 @@ impl RecordingStream { fn log_component_batches_impl<'a>( &self, row_id: RowId, - ent_path: impl Into, + entity_path: impl Into, static_: bool, comp_batches: impl IntoIterator, ) -> RecordingStreamResult<()> { @@ -1026,39 +1027,29 @@ impl RecordingStream { return Ok(()); // silently drop the message } - let ent_path = ent_path.into(); + let entity_path = entity_path.into(); let comp_batches: Result, _> = comp_batches .into_iter() .map(|comp_batch| { comp_batch .to_arrow() - .map(|array| (comp_batch.arrow_field(), array)) + .map(|array| (comp_batch.name(), array)) }) .collect(); - let comp_batches = comp_batches?; - - let cells: Result, _> = comp_batches - .into_iter() - .map(|(field, array)| { - // NOTE: Unreachable, a top-level Field will always be a component, and thus an - // extension. - use re_log_types::external::arrow2::datatypes::DataType; - let DataType::Extension(fqname, _, _) = field.data_type else { - return Err(SerializationError::missing_extension_metadata(field.name).into()); - }; - DataCell::try_from_arrow(fqname.into(), array) - }) - .collect(); - let cells = cells?; + let components: BTreeMap<_, _> = comp_batches?.into_iter().collect(); // NOTE: The timepoint is irrelevant, the `RecordingStream` will overwrite it using its // internal clock. let timepoint = TimePoint::default(); - if !cells.is_empty() { - let row = DataRow::from_cells(row_id, timepoint.clone(), ent_path.clone(), cells)?; - self.record_row(row, !static_); + if !components.is_empty() { + let row = PendingRow { + row_id, + timepoint, + components, + }; + self.record_row(entity_path, row, !static_); } Ok(()) @@ -1197,7 +1188,7 @@ fn forwarding_thread( info: StoreInfo, mut sink: Box, cmds_rx: Receiver, - tables: Receiver, + chunks: Receiver, on_release: Option, ) { /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate @@ -1246,8 +1237,8 @@ fn forwarding_thread( sink.flush_blocking(); drop(oneshot); // signals the oneshot } - Command::PopPendingTables => { - // Wake up and skip the current iteration so that we can drain all pending tables + Command::PopPendingChunks => { + // Wake up and skip the current iteration so that we can drain all pending chunks // before handling the next command. } Command::Shutdown => return false, @@ -1258,40 +1249,60 @@ fn forwarding_thread( use crossbeam::select; loop { - // NOTE: Always pop tables first, this is what makes `Command::PopPendingTables` possible, + // NOTE: Always pop chunks first, this is what makes `Command::PopPendingChunks` possible, // which in turns makes `RecordingStream::flush_blocking` well defined. - while let Ok(table) = tables.try_recv() { - let mut arrow_msg = match table.to_arrow_msg() { - Ok(table) => table, + while let Ok(chunk) = chunks.try_recv() { + let timepoint_max = chunk.timepoint_max(); + let chunk = match chunk.to_transport() { + Ok(chunk) => chunk, Err(err) => { - re_log::error!(%err, - "couldn't serialize table; data dropped (this is a bug in Rerun!)"); + re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)"); continue; } }; - arrow_msg.on_release = on_release.clone(); - sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg)); + + sink.send(LogMsg::ArrowMsg( + info.store_id.clone(), + ArrowMsg { + table_id: TableId::new(), + timepoint_max, + schema: chunk.schema, + chunk: chunk.data, + on_release: on_release.clone(), + }, + )); } select! { - recv(tables) -> res => { - let Ok(table) = res else { + recv(chunks) -> res => { + let Ok(chunk) = res else { // The batcher is gone, which can only happen if the `RecordingStream` itself // has been dropped. re_log::trace!("Shutting down forwarding_thread: batcher is gone"); break; }; - let mut arrow_msg = match table.to_arrow_msg() { - Ok(table) => table, + + let timepoint_max = chunk.timepoint_max(); + let chunk = match chunk.to_transport() { + Ok(chunk) => chunk, Err(err) => { - re_log::error!(%err, - "couldn't serialize table; data dropped (this is a bug in Rerun!)"); + re_log::error!(%err, "couldn't serialize chunk; data dropped (this is a bug in Rerun!)"); continue; } }; - arrow_msg.on_release = on_release.clone(); - sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg)); + + sink.send(LogMsg::ArrowMsg( + info.store_id.clone(), + ArrowMsg { + table_id: TableId::new(), + timepoint_max, + schema: chunk.schema, + chunk: chunk.data, + on_release: on_release.clone(), + }, + )); } + recv(cmds_rx) -> res => { let Ok(cmd) = res else { // All command senders are gone, which can only happen if the @@ -1354,15 +1365,15 @@ impl RecordingStream { } } - /// Records a single [`DataRow`]. + /// Records a single [`PendingRow`]. /// /// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the /// [`RecordingStream`]'s internal clock. /// - /// Internally, incoming [`DataRow`]s are automatically coalesced into larger [`DataTable`]s to + /// Internally, incoming [`PendingRow`]s are automatically coalesced into larger [`Chunk`]s to /// optimize for transport. #[inline] - pub fn record_row(&self, mut row: DataRow, inject_time: bool) { + pub fn record_row(&self, entity_path: EntityPath, mut row: PendingRow, inject_time: bool) { let f = move |inner: &RecordingStreamInner| { // NOTE: We're incrementing the current tick still. let tick = inner @@ -1381,7 +1392,7 @@ impl RecordingStream { } } - inner.batcher.push_row(row); + inner.batcher.push_row(entity_path, row); }; if self.with(f).is_none() { @@ -1392,7 +1403,7 @@ impl RecordingStream { /// Swaps the underlying sink for a new one. /// /// This guarantees that: - /// 1. all pending rows and tables are batched, collected and sent down the current sink, + /// 1. all pending rows and chunks are batched, collected and sent down the current sink, /// 2. the current sink is flushed if it has pending data in its buffers, /// 3. the current sink's backlog, if there's any, is forwarded to the new sink. /// @@ -1413,11 +1424,11 @@ impl RecordingStream { // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends // are safe. - // 1. Flush the batcher down the table channel + // 1. Flush the batcher down the chunk channel inner.batcher.flush_blocking(); - // 2. Receive pending tables from the batcher's channel - inner.cmds_tx.send(Command::PopPendingTables).ok(); + // 2. Receive pending chunks from the batcher's channel + inner.cmds_tx.send(Command::PopPendingChunks).ok(); // 3. Swap the sink, which will internally make sure to re-ingest the backlog if needed inner.cmds_tx.send(Command::SwapSink(sink)).ok(); @@ -1450,15 +1461,15 @@ impl RecordingStream { // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends // are safe. - // 1. Synchronously flush the batcher down the table channel + // 1. Synchronously flush the batcher down the chunk channel // - // NOTE: This _has_ to be done synchronously as we need to be guaranteed that all tables + // NOTE: This _has_ to be done synchronously as we need to be guaranteed that all chunks // are ready to be drained by the time this call returns. // It cannot block indefinitely and is fairly fast as it only requires compute (no I/O). inner.batcher.flush_blocking(); - // 2. Drain all pending tables from the batcher's channel _before_ any other future command - inner.cmds_tx.send(Command::PopPendingTables).ok(); + // 2. Drain all pending chunks from the batcher's channel _before_ any other future command + inner.cmds_tx.send(Command::PopPendingChunks).ok(); // 3. Asynchronously flush everything down the sink let (cmd, _) = Command::flush(); @@ -1483,13 +1494,13 @@ impl RecordingStream { // NOTE: Internal channels can never be closed outside of the `Drop` impl, all these sends // are safe. - // 1. Flush the batcher down the table channel + // 1. Flush the batcher down the chunk channel inner.batcher.flush_blocking(); - // 2. Drain all pending tables from the batcher's channel _before_ any other future command - inner.cmds_tx.send(Command::PopPendingTables).ok(); + // 2. Drain all pending chunks from the batcher's channel _before_ any other future command + inner.cmds_tx.send(Command::PopPendingChunks).ok(); - // 3. Wait for all tables to have been forwarded down the sink + // 3. Wait for all chunks to have been forwarded down the sink let (cmd, oneshot) = Command::flush(); inner.cmds_tx.send(cmd).ok(); oneshot.recv().ok(); @@ -2063,7 +2074,8 @@ impl RecordingStream { #[cfg(test)] mod tests { - use re_log_types::{DataTable, RowId}; + use re_chunk::TransportChunk; + use re_log_types::RowId; use super::*; @@ -2077,16 +2089,15 @@ mod tests { fn never_flush() { let rec = RecordingStreamBuilder::new("rerun_example_never_flush") .enabled(true) - .batcher_config(DataTableBatcherConfig::NEVER) + .batcher_config(ChunkBatcherConfig::NEVER) .buffered() .unwrap(); let store_info = rec.store_info().unwrap(); - let mut table = DataTable::example(false); - table.compute_all_size_bytes(); - for row in table.to_rows() { - rec.record_row(row.unwrap(), false); + let rows = example_rows(false); + for row in rows.clone() { + rec.record_row("a".into(), row, false); } let storage = rec.memory(); @@ -2117,19 +2128,19 @@ mod tests { _ => panic!("expected SetStoreInfo"), } - // Third message is the batched table itself, which was sent as a result of the implicit + // Third message is the batched chunk itself, which was sent as a result of the implicit // flush when swapping the underlying sink from buffered to in-memory. match msgs.pop().unwrap() { LogMsg::ArrowMsg(rid, msg) => { assert_eq!(store_info.store_id, rid); - let mut got = DataTable::from_arrow_msg(&msg).unwrap(); - // TODO(#1760): we shouldn't have to (re)do this! - got.compute_all_size_bytes(); - // NOTE: Override the resulting table's ID so they can be compared. - got.table_id = table.table_id; + let chunk = Chunk::from_transport(&TransportChunk { + schema: msg.schema.clone(), + data: msg.chunk.clone(), + }) + .unwrap(); - similar_asserts::assert_eq!(table, got); + chunk.sanity_check().unwrap(); } _ => panic!("expected ArrowMsg"), } @@ -2140,20 +2151,17 @@ mod tests { #[test] fn always_flush() { - use itertools::Itertools as _; - let rec = RecordingStreamBuilder::new("rerun_example_always_flush") .enabled(true) - .batcher_config(DataTableBatcherConfig::ALWAYS) + .batcher_config(ChunkBatcherConfig::ALWAYS) .buffered() .unwrap(); let store_info = rec.store_info().unwrap(); - let mut table = DataTable::example(false); - table.compute_all_size_bytes(); - for row in table.to_rows() { - rec.record_row(row.unwrap(), false); + let rows = example_rows(false); + for row in rows.clone() { + rec.record_row("a".into(), row, false); } let storage = rec.memory(); @@ -2184,32 +2192,22 @@ mod tests { _ => panic!("expected SetStoreInfo"), } - let mut rows = { - let mut rows: Vec<_> = table.to_rows().try_collect().unwrap(); - rows.reverse(); - rows - }; - - let mut assert_next_row = || { - match msgs.pop().unwrap() { - LogMsg::ArrowMsg(rid, msg) => { - assert_eq!(store_info.store_id, rid); - - let mut got = DataTable::from_arrow_msg(&msg).unwrap(); - // TODO(#1760): we shouldn't have to (re)do this! - got.compute_all_size_bytes(); - // NOTE: Override the resulting table's ID so they can be compared. - got.table_id = table.table_id; + let mut assert_next_row = || match msgs.pop().unwrap() { + LogMsg::ArrowMsg(rid, msg) => { + assert_eq!(store_info.store_id, rid); - let expected = DataTable::from_rows(got.table_id, [rows.pop().unwrap()]); + let chunk = Chunk::from_transport(&TransportChunk { + schema: msg.schema.clone(), + data: msg.chunk.clone(), + }) + .unwrap(); - similar_asserts::assert_eq!(expected, got); - } - _ => panic!("expected ArrowMsg"), + chunk.sanity_check().unwrap(); } + _ => panic!("expected ArrowMsg"), }; - // 3rd, 4th and 5th messages are all the single-row batched tables themselves, which were + // 3rd, 4th and 5th messages are all the single-row batched chunks themselves, which were // sent as a result of the implicit flush when swapping the underlying sink from buffered // to in-memory. assert_next_row(); @@ -2224,16 +2222,15 @@ mod tests { fn flush_hierarchy() { let (rec, storage) = RecordingStreamBuilder::new("rerun_example_flush_hierarchy") .enabled(true) - .batcher_config(DataTableBatcherConfig::NEVER) + .batcher_config(ChunkBatcherConfig::NEVER) .memory() .unwrap(); let store_info = rec.store_info().unwrap(); - let mut table = DataTable::example(false); - table.compute_all_size_bytes(); - for row in table.to_rows() { - rec.record_row(row.unwrap(), false); + let rows = example_rows(false); + for row in rows.clone() { + rec.record_row("a".into(), row, false); } { @@ -2265,18 +2262,18 @@ mod tests { // MemorySinkStorage transparently handles flushing during `take()`! - // The batched table itself, which was sent as a result of the explicit flush above. + // The batched chunk itself, which was sent as a result of the explicit flush above. match msgs.pop().unwrap() { LogMsg::ArrowMsg(rid, msg) => { assert_eq!(store_info.store_id, rid); - let mut got = DataTable::from_arrow_msg(&msg).unwrap(); - // TODO(#1760): we shouldn't have to (re)do this! - got.compute_all_size_bytes(); - // NOTE: Override the resulting table's ID so they can be compared. - got.table_id = table.table_id; + let chunk = Chunk::from_transport(&TransportChunk { + schema: msg.schema.clone(), + data: msg.chunk.clone(), + }) + .unwrap(); - similar_asserts::assert_eq!(table, got); + chunk.sanity_check().unwrap(); } _ => panic!("expected ArrowMsg"), } @@ -2290,14 +2287,13 @@ mod tests { fn disabled() { let (rec, storage) = RecordingStreamBuilder::new("rerun_example_disabled") .enabled(false) - .batcher_config(DataTableBatcherConfig::ALWAYS) + .batcher_config(ChunkBatcherConfig::ALWAYS) .memory() .unwrap(); - let mut table = DataTable::example(false); - table.compute_all_size_bytes(); - for row in table.to_rows() { - rec.record_row(row.unwrap(), false); + let rows = example_rows(false); + for row in rows.clone() { + rec.record_row("a".into(), row, false); } let mut msgs = { @@ -2325,4 +2321,93 @@ mod tests { .join() .unwrap(); } + + fn example_rows(timeless: bool) -> Vec { + use re_log_types::example_components::{MyColor, MyLabel, MyPoint}; + use re_types_core::Loggable as _; + + let mut tick = 0i64; + let mut timepoint = |frame_nr: i64| { + let mut tp = TimePoint::default(); + if !timeless { + tp.insert(Timeline::log_time(), Time::now()); + tp.insert(Timeline::log_tick(), tick); + tp.insert(Timeline::new_sequence("frame_nr"), frame_nr); + } + tick += 1; + tp + }; + + let row0 = { + PendingRow { + row_id: RowId::new(), + timepoint: timepoint(1), + components: [ + ( + MyPoint::name(), + MyPoint::to_arrow([MyPoint::new(10.0, 10.0), MyPoint::new(20.0, 20.0)]) + .unwrap(), + ), // + ( + MyColor::name(), + MyColor::to_arrow([MyColor(0x8080_80FF)]).unwrap(), + ), // + ( + MyLabel::name(), + MyLabel::to_arrow([] as [MyLabel; 0]).unwrap(), + ), // + ] + .into_iter() + .collect(), + } + }; + + let row1 = { + PendingRow { + row_id: RowId::new(), + timepoint: timepoint(1), + components: [ + ( + MyPoint::name(), + MyPoint::to_arrow([] as [MyPoint; 0]).unwrap(), + ), // + ( + MyColor::name(), + MyColor::to_arrow([] as [MyColor; 0]).unwrap(), + ), // + ( + MyLabel::name(), + MyLabel::to_arrow([] as [MyLabel; 0]).unwrap(), + ), // + ] + .into_iter() + .collect(), + } + }; + + let row2 = { + PendingRow { + row_id: RowId::new(), + timepoint: timepoint(1), + components: [ + ( + MyPoint::name(), + MyPoint::to_arrow([] as [MyPoint; 0]).unwrap(), + ), // + ( + MyColor::name(), + MyColor::to_arrow([MyColor(0xFFFF_FFFF)]).unwrap(), + ), // + ( + MyLabel::name(), + MyLabel::to_arrow([MyLabel("hey".into())]).unwrap(), + ), // + ] + .into_iter() + .collect(), + } + }; + + vec![row0, row1, row2] + } } diff --git a/crates/rerun/tests/rerun_tests.rs b/crates/rerun/tests/rerun_tests.rs index 9f6793ed1630..e59446d44a01 100644 --- a/crates/rerun/tests/rerun_tests.rs +++ b/crates/rerun/tests/rerun_tests.rs @@ -5,11 +5,11 @@ /// See for instance . #[test] fn test_row_id_order() { - let mut batcher_config = rerun::log::DataTableBatcherConfig::NEVER; + let mut batcher_config = rerun::log::ChunkBatcherConfig::NEVER; batcher_config.hooks.on_insert = Some(std::sync::Arc::new(|rows| { if let [.., penultimate, ultimate] = rows { assert!( - penultimate.row_id() <= ultimate.row_id(), + penultimate.row_id <= ultimate.row_id, "Rows coming to batcher out-of-order" ); } diff --git a/crates/rerun_c/src/lib.rs b/crates/rerun_c/src/lib.rs index 9ea7f9dbd241..acd664b6e7bb 100644 --- a/crates/rerun_c/src/lib.rs +++ b/crates/rerun_c/src/lib.rs @@ -10,15 +10,17 @@ mod error; mod ptr; mod recording_streams; -use std::ffi::{c_char, c_uchar, CString}; +use std::{ + collections::BTreeMap, + ffi::{c_char, c_uchar, CString}, +}; use component_type_registry::COMPONENT_TYPES; use once_cell::sync::Lazy; use re_sdk::{ - external::re_log_types::{self}, - log::{DataCell, DataRow}, - ComponentName, EntityPath, RecordingStream, RecordingStreamBuilder, StoreKind, TimePoint, + log::PendingRow, ComponentName, EntityPath, RecordingStream, RecordingStreamBuilder, StoreKind, + TimePoint, }; use recording_streams::{recording_stream, RECORDING_STREAMS}; @@ -195,7 +197,6 @@ pub enum CErrorCode { _CategoryArrow = 0x0000_1000, ArrowFfiSchemaImportError, ArrowFfiArrayImportError, - ArrowDataCellError, Unknown = 0xFFFF_FFFF, } @@ -301,7 +302,7 @@ fn rr_recording_stream_new_impl( let mut rec_builder = RecordingStreamBuilder::new(application_id) //.is_official_example(is_official_example) // TODO(andreas): Is there a meaningful way to expose this? //.store_id(recording_id.clone()) // TODO(andreas): Expose store id. - .store_source(re_log_types::StoreSource::CSdk) + .store_source(re_sdk::external::re_log_types::StoreSource::CSdk) .default_enabled(default_enabled); if !(recording_id.is_null() || recording_id.is_empty()) { @@ -644,11 +645,9 @@ fn rr_log_impl( let num_data_cells = num_data_cells as usize; re_log::debug!("rerun_log {entity_path:?}, num_data_cells: {num_data_cells}"); - let mut cells = re_log_types::DataCellVec::default(); - cells.reserve(num_data_cells); - let data_cells = unsafe { std::slice::from_raw_parts_mut(data_cells, num_data_cells) }; + let mut components = BTreeMap::default(); { let component_type_registry = COMPONENT_TYPES.read(); @@ -685,31 +684,17 @@ fn rr_log_impl( ) })?; - cells.push( - DataCell::try_from_arrow(component_type.name, values).map_err(|err| { - CError::new( - CErrorCode::ArrowDataCellError, - &format!("Failed to create arrow datacell: {err}"), - ) - })?, - ); + components.insert(component_type.name, values); } } - let data_row = DataRow::from_cells( + let row = PendingRow { row_id, - TimePoint::default(), // we use the one in the recording stream for now - entity_path, - cells, - ) - .map_err(|err| { - CError::new( - CErrorCode::ArrowDataCellError, - &format!("Failed to create DataRow from CDataRow: {err}"), - ) - })?; + timepoint: TimePoint::default(), // we use the one in the recording stream for now + components, + }; - stream.record_row(data_row, inject_time); + stream.record_row(entity_path, row, inject_time); Ok(()) } diff --git a/crates/rerun_c/src/rerun.h b/crates/rerun_c/src/rerun.h index 40ea536043d4..94811154be7c 100644 --- a/crates/rerun_c/src/rerun.h +++ b/crates/rerun_c/src/rerun.h @@ -239,7 +239,6 @@ enum { _RR_ERROR_CODE_CATEGORY_ARROW = 0x000001000, RR_ERROR_CODE_ARROW_FFI_SCHEMA_IMPORT_ERROR, RR_ERROR_CODE_ARROW_FFI_ARRAY_IMPORT_ERROR, - RR_ERROR_CODE_ARROW_DATA_CELL_ERROR, // Generic errors. RR_ERROR_CODE_UNKNOWN, diff --git a/rerun_cpp/src/rerun/c/rerun.h b/rerun_cpp/src/rerun/c/rerun.h index 81c3835e598b..992799ff024c 100644 --- a/rerun_cpp/src/rerun/c/rerun.h +++ b/rerun_cpp/src/rerun/c/rerun.h @@ -239,7 +239,6 @@ enum { _RR_ERROR_CODE_CATEGORY_ARROW = 0x000001000, RR_ERROR_CODE_ARROW_FFI_SCHEMA_IMPORT_ERROR, RR_ERROR_CODE_ARROW_FFI_ARRAY_IMPORT_ERROR, - RR_ERROR_CODE_ARROW_DATA_CELL_ERROR, // Generic errors. RR_ERROR_CODE_UNKNOWN, @@ -267,7 +266,7 @@ typedef struct rr_error { /// /// This should match the string returned by `rr_version_string`. /// If not, the SDK's binary and the C header are out of sync. -#define RERUN_SDK_HEADER_VERSION "0.16.0" +#define RERUN_SDK_HEADER_VERSION "0.17.0-alpha.2" /// Returns a human-readable version string of the Rerun C SDK. /// diff --git a/rerun_cpp/src/rerun/error.hpp b/rerun_cpp/src/rerun/error.hpp index d49e4ced01d2..d669423d8142 100644 --- a/rerun_cpp/src/rerun/error.hpp +++ b/rerun_cpp/src/rerun/error.hpp @@ -53,7 +53,6 @@ namespace rerun { _CategoryArrow = 0x0000'1000, ArrowFfiSchemaImportError, ArrowFfiArrayImportError, - ArrowDataCellError, // Errors relating to file IO. _CategoryFileIO = 0x0001'0000, diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 696448d467ea..b3edcb8b710b 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -41,6 +41,7 @@ web_viewer = [ [dependencies] re_build_info.workspace = true +re_chunk.workspace = true re_log = { workspace = true, features = ["setup"] } re_log_types.workspace = true re_memory.workspace = true diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 23b0e549a1c7..88a805db9908 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -1,11 +1,12 @@ //! Methods for handling Arrow datamodel log ingest use arrow2::{array::Array, datatypes::Field, ffi}; -use itertools::Itertools as _; use pyo3::{ exceptions::PyValueError, ffi::Py_uintptr_t, types::PyDict, types::PyString, PyAny, PyResult, }; -use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimePoint}; + +use re_chunk::PendingRow; +use re_log_types::{RowId, TimePoint}; /// Perform conversion between a pyarrow array to arrow2 types. /// @@ -44,12 +45,11 @@ fn array_to_rust(arrow_array: &PyAny, name: Option<&str>) -> PyResult<(Box PyResult { +) -> PyResult { // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency. // TODO(emilk): move to before we arrow-serialize the data let row_id = RowId::new(); @@ -62,14 +62,15 @@ pub fn build_data_row_from_components( |iter| iter.unzip(), )?; - let cells = arrays + let components = arrays .into_iter() .zip(fields) - .map(|(value, field)| DataCell::from_arrow(field.name.into(), value)) - .collect_vec(); - - let row = DataRow::from_cells(row_id, time_point.clone(), entity_path.clone(), cells) - .map_err(|err| PyValueError::new_err(err.to_string()))?; + .map(|(value, field)| (field.name.into(), value)) + .collect(); - Ok(row) + Ok(PendingRow { + row_id, + timepoint: time_point.clone(), + components, + }) } diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index a08850e34a90..e0630e7f6f0e 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -208,7 +208,7 @@ fn new_recording( default_store_id(py, StoreKind::Recording, &application_id) }; - let mut batcher_config = re_log_types::DataTableBatcherConfig::from_env().unwrap_or_default(); + let mut batcher_config = re_chunk::ChunkBatcherConfig::from_env().unwrap_or_default(); let on_release = |chunk| { GARBAGE_QUEUE.0.send(chunk).ok(); }; @@ -261,7 +261,7 @@ fn new_blueprint( // blueprint id to avoid collisions. let blueprint_id = StoreId::random(StoreKind::Blueprint); - let mut batcher_config = re_log_types::DataTableBatcherConfig::from_env().unwrap_or_default(); + let mut batcher_config = re_chunk::ChunkBatcherConfig::from_env().unwrap_or_default(); let on_release = |chunk| { GARBAGE_QUEUE.0.send(chunk).ok(); }; @@ -1047,13 +1047,9 @@ fn log_arrow_msg( // It's important that we don't hold the session lock while building our arrow component. // the API we call to back through pyarrow temporarily releases the GIL, which can cause // a deadlock. - let row = crate::arrow::build_data_row_from_components( - &entity_path, - components, - &TimePoint::default(), - )?; + let row = crate::arrow::build_row_from_components(components, &TimePoint::default())?; - recording.record_row(row, !static_); + recording.record_row(entity_path, row, !static_); py.allow_threads(flush_garbage_queue);