Skip to content

Commit

Permalink
feat(backfill): Persist backfill operator state (#9752)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored May 24, 2023
1 parent a0acfdf commit 5e80fd7
Show file tree
Hide file tree
Showing 29 changed files with 2,027 additions and 490 deletions.
1 change: 1 addition & 0 deletions ci/scripts/gen-flamegraph.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ install_all() {
# faster addr2line to speed up heap flamegraph analysis by jeprof
echo ">>> Installing addr2line"
git clone https://github.com/gimli-rs/addr2line
cp risingwave/rust-toolchain addr2line/rust-toolchain
pushd addr2line
cargo b --examples -r
mv ./target/release/examples/addr2line $(which addr2line)
Expand Down
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ message ChainNode {
// ChainType is used to decide which implementation for the ChainNode.
ChainType chain_type = 4;

/// The state table used by Backfill operator for persisting internal state
catalog.Table state_table = 5;

// The upstream materialized view info used by backfill.
plan_common.StorageTableDesc table_desc = 7;
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/hash/consistent_hash/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ impl Bitmap {
self.iter_ones().map(VirtualNode::from_index)
}

/// Enumerates the virtual nodes set to 1 in the bitmap.
pub fn iter_vnodes_scalar(&self) -> impl Iterator<Item = i16> + '_ {
self.iter_vnodes().map(|vnode| vnode.to_scalar())
}

/// Returns an iterator which yields the position ranges of continuous virtual nodes set to 1 in
/// the bitmap.
pub fn vnode_ranges(&self) -> impl Iterator<Item = RangeInclusive<VirtualNode>> + '_ {
Expand Down
9 changes: 6 additions & 3 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use parse_display::Display;
use crate::array::{Array, ArrayImpl, DataChunk};
use crate::hash::Crc32HashCode;
use crate::row::{Row, RowExt};
use crate::types::ScalarRefImpl;
use crate::types::{DataType, ScalarRefImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;

Expand Down Expand Up @@ -64,6 +64,9 @@ pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -
impl VirtualNode {
/// The maximum value of the virtual node.
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1);
/// We may use `VirtualNode` as a datum in a stream, or store it as a column.
/// Hence this reifies it as a RW datatype.
pub const RW_TYPE: DataType = DataType::Int16;
/// The minimum (zero) value of the virtual node.
pub const ZERO: VirtualNode = VirtualNode::from_index(0);

Expand Down Expand Up @@ -129,8 +132,8 @@ impl VirtualNode {
.collect()
}

// `compute_row` is used to calculate the `VirtualNode` for the corresponding column in a `Row`.
// Similar to `compute_chunk`, it also contains special handling for serial columns.
// `compute_row` is used to calculate the `VirtualNode` for the corresponding columns in a
// `Row`. Similar to `compute_chunk`, it also contains special handling for serial columns.
pub fn compute_row(row: impl Row, indices: &[usize]) -> VirtualNode {
let project = row.project(indices);
if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ fn visit_stream_node_tables_inner<F>(
always!(node.state_table, "Sort");
}

// Chain
NodeBody::Chain(node) => {
always!(node.state_table, "Chain")
}

// Note: add internal tables for new nodes here.
NodeBody::Materialize(node) if !internal_tables_only => {
always!(node.table, "Materialize")
Expand Down
Loading

0 comments on commit 5e80fd7

Please sign in to comment.