Skip to content

Commit

Permalink
Introduce JoinEncoding instead of Dictionary column
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Sep 18, 2024
1 parent 3faa9ac commit abbe65b
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 138 deletions.
120 changes: 91 additions & 29 deletions crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,47 @@ use crate::RowId;

// --- Descriptors ---

/// When selecting secondary component columns, specify how the joined data should be encoded.
///
/// Because range-queries often involve repeating the same joined-in data multiple times,
/// the strategy we choose for joining can have a significant impact on the size and memory
/// overhead of the `RecordBatch`.
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
pub enum JoinEncoding {
/// Slice the `RecordBatch` to minimal overlapping sub-ranges.
///
/// This is the default, and should always be used for the POV component which defines
/// the optimal size for `RecordBatch`.
///
/// This minimizes the need for allocation, but at the cost of `RecordBatch`es that are
/// almost always smaller than the optimal size. In the common worst-case, this will result
/// in single-row `RecordBatch`es.
#[default]
OverlappingSlice,

/// Dictionary-encode the joined column.
///
/// Using dictionary-encoding allows any repeated data to be shared between rows,
/// but comes with the cost of an extra dictionary-lookup indirection.
///
/// Note that this changes the physical type of the returned column.
///
/// Using this encoding for complex types is incompatible with some arrow libraries.
DictionaryEncode,
//
// TODO(jleibs):
// RepeatCopy,
//
// Repeat the joined column by physically copying the data.
//
// This will always allocate a new column in the `RecordBatch`, matching the size of the
// POV component.
//
// This is the most expensive option, but can make working with the data more efficient,
// especially when the copied column is small.
//
}

// TODO(#6889): At some point all these descriptors needs to be interned and have handles or
// something. And of course they need to be codegen. But we'll get there once we're back to
// natively tagged components.
Expand All @@ -37,15 +78,14 @@ pub enum ColumnDescriptor {
Control(ControlColumnDescriptor),
Time(TimeColumnDescriptor),
Component(ComponentColumnDescriptor),
DictionaryEncoded(ComponentColumnDescriptor),
}

impl ColumnDescriptor {
#[inline]
pub fn entity_path(&self) -> Option<&EntityPath> {
match self {
Self::Control(_) | Self::Time(_) => None,
Self::Component(descr) | Self::DictionaryEncoded(descr) => Some(&descr.entity_path),
Self::Component(descr) => Some(&descr.entity_path),
}
}

Expand All @@ -54,12 +94,7 @@ impl ColumnDescriptor {
match self {
Self::Control(descr) => descr.datatype.clone(),
Self::Time(descr) => descr.datatype.clone(),
Self::Component(descr) => descr.datatype.clone(),
Self::DictionaryEncoded(descr) => ArrowDatatype::Dictionary(
arrow2::datatypes::IntegerType::Int32,
std::sync::Arc::new(descr.datatype.clone()),
true,
),
Self::Component(descr) => descr.returned_datatype(),
}
}

Expand All @@ -69,11 +104,6 @@ impl ColumnDescriptor {
Self::Control(descr) => descr.to_arrow_field(),
Self::Time(descr) => descr.to_arrow_field(),
Self::Component(descr) => descr.to_arrow_field(),
Self::DictionaryEncoded(descr) => {
let mut field = descr.to_arrow_field();
field.data_type = self.datatype();
field
}
}
}

