Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): deserialize output columns only #17260

Merged
merged 4 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion src/storage/src/row_serde/value_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ impl ValueRowSerdeNew for ColumnAwareSerde {
}
}

let column_with_default = table_columns.iter().enumerate().filter_map(|(i, c)| {
let partial_columns = value_indices.iter().map(|idx| &table_columns[*idx]);
let column_with_default = partial_columns.enumerate().filter_map(|(i, c)| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
snapshot_value,
expr,
Expand Down Expand Up @@ -305,4 +306,59 @@ mod tests {
// drop all columns is now allowed
assert!(try_drop_invalid_columns(&row_bytes, &HashSet::new()).is_none());
}

#[test]
fn test_deserialize_partial_columns() {
let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
let row1 = OwnedRow::new(vec![
Some(Int16(5)),
Some(Utf8("abc".into())),
Some(Utf8("ABC".into())),
]);
let serializer = column_aware_row_encoding::Serializer::new(&column_ids);
let row_bytes = serializer.serialize(row1);

let deserializer = column_aware_row_encoding::Deserializer::new(
&[ColumnId::new(2), ColumnId::new(0)],
Arc::from(vec![DataType::Varchar, DataType::Int16].into_boxed_slice()),
std::iter::empty(),
);
let decoded = deserializer.deserialize(&row_bytes[..]);
assert_eq!(
decoded.unwrap(),
vec![Some(Utf8("ABC".into())), Some(Int16(5))]
);
}

#[test]
fn test_deserialize_partial_columns_with_default_columns() {
let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
let row1 = OwnedRow::new(vec![
Some(Int16(5)),
Some(Utf8("abc".into())),
Some(Utf8("ABC".into())),
]);
let serializer = column_aware_row_encoding::Serializer::new(&column_ids);
let row_bytes = serializer.serialize(row1);

// default column of ColumnId::new(3)
let default_columns = vec![(1, Some(Utf8("new column".into())))];

let deserializer = column_aware_row_encoding::Deserializer::new(
&[ColumnId::new(2), ColumnId::new(3), ColumnId::new(0)],
Arc::from(
vec![DataType::Varchar, DataType::Varchar, DataType::Int16].into_boxed_slice(),
),
default_columns.into_iter(),
);
let decoded = deserializer.deserialize(&row_bytes[..]);
assert_eq!(
decoded.unwrap(),
vec![
Some(Utf8("ABC".into())),
Some(Utf8("new column".into())),
Some(Int16(5))
]
);
}
}
80 changes: 69 additions & 11 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::util::row_serde::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
use risingwave_common::util::value_encoding::{
column_aware_row_encoding, BasicSerde, EitherSerde, ValueRowDeserializer,
};
use risingwave_hummock_sdk::key::{
end_bound_of_prefix, next_key, prefixed_range_with_vnode, TableKeyRange,
};
Expand Down Expand Up @@ -83,6 +85,13 @@ pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {

/// Row deserializer to deserialize the whole value in storage to a row.
row_serde: Arc<SD>,
/// The `Deserializer` deserializes the columns specified in `value_output_indices`.
/// The `Deserializer` ensures there are no duplicate columns because `ColumnAwareSerde` disallows them.
/// The `ColumnMapping` restores all columns specified in `value_output_indices`.
partial_row_deserializer: Option<(
Arc<column_aware_row_encoding::Deserializer>,
Arc<ColumnMapping>,
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
)>,

/// Indices of primary key.
/// Note that the index is based on the all columns of the table, instead of the output ones.
Expand Down Expand Up @@ -264,6 +273,32 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
.collect();
let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);

let partial_row_deserializer = if versioned {
let value_output_indices_dedup = value_output_indices
.iter()
.sorted()
.dedup()
.copied()
.collect::<Vec<_>>();
let output_row_in_value_output_indices_dedup = value_output_indices
.iter()
.map(|&di| {
value_output_indices_dedup
.iter()
.position(|&pi| di == pi)
.unwrap()
})
.collect_vec();
let partial_row_serde = ColumnAwareSerde::new(
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
value_output_indices_dedup.into(),
table_columns.clone().into(),
);
let de = Arc::new(partial_row_serde.deserializer);
let mapping = Arc::new(ColumnMapping::new(output_row_in_value_output_indices_dedup));
Some((de, mapping))
} else {
None
};
let row_serde = {
if versioned {
ColumnAwareSerde::new(value_indices.into(), table_columns.into()).into()
Expand Down Expand Up @@ -291,6 +326,7 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
distribution,
table_option,
read_prefix_len_hint,
partial_row_deserializer,
}
}
}
Expand Down Expand Up @@ -366,11 +402,17 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? {
// Refer to [`StorageTableInnerIterInner::new`] for necessity of `validate_read_epoch`.
self.store.validate_read_epoch(wait_epoch)?;
let full_row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();

let result_row_in_value =
if let Some((ref de, ref mapping)) = self.partial_row_deserializer {
let partial_row = de.deserialize(&value)?;
mapping.project(OwnedRow::new(partial_row)).into_owned_row()
} else {
let full_row = self.row_serde.deserialize(&value)?;
self.mapping
.project(OwnedRow::new(full_row))
.into_owned_row()
};
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key =
Expand Down Expand Up @@ -491,6 +533,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
table_key_range,
read_options,
wait_epoch,
self.partial_row_deserializer.clone(),
)
.await?
.into_stream();
Expand Down Expand Up @@ -714,6 +757,11 @@ struct StorageTableInnerIterInner<S: StateStore, SD: ValueRowSerde> {
mapping: Arc<ColumnMapping>,

row_deserializer: Arc<SD>,
/// Refer to [`StorageTableInner::partial_row_deserializer`].
partial_row_deserializer: Option<(
Arc<column_aware_row_encoding::Deserializer>,
Arc<ColumnMapping>,
)>,

/// Used for serializing and deserializing the primary key.
pk_serializer: Option<Arc<OrderedRowSerde>>,
Expand Down Expand Up @@ -745,6 +793,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
table_key_range: TableKeyRange,
read_options: ReadOptions,
epoch: HummockReadEpoch,
partial_row_deserializer: Option<(
Arc<column_aware_row_encoding::Deserializer>,
Arc<ColumnMapping>,
)>,
) -> StorageResult<Self> {
let raw_epoch = epoch.get_epoch();
store.try_wait_epoch(epoch).await?;
Expand All @@ -758,6 +810,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
iter,
mapping,
row_deserializer,
partial_row_deserializer,
pk_serializer,
output_indices,
key_output_indices,
Expand All @@ -777,11 +830,16 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
.await?
{
let (table_key, value) = (k.user_key.table_key, v);
let full_row = self.row_deserializer.deserialize(value)?;
let result_row_in_value = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
let result_row_in_value =
if let Some((ref de, ref mapping)) = self.partial_row_deserializer {
let partial_row = de.deserialize(value)?;
mapping.project(OwnedRow::new(partial_row)).into_owned_row()
} else {
let full_row = self.row_deserializer.deserialize(value)?;
self.mapping
.project(OwnedRow::new(full_row))
.into_owned_row()
};
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key = match self.pk_serializer.clone() {
Expand Down
Loading