Skip to content

Commit

Permalink
refactor(storage): deserialize output columns only
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jun 14, 2024
1 parent 5c24685 commit 28472ba
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 13 deletions.
73 changes: 72 additions & 1 deletion src/storage/src/row_serde/value_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ impl ValueRowSerdeNew for ColumnAwareSerde {
}
}

let column_with_default = table_columns.iter().enumerate().filter_map(|(i, c)| {
let partial_columns = value_indices.iter().map(|idx| &table_columns[*idx]);
let column_with_default = partial_columns.enumerate().filter_map(|(i, c)| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
snapshot_value,
expr,
Expand Down Expand Up @@ -140,6 +141,21 @@ impl ValueRowSerde for ColumnAwareSerde {
}
}

#[derive(Clone)]
pub struct PartialRowDeserializer(Deserializer);

impl PartialRowDeserializer {
pub fn new(inner: Deserializer) -> Self {
Self(inner)
}
}

impl ValueRowDeserializer for PartialRowDeserializer {
fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
self.0.deserialize(encoded_bytes)
}
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down Expand Up @@ -305,4 +321,59 @@ mod tests {
// drop all columns is now allowed
assert!(try_drop_invalid_columns(&row_bytes, &HashSet::new()).is_none());
}

#[test]
fn test_deserialize_partial_columns() {
let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
let row1 = OwnedRow::new(vec![
Some(Int16(5)),
Some(Utf8("abc".into())),
Some(Utf8("ABC".into())),
]);
let serializer = column_aware_row_encoding::Serializer::new(&column_ids);
let row_bytes = serializer.serialize(row1);

let deserializer = column_aware_row_encoding::Deserializer::new(
&[ColumnId::new(2), ColumnId::new(0)],
Arc::from(vec![DataType::Varchar, DataType::Int16].into_boxed_slice()),
std::iter::empty(),
);
let decoded = deserializer.deserialize(&row_bytes[..]);
assert_eq!(
decoded.unwrap(),
vec![Some(Utf8("ABC".into())), Some(Int16(5))]
);
}

#[test]
fn test_deserialize_partial_columns_with_default_columns() {
let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)];
let row1 = OwnedRow::new(vec![
Some(Int16(5)),
Some(Utf8("abc".into())),
Some(Utf8("ABC".into())),
]);
let serializer = column_aware_row_encoding::Serializer::new(&column_ids);
let row_bytes = serializer.serialize(row1);

// default column of ColumnId::new(3)
let default_columns = vec![(1, Some(Utf8("new column".into())))];

let deserializer = column_aware_row_encoding::Deserializer::new(
&[ColumnId::new(2), ColumnId::new(3), ColumnId::new(0)],
Arc::from(
vec![DataType::Varchar, DataType::Varchar, DataType::Int16].into_boxed_slice(),
),
default_columns.into_iter(),
);
let decoded = deserializer.deserialize(&row_bytes[..]);
assert_eq!(
decoded.unwrap(),
vec![
Some(Utf8("ABC".into())),
Some(Utf8("new column".into())),
Some(Int16(5))
]
);
}
}
50 changes: 38 additions & 12 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::util::row_serde::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_hummock_sdk::key::{
end_bound_of_prefix, next_key, prefixed_range_with_vnode, TableKeyRange,
};
Expand All @@ -43,7 +43,7 @@ use tracing::trace;
use crate::error::{StorageError, StorageResult};
use crate::hummock::CachePolicy;
use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
use crate::row_serde::value_serde::{PartialRowDeserializer, ValueRowSerde, ValueRowSerdeNew};
use crate::row_serde::{find_columns_by_ids, ColumnMapping};
use crate::store::{ChangeLogValue, PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter};
use crate::table::merge_sort::merge_sort;
Expand Down Expand Up @@ -83,6 +83,7 @@ pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {

/// Row deserializer to deserialize the whole value in storage to a row.
row_serde: Arc<SD>,
partial_row_deserializer: Option<Arc<PartialRowDeserializer>>,

/// Indices of primary key.
/// Note that the index is based on the all columns of the table, instead of the output ones.
Expand Down Expand Up @@ -264,6 +265,17 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
.collect();
let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);

let partial_row_deserializer = if versioned {
let partial_row_serde = ColumnAwareSerde::new(
value_output_indices.clone().into(),
table_columns.clone().into(),
);
Some(Arc::new(PartialRowDeserializer::new(
partial_row_serde.deserializer.clone(),
)))
} else {
None
};
let row_serde = {
if versioned {
ColumnAwareSerde::new(value_indices.into(), table_columns.into()).into()
Expand Down Expand Up @@ -291,6 +303,7 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
distribution,
table_option,
read_prefix_len_hint,
partial_row_deserializer,
}
}
}
Expand Down Expand Up @@ -366,11 +379,16 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? {
// Refer to [`StorageTableInnerIterInner::new`] for necessity of `validate_read_epoch`.
self.store.validate_read_epoch(wait_epoch)?;
let full_row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();

let result_row_in_value =
if let Some(ref partial_row_deserializer) = self.partial_row_deserializer {
OwnedRow::new(partial_row_deserializer.deserialize(&value)?)
} else {
let full_row = self.row_serde.deserialize(&value)?;
self.mapping
.project(OwnedRow::new(full_row))
.into_owned_row()
};
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key =
Expand Down Expand Up @@ -491,6 +509,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
table_key_range,
read_options,
wait_epoch,
self.partial_row_deserializer.clone(),
)
.await?
.into_stream();
Expand Down Expand Up @@ -714,6 +733,7 @@ struct StorageTableInnerIterInner<S: StateStore, SD: ValueRowSerde> {
mapping: Arc<ColumnMapping>,

row_deserializer: Arc<SD>,
partial_row_deserializer: Option<Arc<PartialRowDeserializer>>,

/// Used for serializing and deserializing the primary key.
pk_serializer: Option<Arc<OrderedRowSerde>>,
Expand Down Expand Up @@ -745,6 +765,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
table_key_range: TableKeyRange,
read_options: ReadOptions,
epoch: HummockReadEpoch,
partial_row_deserializer: Option<Arc<PartialRowDeserializer>>,
) -> StorageResult<Self> {
let raw_epoch = epoch.get_epoch();
store.try_wait_epoch(epoch).await?;
Expand All @@ -758,6 +779,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
iter,
mapping,
row_deserializer,
partial_row_deserializer,
pk_serializer,
output_indices,
key_output_indices,
Expand All @@ -777,11 +799,15 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
.await?
{
let (table_key, value) = (k.user_key.table_key, v);
let full_row = self.row_deserializer.deserialize(value)?;
let result_row_in_value = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
let result_row_in_value =
if let Some(ref partial_row_deserializer) = self.partial_row_deserializer {
OwnedRow::new(partial_row_deserializer.deserialize(value)?)
} else {
let full_row = self.row_deserializer.deserialize(value)?;
self.mapping
.project(OwnedRow::new(full_row))
.into_owned_row()
};
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key = match self.pk_serializer.clone() {
Expand Down

0 comments on commit 28472ba

Please sign in to comment.