Skip to content

Commit

Permalink
refactor(storage): deserialize output columns only (#17260)
Browse files Browse the repository at this point in the history
(cherry picked from commit ade66c7)
  • Loading branch information
zwang28 committed Aug 6, 2024
1 parent 099679e commit 362e258
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 33 deletions.
58 changes: 57 additions & 1 deletion src/storage/src/row_serde/value_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ impl ValueRowSerdeNew for ColumnAwareSerde {
}
}

let column_with_default = table_columns.iter().enumerate().filter_map(|(i, c)| {
let partial_columns = value_indices.iter().map(|idx| &table_columns[*idx]);
let column_with_default = partial_columns.enumerate().filter_map(|(i, c)| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
snapshot_value,
expr,
Expand Down Expand Up @@ -306,4 +307,59 @@ mod tests {
// drop all columns is now allowed
assert!(try_drop_invalid_columns(&row_bytes, &HashSet::new()).is_none());
}

#[test]
fn test_deserialize_partial_columns() {
let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
let row1 = OwnedRow::new(vec![
Some(Int16(5)),
Some(Utf8("abc".into())),
Some(Utf8("ABC".into())),
]);
let serializer = column_aware_row_encoding::Serializer::new(&column_ids);
let row_bytes = serializer.serialize(row1);

let deserializer = column_aware_row_encoding::Deserializer::new(
&[ColumnId::new(2), ColumnId::new(0)],
Arc::from(vec![DataType::Varchar, DataType::Int16].into_boxed_slice()),
std::iter::empty(),
);
let decoded = deserializer.deserialize(&row_bytes[..]);
assert_eq!(
decoded.unwrap(),
vec![Some(Utf8("ABC".into())), Some(Int16(5))]
);
}

#[test]
fn test_deserialize_partial_columns_with_default_columns() {
let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
let row1 = OwnedRow::new(vec![
Some(Int16(5)),
Some(Utf8("abc".into())),
Some(Utf8("ABC".into())),
]);
let serializer = column_aware_row_encoding::Serializer::new(&column_ids);
let row_bytes = serializer.serialize(row1);

// default column of ColumnId::new(3)
let default_columns = vec![(1, Some(Utf8("new column".into())))];

let deserializer = column_aware_row_encoding::Deserializer::new(
&[ColumnId::new(2), ColumnId::new(3), ColumnId::new(0)],
Arc::from(
vec![DataType::Varchar, DataType::Varchar, DataType::Int16].into_boxed_slice(),
),
default_columns.into_iter(),
);
let decoded = deserializer.deserialize(&row_bytes[..]);
assert_eq!(
decoded.unwrap(),
vec![
Some(Utf8("ABC".into())),
Some(Utf8("new column".into())),
Some(Int16(5))
]
);
}
}
84 changes: 52 additions & 32 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::default::Default;
use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::ops::{Index, RangeBounds};
use std::ops::RangeBounds;
use std::sync::Arc;

use auto_enums::auto_enum;
Expand All @@ -30,6 +30,7 @@ use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::row_serde::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
Expand Down Expand Up @@ -82,7 +83,8 @@ pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {
/// Mapping from column id to column index for deserializing the row.
mapping: Arc<ColumnMapping>,

/// Row deserializer to deserialize the whole value in storage to a row.
/// Row deserializer to deserialize the value in storage to a row.
/// The row can be either complete or partial, depending on whether the row encoding is versioned.
row_serde: Arc<SD>,

/// Indices of primary key.
Expand Down Expand Up @@ -247,29 +249,45 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
}
}

let output_row_in_value_indices = value_output_indices
.iter()
.map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let output_row_in_key_indices = key_output_indices
.iter()
.map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let schema = Schema::new(output_columns.iter().map(Into::into).collect());

let mapping = ColumnMapping::new(output_row_in_value_indices);

let pk_data_types = pk_indices
.iter()
.map(|i| table_columns[*i].data_type.clone())
.collect();
let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);

let row_serde = {
let (row_serde, mapping) = {
if versioned {
ColumnAwareSerde::new(value_indices.into(), table_columns.into()).into()
let value_output_indices_dedup = value_output_indices
.iter()
.unique()
.copied()
.collect::<Vec<_>>();
let output_row_in_value_output_indices_dedup = value_output_indices
.iter()
.map(|&di| {
value_output_indices_dedup
.iter()
.position(|&pi| di == pi)
.unwrap()
})
.collect_vec();
let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
let serde =
ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
(serde.into(), mapping)
} else {
BasicSerde::new(value_indices.into(), table_columns.into()).into()
let output_row_in_value_indices = value_output_indices
.iter()
.map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let mapping = ColumnMapping::new(output_row_in_value_indices);
let serde = BasicSerde::new(value_indices.into(), table_columns.into());
(serde.into(), mapping)
}
};

Expand Down Expand Up @@ -367,11 +385,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? {
// Refer to [`StorageTableInnerIterInner::new`] for necessity of `validate_read_epoch`.
self.store.validate_read_epoch(wait_epoch)?;
let full_row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();

let row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row));

match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key =
Expand All @@ -386,20 +403,23 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
.unwrap();
result_row_vec.push(
result_row_in_value
.index(*item_position_in_value_indices)
.clone(),
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
} else {
let item_position_in_pk_indices =
key_output_indices.iter().position(|p| idx == p).unwrap();
result_row_vec
.push(result_row_in_key.index(item_position_in_pk_indices).clone());
result_row_vec.push(
result_row_in_key
.datum_at(item_position_in_pk_indices)
.to_owned_datum(),
);
}
}
let result_row = OwnedRow::new(result_row_vec);
Ok(Some(result_row))
}
None => Ok(Some(result_row_in_value)),
None => Ok(Some(result_row_in_value.into_owned_row())),
}
} else {
Ok(None)
Expand Down Expand Up @@ -778,11 +798,8 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
.await?
{
let (table_key, value) = (k.user_key.table_key, v);
let full_row = self.row_deserializer.deserialize(value)?;
let result_row_in_value = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
let row = self.row_deserializer.deserialize(value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row));
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key = match self.pk_serializer.clone() {
Expand All @@ -804,14 +821,17 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
.unwrap();
result_row_vec.push(
result_row_in_value
.index(*item_position_in_value_indices)
.clone(),
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
} else {
let item_position_in_pk_indices =
key_output_indices.iter().position(|p| idx == p).unwrap();
result_row_vec
.push(result_row_in_key.index(item_position_in_pk_indices).clone());
result_row_vec.push(
result_row_in_key
.datum_at(item_position_in_pk_indices)
.to_owned_datum(),
);
}
}
let row = OwnedRow::new(result_row_vec);
Expand All @@ -825,7 +845,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
None => {
yield KeyedRow {
vnode_prefixed_key: table_key.copy_into(),
row: result_row_in_value,
row: result_row_in_value.into_owned_row(),
}
}
}
Expand Down

0 comments on commit 362e258

Please sign in to comment.