Skip to content

Commit

Permalink
feat(streaming): Support in memory backfill executor for mv on mv (#6341
Browse files Browse the repository at this point in the history
)

* first version of backfill

* second version backfill

* third versioin backfill

* refactor

* add doc

* add chain type

* rollback risingwave.toml

* use storage primary key instead of stream key of upstream

* do not mark chunk when snapshot comes to its end

* fix dashboard ui build

* fix proto misc check

* remove additional line to make misc check happy

* add UNSPECIFIED to chain type

* minor improvement

* Too much log for ci and remove some log.

* code style

* better indent

* assert next_key of the storage_table would never return a empty key

* support uncommitted read for backfill

* add some doc

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
chenzl25 and mergify[bot] authored Nov 21, 2022
1 parent d532a34 commit 0e02443
Show file tree
Hide file tree
Showing 10 changed files with 582 additions and 42 deletions.
71 changes: 65 additions & 6 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 17 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,18 @@ message ExchangeNode {
DispatchStrategy strategy = 1;
}

enum ChainType {
CHAIN_UNSPECIFIED = 0;

// CHAIN is corresponding to the chain executor.
CHAIN = 1;

// REARRANGE is corresponding to the rearranged chain executor.
REARRANGE = 2;

// BACKFILL is corresponding to the backfill executor.
BACKFILL = 3;
}
// ChainNode is used for mv on mv.
// ChainNode is like a "UNION" on mv snapshot and streaming. So it takes two inputs with fixed order:
// 1. MergeNode (as a placeholder) for streaming read.
Expand All @@ -313,14 +325,17 @@ message ChainNode {
// Generally, the barrier needs to be rearranged during the MV creation process, so that data can
// be flushed to shared buffer periodically, instead of making the first epoch from batch query extra
// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode.
// This option is used to disable barrier rearrangement.
bool disable_rearrange = 4;
// ChainType is used to decide which implementation for the ChainNode.
ChainType chain_type = 4;
// Whether to place this chain on the same worker node as upstream actors.
bool same_worker_node = 5;
// Whether the upstream materialize is and this chain should be a singleton.
// FIXME: This is a workaround for fragmenter since the distribution info will be lost if there's only one
// fragment in the downstream mview. Remove this when we refactor the fragmenter.
bool is_singleton = 6;

// The upstream materialized view info used by backfill.
plan_common.StorageTableDesc table_desc = 7;
}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down
5 changes: 4 additions & 1 deletion src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,10 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
.batch_iter_with_pk_bounds(
HummockReadEpoch::Committed(epoch),
&pk_prefix,
next_col_bounds,
(
next_col_bounds.0.map(|x| Row::new(vec![x])),
next_col_bounds.1.map(|x| Row::new(vec![x])),
),
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#![feature(is_sorted)]
#![recursion_limit = "256"]
#![feature(let_chains)]
#![feature(bound_map)]

mod error;
pub mod exchange_source;
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl StreamIndexScan {
node_body: Some(ProstStreamNode::Chain(ChainNode {
table_id: self.logical.table_desc().table_id.table_id,
same_worker_node: true,
disable_rearrange: true,
chain_type: ChainType::Chain as i32,
// The fields from upstream
upstream_fields: self
.logical
Expand All @@ -171,6 +171,7 @@ impl StreamIndexScan {
.map(|&i| i as _)
.collect(),
is_singleton: false,
table_desc: Some(self.logical.table_desc().to_protobuf()),
})),
stream_key,
operator_id: self.base.id.0 as u64,
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl StreamTableScan {
node_body: Some(ProstStreamNode::Chain(ChainNode {
table_id: self.logical.table_desc().table_id.table_id,
same_worker_node: false,
disable_rearrange: false,
chain_type: ChainType::Backfill as i32,
// The fields from upstream
upstream_fields: self
.logical
Expand All @@ -212,6 +212,8 @@ impl StreamTableScan {
.map(|&i| i as _)
.collect(),
is_singleton: *self.distribution() == Distribution::Single,
// The table desc used by backfill executor
table_desc: Some(self.logical.table_desc().to_protobuf()),
})),
stream_key,
operator_id: self.base.id.0 as u64,
Expand Down
40 changes: 20 additions & 20 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::row::{self, Row, Row2, RowDeserializer, RowExt};
use risingwave_common::types::{Datum, VirtualNode};
use risingwave_common::types::VirtualNode;
use risingwave_common::util::ordered::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_hummock_sdk::key::{end_bound_of_prefix, next_key, prefixed_range};
Expand Down Expand Up @@ -363,26 +363,24 @@ impl<S: StateStore> StorageTable<S> {
Ok(iter)
}

/// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds of
/// the next primary key column in `next_col_bounds`.
// TODO: support multiple datums or `Row` for `next_col_bounds`.
/// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
async fn iter_with_pk_bounds(
&self,
epoch: HummockReadEpoch,
pk_prefix: impl Row2,
next_col_bounds: impl RangeBounds<Datum>,
range_bounds: impl RangeBounds<Row>,
ordered: bool,
) -> StorageResult<StorageTableIter<S>> {
fn serialize_pk_bound(
pk_serializer: &OrderedRowSerde,
pk_prefix: impl Row2,
next_col_bound: Bound<&Datum>,
range_bound: Bound<&Row>,
is_start_bound: bool,
) -> Bound<Vec<u8>> {
match next_col_bound {
match range_bound {
Included(k) => {
let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + 1);
let key = pk_prefix.chain(row::once(k));
let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + k.0.len());
let key = pk_prefix.chain(k);
let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
if is_start_bound {
Included(serialized_key)
Expand All @@ -393,15 +391,17 @@ impl<S: StateStore> StorageTable<S> {
}
}
Excluded(k) => {
let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + 1);
let key = pk_prefix.chain(row::once(k));
let pk_prefix_serializer = pk_serializer.prefix(pk_prefix.len() + k.0.len());
let key = pk_prefix.chain(k);
let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
if is_start_bound {
// storage doesn't support excluded begin key yet, so transform it to
// included
// FIXME: What if `serialized_key` is `\xff\xff..`? Should the frontend
// reject this?
Included(next_key(&serialized_key))
// Storage doesn't support excluded begin key yet, so transform it to
// included.
// We always serialize a u8 for null of datum which is not equal to '\xff',
// so we can assert that the next_key would never be empty.
let next_serialized_key = next_key(&serialized_key);
assert!(!next_serialized_key.is_empty());
Included(next_serialized_key)
} else {
Excluded(serialized_key)
}
Expand All @@ -423,13 +423,13 @@ impl<S: StateStore> StorageTable<S> {
let start_key = serialize_pk_bound(
&self.pk_serializer,
&pk_prefix,
next_col_bounds.start_bound(),
range_bounds.start_bound(),
true,
);
let end_key = serialize_pk_bound(
&self.pk_serializer,
&pk_prefix,
next_col_bounds.end_bound(),
range_bounds.end_bound(),
false,
);

Expand Down Expand Up @@ -482,9 +482,9 @@ impl<S: StateStore> StorageTable<S> {
&self,
epoch: HummockReadEpoch,
pk_prefix: impl Row2,
next_col_bounds: impl RangeBounds<Datum>,
range_bounds: impl RangeBounds<Row>,
) -> StorageResult<StorageTableIter<S>> {
self.iter_with_pk_bounds(epoch, pk_prefix, next_col_bounds, true)
self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, true)
.await
}

Expand Down
Loading

0 comments on commit 0e02443

Please sign in to comment.