Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip filling NULL for put and delete requests #3364

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 33 additions & 38 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct KeyValues {
/// must not be `None`.
mutation: Mutation,
/// Key value read helper.
helper: ReadRowHelper,
helper: SparseReadRowHelper,
}

impl KeyValues {
Expand All @@ -37,7 +37,7 @@ impl KeyValues {
/// Returns `None` if `rows` of the `mutation` is `None`.
pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
let rows = mutation.rows.as_ref()?;
let helper = ReadRowHelper::new(metadata, rows);
let helper = SparseReadRowHelper::new(metadata, rows);

Some(KeyValues { mutation, helper })
}
Expand Down Expand Up @@ -75,7 +75,7 @@ impl KeyValues {
pub struct KeyValue<'a> {
row: &'a Row,
schema: &'a Vec<ColumnSchema>,
helper: &'a ReadRowHelper,
helper: &'a SparseReadRowHelper,
sequence: SequenceNumber,
op_type: OpType,
}
Expand All @@ -85,30 +85,32 @@ impl<'a> KeyValue<'a> {
pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
self.helper.indices[..self.helper.num_primary_key_column]
.iter()
.map(|idx| {
api::helper::pb_value_to_value_ref(
&self.row.values[*idx],
&self.schema[*idx].datatype_extension,
)
.map(|idx| match idx {
Some(i) => api::helper::pb_value_to_value_ref(
&self.row.values[*i],
&self.schema[*i].datatype_extension,
),
None => ValueRef::Null,
})
}

/// Get field columns.
pub fn fields(&self) -> impl Iterator<Item = ValueRef> {
self.helper.indices[self.helper.num_primary_key_column + 1..]
.iter()
.map(|idx| {
api::helper::pb_value_to_value_ref(
&self.row.values[*idx],
&self.schema[*idx].datatype_extension,
)
.map(|idx| match idx {
Some(i) => api::helper::pb_value_to_value_ref(
&self.row.values[*i],
&self.schema[*i].datatype_extension,
),
None => ValueRef::Null,
})
}

/// Get timestamp.
pub fn timestamp(&self) -> ValueRef {
// Timestamp is primitive, we clone it.
let index = self.helper.indices[self.helper.num_primary_key_column];
let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
api::helper::pb_value_to_value_ref(
&self.row.values[index],
&self.schema[index].datatype_extension,
Expand Down Expand Up @@ -136,33 +138,24 @@ impl<'a> KeyValue<'a> {
}
}

/// Helper to read rows in key, value order.
/// Helper to read rows in key, value order for sparse data.
#[derive(Debug)]
struct ReadRowHelper {
struct SparseReadRowHelper {
/// Key and value column indices.
///
/// `indices[..num_primary_key_column]` are primary key columns, `indices[num_primary_key_column]`
/// is the timestamp column and remainings are field columns.
indices: Vec<usize>,
indices: Vec<Option<usize>>,
/// Number of primary key columns.
num_primary_key_column: usize,
}

impl ReadRowHelper {
/// Creates a [ReadRowHelper] for specific `rows`.
impl SparseReadRowHelper {
/// Creates a [SparseReadRowHelper] for specific `rows`.
///
/// # Panics
/// The `rows` must fill their missing columns first and have same columns with `metadata`.
/// Otherwise this method will panic.
fn new(metadata: &RegionMetadata, rows: &Rows) -> ReadRowHelper {
assert_eq!(
metadata.column_metadatas.len(),
rows.schema.len(),
"Length mismatch, column_metas: {:?}, rows_schema: {:?}",
metadata.column_metadatas,
rows.schema
);

/// Time index column must exist.
fn new(metadata: &RegionMetadata, rows: &Rows) -> SparseReadRowHelper {
// Build a name to index mapping for rows.
let name_to_index: HashMap<_, _> = rows
.schema
Expand All @@ -173,25 +166,27 @@ impl ReadRowHelper {
let mut indices = Vec::with_capacity(metadata.column_metadatas.len());

// Get primary key indices.
for pk_id in &metadata.primary_key {
for pk_column_id in &metadata.primary_key {
// Safety: Id comes from primary key.
let column = metadata.column_by_id(*pk_id).unwrap();
let index = name_to_index.get(&column.column_schema.name).unwrap();
indices.push(*index);
let column = metadata.column_by_id(*pk_column_id).unwrap();
let index = name_to_index.get(&column.column_schema.name);
indices.push(index.copied());
}
// Get timestamp index.
// Safety: time index must exist
let ts_index = name_to_index
.get(&metadata.time_index_column().column_schema.name)
.unwrap();
indices.push(*ts_index);
indices.push(Some(*ts_index));

// Iterate columns and find field columns.
for column in metadata.field_columns() {
// Get index in request for each field column.
let index = name_to_index.get(&column.column_schema.name).unwrap();
indices.push(*index);
let index = name_to_index.get(&column.column_schema.name);
indices.push(index.copied());
}

ReadRowHelper {
SparseReadRowHelper {
indices,
num_primary_key_column: metadata.primary_key.len(),
}
Expand Down
35 changes: 12 additions & 23 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ impl WriteRequest {
// Need to add a default value for this column.
let proto_value = self.column_default_value(column)?;

if proto_value.value_data.is_none() {
return Ok(());
}

// Insert default value to each row.
for row in &mut self.rows.rows {
row.values.push(proto_value.clone());
Expand Down Expand Up @@ -989,16 +993,13 @@ mod tests {
request.fill_missing_columns(&metadata).unwrap();

let expect_rows = Rows {
schema: vec![
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
],
schema: vec![new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
)],
rows: vec![Row {
values: vec![ts_ms_value(1), Value { value_data: None }],
values: vec![ts_ms_value(1)],
}],
};
assert_eq!(expect_rows, request.rows);
Expand Down Expand Up @@ -1104,17 +1105,11 @@ mod tests {
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field),
new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
],
// Column f1 is not nullable and we use 0 for padding.
rows: vec![Row {
values: vec![
i64_value(100),
ts_ms_value(1),
Value { value_data: None },
i64_value(0),
],
values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
}],
};
assert_eq!(expect_rows, request.rows);
Expand Down Expand Up @@ -1173,17 +1168,11 @@ mod tests {
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("f0", ColumnDataType::Int64, SemanticType::Field),
new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
],
// Column f1 is not nullable and we use 0 for padding.
rows: vec![Row {
values: vec![
i64_value(100),
ts_ms_value(1),
Value { value_data: None },
i64_value(0),
],
values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
}],
};
assert_eq!(expect_rows, request.rows);
Expand Down