Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(stream): optimize arrangement backfill snapshot read #14617

Merged
merged 15 commits into from
Jan 18, 2024
2 changes: 1 addition & 1 deletion ci/scripts/backfill-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ download_and_prepare_rw "$profile" source

################ TESTS

./ci/scripts/run-backfill-tests.sh
profile=$profile ./ci/scripts/run-backfill-tests.sh
90 changes: 44 additions & 46 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# USAGE:
# ```sh
# ./ci/scripts/run-backfill-tests.sh
# profile=(ci-release|ci-dev) ./ci/scripts/run-backfill-tests.sh
# ```
# Example progress:
# dev=> select * from rw_catalog.rw_ddl_progress;
Expand All @@ -23,6 +23,11 @@ BACKGROUND_DDL_DIR=$TEST_DIR/background_ddl
COMMON_DIR=$BACKGROUND_DDL_DIR/common

CLUSTER_PROFILE='ci-1cn-1fe-kafka-with-recovery'
if [[ -n "${BUILDKITE:-}" ]]; then
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring'
else
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe'
fi
export RUST_LOG="info,risingwave_meta::barrier::progress=debug,risingwave_meta::rpc::ddl_controller=debug"

run_sql_file() {
Expand Down Expand Up @@ -200,67 +205,49 @@ test_sink_backfill_recovery() {
wait
}

test_no_shuffle_backfill_runtime() {
echo "--- e2e, test_no_shuffle_backfill_runtime"
cargo make ci-start $CLUSTER_PROFILE
test_arrangement_backfill_snapshot_and_upstream_runtime() {
echo "--- e2e, test_backfill_snapshot_and_upstream_runtime"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'

# Provide updates ...
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt' 2>&1 1>out.log &

# ... and concurrently create mv.
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_upstream.slt' 2>&1 1>out.log &
echo "[INFO] Upstream is ingesting in background"
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'

wait

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

cargo make kill
cargo make wait-processes-exit
}

test_arrangement_backfill_runtime() {
echo "--- e2e, test_arrangement_backfill_runtime"
cargo make ci-start $CLUSTER_PROFILE
test_no_shuffle_backfill_snapshot_and_upstream_runtime() {
echo "--- e2e, test_backfill_snapshot_and_upstream_runtime"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'

# Provide updates ...
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt' 2>&1 1>out.log &

# ... and concurrently create mv.
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_upstream.slt' 2>&1 1>out.log &
echo "[INFO] Upstream is ingesting in background"
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'

wait

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'

cargo make kill
cargo make wait-processes-exit
}

test_no_shuffle_backfill_snapshot_only_runtime() {
echo "--- e2e, test_no_shuffle_backfill_snapshot_only_runtime"
cargo make ci-start $CLUSTER_PROFILE
test_backfill_snapshot_runtime() {
echo "--- e2e, test_backfill_snapshot_runtime"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'

cargo make kill
cargo make wait-processes-exit
}

test_arrangement_backfill_snapshot_only_runtime() {
echo "--- e2e, test_arrangement_backfill_snapshot_only_runtime"
cargo make ci-start $CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

cargo make kill
cargo make wait-processes-exit
Expand All @@ -272,10 +259,21 @@ main() {
test_backfill_tombstone
test_replication_with_column_pruning
test_sink_backfill_recovery
test_no_shuffle_backfill_runtime
test_arrangement_backfill_runtime
test_no_shuffle_backfill_snapshot_only_runtime
test_arrangement_backfill_snapshot_only_runtime

# Only if profile is "ci-release", run it.
if [[ ${profile:-} == "ci-release" ]]; then
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only run if release mode.

echo "--- Using release profile, running backfill performance tests."
# Need separate tests, we don't want to backfill concurrently.
# It's difficult to measure the time taken for each backfill if we do so.
test_no_shuffle_backfill_snapshot_and_upstream_runtime
test_arrangement_backfill_snapshot_and_upstream_runtime

# Backfill will happen in sequence here.
test_backfill_snapshot_runtime

# No upstream only tests, because if there's no snapshot,
# Backfill will complete almost immediately.
fi
}

main
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ statement ok
SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=true;

statement ok
CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
CREATE MATERIALIZED VIEW arrangement_backfill AS SELECT * FROM t;
2 changes: 1 addition & 1 deletion e2e_test/backfill/runtime/create_no_shuffle_mv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ statement ok
SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false;

statement ok
CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
CREATE MATERIALIZED VIEW no_shuffle_backfill AS SELECT * FROM t;
2 changes: 1 addition & 1 deletion e2e_test/backfill/runtime/create_table.slt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
statement ok
CREATE TABLE t (v1 int, v2 varchar, v3 bigint);
CREATE TABLE t (v1 int primary key, v2 varchar, v3 bigint);
5 changes: 0 additions & 5 deletions e2e_test/backfill/runtime/insert.slt

This file was deleted.

5 changes: 5 additions & 0 deletions e2e_test/backfill/runtime/insert_snapshot.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(4000001, 8000000);

statement ok
flush;
5 changes: 5 additions & 0 deletions e2e_test/backfill/runtime/insert_upstream.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
INSERT INTO t select generate_series, 'jakbj2khbe2', 22222222222 from generate_series(1, 4000000);

statement ok
flush;
4 changes: 0 additions & 4 deletions e2e_test/backfill/runtime/validate_rows.slt

This file was deleted.

4 changes: 4 additions & 0 deletions e2e_test/backfill/runtime/validate_rows_arrangement.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query I
select (select count(*) from arrangement_backfill) = (select count(*) from t);
----
t
4 changes: 4 additions & 0 deletions e2e_test/backfill/runtime/validate_rows_no_shuffle.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query I
select (select count(*) from no_shuffle_backfill) = (select count(*) from t);
----
t
22 changes: 22 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,28 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_rowsps(
"Arrangement Backfill Snapshot Read Throughput(rows)",
"Total number of rows that have been read from the backfill snapshot",
[
panels.target(
f"rate({table_metric('stream_arrangement_backfill_snapshot_read_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
"Arrangement Backfill Upstream Throughput(rows)",
"Total number of rows that have been output from the backfill upstream",
[
panels.target(
f"rate({table_metric('stream_arrangement_backfill_upstream_output_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_count(
"Barrier Number",
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
Expand Down
24 changes: 24 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,30 @@ profile:
- use: frontend
- use: compactor

ci-3cn-1fe-with-monitoring:
config-path: src/config/ci.toml
steps:
- use: minio
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
port: 5687
exporter-port: 1222
enable-tiered-cache: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-tiered-cache: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: prometheus
- use: grafana

ci-3cn-3fe:
config-path: src/config/ci.toml
steps:
Expand Down
26 changes: 19 additions & 7 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::pin::pin;
use std::sync::Arc;

use either::Either;
use futures::stream::select_with_strategy;
use futures::stream::{select_all, select_with_strategy};
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
Expand All @@ -27,6 +27,7 @@ use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
Expand Down Expand Up @@ -533,6 +534,7 @@ where
backfill_state: BackfillState,
builders: &'a mut [DataChunkBuilder],
) {
let mut iterators = vec![];
for (vnode, builder) in upstream_table
.vnodes()
.iter_vnodes()
Expand All @@ -558,19 +560,29 @@ where
"iter_with_vnode_and_output_indices"
);
let vnode_row_iter = upstream_table
.iter_with_vnode_and_output_indices(vnode, &range_bounds, Default::default())
.iter_with_vnode_and_output_indices(
vnode,
&range_bounds,
PrefetchOptions::prefetch_for_small_range_scan(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization 1: Use same prefetch config as no_shuffle_backfill.

)
.await?;

let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter));

let vnode_chunk_iter =
iter_chunks(vnode_row_iter, builder).map_ok(move |chunk| (vnode, chunk));

// This means we iterate serially rather than in parallel across vnodes.
#[for_await]
for chunk in vnode_chunk_iter {
yield Some(chunk?);
}
let vnode_chunk_iter = Box::pin(vnode_chunk_iter);

iterators.push(vnode_chunk_iter);
}

let vnode_chunk_iter = select_all(iterators);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization 2: Make sure iter is done in parallel.


// This means we iterate serially rather than in parallel across vnodes.
#[for_await]
for chunk in vnode_chunk_iter {
yield Some(chunk?);
}
yield None;
return Ok(());
Expand Down
Loading