Skip to content

Commit

Permalink
refactor: remove dedicated partial_row_de
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jun 17, 2024
1 parent 7b816b1 commit 26fb5cc
Showing 1 changed file with 35 additions and 79 deletions.
114 changes: 35 additions & 79 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::util::row_serde::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{
column_aware_row_encoding, BasicSerde, EitherSerde, ValueRowDeserializer,
};
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
use risingwave_hummock_sdk::key::{
end_bound_of_prefix, next_key, prefixed_range_with_vnode, TableKeyRange,
};
Expand Down Expand Up @@ -83,15 +81,9 @@ pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {
/// Mapping from column id to column index for deserializing the row.
mapping: Arc<ColumnMapping>,

/// Row deserializer to deserialize the whole value in storage to a row.
/// Row deserializer to deserialize the value in storage to a row.
/// The row can be either complete or partial, depending on whether the row encoding is versioned.
row_serde: Arc<SD>,
/// The `Deserializer` deserializes the columns specified in `value_output_indices`.
/// The `Deserializer` ensures there are no duplicate columns because `ColumnAwareSerde` disallows them.
/// The `ColumnMapping` restores all columns specified in `value_output_indices`.
partial_row_deserializer: Option<(
Arc<column_aware_row_encoding::Deserializer>,
Arc<ColumnMapping>,
)>,

/// Indices of primary key.
/// Note that the index is based on the all columns of the table, instead of the output ones.
Expand Down Expand Up @@ -255,55 +247,46 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
}
}

let output_row_in_value_indices = value_output_indices
.iter()
.map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let output_row_in_key_indices = key_output_indices
.iter()
.map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let schema = Schema::new(output_columns.iter().map(Into::into).collect());

let mapping = ColumnMapping::new(output_row_in_value_indices);

let pk_data_types = pk_indices
.iter()
.map(|i| table_columns[*i].data_type.clone())
.collect();
let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);

let partial_row_deserializer = if versioned {
let value_output_indices_dedup = value_output_indices
.iter()
.sorted()
.dedup()
.copied()
.collect::<Vec<_>>();
let output_row_in_value_output_indices_dedup = value_output_indices
.iter()
.map(|&di| {
value_output_indices_dedup
.iter()
.position(|&pi| di == pi)
.unwrap()
})
.collect_vec();
let partial_row_serde = ColumnAwareSerde::new(
value_output_indices_dedup.into(),
table_columns.clone().into(),
);
let de = Arc::new(partial_row_serde.deserializer);
let mapping = Arc::new(ColumnMapping::new(output_row_in_value_output_indices_dedup));
Some((de, mapping))
} else {
None
};
let row_serde = {
let (row_serde, mapping) = {
if versioned {
ColumnAwareSerde::new(value_indices.into(), table_columns.into()).into()
let value_output_indices_dedup = value_output_indices
.iter()
.sorted()
.dedup()
.copied()
.collect::<Vec<_>>();
let output_row_in_value_output_indices_dedup = value_output_indices
.iter()
.map(|&di| {
value_output_indices_dedup
.iter()
.position(|&pi| di == pi)
.unwrap()
})
.collect_vec();
let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
let serde =
ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
(serde.into(), mapping)
} else {
BasicSerde::new(value_indices.into(), table_columns.into()).into()
let output_row_in_value_indices = value_output_indices
.iter()
.map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let mapping = ColumnMapping::new(output_row_in_value_indices);
let serde = BasicSerde::new(value_indices.into(), table_columns.into());
(serde.into(), mapping)
}
};

Expand All @@ -326,7 +309,6 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
distribution,
table_option,
read_prefix_len_hint,
partial_row_deserializer,
}
}
}
Expand Down Expand Up @@ -403,16 +385,9 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
// Refer to [`StorageTableInnerIterInner::new`] for necessity of `validate_read_epoch`.
self.store.validate_read_epoch(wait_epoch)?;

let result_row_in_value =
if let Some((ref de, ref mapping)) = self.partial_row_deserializer {
let partial_row = de.deserialize(&value)?;
mapping.project(OwnedRow::new(partial_row)).into_owned_row()
} else {
let full_row = self.row_serde.deserialize(&value)?;
self.mapping
.project(OwnedRow::new(full_row))
.into_owned_row()
};
let row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row)).into_owned_row();

match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key =
Expand Down Expand Up @@ -533,7 +508,6 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
table_key_range,
read_options,
wait_epoch,
self.partial_row_deserializer.clone(),
)
.await?
.into_stream();
Expand Down Expand Up @@ -757,11 +731,6 @@ struct StorageTableInnerIterInner<S: StateStore, SD: ValueRowSerde> {
mapping: Arc<ColumnMapping>,

row_deserializer: Arc<SD>,
/// Refer to [`StorageTableInner::partial_row_deserializer`].
partial_row_deserializer: Option<(
Arc<column_aware_row_encoding::Deserializer>,
Arc<ColumnMapping>,
)>,

/// Used for serializing and deserializing the primary key.
pk_serializer: Option<Arc<OrderedRowSerde>>,
Expand Down Expand Up @@ -793,10 +762,6 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
table_key_range: TableKeyRange,
read_options: ReadOptions,
epoch: HummockReadEpoch,
partial_row_deserializer: Option<(
Arc<column_aware_row_encoding::Deserializer>,
Arc<ColumnMapping>,
)>,
) -> StorageResult<Self> {
let raw_epoch = epoch.get_epoch();
store.try_wait_epoch(epoch).await?;
Expand All @@ -810,7 +775,6 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
iter,
mapping,
row_deserializer,
partial_row_deserializer,
pk_serializer,
output_indices,
key_output_indices,
Expand All @@ -830,16 +794,8 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
.await?
{
let (table_key, value) = (k.user_key.table_key, v);
let result_row_in_value =
if let Some((ref de, ref mapping)) = self.partial_row_deserializer {
let partial_row = de.deserialize(value)?;
mapping.project(OwnedRow::new(partial_row)).into_owned_row()
} else {
let full_row = self.row_deserializer.deserialize(value)?;
self.mapping
.project(OwnedRow::new(full_row))
.into_owned_row()
};
let row = self.row_deserializer.deserialize(value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row)).into_owned_row();
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key = match self.pk_serializer.clone() {
Expand Down

0 comments on commit 26fb5cc

Please sign in to comment.