Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backfill): Persist backfill operator state #9752

Merged
merged 108 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
2caed1e
rename table to upstream_table
kwannoel May 11, 2023
77beb91
add state table
kwannoel May 11, 2023
a90452b
refactor proto
kwannoel May 11, 2023
42427e8
implement flush logic
kwannoel May 11, 2023
2040d9e
add upstream columns
kwannoel May 11, 2023
7c729ef
add end state
kwannoel May 11, 2023
a53fde2
fmt
kwannoel May 11, 2023
d9081fc
use plan base instead
kwannoel May 12, 2023
df48c2f
fix stream graph visitor
kwannoel May 12, 2023
baa5075
fix planner test
kwannoel May 12, 2023
2b5478e
add dist key
kwannoel May 12, 2023
1e5f6bc
fmt
kwannoel May 12, 2023
5c96e0e
remove dist key and pk it makes no sense
kwannoel May 12, 2023
b2a7b4b
commit state
kwannoel May 12, 2023
8819626
fix
kwannoel May 12, 2023
9aab87f
init state table
kwannoel May 12, 2023
dc2ebf8
clean
kwannoel May 12, 2023
37a4b05
fix logic if we receive some other msg instead of barrier
kwannoel May 12, 2023
82fccb8
fix initial epoch + write to state store if not backfill
kwannoel May 13, 2023
20e9a2d
fix
kwannoel May 13, 2023
6060a46
fix
kwannoel May 15, 2023
6dc6bec
refactor catalog build
kwannoel May 15, 2023
fdb6272
add vnode for UpstreamHashShard
kwannoel May 15, 2023
6f54ff1
add dist key + pk derive
kwannoel May 15, 2023
8e51af4
fix fmt + derive vnode in from_proto
kwannoel May 15, 2023
1fba701
always have vnode column
kwannoel May 16, 2023
8ca3b93
add persist state function
kwannoel May 16, 2023
c03ddf1
use persist data instead
kwannoel May 16, 2023
0501aa6
ensure other state persisted
kwannoel May 16, 2023
4b5fe9a
debug
kwannoel May 16, 2023
e2d2970
docs
kwannoel May 16, 2023
8a430ae
docs
kwannoel May 16, 2023
6e63c9c
docs
kwannoel May 16, 2023
ab9dc83
refactor derive output distribution key out
kwannoel May 16, 2023
f04c9ba
add upstream dist key
kwannoel May 16, 2023
b6e95ad
add vnode
kwannoel May 16, 2023
bd8208e
add debug asserts
kwannoel May 16, 2023
9ffcf09
refactor state persist
kwannoel May 16, 2023
c633fa5
fmt
kwannoel May 16, 2023
cd90223
fix distribution + fix state encoding
kwannoel May 16, 2023
2b7f56c
add more debug logging + fix minor + pass tpch e2e
kwannoel May 16, 2023
396354d
fmt
kwannoel May 16, 2023
973e51a
fmt
kwannoel May 16, 2023
6e877f5
use table desc
kwannoel May 17, 2023
57642a3
use table dist key
kwannoel May 17, 2023
9c14da4
rename
kwannoel May 17, 2023
b001d6c
handle no progress
kwannoel May 17, 2023
5e0ba6e
ensure vnode is correct
kwannoel May 17, 2023
c292ee9
fmt
kwannoel May 17, 2023
3982a0e
remove print
kwannoel May 17, 2023
5eb2fe3
fmt
kwannoel May 17, 2023
6e5f6f5
use table pk to index table schema
kwannoel May 17, 2023
8a9c3de
fix
kwannoel May 17, 2023
ea1936d
use table pk + schema
kwannoel May 17, 2023
48cd93d
fix import
kwannoel May 17, 2023
ba39e3f
remove print
kwannoel May 17, 2023
e0445e8
fmt
kwannoel May 17, 2023
be633fb
address review comments
kwannoel May 17, 2023
86cd066
index should now be supported
kwannoel May 18, 2023
03e9e3e
change comment
kwannoel May 18, 2023
104c68b
fix over window function
kwannoel May 19, 2023
83e7637
drop index internal tables when drop table
kwannoel May 19, 2023
99f0a9b
docs
kwannoel May 19, 2023
ad8b671
interim commit: create temp state and use to update all vnodes
kwannoel May 19, 2023
741b5de
fix
kwannoel May 19, 2023
c4673d7
fmt
kwannoel May 19, 2023
9f1c838
fmt
kwannoel May 19, 2023
57c783d
add state check
kwannoel May 19, 2023
2677478
if snapshot empty still finish
kwannoel May 19, 2023
591db48
it should be the last datum
kwannoel May 19, 2023
c6b9467
disable print
kwannoel May 19, 2023
162da29
try
kwannoel May 19, 2023
7eab50c
revert
kwannoel May 19, 2023
55513ec
add watermark
kwannoel May 19, 2023
a6c6b26
fix drop source + oob row datum
kwannoel May 19, 2023
9e760de
always update progress?
kwannoel May 20, 2023
d5ead81
assert that if we backfill, we must have updated pos
kwannoel May 20, 2023
0e0a9d2
set vnode col in pk
kwannoel May 22, 2023
c0ad50a
add values
kwannoel May 22, 2023
9ef62f6
fixed + debug logs
kwannoel May 22, 2023
c1f6751
strip println
kwannoel May 22, 2023
df7b53a
add vnode col idx
kwannoel May 22, 2023
2c01bd2
fmt
kwannoel May 22, 2023
ccaa9c0
docs
kwannoel May 22, 2023
10c9b88
add value indices
kwannoel May 22, 2023
9a685cf
refactor check all vnode finished
kwannoel May 22, 2023
3a6da81
remove actor ids
kwannoel May 22, 2023
15ed583
fix planner test
kwannoel May 22, 2023
134018f
refactor + docs
kwannoel May 22, 2023
8f54ca0
docs
kwannoel May 22, 2023
fb29320
strip watermark changes for backfill
kwannoel May 22, 2023
0b8f924
minor
kwannoel May 22, 2023
0ac48a4
fix review comment
kwannoel May 22, 2023
b8633ee
fix some docs + do further refactor
kwannoel May 22, 2023
5d44fa4
run q0 flamegraph
kwannoel May 22, 2023
61a9e8a
use the rw toolchain to build addr2line
kwannoel May 22, 2023
2cf5980
fix internal dist for backfill + add test case
kwannoel May 23, 2023
82392cd
fix planner
kwannoel May 23, 2023
d504916
fix backwards compatibility issues
kwannoel May 23, 2023
ffef0e8
nit
kwannoel May 23, 2023
0cbcb85
remove optimization
kwannoel May 23, 2023
fcc7fe6
remove print
kwannoel May 23, 2023
cb46a73
add tests for backfill
kwannoel May 23, 2023
7d5df32
add truth table + use safe vnode iter
kwannoel May 23, 2023
321f272
revert changes to cascade mview integration test
kwannoel May 23, 2023
78e0e4d
fix further incompat
kwannoel May 24, 2023
d357360
handle incompat case where no internal table ids
kwannoel May 24, 2023
2cbd14a
doc
kwannoel May 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/gen-flamegraph.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ install_all() {
# faster addr2line to speed up heap flamegraph analysis by jeprof
echo ">>> Installing addr2line"
git clone https://github.com/gimli-rs/addr2line
cp risingwave/rust-toolchain addr2line/rust-toolchain
pushd addr2line
cargo b --examples -r
mv ./target/release/examples/addr2line $(which addr2line)
Expand Down
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ message ChainNode {
// ChainType is used to decide which implementation for the ChainNode.
ChainType chain_type = 4;

/// The state table used by Backfill operator for persisting internal state
catalog.Table state_table = 5;

// The upstream materialized view info used by backfill.
plan_common.StorageTableDesc table_desc = 7;
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/hash/consistent_hash/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ impl Bitmap {
self.iter_ones().map(VirtualNode::from_index)
}

/// Enumerates the virtual nodes set to 1 in the bitmap.
pub fn iter_vnodes_scalar(&self) -> impl Iterator<Item = i16> + '_ {
self.iter_vnodes().map(|vnode| vnode.to_scalar())
}

/// Returns an iterator which yields the position ranges of continuous virtual nodes set to 1 in
/// the bitmap.
pub fn vnode_ranges(&self) -> impl Iterator<Item = RangeInclusive<VirtualNode>> + '_ {
Expand Down
9 changes: 6 additions & 3 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use parse_display::Display;
use crate::array::{Array, ArrayImpl, DataChunk};
use crate::hash::Crc32HashCode;
use crate::row::{Row, RowExt};
use crate::types::ScalarRefImpl;
use crate::types::{DataType, ScalarRefImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;

Expand Down Expand Up @@ -64,6 +64,9 @@ pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -
impl VirtualNode {
/// The maximum value of the virtual node.
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1);
/// We may use `VirtualNode` as a datum in a stream, or store it as a column.
/// Hence this reifies it as a RW datatype.
pub const RW_TYPE: DataType = DataType::Int16;
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
/// The minimum (zero) value of the virtual node.
pub const ZERO: VirtualNode = VirtualNode::from_index(0);

Expand Down Expand Up @@ -129,8 +132,8 @@ impl VirtualNode {
.collect()
}

// `compute_row` is used to calculate the `VirtualNode` for the corresponding column in a `Row`.
// Similar to `compute_chunk`, it also contains special handling for serial columns.
// `compute_row` is used to calculate the `VirtualNode` for the corresponding columns in a
// `Row`. Similar to `compute_chunk`, it also contains special handling for serial columns.
pub fn compute_row(row: impl Row, indices: &[usize]) -> VirtualNode {
let project = row.project(indices);
if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ fn visit_stream_node_tables_inner<F>(
always!(node.state_table, "Sort");
}

// Chain
NodeBody::Chain(node) => {
always!(node.state_table, "Chain")
}

// Note: add internal tables for new nodes here.
NodeBody::Materialize(node) if !internal_tables_only => {
always!(node.table, "Materialize")
Expand Down
Loading