Skip to content

Commit

Permalink
perf(stream): optimize arrangement backfill snapshot read (#14617)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Jan 18, 2024
1 parent e1df65f commit 38487da
Show file tree
Hide file tree
Showing 18 changed files with 135 additions and 70 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/backfill-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ download_and_prepare_rw "$profile" source

################ TESTS

./ci/scripts/run-backfill-tests.sh
profile=$profile ./ci/scripts/run-backfill-tests.sh
90 changes: 44 additions & 46 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# USAGE:
# ```sh
# ./ci/scripts/run-backfill-tests.sh
# profile=(ci-release|ci-dev) ./ci/scripts/run-backfill-tests.sh
# ```
# Example progress:
# dev=> select * from rw_catalog.rw_ddl_progress;
Expand All @@ -23,6 +23,11 @@ BACKGROUND_DDL_DIR=$TEST_DIR/background_ddl
COMMON_DIR=$BACKGROUND_DDL_DIR/common

CLUSTER_PROFILE='ci-1cn-1fe-kafka-with-recovery'
if [[ -n "${BUILDKITE:-}" ]]; then
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring'
else
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe'
fi
export RUST_LOG="info,risingwave_meta::barrier::progress=debug,risingwave_meta::rpc::ddl_controller=debug"

run_sql_file() {
Expand Down Expand Up @@ -200,67 +205,49 @@ test_sink_backfill_recovery() {
wait
}

test_no_shuffle_backfill_runtime() {
echo "--- e2e, test_no_shuffle_backfill_runtime"
cargo make ci-start $CLUSTER_PROFILE
test_arrangement_backfill_snapshot_and_upstream_runtime() {
echo "--- e2e, test_backfill_snapshot_and_upstream_runtime"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'

# Provide updates ...
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt' 2>&1 1>out.log &

# ... and concurrently create mv.
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_upstream.slt' 2>&1 1>out.log &
echo "[INFO] Upstream is ingesting in background"
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'

wait

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

cargo make kill
cargo make wait-processes-exit
}

test_arrangement_backfill_runtime() {
echo "--- e2e, test_arrangement_backfill_runtime"
cargo make ci-start $CLUSTER_PROFILE
test_no_shuffle_backfill_snapshot_and_upstream_runtime() {
echo "--- e2e, test_backfill_snapshot_and_upstream_runtime"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'

# Provide updates ...
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt' 2>&1 1>out.log &

# ... and concurrently create mv.
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_upstream.slt' 2>&1 1>out.log &
echo "[INFO] Upstream is ingesting in background"
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'

wait

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'

cargo make kill
cargo make wait-processes-exit
}

test_no_shuffle_backfill_snapshot_only_runtime() {
echo "--- e2e, test_no_shuffle_backfill_snapshot_only_runtime"
cargo make ci-start $CLUSTER_PROFILE
test_backfill_snapshot_runtime() {
echo "--- e2e, test_backfill_snapshot_runtime"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'

cargo make kill
cargo make wait-processes-exit
}

test_arrangement_backfill_snapshot_only_runtime() {
echo "--- e2e, test_arrangement_backfill_snapshot_only_runtime"
cargo make ci-start $CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

cargo make kill
cargo make wait-processes-exit
Expand All @@ -272,10 +259,21 @@ main() {
test_backfill_tombstone
test_replication_with_column_pruning
test_sink_backfill_recovery
test_no_shuffle_backfill_runtime
test_arrangement_backfill_runtime
test_no_shuffle_backfill_snapshot_only_runtime
test_arrangement_backfill_snapshot_only_runtime

# Only if profile is "ci-release", run it.
if [[ ${profile:-} == "ci-release" ]]; then
echo "--- Using release profile, running backfill performance tests."
# Need separate tests, we don't want to backfill concurrently.
# It's difficult to measure the time taken for each backfill if we do so.
test_no_shuffle_backfill_snapshot_and_upstream_runtime
test_arrangement_backfill_snapshot_and_upstream_runtime

# Backfill will happen in sequence here.
test_backfill_snapshot_runtime

# No upstream only tests, because if there's no snapshot,
# Backfill will complete almost immediately.
fi
}

main
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ statement ok
SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=true;

statement ok
CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
CREATE MATERIALIZED VIEW arrangement_backfill AS SELECT * FROM t;
2 changes: 1 addition & 1 deletion e2e_test/backfill/runtime/create_no_shuffle_mv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ statement ok
SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false;

statement ok
CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
CREATE MATERIALIZED VIEW no_shuffle_backfill AS SELECT * FROM t;
2 changes: 1 addition & 1 deletion e2e_test/backfill/runtime/create_table.slt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
statement ok
CREATE TABLE t (v1 int, v2 varchar, v3 bigint);
CREATE TABLE t (v1 int primary key, v2 varchar, v3 bigint);
5 changes: 0 additions & 5 deletions e2e_test/backfill/runtime/insert.slt

This file was deleted.

5 changes: 5 additions & 0 deletions e2e_test/backfill/runtime/insert_snapshot.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(4000001, 8000000);

statement ok
flush;
5 changes: 5 additions & 0 deletions e2e_test/backfill/runtime/insert_upstream.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(1, 4000000);

statement ok
flush;
4 changes: 0 additions & 4 deletions e2e_test/backfill/runtime/validate_rows.slt

This file was deleted.

4 changes: 4 additions & 0 deletions e2e_test/backfill/runtime/validate_rows_arrangement.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query I
select (select count(*) from arrangement_backfill) = (select count(*) from t);
----
t
4 changes: 4 additions & 0 deletions e2e_test/backfill/runtime/validate_rows_no_shuffle.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query I
select (select count(*) from no_shuffle_backfill) = (select count(*) from t);
----
t
22 changes: 22 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,28 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_rowsps(
"Arrangement Backfill Snapshot Read Throughput(rows)",
"Total number of rows that have been read from the backfill snapshot",
[
panels.target(
f"rate({table_metric('stream_arrangement_backfill_snapshot_read_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
"Arrangement Backfill Upstream Throughput(rows)",
"Total number of rows that have been output from the backfill upstream",
[
panels.target(
f"rate({table_metric('stream_arrangement_backfill_upstream_output_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_count(
"Barrier Number",
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,30 @@ profile:
- use: frontend
- use: compactor

ci-3cn-1fe-with-monitoring:
config-path: src/config/ci.toml
steps:
- use: minio
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
port: 5687
exporter-port: 1222
enable-tiered-cache: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-tiered-cache: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: prometheus
- use: grafana

ci-3cn-3fe:
config-path: src/config/ci.toml
steps:
Expand Down
26 changes: 19 additions & 7 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::pin::pin;
use std::sync::Arc;

use either::Either;
use futures::stream::select_with_strategy;
use futures::stream::{select_all, select_with_strategy};
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
Expand All @@ -27,6 +27,7 @@ use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
Expand Down Expand Up @@ -533,6 +534,7 @@ where
backfill_state: BackfillState,
builders: &'a mut [DataChunkBuilder],
) {
let mut iterators = vec![];
for (vnode, builder) in upstream_table
.vnodes()
.iter_vnodes()
Expand All @@ -558,19 +560,29 @@ where
"iter_with_vnode_and_output_indices"
);
let vnode_row_iter = upstream_table
.iter_with_vnode_and_output_indices(vnode, &range_bounds, Default::default())
.iter_with_vnode_and_output_indices(
vnode,
&range_bounds,
PrefetchOptions::prefetch_for_small_range_scan(),
)
.await?;

let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter));

let vnode_chunk_iter =
iter_chunks(vnode_row_iter, builder).map_ok(move |chunk| (vnode, chunk));

// This means we iterate serially rather than in parallel across vnodes.
#[for_await]
for chunk in vnode_chunk_iter {
yield Some(chunk?);
}
let vnode_chunk_iter = Box::pin(vnode_chunk_iter);

iterators.push(vnode_chunk_iter);
}

let vnode_chunk_iter = select_all(iterators);

// This means we iterate serially rather than in parallel across vnodes.
#[for_await]
for chunk in vnode_chunk_iter {
yield Some(chunk?);
}
yield None;
return Ok(());
Expand Down

0 comments on commit 38487da

Please sign in to comment.