From 080b2ec5e77e6f8376ebd5d9d0bad77aa734a320 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Sun, 28 Apr 2024 14:23:17 +0800 Subject: [PATCH] fix comm --- Cargo.lock | 1 + proto/batch_plan.proto | 1 + src/batch/Cargo.toml | 1 + src/batch/src/executor/log_row_seq_scan.rs | 37 ++++-- src/common/src/catalog/column.rs | 2 +- .../src/binder/relation/table_or_source.rs | 11 +- src/frontend/src/handler/declare_cursor.rs | 28 +--- .../optimizer/plan_node/batch_log_seq_scan.rs | 30 +---- .../optimizer/plan_node/generic/log_scan.rs | 120 ++++-------------- src/frontend/src/scheduler/local.rs | 12 ++ src/sqlparser/src/ast/query.rs | 4 +- .../src/table/batch_table/storage_table.rs | 100 +++++++++------ src/storage/src/table/mod.rs | 4 + 13 files changed, 144 insertions(+), 207 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 114d8404c56b4..09b7d4ccd9b9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9907,6 +9907,7 @@ dependencies = [ "assert_matches", "async-recursion", "async-trait", + "bytes", "criterion", "either", "foyer", diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 4adff202775de..d5822416ee7f6 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -73,6 +73,7 @@ message FilterNode { message LogRowSeqScanNode{ plan_common.StorageTableDesc table_desc = 1; + // This records the mandatory column_ids of the original table, excluding op repeated int32 column_ids = 2; // The partition to read for scan tasks. // diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 019c33253466b..e0c284a46fde7 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -20,6 +20,7 @@ arrow-schema = { workspace = true } assert_matches = "1" async-recursion = "1" async-trait = "0.1" +bytes = { version = "1", features = ["serde"] } either = "1" foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 25c06fdccbb24..a214e127c2f72 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -15,19 +15,22 @@ use std::ops::{Bound, Deref}; use std::sync::Arc; +use bytes::Bytes; use futures::prelude::stream::StreamExt; use futures_async_stream::try_stream; use futures_util::pin_mut; +use itertools::Itertools; use prometheus::Histogram; use risingwave_common::array::DataChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnId, Field, Schema}; use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::types::ScalarImpl; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::{collect_data_chunk, TableDistribution}; +use risingwave_storage::table::{collect_data_chunk, KeyedRow, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; use super::{ @@ -40,6 +43,8 @@ use crate::task::BatchTaskContext; pub struct LogRowSeqScanExecutor { chunk_size: usize, identity: String, + // It is table schema + op column + schema: Schema, /// Batch metrics. /// None: Local mode don't record mertics. @@ -61,9 +66,15 @@ impl LogRowSeqScanExecutor { identity: String, metrics: Option, ) -> Self { + let mut schema = table.schema().clone(); + schema.fields.push(Field::with_name( + risingwave_common::types::DataType::Int16, + "op", + )); Self { chunk_size, identity, + schema, metrics, table, scan_ranges, @@ -125,7 +136,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { } impl Executor for LogRowSeqScanExecutor { fn schema(&self) -> &Schema { - self.table.schema() + &self.schema } fn identity(&self) -> &str { @@ -148,15 +159,9 @@ impl LogRowSeqScanExecutor { scan_ranges, old_epoch, new_epoch, + schema, } = *self; let table = std::sync::Arc::new(table); - let mut schema = table.schema().clone(); - // Add op column - schema.fields.push(Field::with_name( - risingwave_common::types::DataType::Int16, - "op", - )); - let schema = Arc::new(schema); // Create collector. let histogram = metrics.as_ref().map(|metrics| { @@ -176,7 +181,7 @@ impl LogRowSeqScanExecutor { new_epoch.clone(), chunk_size, histogram.clone(), - schema.clone(), + Arc::new(schema.clone()), ); #[for_await] for chunk in stream { @@ -243,6 +248,18 @@ impl LogRowSeqScanExecutor { loop { let timer = histogram.as_ref().map(|histogram| histogram.start_timer()); + let mut iter = iter.as_mut().map(|r| match r { + Ok((op, value)) => { + let (k, row) = value.into_owned_row_key(); + let full_row = row + .into_iter() + .chain(vec![Some(ScalarImpl::Int16(op))]) + .collect_vec(); + let row = OwnedRow::new(full_row); + Ok(KeyedRow::::new(k, row)) + } + Err(e) => Err(e), + }); let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size)) .await .map_err(BatchError::from)?; diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 05871a3ff4457..82d2f22f41cb4 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -27,7 +27,7 @@ use crate::types::DataType; /// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is /// not globally unique. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ColumnId(i32); impl std::fmt::Debug for ColumnId { diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 0ce7d8a1ca8d1..c5283a2cc592a 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use either::Either; use itertools::Itertools; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::{is_system_schema, ColumnCatalog, Field}; +use risingwave_common::catalog::{is_system_schema, Field}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_connector::WithPropertiesExt; use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias}; @@ -42,15 +42,6 @@ pub struct BoundBaseTable { pub as_of: Option, } -#[derive(Debug, Clone)] -pub struct BoundLogTable { - pub table_id: TableId, - pub table_catalog: Arc, - pub new_epoch: u64, - pub old_epoch: u64, - pub op_column: ColumnCatalog, -} - #[derive(Debug, Clone)] pub struct BoundSystemTable { pub table_id: TableId, diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index 7b719ddae410a..db4616770bc93 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -17,7 +17,6 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::ColumnDesc; use risingwave_common::session_config::QueryMode; use risingwave_common::util::epoch::Epoch; use risingwave_sqlparser::ast::{DeclareCursorStatement, ObjectName, Query, Since, Statement}; @@ -148,42 +147,23 @@ pub fn create_batch_plan_for_cursor( new_epoch: u64, ) -> Result { let context = OptimizerContext::from_handler_args(handle_args.clone()); - let mut out_col_idx = table_catalog + let out_col_idx = table_catalog .columns .iter() .enumerate() .filter(|(_, v)| !v.is_hidden) .map(|(i, _)| i) .collect::>(); - out_col_idx.push(out_col_idx.len() + 1); - let next_column_id = table_catalog - .columns - .iter() - .max_by(|a, b| { - a.column_desc - .column_id - .get_id() - .cmp(&b.column_desc.column_id.get_id()) - }) - .map(|c| c.column_desc.column_id) - .unwrap_or_default() - .next(); - let op_column = ColumnDesc::named( - "op", - next_column_id, - risingwave_common::types::DataType::Int16, - ); let core = generic::LogScan::new( table_catalog.name.clone(), out_col_idx, Rc::new(table_catalog.table_desc()), - Rc::new(op_column), Rc::new(context), old_epoch, new_epoch, ); let batch_log_seq_scan = BatchLogSeqScan::new(core); - let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.schema().len()); + let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema().len()); let out_names = batch_log_seq_scan.core().column_names(); // Here we just need a plan_root to call the method, only out_fields and out_names will be used let mut plan_root = PlanRoot::new( @@ -193,11 +173,11 @@ pub fn create_batch_plan_for_cursor( out_fields, out_names, ); - let schema = batch_log_seq_scan.schema().clone(); + let schema = batch_log_seq_scan.core().schema().clone(); let (batch_log_seq_scan, query_mode) = match handle_args.session.config().query_mode() { QueryMode::Auto => ( plan_root.gen_batch_distributed_plan(PlanRef::from(batch_log_seq_scan))?, - QueryMode::Distributed, + QueryMode::Local, ), QueryMode::Local => ( plan_root.gen_batch_local_plan(PlanRef::from(batch_log_seq_scan))?, diff --git a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs index d97ed6593daff..c81cb6dd07967 100644 --- a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs @@ -13,7 +13,6 @@ // limitations under the License. use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::Schema; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::LogRowSeqScanNode; use risingwave_pb::common::BatchQueryEpoch; @@ -23,7 +22,6 @@ use super::utils::{childless_record, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch, TryToBatchPb}; use crate::catalog::ColumnId; use crate::error::Result; -use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, DistributionDisplay, Order}; @@ -39,15 +37,11 @@ pub struct BatchLogSeqScan { impl BatchLogSeqScan { fn new_inner(core: generic::LogScan, dist: Distribution) -> Self { let order = Order::any(); - let base = PlanBase::new_batch_with_core(&core, dist, order); + let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order); Self { base, core } } - pub fn schema(&self) -> &Schema { - self.base.schema() - } - pub fn new(core: generic::LogScan) -> Self { // Use `Single` by default, will be updated later with `clone_with_dist`. Self::new_inner(core, Distribution::Single) @@ -84,7 +78,7 @@ impl_plan_tree_node_for_leaf! { BatchLogSeqScan } impl Distill for BatchLogSeqScan { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx().is_explain_verbose(); - let mut vec = Vec::with_capacity(4); + let mut vec = Vec::with_capacity(3); vec.push(("table", Pretty::from(self.core.table_name.clone()))); vec.push(("columns", self.core.columns_pretty(verbose))); @@ -112,7 +106,7 @@ impl TryToBatchPb for BatchLogSeqScan { table_desc: Some(self.core.table_desc.try_to_protobuf()?), column_ids: self .core - .output_column_ids_to_batch() + .output_column_ids() .iter() .map(ColumnId::get_id) .collect(), @@ -144,20 +138,6 @@ impl ToLocalBatch for BatchLogSeqScan { } } -impl ExprRewritable for BatchLogSeqScan { - fn has_rewritable_expr(&self) -> bool { - true - } - - fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let core = self.core.clone(); - core.rewrite_exprs(r); - Self::new(core).into() - } -} +impl ExprRewritable for BatchLogSeqScan {} -impl ExprVisitable for BatchLogSeqScan { - fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - self.core.visit_exprs(v); - } -} +impl ExprVisitable for BatchLogSeqScan {} diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index a53bb0d2021c2..d420c9a835978 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -18,14 +18,15 @@ use std::rc::Rc; use educe::Educe; use pretty_xmlish::Pretty; use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc}; -use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_common::types::DataType; use risingwave_common::util::sort_util::ColumnOrder; -use super::GenericPlanNode; use crate::catalog::ColumnId; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::optimizer::property::FunctionalDependencySet; + +const OP_NAME: &str = "op"; +const OP_TYPE: DataType = DataType::Int16; #[derive(Debug, Clone, Educe)] #[educe(PartialEq, Eq, Hash)] @@ -35,8 +36,6 @@ pub struct LogScan { pub output_col_idx: Vec, /// Descriptor of the table pub table_desc: Rc, - /// Catalog of the op column - pub op_column: Rc, /// Help `RowSeqLogScan` executor use a better chunk size pub chunk_size: Option, @@ -53,7 +52,7 @@ impl LogScan { pub fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {} - /// Get the ids of the output columns. + // Used for create batch exec, without op pub fn output_column_ids(&self) -> Vec { self.output_col_idx .iter() @@ -61,31 +60,28 @@ impl LogScan { .collect() } - pub fn output_column_ids_to_batch(&self) -> Vec { - self.output_col_idx - .iter() - .map(|i| self.get_table_columns()[*i].column_id) - .filter(|i| i != &self.op_column.column_id) - .collect() - } - pub fn primary_key(&self) -> &[ColumnOrder] { &self.table_desc.pk } pub(crate) fn column_names_with_table_prefix(&self) -> Vec { - self.output_col_idx + let mut out_column_names: Vec<_> = self + .output_col_idx .iter() .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name)) - .collect() + .collect(); + out_column_names.push(format!("{}.{}", self.table_name, OP_NAME)); + out_column_names } pub(crate) fn column_names(&self) -> Vec { - println!("output_col_idx: {:?}", self.output_col_idx); - self.output_col_idx + let mut out_column_names: Vec<_> = self + .output_col_idx .iter() .map(|&i| self.get_table_columns()[i].name.clone()) - .collect() + .collect(); + out_column_names.push(OP_NAME.to_string()); + out_column_names } pub fn distribution_key(&self) -> Option> { @@ -102,32 +98,11 @@ impl LogScan { .collect() } - /// get the Mapping of columnIndex from internal column index to output column index - pub fn i2o_col_mapping(&self) -> ColIndexMapping { - ColIndexMapping::with_remaining_columns( - &self.output_col_idx, - self.get_table_columns().len(), - ) - } - - /// Get the ids of the output columns and primary key columns. - pub fn output_and_pk_column_ids(&self) -> Vec { - let mut ids = self.output_column_ids(); - for column_order in self.primary_key() { - let id = self.get_table_columns()[column_order.column_index].column_id; - if !ids.contains(&id) { - ids.push(id); - } - } - ids - } - /// Create a logical scan node for log table scan pub(crate) fn new( table_name: String, output_col_idx: Vec, table_desc: Rc, - op_column: Rc, ctx: OptimizerContextRef, old_epoch: u64, new_epoch: u64, @@ -136,7 +111,6 @@ impl LogScan { table_name, output_col_idx, table_desc, - op_column, chunk_size: None, ctx, old_epoch, @@ -156,25 +130,8 @@ impl LogScan { ) } - pub(crate) fn get_id_to_op_idx_mapping( - output_col_idx: &[usize], - columns: Vec, - ) -> HashMap { - let mut id_to_op_idx = HashMap::new(); - output_col_idx - .iter() - .enumerate() - .for_each(|(op_idx, tb_idx)| { - let col = &columns[*tb_idx]; - id_to_op_idx.insert(col.column_id, op_idx); - }); - id_to_op_idx - } -} - -impl GenericPlanNode for LogScan { - fn schema(&self) -> Schema { - let fields = self + pub(crate) fn schema(&self) -> Schema { + let mut fields: Vec<_> = self .output_col_idx .iter() .map(|tb_idx| { @@ -182,49 +139,18 @@ impl GenericPlanNode for LogScan { Field::from_with_table_name_prefix(col, &self.table_name) }) .collect(); + fields.push(Field::with_name( + OP_TYPE, + format!("{}.{}", &self.table_name, OP_NAME), + )); Schema { fields } } - fn stream_key(&self) -> Option> { - let id_to_op_idx = - Self::get_id_to_op_idx_mapping(&self.output_col_idx, self.get_table_columns()); - self.table_desc - .stream_key - .iter() - .map(|&c| { - id_to_op_idx - .get(&self.get_table_columns()[c].column_id) - .copied() - }) - .collect::>>() - } - - fn ctx(&self) -> OptimizerContextRef { + pub(crate) fn ctx(&self) -> OptimizerContextRef { self.ctx.clone() } - fn functional_dependency(&self) -> FunctionalDependencySet { - let pk_indices = self.stream_key(); - let col_num = self.output_col_idx.len(); - match &pk_indices { - Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices), - None => FunctionalDependencySet::new(col_num), - } - } -} - -impl LogScan { pub fn get_table_columns(&self) -> Vec { - let mut columns = self.table_desc.columns.clone(); - columns.push(self.op_column.as_ref().clone()); - columns - } - - /// Get the descs of the output columns. - pub fn column_descs(&self) -> Vec { - self.output_col_idx - .iter() - .map(|&i| self.get_table_columns()[i].clone()) - .collect() + self.table_desc.columns.clone() } } diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index fa7fd85735113..94a15b99c6bfe 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -463,6 +463,18 @@ impl LocalQueryExecution { } } NodeBody::SysRowSeqScan(_) => {} + _ => unreachable!(), + } + + Ok(PlanNodePb { + children: vec![], + identity, + node_body: Some(node_body), + }) + } + PlanNodeType::BatchLogSeqScan => { + let mut node_body = execution_plan_node.node.clone(); + match &mut node_body { NodeBody::LogRowSeqScan(ref mut scan_node) => { if let Some(partition) = partition { let partition = partition diff --git a/src/sqlparser/src/ast/query.rs b/src/sqlparser/src/ast/query.rs index e779115de6db6..5425864bf4e5c 100644 --- a/src/sqlparser/src/ast/query.rs +++ b/src/sqlparser/src/ast/query.rs @@ -408,9 +408,7 @@ pub enum TableFactor { impl fmt::Display for TableFactor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - TableFactor::Table { - name, alias, as_of, .. - } => { + TableFactor::Table { name, alias, as_of } => { write!(f, "{}", name)?; match as_of { Some(as_of) => write!(f, "{}", as_of)?, diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 64e922d5c39ce..655b98ed3f7ee 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -29,7 +29,6 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, OwnedRow, Row, RowExt}; -use risingwave_common::types::{Datum, ScalarImpl}; use risingwave_common::util::row_serde::*; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; @@ -658,7 +657,7 @@ impl StorageTableInner { end_epoch: HummockReadEpoch, pk_prefix: impl Row, range_bounds: impl RangeBounds, - ) -> StorageResult>> + Send> { + ) -> StorageResult)>> + Send> { self.iter_log_with_pk_bounds(satrt_epoch, end_epoch, pk_prefix, range_bounds) .await } @@ -669,7 +668,7 @@ impl StorageTableInner { end_epoch: HummockReadEpoch, pk_prefix: impl Row, range_bounds: impl RangeBounds, - ) -> StorageResult>> + Send> { + ) -> StorageResult)>> + Send> { let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true); let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false); @@ -891,20 +890,20 @@ impl StorageTableInnerIterLogInner { } /// Yield a row with its primary key. - #[try_stream(ok = KeyedRow, error = StorageError)] + #[try_stream(ok = (i16, KeyedRow), error = StorageError)] async fn into_stream(mut self) { - let build_value_with_op = |value: &[u8], op: i16| -> Result, StorageError> { - let full_row = self.row_deserializer.deserialize(value)?; - let result_row_in_value = self - .mapping - .project(OwnedRow::new(full_row)) - .into_owned_row(); - let result_row_vec = result_row_in_value - .into_iter() - .chain(vec![Some(ScalarImpl::Int16(op))]) - .collect_vec(); - Ok(result_row_vec) - }; + // let build_value_with_op = |value: &[u8], op: i16| -> Result, StorageError> { + // let full_row = self.row_deserializer.deserialize(value)?; + // let result_row_in_value = self + // .mapping + // .project(OwnedRow::new(full_row)) + // .into_owned_row(); + // let result_row_vec = result_row_in_value + // .into_iter() + // .chain(vec![Some(ScalarImpl::Int16(op))]) + // .collect_vec(); + // Ok(result_row_vec) + // }; while let Some((k, v)) = self .iter @@ -914,38 +913,65 @@ impl StorageTableInnerIterLogInner { { match v { ChangeLogValue::Insert(value) => { - let row = OwnedRow::new(build_value_with_op(value, 1)?); + let full_row = self.row_deserializer.deserialize(value)?; + let row = self + .mapping + .project(OwnedRow::new(full_row)) + .into_owned_row(); // TODO: may optimize the key clone - yield KeyedRow:: { - vnode_prefixed_key: k.copy_into(), - row, - }; + yield ( + 1, + KeyedRow:: { + vnode_prefixed_key: k.copy_into(), + row, + }, + ); } ChangeLogValue::Update { new_value, old_value, } => { - let row = OwnedRow::new(build_value_with_op(old_value, 3)?); + let full_row = self.row_deserializer.deserialize(old_value)?; + let row = self + .mapping + .project(OwnedRow::new(full_row)) + .into_owned_row(); // TODO: may optimize the key clone - yield KeyedRow:: { - vnode_prefixed_key: k.copy_into(), - row, - }; - - let row = OwnedRow::new(build_value_with_op(new_value, 4)?); + yield ( + 3, + KeyedRow:: { + vnode_prefixed_key: k.copy_into(), + row, + }, + ); + let full_row = self.row_deserializer.deserialize(new_value)?; + let row = self + .mapping + .project(OwnedRow::new(full_row)) + .into_owned_row(); // TODO: may optimize the key clone - yield KeyedRow:: { - vnode_prefixed_key: k.copy_into(), - row, - }; + yield ( + 4, + KeyedRow:: { + vnode_prefixed_key: k.copy_into(), + row, + }, + ); } ChangeLogValue::Delete(value) => { - let row = OwnedRow::new(build_value_with_op(value, 2)?); + let full_row = self.row_deserializer.deserialize(value)?; + let row = self + .mapping + .project(OwnedRow::new(full_row)) + .into_owned_row(); // TODO: may optimize the key clone - yield KeyedRow:: { - vnode_prefixed_key: k.copy_into(), - row, - }; + yield ( + 2, + KeyedRow:: { + vnode_prefixed_key: k.copy_into(), + row, + }, + ); } } } diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 9be20b2cce538..d245e4bde3790 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -122,6 +122,10 @@ impl> KeyedRow { self.row } + pub fn into_owned_row_key(self) -> (TableKey, OwnedRow) { + (self.vnode_prefixed_key, self.row) + } + pub fn vnode(&self) -> VirtualNode { self.vnode_prefixed_key.vnode_part() }