Skip to content

Commit

Permalink
feat: optimize for sparse data
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Feb 22, 2024
1 parent 8289b0d commit c5f982a
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 37 deletions.
96 changes: 82 additions & 14 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct KeyValues {
/// must not be `None`.
mutation: Mutation,
/// Key value read helper.
helper: ReadRowHelper,
helper: SparseReadRowHelper,
}

impl KeyValues {
Expand All @@ -37,7 +37,18 @@ impl KeyValues {
/// Returns `None` if `rows` of the `mutation` is `None`.
pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
let rows = mutation.rows.as_ref()?;
let helper = ReadRowHelper::new(metadata, rows);
let helper = SparseReadRowHelper::new(metadata, rows);

Some(KeyValues { mutation, helper })
}

/// Creates [KeyValues] from specific `mutation`. This method is used for [Mutation] that
/// has not been filled with all columns.
///
/// Returns `None` if `rows` of the `mutation` is `None`.
pub fn new_sparse(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
let rows = mutation.rows.as_ref()?;
let helper = SparseReadRowHelper::new(metadata, rows);

Some(KeyValues { mutation, helper })
}
Expand Down Expand Up @@ -75,7 +86,7 @@ impl KeyValues {
pub struct KeyValue<'a> {
row: &'a Row,
schema: &'a Vec<ColumnSchema>,
helper: &'a ReadRowHelper,
helper: &'a SparseReadRowHelper,
sequence: SequenceNumber,
op_type: OpType,
}
Expand All @@ -85,30 +96,32 @@ impl<'a> KeyValue<'a> {
pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
self.helper.indices[..self.helper.num_primary_key_column]
.iter()
.map(|idx| {
api::helper::pb_value_to_value_ref(
&self.row.values[*idx],
&self.schema[*idx].datatype_extension,
)
.map(|idx| match idx {
Some(i) => api::helper::pb_value_to_value_ref(
&self.row.values[*i],
&self.schema[*i].datatype_extension,
),
None => ValueRef::Null,
})
}

/// Get field columns.
pub fn fields(&self) -> impl Iterator<Item = ValueRef> {
self.helper.indices[self.helper.num_primary_key_column + 1..]
.iter()
.map(|idx| {
api::helper::pb_value_to_value_ref(
&self.row.values[*idx],
&self.schema[*idx].datatype_extension,
)
.map(|idx| match idx {
Some(i) => api::helper::pb_value_to_value_ref(
&self.row.values[*i],
&self.schema[*i].datatype_extension,
),
None => ValueRef::Null,
})
}

/// Get timestamp.
pub fn timestamp(&self) -> ValueRef {
// Timestamp is primitive, we clone it.
let index = self.helper.indices[self.helper.num_primary_key_column];
let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
api::helper::pb_value_to_value_ref(
&self.row.values[index],
&self.schema[index].datatype_extension,
Expand Down Expand Up @@ -198,6 +211,61 @@ impl ReadRowHelper {
}
}

/// Helper to read rows in key, value order for sparse data.
#[derive(Debug)]
struct SparseReadRowHelper {
/// Key and value column indices.
///
/// `indices[..num_primary_key_column]` are primary key columns, `indices[num_primary_key_column]`
/// is the timestamp column and remainings are field columns.
indices: Vec<Option<usize>>,
/// Number of primary key columns.
num_primary_key_column: usize,
}

impl SparseReadRowHelper {
/// Creates a [SparseReadRowHelper] for specific `rows`.
///
/// # Panics
/// Time index column must exist.
fn new(metadata: &RegionMetadata, rows: &Rows) -> SparseReadRowHelper {
// Build a name to index mapping for rows.
let name_to_index: HashMap<_, _> = rows
.schema
.iter()
.enumerate()
.map(|(index, col)| (&col.column_name, index))
.collect();
let mut indices = Vec::with_capacity(metadata.column_metadatas.len());

// Get primary key indices.
for pk_column_id in &metadata.primary_key {
// Safety: Id comes from primary key.
let column = metadata.column_by_id(*pk_column_id).unwrap();
let index = name_to_index.get(&column.column_schema.name);
indices.push(index.copied());
}
// Get timestamp index.
// Safety: time index must exist
let ts_index = name_to_index
.get(&metadata.time_index_column().column_schema.name)
.unwrap();
indices.push(Some(*ts_index));

// Iterate columns and find field columns.
for column in metadata.field_columns() {
// Get index in request for each field column.
let index = name_to_index.get(&column.column_schema.name);
indices.push(index.copied());
}

SparseReadRowHelper {
indices,
num_primary_key_column: metadata.primary_key.len(),
}
}
}

#[cfg(test)]
mod tests {
use api::v1::{self, ColumnDataType, SemanticType};
Expand Down
35 changes: 12 additions & 23 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ impl WriteRequest {
// Need to add a default value for this column.
let proto_value = self.column_default_value(column)?;

if proto_value.value_data.is_none() {
return Ok(());
}

// Insert default value to each row.
for row in &mut self.rows.rows {
row.values.push(proto_value.clone());
Expand Down Expand Up @@ -989,16 +993,13 @@ mod tests {
request.fill_missing_columns(&metadata).unwrap();

let expect_rows = Rows {
schema: vec![
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
],
schema: vec![new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
)],
rows: vec![Row {
values: vec![ts_ms_value(1), Value { value_data: None }],
values: vec![ts_ms_value(1)],
}],
};
assert_eq!(expect_rows, request.rows);
Expand Down Expand Up @@ -1104,17 +1105,11 @@ mod tests {
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field),
new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
],
// Column f1 is not nullable and we use 0 for padding.
rows: vec![Row {
values: vec![
i64_value(100),
ts_ms_value(1),
Value { value_data: None },
i64_value(0),
],
values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
}],
};
assert_eq!(expect_rows, request.rows);
Expand Down Expand Up @@ -1173,17 +1168,11 @@ mod tests {
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field),
new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
],
// Column f1 is not nullable and we use 0 for padding.
rows: vec![Row {
values: vec![
i64_value(100),
ts_ms_value(1),
Value { value_data: None },
i64_value(0),
],
values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
}],
};
assert_eq!(expect_rows, request.rows);
Expand Down

0 comments on commit c5f982a

Please sign in to comment.