Skip to content

Commit

Permalink
implement interior mutability and sort-on-reads for timeless tables
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Nov 30, 2023
1 parent 28b7478 commit b4b3cf9
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 69 deletions.
2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub use self::store_write::{WriteError, WriteResult};

pub(crate) use self::store::{
ClusterCellCache, DataTypeRegistry, IndexedBucket, IndexedBucketInner, IndexedTable,
MetadataRegistry, PersistentIndexedTable,
MetadataRegistry, PersistentIndexedTable, PersistentIndexedTableInner,
};

// Re-exports
Expand Down
51 changes: 45 additions & 6 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ impl Default for IndexedBucketInner {
/// ```
//
// TODO(#1807): timeless should be row-id ordered too then
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PersistentIndexedTable {
/// The entity this table is related to, for debugging purposes.
pub ent_path: EntityPath,
Expand All @@ -572,6 +572,32 @@ pub struct PersistentIndexedTable {
/// place.
pub cluster_key: ComponentName,

// To simplify interior mutability.
pub inner: RwLock<PersistentIndexedTableInner>,
}

impl Clone for PersistentIndexedTable {
fn clone(&self) -> Self {
Self {
ent_path: self.ent_path.clone(),
cluster_key: self.cluster_key,
inner: RwLock::new(self.inner.read().clone()),
}
}
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
Self {
cluster_key,
ent_path,
inner: RwLock::new(PersistentIndexedTableInner::default()),
}
}
}

#[derive(Debug, Clone)]
pub struct PersistentIndexedTableInner {
/// The entire column of insertion IDs, if enabled in [`DataStoreConfig`].
///
/// Keeps track of insertion order from the point-of-view of the [`DataStore`].
Expand All @@ -592,21 +618,34 @@ pub struct PersistentIndexedTable {
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// Are the rows in this table sorted?
///
/// Querying a [`PersistentIndexedTable`] will always trigger a sort if the rows within
/// aren't already sorted.
pub is_sorted: bool,
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
impl Default for PersistentIndexedTableInner {
fn default() -> Self {
Self {
cluster_key,
ent_path,
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
is_sorted: true,
}
}
}

impl PersistentIndexedTableInner {
#[inline]
pub fn is_empty(&self) -> bool {
self.col_num_instances.is_empty()
self.num_rows() == 0
}

#[inline]
pub fn num_rows(&self) -> u64 {
self.col_row_id.len() as u64
}
}
11 changes: 9 additions & 2 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use nohash_hasher::IntMap;
use re_log_types::{DataCellColumn, DataTable, DataTableResult, NumInstances, RowId, Timeline};
use re_types_core::ComponentName;

use crate::store::{IndexedBucket, IndexedBucketInner, PersistentIndexedTable};
use crate::store::{
IndexedBucket, IndexedBucketInner, PersistentIndexedTable, PersistentIndexedTableInner,
};

// ---

Expand Down Expand Up @@ -66,11 +68,16 @@ impl PersistentIndexedTable {
let Self {
ent_path: _,
cluster_key,
inner,
} = self;

let PersistentIndexedTableInner {
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
is_sorted: _,
} = &*inner.read();

serialize(
cluster_key,
Expand Down
13 changes: 10 additions & 3 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use re_log_types::{
};

use crate::{
store::{IndexedBucketInner, PersistentIndexedTable},
store::{IndexedBucketInner, PersistentIndexedTable, PersistentIndexedTableInner},
DataStore, IndexedBucket,
};

Expand Down Expand Up @@ -71,18 +71,25 @@ impl DataStore {
let PersistentIndexedTable {
ent_path,
cluster_key: _,
inner,
} = table;

let inner = &*inner.read();
let PersistentIndexedTableInner {
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
} = table;
is_sorted,
} = inner;
debug_assert!(is_sorted);

DataTable {
table_id: TableId::random(),
col_row_id: col_row_id.clone(),
col_timelines: Default::default(),
col_entity_path: std::iter::repeat_with(|| ent_path.clone())
.take(table.num_rows() as _)
.take(inner.num_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone().into_iter().collect(), // shallow
Expand Down
13 changes: 2 additions & 11 deletions crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,12 @@ impl std::fmt::Display for IndexedBucket {
impl std::fmt::Display for PersistentIndexedTable {
#[allow(clippy::string_add)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
ent_path,
cluster_key: _,
col_insert_id: _,
col_row_id: _,
col_num_instances: _,
columns: _,
} = self;

f.write_fmt(format_args!("entity: {ent_path}\n"))?;
f.write_fmt(format_args!("entity: {}\n", self.ent_path))?;

f.write_fmt(format_args!(
"size: {} across {} rows\n",
format_bytes(self.total_size_bytes() as _),
format_number(self.num_rows() as _),
format_number(self.inner.read().num_rows() as _),
))?;

let (schema, columns) = self.serialize().map_err(|err| {
Expand Down
20 changes: 16 additions & 4 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline};
use re_types_core::{ComponentName, SizeBytes as _};

use crate::{
store::{ClusterCellCache, IndexedBucketInner, IndexedTable, PersistentIndexedTable},
store::{
ClusterCellCache, IndexedBucketInner, IndexedTable, PersistentIndexedTable,
PersistentIndexedTableInner,
},
DataStore, DataStoreStats, StoreDiff, StoreDiffKind, StoreEvent,
};

Expand Down Expand Up @@ -365,10 +368,12 @@ impl DataStore {
// Find all protected rows in timeless tables
// TODO(#1807): this is still based on insertion order.
for table in self.timeless_tables.values() {
let cluster_key = table.cluster_key;
let table = table.inner.read();
let mut components_to_find: HashMap<ComponentName, usize> = table
.columns
.keys()
.filter(|c| **c != table.cluster_key)
.filter(|c| **c != cluster_key)
.filter(|c| !dont_protect.contains(*c))
.map(|c| (*c, target_count))
.collect();
Expand Down Expand Up @@ -409,6 +414,9 @@ impl DataStore {

// Drop any empty timeless tables
self.timeless_tables.retain(|_, table| {
let entity_path = &table.ent_path;
let mut table = table.inner.write();

// If any column is non-empty, we need to keep this table…
for num in &table.col_num_instances {
if num.get() != 0 {
Expand All @@ -418,7 +426,7 @@ impl DataStore {

// …otherwise we can drop it.

let entity_path = table.ent_path.clone();
let entity_path = entity_path.clone();

for i in 0..table.col_row_id.len() {
let row_id = table.col_row_id[i];
Expand Down Expand Up @@ -683,11 +691,15 @@ impl PersistentIndexedTable {
let PersistentIndexedTable {
ent_path,
cluster_key: _,
inner,
} = self;
let PersistentIndexedTableInner {
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
is_sorted: _,
} = &mut *inner.write();

let mut diff: Option<StoreDiff> = None;

Expand Down
12 changes: 9 additions & 3 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use re_types_core::ComponentName;

use crate::{
store::InsertIdVec, ArrayExt, DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner,
PersistentIndexedTable,
PersistentIndexedTable, PersistentIndexedTableInner,
};

// TODO(#1692): all of this stuff should be defined by Data{Cell,Row,Table}, not the store.
Expand Down Expand Up @@ -173,13 +173,19 @@ impl PersistentIndexedTable {
let Self {
ent_path: _,
cluster_key: _,
inner,
} = self;

let inner = &*inner.read();
let PersistentIndexedTableInner {
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
is_sorted,
} = inner;

let num_rows = self.num_rows() as usize;
let num_rows = inner.num_rows() as usize;

let insert_ids = config
.store_insert_ids
Expand Down
Loading

0 comments on commit b4b3cf9

Please sign in to comment.