Expand All @@ -82,9 +112,7 @@ impl ColumnDescriptor {
match self {
Self::Control(descr) => descr.component_name.short_name().to_owned(),
Self::Time(descr) => descr.timeline.name().to_string(),
Self::Component(descr) | Self::DictionaryEncoded(descr) => {
descr.component_name.short_name().to_owned()
}
Self::Component(descr) => descr.component_name.short_name().to_owned(),
}
}
}
Expand Down Expand Up @@ -206,12 +234,15 @@ pub struct ComponentColumnDescriptor {
/// Example: `rerun.components.Position3D`.
pub component_name: ComponentName,

/// The Arrow datatype of the column.
/// The Arrow datatype of the stored column.
///
/// This is the log-time datatype corresponding to how this data is encoded
/// in a chunk. Currently this will always be an [`ArrowListArray`], but as
/// we introduce mono-type optimization, this might be a native type instead.
pub datatype: ArrowDatatype,
pub store_datatype: ArrowDatatype,

/// How the data will be joined into the resulting `RecordBatch`.
pub join_encoding: JoinEncoding,

/// Whether this column represents static data.
pub is_static: bool,
Expand All @@ -232,7 +263,8 @@ impl Ord for ComponentColumnDescriptor {
archetype_name,
archetype_field_name,
component_name,
datatype: _,
join_encoding: _,
store_datatype: _,
is_static: _,
} = self;

Expand All @@ -251,7 +283,8 @@ impl std::fmt::Display for ComponentColumnDescriptor {
archetype_name,
archetype_field_name,
component_name,
datatype: _,
join_encoding: _,
store_datatype: _,
is_static,
} = self;

Expand Down Expand Up @@ -284,16 +317,20 @@ impl std::fmt::Display for ComponentColumnDescriptor {
impl ComponentColumnDescriptor {
#[inline]
pub fn new<C: re_types_core::Component>(entity_path: EntityPath) -> Self {
let join_encoding = JoinEncoding::default();

// NOTE: The data is always a at least a list, whether it's latest-at or range.
// It might be wrapped further in e.g. a dict, but at the very least
// it's a list.
let store_datatype = ArrowListArray::<i32>::default_datatype(C::arrow_datatype());

Self {
entity_path,
archetype_name: None,
archetype_field_name: None,
component_name: C::name(),
// NOTE: The data is always a at least a list, whether it's latest-at or range.
// It might be wrapped further in e.g. a dict, but at the very least
// it's a list.
// TODO(#7365): user-specified datatypes have got to go.
datatype: ArrowListArray::<i32>::default_datatype(C::arrow_datatype()),
join_encoding,
store_datatype,
is_static: false,
}
}
Expand All @@ -304,7 +341,8 @@ impl ComponentColumnDescriptor {
archetype_name,
archetype_field_name,
component_name,
datatype: _,
join_encoding: _,
store_datatype: _,
is_static,
} = self;

Expand All @@ -330,16 +368,34 @@ impl ComponentColumnDescriptor {
.collect()
}

#[inline]
pub fn returned_datatype(&self) -> ArrowDatatype {
match self.join_encoding {
JoinEncoding::OverlappingSlice => self.store_datatype.clone(),
JoinEncoding::DictionaryEncode => ArrowDatatype::Dictionary(
arrow2::datatypes::IntegerType::Int32,
std::sync::Arc::new(self.store_datatype.clone()),
true,
),
}
}

#[inline]
pub fn to_arrow_field(&self) -> ArrowField {
ArrowField::new(
self.component_name.short_name().to_owned(),
self.datatype.clone(),
self.returned_datatype(),
true, /* nullable */
)
// TODO(#6889): This needs some proper sorbetization -- I just threw these names randomly.
.with_metadata(self.metadata())
}

#[inline]
pub fn with_join_encoding(mut self, join_encoding: JoinEncoding) -> Self {
self.join_encoding = join_encoding;
self
}
}

// --- Queries ---
Expand Down Expand Up @@ -510,7 +566,10 @@ impl ChunkStore {
archetype_name: None,
archetype_field_name: None,
component_name: *component_name,
datatype: ArrowListArray::<i32>::default_datatype(datatype.clone()),
store_datatype: ArrowListArray::<i32>::default_datatype(
datatype.clone(),
),
join_encoding: JoinEncoding::default(),
is_static: true,
})
})
Expand Down Expand Up @@ -539,7 +598,10 @@ impl ChunkStore {
// NOTE: The data is always a at least a list, whether it's latest-at or range.
// It might be wrapped further in e.g. a dict, but at the very least
// it's a list.
datatype: ArrowListArray::<i32>::default_datatype(datatype.clone()),
store_datatype: ArrowListArray::<i32>::default_datatype(
datatype.clone(),
),
join_encoding: JoinEncoding::default(),
// NOTE: This will make it so shadowed temporal data automatically gets
// discarded from the schema.
is_static: self
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ mod subscribers;
mod writes;

pub use self::dataframe::{
ColumnDescriptor, ComponentColumnDescriptor, ControlColumnDescriptor, LatestAtQueryExpression,
QueryExpression, RangeQueryExpression, TimeColumnDescriptor,
ColumnDescriptor, ComponentColumnDescriptor, ControlColumnDescriptor, JoinEncoding,
LatestAtQueryExpression, QueryExpression, RangeQueryExpression, TimeColumnDescriptor,
};
pub use self::events::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent};
pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget};
Expand Down
28 changes: 2 additions & 26 deletions crates/store/re_dataframe/src/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ impl LatestAtQueryHandle<'_> {
columns
.iter()
.filter_map(|col| match col {
ColumnDescriptor::Component(descr)
| ColumnDescriptor::DictionaryEncoded(descr) => {
ColumnDescriptor::Component(descr) => {
let results = self.engine.cache.latest_at(
self.engine.store,
&query,
Expand Down Expand Up @@ -203,30 +202,7 @@ impl LatestAtQueryHandle<'_> {
.map_or_else(
|| {
arrow2::array::new_null_array(
descr.datatype.clone(),
null_array_length,
)
},
|list_array| list_array.to_boxed(),
),
),

ColumnDescriptor::DictionaryEncoded(descr) => Some(
all_units
.get(col)
.and_then(|chunk| {
let indexed = chunk.index(&self.query.timeline).and_then(|index| {
chunk
.components()
.get(&descr.component_name)
.map(|array| (index, array as &dyn ArrowArray))
});
re_chunk::util::arrays_to_dictionary(&descr.datatype, &[indexed])
})
.map_or_else(
|| {
arrow2::array::new_null_array(
descr.datatype.clone(),
descr.returned_datatype(),
null_array_length,
)
},
Expand Down
Loading

0 comments on commit abbe65b

Please sign in to comment.