From c5f982aaae00233a82e09ae9c46649783b723157 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 22 Feb 2024 19:51:59 +0800 Subject: [PATCH 1/2] feat: optimize for sparse data Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/key_values.rs | 96 ++++++++++++++++++++++++---- src/mito2/src/request.rs | 35 ++++------ 2 files changed, 94 insertions(+), 37 deletions(-) diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 10854d23a8ae..1cc1ef9c3bef 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -28,7 +28,7 @@ pub struct KeyValues { /// must not be `None`. mutation: Mutation, /// Key value read helper. - helper: ReadRowHelper, + helper: SparseReadRowHelper, } impl KeyValues { @@ -37,7 +37,18 @@ impl KeyValues { /// Returns `None` if `rows` of the `mutation` is `None`. pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option { let rows = mutation.rows.as_ref()?; - let helper = ReadRowHelper::new(metadata, rows); + let helper = SparseReadRowHelper::new(metadata, rows); + + Some(KeyValues { mutation, helper }) + } + + /// Creates [KeyValues] from specific `mutation`. This method is used for [Mutation] that + /// has not been filled with all columns. + /// + /// Returns `None` if `rows` of the `mutation` is `None`. + pub fn new_sparse(metadata: &RegionMetadata, mutation: Mutation) -> Option { + let rows = mutation.rows.as_ref()?; + let helper = SparseReadRowHelper::new(metadata, rows); Some(KeyValues { mutation, helper }) } @@ -75,7 +86,7 @@ impl KeyValues { pub struct KeyValue<'a> { row: &'a Row, schema: &'a Vec, - helper: &'a ReadRowHelper, + helper: &'a SparseReadRowHelper, sequence: SequenceNumber, op_type: OpType, } @@ -85,11 +96,12 @@ impl<'a> KeyValue<'a> { pub fn primary_keys(&self) -> impl Iterator { self.helper.indices[..self.helper.num_primary_key_column] .iter() - .map(|idx| { - api::helper::pb_value_to_value_ref( - &self.row.values[*idx], - &self.schema[*idx].datatype_extension, - ) + .map(|idx| match idx { + Some(i) => api::helper::pb_value_to_value_ref( + &self.row.values[*i], + &self.schema[*i].datatype_extension, + ), + None => ValueRef::Null, }) } @@ -97,18 +109,19 @@ impl<'a> KeyValue<'a> { pub fn fields(&self) -> impl Iterator { self.helper.indices[self.helper.num_primary_key_column + 1..] .iter() - .map(|idx| { - api::helper::pb_value_to_value_ref( - &self.row.values[*idx], - &self.schema[*idx].datatype_extension, - ) + .map(|idx| match idx { + Some(i) => api::helper::pb_value_to_value_ref( + &self.row.values[*i], + &self.schema[*i].datatype_extension, + ), + None => ValueRef::Null, }) } /// Get timestamp. pub fn timestamp(&self) -> ValueRef { // Timestamp is primitive, we clone it. - let index = self.helper.indices[self.helper.num_primary_key_column]; + let index = self.helper.indices[self.helper.num_primary_key_column].unwrap(); api::helper::pb_value_to_value_ref( &self.row.values[index], &self.schema[index].datatype_extension, @@ -198,6 +211,61 @@ impl ReadRowHelper { } } +/// Helper to read rows in key, value order for sparse data. +#[derive(Debug)] +struct SparseReadRowHelper { + /// Key and value column indices. + /// + /// `indices[..num_primary_key_column]` are primary key columns, `indices[num_primary_key_column]` + /// is the timestamp column and remainings are field columns. + indices: Vec>, + /// Number of primary key columns. + num_primary_key_column: usize, +} + +impl SparseReadRowHelper { + /// Creates a [SparseReadRowHelper] for specific `rows`. + /// + /// # Panics + /// Time index column must exist. + fn new(metadata: &RegionMetadata, rows: &Rows) -> SparseReadRowHelper { + // Build a name to index mapping for rows. + let name_to_index: HashMap<_, _> = rows + .schema + .iter() + .enumerate() + .map(|(index, col)| (&col.column_name, index)) + .collect(); + let mut indices = Vec::with_capacity(metadata.column_metadatas.len()); + + // Get primary key indices. + for pk_column_id in &metadata.primary_key { + // Safety: Id comes from primary key. + let column = metadata.column_by_id(*pk_column_id).unwrap(); + let index = name_to_index.get(&column.column_schema.name); + indices.push(index.copied()); + } + // Get timestamp index. + // Safety: time index must exist + let ts_index = name_to_index + .get(&metadata.time_index_column().column_schema.name) + .unwrap(); + indices.push(Some(*ts_index)); + + // Iterate columns and find field columns. + for column in metadata.field_columns() { + // Get index in request for each field column. + let index = name_to_index.get(&column.column_schema.name); + indices.push(index.copied()); + } + + SparseReadRowHelper { + indices, + num_primary_key_column: metadata.primary_key.len(), + } + } +} + #[cfg(test)] mod tests { use api::v1::{self, ColumnDataType, SemanticType}; diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 2d440f6451d0..f475db5ad861 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -247,6 +247,10 @@ impl WriteRequest { // Need to add a default value for this column. let proto_value = self.column_default_value(column)?; + if proto_value.value_data.is_none() { + return Ok(()); + } + // Insert default value to each row. for row in &mut self.rows.rows { row.values.push(proto_value.clone()); @@ -989,16 +993,13 @@ mod tests { request.fill_missing_columns(&metadata).unwrap(); let expect_rows = Rows { - schema: vec![ - new_column_schema( - "ts", - ColumnDataType::TimestampMillisecond, - SemanticType::Timestamp, - ), - new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), - ], + schema: vec![new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + )], rows: vec![Row { - values: vec![ts_ms_value(1), Value { value_data: None }], + values: vec![ts_ms_value(1)], }], }; assert_eq!(expect_rows, request.rows); @@ -1104,17 +1105,11 @@ mod tests { ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, ), - new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field), new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field), ], // Column f1 is not nullable and we use 0 for padding. rows: vec![Row { - values: vec![ - i64_value(100), - ts_ms_value(1), - Value { value_data: None }, - i64_value(0), - ], + values: vec![i64_value(100), ts_ms_value(1), i64_value(0)], }], }; assert_eq!(expect_rows, request.rows); @@ -1173,17 +1168,11 @@ mod tests { ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, ), - new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field), new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field), ], // Column f1 is not nullable and we use 0 for padding. rows: vec![Row { - values: vec![ - i64_value(100), - ts_ms_value(1), - Value { value_data: None }, - i64_value(0), - ], + values: vec![i64_value(100), ts_ms_value(1), i64_value(0)], }], }; assert_eq!(expect_rows, request.rows); From 591f0a44d92b5d00aeb85f6c614fa666c1032070 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 22 Feb 2024 19:53:37 +0800 Subject: [PATCH 2/2] remove old structures Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/key_values.rs | 73 ---------------------------- 1 file changed, 73 deletions(-) diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 1cc1ef9c3bef..29e7b0beca12 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -42,17 +42,6 @@ impl KeyValues { Some(KeyValues { mutation, helper }) } - /// Creates [KeyValues] from specific `mutation`. This method is used for [Mutation] that - /// has not been filled with all columns. - /// - /// Returns `None` if `rows` of the `mutation` is `None`. - pub fn new_sparse(metadata: &RegionMetadata, mutation: Mutation) -> Option { - let rows = mutation.rows.as_ref()?; - let helper = SparseReadRowHelper::new(metadata, rows); - - Some(KeyValues { mutation, helper }) - } - /// Returns a key value iterator. pub fn iter(&self) -> impl Iterator { let rows = self.mutation.rows.as_ref().unwrap(); @@ -149,68 +138,6 @@ impl<'a> KeyValue<'a> { } } -/// Helper to read rows in key, value order. -#[derive(Debug)] -struct ReadRowHelper { - /// Key and value column indices. - /// - /// `indices[..num_primary_key_column]` are primary key columns, `indices[num_primary_key_column]` - /// is the timestamp column and remainings are field columns. - indices: Vec, - /// Number of primary key columns. - num_primary_key_column: usize, -} - -impl ReadRowHelper { - /// Creates a [ReadRowHelper] for specific `rows`. - /// - /// # Panics - /// The `rows` must fill their missing columns first and have same columns with `metadata`. - /// Otherwise this method will panic. - fn new(metadata: &RegionMetadata, rows: &Rows) -> ReadRowHelper { - assert_eq!( - metadata.column_metadatas.len(), - rows.schema.len(), - "Length mismatch, column_metas: {:?}, rows_schema: {:?}", - metadata.column_metadatas, - rows.schema - ); - - // Build a name to index mapping for rows. - let name_to_index: HashMap<_, _> = rows - .schema - .iter() - .enumerate() - .map(|(index, col)| (&col.column_name, index)) - .collect(); - let mut indices = Vec::with_capacity(metadata.column_metadatas.len()); - - // Get primary key indices. - for pk_id in &metadata.primary_key { - // Safety: Id comes from primary key. - let column = metadata.column_by_id(*pk_id).unwrap(); - let index = name_to_index.get(&column.column_schema.name).unwrap(); - indices.push(*index); - } - // Get timestamp index. - let ts_index = name_to_index - .get(&metadata.time_index_column().column_schema.name) - .unwrap(); - indices.push(*ts_index); - // Iterate columns and find field columns. - for column in metadata.field_columns() { - // Get index in request for each field column. - let index = name_to_index.get(&column.column_schema.name).unwrap(); - indices.push(*index); - } - - ReadRowHelper { - indices, - num_primary_key_column: metadata.primary_key.len(), - } - } -} - /// Helper to read rows in key, value order for sparse data. #[derive(Debug)] struct SparseReadRowHelper {