Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(stream): optimize arrangement backfill snapshot read #14617

Merged
merged 15 commits into from
Jan 18, 2024
48 changes: 26 additions & 22 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ BACKGROUND_DDL_DIR=$TEST_DIR/background_ddl
COMMON_DIR=$BACKGROUND_DDL_DIR/common

CLUSTER_PROFILE='ci-1cn-1fe-kafka-with-recovery'
# If running in buildkite disable monitoring.
if [[ -n "${BUILDKITE:-}" ]]; then
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring'
else
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe'
fi
export RUST_LOG="info,risingwave_meta::barrier::progress=debug,risingwave_meta::rpc::ddl_controller=debug"

run_sql_file() {
Expand Down Expand Up @@ -202,16 +208,16 @@ test_sink_backfill_recovery() {

test_no_shuffle_backfill_runtime() {
echo "--- e2e, test_no_shuffle_backfill_runtime"
cargo make ci-start $CLUSTER_PROFILE
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'

# Provide updates ...
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt' 2>&1 1>out.log &
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'

# ... and concurrently create mv.
# Concurrently create mv ...
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'

# ... and provide updates
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_upstream.slt' 2>&1 1>out.log &

wait

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'
Expand All @@ -222,16 +228,16 @@ test_no_shuffle_backfill_runtime() {

test_arrangement_backfill_runtime() {
echo "--- e2e, test_arrangement_backfill_runtime"
cargo make ci-start $CLUSTER_PROFILE
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'

# Provide updates ...
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt' 2>&1 1>out.log &

# ... and concurrently create mv.
# Concurrently create mv...
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'

# ... and provide updates
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_upstream.slt' 2>&1 1>out.log &

wait

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'
Expand All @@ -242,10 +248,9 @@ test_arrangement_backfill_runtime() {

test_no_shuffle_backfill_snapshot_only_runtime() {
echo "--- e2e, test_no_shuffle_backfill_snapshot_only_runtime"
cargo make ci-start $CLUSTER_PROFILE
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'

Expand All @@ -255,10 +260,9 @@ test_no_shuffle_backfill_snapshot_only_runtime() {

test_arrangement_backfill_snapshot_only_runtime() {
echo "--- e2e, test_arrangement_backfill_snapshot_only_runtime"
cargo make ci-start $CLUSTER_PROFILE
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'

Expand All @@ -268,10 +272,10 @@ test_arrangement_backfill_snapshot_only_runtime() {

main() {
set -euo pipefail
test_snapshot_and_upstream_read
test_backfill_tombstone
test_replication_with_column_pruning
test_sink_backfill_recovery
# test_snapshot_and_upstream_read
# test_backfill_tombstone
# test_replication_with_column_pruning
# test_sink_backfill_recovery
test_no_shuffle_backfill_runtime
test_arrangement_backfill_runtime
test_no_shuffle_backfill_snapshot_only_runtime
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/backfill/runtime/create_table.slt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
statement ok
CREATE TABLE t (v1 int, v2 varchar, v3 bigint);
CREATE TABLE t (v1 int primary key, v2 varchar, v3 bigint);
5 changes: 0 additions & 5 deletions e2e_test/backfill/runtime/insert.slt

This file was deleted.

5 changes: 5 additions & 0 deletions e2e_test/backfill/runtime/insert_snapshot.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(500001, 2000000);

statement ok
flush;
5 changes: 5 additions & 0 deletions e2e_test/backfill/runtime/insert_upstream.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(1, 500000);

statement ok
flush;
22 changes: 22 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,28 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_rowsps(
"Arrangement Backfill Snapshot Read Throughput(rows)",
"Total number of rows that have been read from the backfill snapshot",
[
panels.target(
f"rate({table_metric('stream_arrangement_backfill_snapshot_read_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
"Arrangement Backfill Upstream Throughput(rows)",
"Total number of rows that have been output from the backfill upstream",
[
panels.target(
f"rate({table_metric('stream_arrangement_backfill_upstream_output_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_count(
"Barrier Number",
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
Expand Down
24 changes: 24 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,30 @@ profile:
- use: frontend
- use: compactor

ci-3cn-1fe-with-monitoring:
config-path: src/config/ci.toml
steps:
- use: minio
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
port: 5687
exporter-port: 1222
enable-tiered-cache: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-tiered-cache: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: prometheus
- use: grafana

ci-3cn-3fe:
config-path: src/config/ci.toml
steps:
Expand Down
26 changes: 19 additions & 7 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::pin::pin;
use std::sync::Arc;

use either::Either;
use futures::stream::select_with_strategy;
use futures::stream::{select_all, select_with_strategy};
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
Expand All @@ -27,6 +27,7 @@ use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
Expand Down Expand Up @@ -533,6 +534,7 @@ where
backfill_state: BackfillState,
builders: &'a mut [DataChunkBuilder],
) {
let mut iterators = vec![];
for (vnode, builder) in upstream_table
.vnodes()
.iter_vnodes()
Expand All @@ -558,19 +560,29 @@ where
"iter_with_vnode_and_output_indices"
);
let vnode_row_iter = upstream_table
.iter_with_vnode_and_output_indices(vnode, &range_bounds, Default::default())
.iter_with_vnode_and_output_indices(
vnode,
&range_bounds,
PrefetchOptions::prefetch_for_small_range_scan(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization 1: Use same prefetch config as no_shuffle_backfill.

)
.await?;

let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter));

let vnode_chunk_iter =
iter_chunks(vnode_row_iter, builder).map_ok(move |chunk| (vnode, chunk));

// This means we iterate serially rather than in parallel across vnodes.
#[for_await]
for chunk in vnode_chunk_iter {
yield Some(chunk?);
}
let vnode_chunk_iter = Box::pin(vnode_chunk_iter);

iterators.push(vnode_chunk_iter);
}

let vnode_chunk_iter = select_all(iterators);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization 2: Make sure iter is done in parallel.


// This means we iterate serially rather than in parallel across vnodes.
#[for_await]
for chunk in vnode_chunk_iter {
yield Some(chunk?);
}
yield None;
return Ok(());
Expand Down
Loading