diff --git a/src/storage/src/row_serde/value_serde.rs b/src/storage/src/row_serde/value_serde.rs index c4d4ef8b808f..3a9156ca16d1 100644 --- a/src/storage/src/row_serde/value_serde.rs +++ b/src/storage/src/row_serde/value_serde.rs @@ -97,7 +97,8 @@ impl ValueRowSerdeNew for ColumnAwareSerde { } } - let column_with_default = table_columns.iter().enumerate().filter_map(|(i, c)| { + let partial_columns = value_indices.iter().map(|idx| &table_columns[*idx]); + let column_with_default = partial_columns.enumerate().filter_map(|(i, c)| { if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { snapshot_value, expr, @@ -305,4 +306,59 @@ mod tests { // drop all columns is now allowed assert!(try_drop_invalid_columns(&row_bytes, &HashSet::new()).is_none()); } + + #[test] + fn test_deserialize_partial_columns() { + let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)]; + let row1 = OwnedRow::new(vec![ + Some(Int16(5)), + Some(Utf8("abc".into())), + Some(Utf8("ABC".into())), + ]); + let serializer = column_aware_row_encoding::Serializer::new(&column_ids); + let row_bytes = serializer.serialize(row1); + + let deserializer = column_aware_row_encoding::Deserializer::new( + &[ColumnId::new(2), ColumnId::new(0)], + Arc::from(vec![DataType::Varchar, DataType::Int16].into_boxed_slice()), + std::iter::empty(), + ); + let decoded = deserializer.deserialize(&row_bytes[..]); + assert_eq!( + decoded.unwrap(), + vec![Some(Utf8("ABC".into())), Some(Int16(5))] + ); + } + + #[test] + fn test_deserialize_partial_columns_with_default_columns() { + let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)]; + let row1 = OwnedRow::new(vec![ + Some(Int16(5)), + Some(Utf8("abc".into())), + Some(Utf8("ABC".into())), + ]); + let serializer = column_aware_row_encoding::Serializer::new(&column_ids); + let row_bytes = serializer.serialize(row1); + + // default column of ColumnId::new(3) + let default_columns = vec![(1, Some(Utf8("new column".into())))]; + + let deserializer = column_aware_row_encoding::Deserializer::new( + &[ColumnId::new(2), ColumnId::new(3), ColumnId::new(0)], + Arc::from( + vec![DataType::Varchar, DataType::Varchar, DataType::Int16].into_boxed_slice(), + ), + default_columns.into_iter(), + ); + let decoded = deserializer.deserialize(&row_bytes[..]); + assert_eq!( + decoded.unwrap(), + vec![ + Some(Utf8("ABC".into())), + Some(Utf8("new column".into())), + Some(Int16(5)) + ] + ); + } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 795668dfe8b1..9ba7eaad9dc8 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::ops::Bound::{self, Excluded, Included, Unbounded}; -use std::ops::{Index, RangeBounds}; +use std::ops::RangeBounds; use std::sync::Arc; use auto_enums::auto_enum; @@ -29,6 +29,7 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, OwnedRow, Row, RowExt}; +use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::row_serde::*; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; @@ -81,7 +82,8 @@ pub struct StorageTableInner { /// Mapping from column id to column index for deserializing the row. mapping: Arc, - /// Row deserializer to deserialize the whole value in storage to a row. + /// Row deserializer to deserialize the value in storage to a row. + /// The row can be either complete or partial, depending on whether the row encoding is versioned. row_serde: Arc, /// Indices of primary key. @@ -246,29 +248,45 @@ impl StorageTableInner { } } - let output_row_in_value_indices = value_output_indices - .iter() - .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap()) - .collect_vec(); let output_row_in_key_indices = key_output_indices .iter() .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap()) .collect_vec(); let schema = Schema::new(output_columns.iter().map(Into::into).collect()); - let mapping = ColumnMapping::new(output_row_in_value_indices); - let pk_data_types = pk_indices .iter() .map(|i| table_columns[*i].data_type.clone()) .collect(); let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types); - - let row_serde = { + let (row_serde, mapping) = { if versioned { - ColumnAwareSerde::new(value_indices.into(), table_columns.into()).into() + let value_output_indices_dedup = value_output_indices + .iter() + .unique() + .copied() + .collect::>(); + let output_row_in_value_output_indices_dedup = value_output_indices + .iter() + .map(|&di| { + value_output_indices_dedup + .iter() + .position(|&pi| di == pi) + .unwrap() + }) + .collect_vec(); + let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup); + let serde = + ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into()); + (serde.into(), mapping) } else { - BasicSerde::new(value_indices.into(), table_columns.into()).into() + let output_row_in_value_indices = value_output_indices + .iter() + .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap()) + .collect_vec(); + let mapping = ColumnMapping::new(output_row_in_value_indices); + let serde = BasicSerde::new(value_indices.into(), table_columns.into()); + (serde.into(), mapping) } }; @@ -366,11 +384,10 @@ impl StorageTableInner { if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? { // Refer to [`StorageTableInnerIterInner::new`] for necessity of `validate_read_epoch`. self.store.validate_read_epoch(wait_epoch)?; - let full_row = self.row_serde.deserialize(&value)?; - let result_row_in_value = self - .mapping - .project(OwnedRow::new(full_row)) - .into_owned_row(); + + let row = self.row_serde.deserialize(&value)?; + let result_row_in_value = self.mapping.project(OwnedRow::new(row)); + match &self.key_output_indices { Some(key_output_indices) => { let result_row_in_key = @@ -385,20 +402,23 @@ impl StorageTableInner { .unwrap(); result_row_vec.push( result_row_in_value - .index(*item_position_in_value_indices) - .clone(), + .datum_at(*item_position_in_value_indices) + .to_owned_datum(), ); } else { let item_position_in_pk_indices = key_output_indices.iter().position(|p| idx == p).unwrap(); - result_row_vec - .push(result_row_in_key.index(item_position_in_pk_indices).clone()); + result_row_vec.push( + result_row_in_key + .datum_at(item_position_in_pk_indices) + .to_owned_datum(), + ); } } let result_row = OwnedRow::new(result_row_vec); Ok(Some(result_row)) } - None => Ok(Some(result_row_in_value)), + None => Ok(Some(result_row_in_value.into_owned_row())), } } else { Ok(None) @@ -777,11 +797,8 @@ impl StorageTableInnerIterInner { .await? { let (table_key, value) = (k.user_key.table_key, v); - let full_row = self.row_deserializer.deserialize(value)?; - let result_row_in_value = self - .mapping - .project(OwnedRow::new(full_row)) - .into_owned_row(); + let row = self.row_deserializer.deserialize(value)?; + let result_row_in_value = self.mapping.project(OwnedRow::new(row)); match &self.key_output_indices { Some(key_output_indices) => { let result_row_in_key = match self.pk_serializer.clone() { @@ -803,14 +820,17 @@ impl StorageTableInnerIterInner { .unwrap(); result_row_vec.push( result_row_in_value - .index(*item_position_in_value_indices) - .clone(), + .datum_at(*item_position_in_value_indices) + .to_owned_datum(), ); } else { let item_position_in_pk_indices = key_output_indices.iter().position(|p| idx == p).unwrap(); - result_row_vec - .push(result_row_in_key.index(item_position_in_pk_indices).clone()); + result_row_vec.push( + result_row_in_key + .datum_at(item_position_in_pk_indices) + .to_owned_datum(), + ); } } let row = OwnedRow::new(result_row_vec); @@ -824,7 +844,7 @@ impl StorageTableInnerIterInner { None => { yield KeyedRow { vnode_prefixed_key: table_key.copy_into(), - row: result_row_in_value, + row: result_row_in_value.into_owned_row(), } } }