Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 19, 2023
1 parent 9eaf8fb commit d90adad
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 72 deletions.
3 changes: 0 additions & 3 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,6 @@ message ChainNode {
/// The state table used by Backfill operator for persisting internal state
catalog.Table state_table = 5;

/// The output distribution key used to compute vnode.
repeated uint32 dist_key_in_pk = 6;

// The upstream materialized view info used by backfill.
plan_common.StorageTableDesc table_desc = 7;
}
Expand Down
56 changes: 0 additions & 56 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_pb::stream_plan::{ChainType, PbStreamNode};
use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::{ExprRewriter, FunctionCall};
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -115,58 +114,6 @@ impl StreamTableScan {
self.chain_type
}

/// Compute upstream distribution key. We will use it to compute vnode.
// TODO(kwannoel): Should we use project executor instead?
pub fn compute_upstream_distribution_key(&self) -> Vec<u32> {
// Construct output distribution key for the backfill executor to compute vnode of upstream
let output_distribution_key = match self.base.distribution() {
// For sharded distribution, we need to remap dist_key.
// Suppose we had Order Key (Primary Key) of Upstream table: [0, 4],
// and Distribution Key [4].
// Output schema will be:
// [0, 4]
// So the new distribution key indices will be: [1]
// ---
// Q: Why distribution key is needed?
// A: We need distribution key so we can compute vnode.
//
// Q: Why do we need vnode?
// A: We need to partition state by it,
// since there can be parallel table scans. Without it when we recover,
// we don't know what is the backfill progress of each backfill executor,
// since they all override the same row (no key prefix).
// If we use vnode as prefix key, that will partition their states.
// The upstream pk will still serve as the value.
//
// Q: Why don't we actually use this distribution key for state table itself?
// A: State Table interface expects that distribution key
// is a subset of primary key.
// Here we don't have primary key for state table. We only partition it by vnode.
Distribution::UpstreamHashShard(_, _) => {
let distribution_key = self
.logical()
.table_desc
.distribution_key
.iter()
.map(|i| {
self.logical
.primary_key()
.iter()
.position(|j| *i == j.column_index)
.map(|k| k as u32)
.unwrap()
})
.collect_vec();
distribution_key
}
Distribution::SomeShard | Distribution::Single => {
vec![]
}
Distribution::HashShard(_) | Distribution::Broadcast => unreachable!(),
};
output_distribution_key
}

/// Build catalog for backfill state
///
/// Schema
Expand Down Expand Up @@ -252,8 +199,6 @@ impl StreamTableScan {

let stream_key = self.base.logical_pk.iter().map(|x| *x as u32).collect_vec();

let dist_key_in_pk = self.compute_upstream_distribution_key();

// The required columns from the table (both scan and upstream).
let upstream_column_ids = match self.chain_type {
// For backfill, we additionally need the primary key columns.
Expand Down Expand Up @@ -333,7 +278,6 @@ impl StreamTableScan {
// The table desc used by backfill executor
table_desc: Some(self.logical.table_desc.to_protobuf()),
state_table: Some(catalog),
dist_key_in_pk,
})),
stream_key,
operator_id: self.base.id.0 as u64,
Expand Down
15 changes: 5 additions & 10 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ use futures::{pin_mut, stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::{BitmapBuilder};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::Schema;

use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::types::{Datum};
use risingwave_common::types::Datum;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{cmp_datum, OrderType};
Expand Down Expand Up @@ -75,9 +74,6 @@ pub struct BackfillExecutor<S: StateStore> {
/// Internal state table for persisting state of backfill state.
state_table: StateTable<S>,

/// Upstream dist key to compute vnode
dist_key_in_pk: Vec<usize>,

/// The column indices need to be forwarded to the downstream from the upstream and table scan.
output_indices: Vec<usize>,

Expand All @@ -101,7 +97,6 @@ where
upstream_table: StorageTable<S>,
upstream: BoxedExecutor,
state_table: StateTable<S>,
dist_key_in_pk: Vec<usize>,
output_indices: Vec<usize>,
progress: CreateMviewProgress,
schema: Schema,
Expand All @@ -117,7 +112,6 @@ where
upstream_table,
upstream,
state_table,
dist_key_in_pk,
output_indices,
actor_id: progress.actor_id(),
progress,
Expand All @@ -132,8 +126,6 @@ where

let pk_order = self.upstream_table.pk_serializer().get_order_types();

let _dist_key_in_pk = self.dist_key_in_pk;

let upstream_table_id = self.upstream_table.table_id().table_id;

let mut upstream = self.upstream.execute();
Expand Down Expand Up @@ -539,6 +531,7 @@ where
) -> StreamExecutorResult<()> {
let vnodes = table.vnodes().clone();
if let Some(old_state) = old_state {
// There are updates to existing state, persist.
if *old_state != current_partial_state {
vnodes.iter_ones().for_each(|vnode| {
let datum = Some((vnode as i16).into());
Expand All @@ -550,10 +543,12 @@ where
})
});
} else {
// If no updates to existing state, we don't need to persist
table.commit_no_data_expected(epoch);
return Ok(());
}
} else {
// No existing state, create a new entry.
vnodes.iter_ones().for_each(|vnode| {
let datum = Some((vnode as i16).into());
// fill the state
Expand Down
3 changes: 0 additions & 3 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,10 @@ impl ExecutorBuilder for ChainExecutorBuilder {
)
.await;

let dist_key_in_pk = node.dist_key_in_pk.iter().map(|k| *k as usize).collect();

BackfillExecutor::new(
upstream_table,
mview,
state_table,
dist_key_in_pk,
output_indices,
progress,
schema,
Expand Down

0 comments on commit d90adad

Please sign in to comment.