diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 69202b8cdb757..fe4d6df708867 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -32,9 +32,7 @@ use risingwave_common::row::{self, OwnedRow, Row, RowExt}; use risingwave_common::util::row_serde::*; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; -use risingwave_common::util::value_encoding::{ - column_aware_row_encoding, BasicSerde, EitherSerde, ValueRowDeserializer, -}; +use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde}; use risingwave_hummock_sdk::key::{ end_bound_of_prefix, next_key, prefixed_range_with_vnode, TableKeyRange, }; @@ -83,15 +81,9 @@ pub struct StorageTableInner { /// Mapping from column id to column index for deserializing the row. mapping: Arc, - /// Row deserializer to deserialize the whole value in storage to a row. + /// Row deserializer to deserialize the value in storage to a row. + /// The row can be either complete or partial, depending on whether the row encoding is versioned. row_serde: Arc, - /// The `Deserializer` deserializes the columns specified in `value_output_indices`. - /// The `Deserializer` ensures there are no duplicate columns because `ColumnAwareSerde` disallows them. - /// The `ColumnMapping` restores all columns specified in `value_output_indices`. - partial_row_deserializer: Option<( - Arc, - Arc, - )>, /// Indices of primary key. /// Note that the index is based on the all columns of the table, instead of the output ones. @@ -255,55 +247,46 @@ impl StorageTableInner { } } - let output_row_in_value_indices = value_output_indices - .iter() - .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap()) - .collect_vec(); let output_row_in_key_indices = key_output_indices .iter() .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap()) .collect_vec(); let schema = Schema::new(output_columns.iter().map(Into::into).collect()); - let mapping = ColumnMapping::new(output_row_in_value_indices); - let pk_data_types = pk_indices .iter() .map(|i| table_columns[*i].data_type.clone()) .collect(); let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types); - - let partial_row_deserializer = if versioned { - let value_output_indices_dedup = value_output_indices - .iter() - .sorted() - .dedup() - .copied() - .collect::>(); - let output_row_in_value_output_indices_dedup = value_output_indices - .iter() - .map(|&di| { - value_output_indices_dedup - .iter() - .position(|&pi| di == pi) - .unwrap() - }) - .collect_vec(); - let partial_row_serde = ColumnAwareSerde::new( - value_output_indices_dedup.into(), - table_columns.clone().into(), - ); - let de = Arc::new(partial_row_serde.deserializer); - let mapping = Arc::new(ColumnMapping::new(output_row_in_value_output_indices_dedup)); - Some((de, mapping)) - } else { - None - }; - let row_serde = { + let (row_serde, mapping) = { if versioned { - ColumnAwareSerde::new(value_indices.into(), table_columns.into()).into() + let value_output_indices_dedup = value_output_indices + .iter() + .sorted() + .dedup() + .copied() + .collect::>(); + let output_row_in_value_output_indices_dedup = value_output_indices + .iter() + .map(|&di| { + value_output_indices_dedup + .iter() + .position(|&pi| di == pi) + .unwrap() + }) + .collect_vec(); + let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup); + let serde = + ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into()); + (serde.into(), mapping) } else { - BasicSerde::new(value_indices.into(), table_columns.into()).into() + let output_row_in_value_indices = value_output_indices + .iter() + .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap()) + .collect_vec(); + let mapping = ColumnMapping::new(output_row_in_value_indices); + let serde = BasicSerde::new(value_indices.into(), table_columns.into()); + (serde.into(), mapping) } }; @@ -326,7 +309,6 @@ impl StorageTableInner { distribution, table_option, read_prefix_len_hint, - partial_row_deserializer, } } } @@ -403,16 +385,9 @@ impl StorageTableInner { // Refer to [`StorageTableInnerIterInner::new`] for necessity of `validate_read_epoch`. self.store.validate_read_epoch(wait_epoch)?; - let result_row_in_value = - if let Some((ref de, ref mapping)) = self.partial_row_deserializer { - let partial_row = de.deserialize(&value)?; - mapping.project(OwnedRow::new(partial_row)).into_owned_row() - } else { - let full_row = self.row_serde.deserialize(&value)?; - self.mapping - .project(OwnedRow::new(full_row)) - .into_owned_row() - }; + let row = self.row_serde.deserialize(&value)?; + let result_row_in_value = self.mapping.project(OwnedRow::new(row)).into_owned_row(); + match &self.key_output_indices { Some(key_output_indices) => { let result_row_in_key = @@ -533,7 +508,6 @@ impl StorageTableInner { table_key_range, read_options, wait_epoch, - self.partial_row_deserializer.clone(), ) .await? .into_stream(); @@ -757,11 +731,6 @@ struct StorageTableInnerIterInner { mapping: Arc, row_deserializer: Arc, - /// Refer to [`StorageTableInner::partial_row_deserializer`]. - partial_row_deserializer: Option<( - Arc, - Arc, - )>, /// Used for serializing and deserializing the primary key. pk_serializer: Option>, @@ -793,10 +762,6 @@ impl StorageTableInnerIterInner { table_key_range: TableKeyRange, read_options: ReadOptions, epoch: HummockReadEpoch, - partial_row_deserializer: Option<( - Arc, - Arc, - )>, ) -> StorageResult { let raw_epoch = epoch.get_epoch(); store.try_wait_epoch(epoch).await?; @@ -810,7 +775,6 @@ impl StorageTableInnerIterInner { iter, mapping, row_deserializer, - partial_row_deserializer, pk_serializer, output_indices, key_output_indices, @@ -830,16 +794,8 @@ impl StorageTableInnerIterInner { .await? { let (table_key, value) = (k.user_key.table_key, v); - let result_row_in_value = - if let Some((ref de, ref mapping)) = self.partial_row_deserializer { - let partial_row = de.deserialize(value)?; - mapping.project(OwnedRow::new(partial_row)).into_owned_row() - } else { - let full_row = self.row_deserializer.deserialize(value)?; - self.mapping - .project(OwnedRow::new(full_row)) - .into_owned_row() - }; + let row = self.row_deserializer.deserialize(value)?; + let result_row_in_value = self.mapping.project(OwnedRow::new(row)).into_owned_row(); match &self.key_output_indices { Some(key_output_indices) => { let result_row_in_key = match self.pk_serializer.clone() {