Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): deserialize output columns only #17260

Merged
merged 4 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion src/storage/src/row_serde/value_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ impl ValueRowSerdeNew for ColumnAwareSerde {
}
}

let column_with_default = table_columns.iter().enumerate().filter_map(|(i, c)| {
let partial_columns = value_indices.iter().map(|idx| &table_columns[*idx]);
let column_with_default = partial_columns.enumerate().filter_map(|(i, c)| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
snapshot_value,
expr,
Expand Down Expand Up @@ -305,4 +306,59 @@ mod tests {
// drop all columns is now allowed
assert!(try_drop_invalid_columns(&row_bytes, &HashSet::new()).is_none());
}

#[test]
fn test_deserialize_partial_columns() {
let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
let row1 = OwnedRow::new(vec![
Some(Int16(5)),
Some(Utf8("abc".into())),
Some(Utf8("ABC".into())),
]);
let serializer = column_aware_row_encoding::Serializer::new(&column_ids);
let row_bytes = serializer.serialize(row1);

let deserializer = column_aware_row_encoding::Deserializer::new(
&[ColumnId::new(2), ColumnId::new(0)],
Arc::from(vec![DataType::Varchar, DataType::Int16].into_boxed_slice()),
std::iter::empty(),
);
let decoded = deserializer.deserialize(&row_bytes[..]);
assert_eq!(
decoded.unwrap(),
vec![Some(Utf8("ABC".into())), Some(Int16(5))]
);
}

#[test]
fn test_deserialize_partial_columns_with_default_columns() {
let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
let row1 = OwnedRow::new(vec![
Some(Int16(5)),
Some(Utf8("abc".into())),
Some(Utf8("ABC".into())),
]);
let serializer = column_aware_row_encoding::Serializer::new(&column_ids);
let row_bytes = serializer.serialize(row1);

// default column of ColumnId::new(3)
let default_columns = vec![(1, Some(Utf8("new column".into())))];

let deserializer = column_aware_row_encoding::Deserializer::new(
&[ColumnId::new(2), ColumnId::new(3), ColumnId::new(0)],
Arc::from(
vec![DataType::Varchar, DataType::Varchar, DataType::Int16].into_boxed_slice(),
),
default_columns.into_iter(),
);
let decoded = deserializer.deserialize(&row_bytes[..]);
assert_eq!(
decoded.unwrap(),
vec![
Some(Utf8("ABC".into())),
Some(Utf8("new column".into())),
Some(Int16(5))
]
);
}
}
84 changes: 52 additions & 32 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::ops::{Index, RangeBounds};
use std::ops::RangeBounds;
use std::sync::Arc;

use auto_enums::auto_enum;
Expand All @@ -29,6 +29,7 @@ use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::row_serde::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
Expand Down Expand Up @@ -81,7 +82,8 @@ pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {
/// Mapping from column id to column index for deserializing the row.
mapping: Arc<ColumnMapping>,

/// Row deserializer to deserialize the whole value in storage to a row.
/// Row deserializer to deserialize the value in storage to a row.
/// The row can be either complete or partial, depending on whether the row encoding is versioned.
row_serde: Arc<SD>,

/// Indices of primary key.
Expand Down Expand Up @@ -246,29 +248,45 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
}
}

let output_row_in_value_indices = value_output_indices
.iter()
.map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let output_row_in_key_indices = key_output_indices
.iter()
.map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let schema = Schema::new(output_columns.iter().map(Into::into).collect());

let mapping = ColumnMapping::new(output_row_in_value_indices);

let pk_data_types = pk_indices
.iter()
.map(|i| table_columns[*i].data_type.clone())
.collect();
let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);

let row_serde = {
let (row_serde, mapping) = {
if versioned {
ColumnAwareSerde::new(value_indices.into(), table_columns.into()).into()
let value_output_indices_dedup = value_output_indices
.iter()
.unique()
.copied()
.collect::<Vec<_>>();
let output_row_in_value_output_indices_dedup = value_output_indices
.iter()
.map(|&di| {
value_output_indices_dedup
.iter()
.position(|&pi| di == pi)
.unwrap()
})
.collect_vec();
let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
let serde =
ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
(serde.into(), mapping)
} else {
BasicSerde::new(value_indices.into(), table_columns.into()).into()
let output_row_in_value_indices = value_output_indices
.iter()
.map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let mapping = ColumnMapping::new(output_row_in_value_indices);
let serde = BasicSerde::new(value_indices.into(), table_columns.into());
(serde.into(), mapping)
}
};

Expand Down Expand Up @@ -366,11 +384,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? {
// Refer to [`StorageTableInnerIterInner::new`] for necessity of `validate_read_epoch`.
self.store.validate_read_epoch(wait_epoch)?;
let full_row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();

let row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row));

match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key =
Expand All @@ -385,20 +402,23 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
.unwrap();
result_row_vec.push(
result_row_in_value
.index(*item_position_in_value_indices)
.clone(),
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
} else {
let item_position_in_pk_indices =
key_output_indices.iter().position(|p| idx == p).unwrap();
result_row_vec
.push(result_row_in_key.index(item_position_in_pk_indices).clone());
result_row_vec.push(
result_row_in_key
.datum_at(item_position_in_pk_indices)
.to_owned_datum(),
);
}
}
let result_row = OwnedRow::new(result_row_vec);
Ok(Some(result_row))
}
None => Ok(Some(result_row_in_value)),
None => Ok(Some(result_row_in_value.into_owned_row())),
}
} else {
Ok(None)
Expand Down Expand Up @@ -777,11 +797,8 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
.await?
{
let (table_key, value) = (k.user_key.table_key, v);
let full_row = self.row_deserializer.deserialize(value)?;
let result_row_in_value = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
let row = self.row_deserializer.deserialize(value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row));
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key = match self.pk_serializer.clone() {
Expand All @@ -803,14 +820,17 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
.unwrap();
result_row_vec.push(
result_row_in_value
.index(*item_position_in_value_indices)
.clone(),
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
} else {
let item_position_in_pk_indices =
key_output_indices.iter().position(|p| idx == p).unwrap();
result_row_vec
.push(result_row_in_key.index(item_position_in_pk_indices).clone());
result_row_vec.push(
result_row_in_key
.datum_at(item_position_in_pk_indices)
.to_owned_datum(),
);
}
}
let row = OwnedRow::new(result_row_vec);
Expand All @@ -824,7 +844,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
None => {
yield KeyedRow {
vnode_prefixed_key: table_key.copy_into(),
row: result_row_in_value,
row: result_row_in_value.into_owned_row(),
}
}
}
Expand Down
Loading