diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index c9b7370cc45c7..a3fd013c86f0d 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -422,9 +422,6 @@ message ChainNode { /// The state table used by Backfill operator for persisting internal state catalog.Table state_table = 5; - /// The output distribution key used to compute vnode. - repeated uint32 dist_key_in_pk = 6; - // The upstream materialized view info used by backfill. plan_common.StorageTableDesc table_desc = 7; } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index a4cdc3d036ab2..b99bb209f8d84 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -27,7 +27,6 @@ use risingwave_pb::stream_plan::{ChainType, PbStreamNode}; use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode}; use crate::catalog::ColumnId; use crate::expr::{ExprRewriter, FunctionCall}; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -115,58 +114,6 @@ impl StreamTableScan { self.chain_type } - /// Compute upstream distribution key. We will use it to compute vnode. - // TODO(kwannoel): Should we use project executor instead? - pub fn compute_upstream_distribution_key(&self) -> Vec { - // Construct output distribution key for the backfill executor to compute vnode of upstream - let output_distribution_key = match self.base.distribution() { - // For sharded distribution, we need to remap dist_key. - // Suppose we had Order Key (Primary Key) of Upstream table: [0, 4], - // and Distribution Key [4]. - // Output schema will be: - // [0, 4] - // So the new distribution key indices will be: [1] - // --- - // Q: Why distribution key is needed? - // A: We need distribution key so we can compute vnode. - // - // Q: Why do we need vnode? - // A: We need to partition state by it, - // since there can be parallel table scans. Without it when we recover, - // we don't know what is the backfill progress of each backfill executor, - // since they all override the same row (no key prefix). - // If we use vnode as prefix key, that will partition their states. - // The upstream pk will still serve as the value. - // - // Q: Why don't we actually use this distribution key for state table itself? - // A: State Table interface expects that distribution key - // is a subset of primary key. - // Here we don't have primary key for state table. We only partition it by vnode. - Distribution::UpstreamHashShard(_, _) => { - let distribution_key = self - .logical() - .table_desc - .distribution_key - .iter() - .map(|i| { - self.logical - .primary_key() - .iter() - .position(|j| *i == j.column_index) - .map(|k| k as u32) - .unwrap() - }) - .collect_vec(); - distribution_key - } - Distribution::SomeShard | Distribution::Single => { - vec![] - } - Distribution::HashShard(_) | Distribution::Broadcast => unreachable!(), - }; - output_distribution_key - } - /// Build catalog for backfill state /// /// Schema @@ -252,8 +199,6 @@ impl StreamTableScan { let stream_key = self.base.logical_pk.iter().map(|x| *x as u32).collect_vec(); - let dist_key_in_pk = self.compute_upstream_distribution_key(); - // The required columns from the table (both scan and upstream). let upstream_column_ids = match self.chain_type { // For backfill, we additionally need the primary key columns. @@ -333,7 +278,6 @@ impl StreamTableScan { // The table desc used by backfill executor table_desc: Some(self.logical.table_desc.to_protobuf()), state_table: Some(catalog), - dist_key_in_pk, })), stream_key, operator_id: self.base.id.0 as u64, diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index 08385572f4de7..bfdf7e3e4e133 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -24,11 +24,10 @@ use futures::{pin_mut, stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::buffer::{BitmapBuilder}; +use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::Schema; - use risingwave_common::row::{self, OwnedRow, Row, RowExt}; -use risingwave_common::types::{Datum}; +use risingwave_common::types::Datum; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{cmp_datum, OrderType}; @@ -75,9 +74,6 @@ pub struct BackfillExecutor { /// Internal state table for persisting state of backfill state. state_table: StateTable, - /// Upstream dist key to compute vnode - dist_key_in_pk: Vec, - /// The column indices need to be forwarded to the downstream from the upstream and table scan. output_indices: Vec, @@ -101,7 +97,6 @@ where upstream_table: StorageTable, upstream: BoxedExecutor, state_table: StateTable, - dist_key_in_pk: Vec, output_indices: Vec, progress: CreateMviewProgress, schema: Schema, @@ -117,7 +112,6 @@ where upstream_table, upstream, state_table, - dist_key_in_pk, output_indices, actor_id: progress.actor_id(), progress, @@ -132,8 +126,6 @@ where let pk_order = self.upstream_table.pk_serializer().get_order_types(); - let _dist_key_in_pk = self.dist_key_in_pk; - let upstream_table_id = self.upstream_table.table_id().table_id; let mut upstream = self.upstream.execute(); @@ -539,6 +531,7 @@ where ) -> StreamExecutorResult<()> { let vnodes = table.vnodes().clone(); if let Some(old_state) = old_state { + // There are updates to existing state, persist. if *old_state != current_partial_state { vnodes.iter_ones().for_each(|vnode| { let datum = Some((vnode as i16).into()); @@ -550,10 +543,12 @@ where }) }); } else { + // If no updates to existing state, we don't need to persist table.commit_no_data_expected(epoch); return Ok(()); } } else { + // No existing state, create a new entry. vnodes.iter_ones().for_each(|vnode| { let datum = Some((vnode as i16).into()); // fill the state diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index 07ec929076c65..bff32ef1ded37 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -158,13 +158,10 @@ impl ExecutorBuilder for ChainExecutorBuilder { ) .await; - let dist_key_in_pk = node.dist_key_in_pk.iter().map(|k| *k as usize).collect(); - BackfillExecutor::new( upstream_table, mview, state_table, - dist_key_in_pk, output_indices, progress, schema,