Skip to content

Commit

Permalink
refactor: avoid unnecessary into_owned_row
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jun 18, 2024
1 parent 26fb5cc commit 8f3d37d
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::ops::{Index, RangeBounds};
use std::ops::RangeBounds;
use std::sync::Arc;

use auto_enums::auto_enum;
Expand All @@ -29,6 +29,7 @@ use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::row_serde::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
Expand Down Expand Up @@ -262,8 +263,7 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
if versioned {
let value_output_indices_dedup = value_output_indices
.iter()
.sorted()
.dedup()
.unique()
.copied()
.collect::<Vec<_>>();
let output_row_in_value_output_indices_dedup = value_output_indices
Expand Down Expand Up @@ -386,7 +386,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
self.store.validate_read_epoch(wait_epoch)?;

let row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row)).into_owned_row();
let result_row_in_value = self.mapping.project(OwnedRow::new(row));

match &self.key_output_indices {
Some(key_output_indices) => {
Expand All @@ -402,20 +402,23 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
.unwrap();
result_row_vec.push(
result_row_in_value
.index(*item_position_in_value_indices)
.clone(),
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
} else {
let item_position_in_pk_indices =
key_output_indices.iter().position(|p| idx == p).unwrap();
result_row_vec
.push(result_row_in_key.index(item_position_in_pk_indices).clone());
result_row_vec.push(
result_row_in_key
.datum_at(item_position_in_pk_indices)
.to_owned_datum(),
);
}
}
let result_row = OwnedRow::new(result_row_vec);
Ok(Some(result_row))
}
None => Ok(Some(result_row_in_value)),
None => Ok(Some(result_row_in_value.into_owned_row())),
}
} else {
Ok(None)
Expand Down Expand Up @@ -795,7 +798,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
{
let (table_key, value) = (k.user_key.table_key, v);
let row = self.row_deserializer.deserialize(value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row)).into_owned_row();
let result_row_in_value = self.mapping.project(OwnedRow::new(row));
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key = match self.pk_serializer.clone() {
Expand All @@ -817,14 +820,17 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
.unwrap();
result_row_vec.push(
result_row_in_value
.index(*item_position_in_value_indices)
.clone(),
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
} else {
let item_position_in_pk_indices =
key_output_indices.iter().position(|p| idx == p).unwrap();
result_row_vec
.push(result_row_in_key.index(item_position_in_pk_indices).clone());
result_row_vec.push(
result_row_in_key
.datum_at(item_position_in_pk_indices)
.to_owned_datum(),
);
}
}
let row = OwnedRow::new(result_row_vec);
Expand All @@ -838,7 +844,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
None => {
yield KeyedRow {
vnode_prefixed_key: table_key.copy_into(),
row: result_row_in_value,
row: result_row_in_value.into_owned_row(),
}
}
}
Expand Down

0 comments on commit 8f3d37d

Please sign in to comment.