Skip to content

Commit

Permalink
fix(stream): arrangement backfill background ddl (#14563)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Jan 24, 2024
1 parent 76656e0 commit b27d252
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 40 deletions.
18 changes: 18 additions & 0 deletions ci/plugins/upload-failure-logs-zipped/hooks/post-command
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env bash

set -euo pipefail

if [ $BUILDKITE_COMMAND_EXIT_STATUS -ne 0 ]; then
mv .risingwave/log risedev-logs
zip -q -r risedev-logs.zip risedev-logs/
buildkite-agent artifact upload risedev-logs.zip
REGRESS_TEST_DIR="$PWD/src/tests/regress/output/results/"
if [ -d "$REGRESS_TEST_DIR" ]; then
mkdir regress-test-logs && cp src/tests/regress/output/results/* regress-test-logs/
zip -q -r regress-test.zip regress-test-logs/
buildkite-agent artifact upload regress-test-logs.zip
fi
if [ -e "$PWD/connector-node.log" ]; then
buildkite-agent artifact upload "$PWD/connector-node.log"
fi
fi
13 changes: 13 additions & 0 deletions ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,16 @@ function download_and_prepare_rw() {
cargo make pre-start-dev
cargo make --allow-private link-all-in-one-binaries
}

function filter_stack_trace() {
# Only keep first 3 lines of backtrace: 0-2.
echo "filtering stack trace for $1"
touch tmp
cat "$1" \
| sed -E '/ [1-9][0-9]+:/d' \
| sed -E '/ [3-9]+:/d' \
| sed -E '/ at .rustc/d' \
| sed -E '/ at ...cargo/d' > tmp
cp tmp "$1"
rm tmp
}
59 changes: 54 additions & 5 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,70 @@ risingwave_meta::manager::catalog=debug,\
risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
risingwave_simulation=debug"

# Extra logs you can enable if the existing trace does not give enough info.
#risingwave_stream::executor::backfill=trace,
#risingwave_meta::barrier::progress=debug,

# ========= Some tips for debugging recovery tests =========
# 1. If materialized view failed to create after multiple retries
# - Check logs to see where the materialized view creation was stuck.
# 1. Is it stuck at waiting for backfill executor response?
# In that case perhaps some backfill logic is flawed, add more trace in backfill to debug.
# 2. Is it stuck at waiting for backfill executor to finish?
# In that case perhaps some part of the backfill loop is slow / stuck.
# 3. Is it stuck at waiting for some executor to report progress?
# In that case perhaps the tracking of backfill's progress in meta is flawed.

export LOGDIR=.risingwave/log

mkdir -p $LOGDIR

filter_stack_trace_for_all_logs() {
# Defined in `common.sh`
for log in "${LOGDIR}"/*.log; do
filter_stack_trace $log
done
}

trap filter_stack_trace_for_all_logs ERR

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/background_ddl/sim/basic.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/background_ddl/sim/basic.slt \
2> $LOGDIR/recovery-background-ddl-{}.log && rm $LOGDIR/recovery-background-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, ddl"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, streaming"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, batch"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, kafka source,sink"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--kafka-datadir=./scripts/source/test_data \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log'
25 changes: 23 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,27 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
timeout_in_minutes: 60
retry: *auto-retry

# Ddl statements will randomly run with background_ddl.
- label: "background_ddl, arrangement_backfill recovery test (deterministic simulation)"
key: "background-ddl-arrangement-backfill-recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=--use-arrangement-backfill timeout 55m ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
|| build.env("CI_STEPS") =~ /(^|,)recovery-tests?-deterministic-simulation(,|$$)/
depends_on: "build-simulation"
plugins:
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
timeout_in_minutes: 60
retry: *auto-retry

Expand All @@ -333,7 +353,8 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
timeout_in_minutes: 60
retry: *auto-retry

Expand Down
3 changes: 2 additions & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,11 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
# - test-collector#v1.0.0:
# files: "*-junit.xml"
# format: "junit"
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
cancel_on_build_failing: true
retry: *auto-retry
Expand Down
69 changes: 37 additions & 32 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ where

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
tracing::debug!("Arrangement Backfill Executor started");
// The primary key columns, in the output columns of the upstream_table scan.
// Table scan scans a subset of the columns of the upstream table.
let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
Expand Down Expand Up @@ -451,54 +452,58 @@ where
"backfill_finished_wait_for_barrier"
);
// If not finished then we need to update state, otherwise no need.
if let Message::Barrier(barrier) = &msg
&& !is_completely_finished
{
// If snapshot was empty, we do not need to backfill,
// but we still need to persist the finished state.
// We currently persist it on the second barrier here rather than first.
// This is because we can't update state table in first epoch,
// since it expects to have been initialized in previous epoch
// (there's no epoch before the first epoch).
for vnode in upstream_table.vnodes().iter_vnodes() {
backfill_state.finish_progress(vnode, upstream_table.pk_indices().len());
}
if let Message::Barrier(barrier) = &msg {
if is_completely_finished {
// If already finished, no need to persist any state.
} else {
// If snapshot was empty, we do not need to backfill,
// but we still need to persist the finished state.
// We currently persist it on the second barrier here rather than first.
// This is because we can't update state table in first epoch,
// since it expects to have been initialized in previous epoch
// (there's no epoch before the first epoch).
for vnode in upstream_table.vnodes().iter_vnodes() {
backfill_state
.finish_progress(vnode, upstream_table.pk_indices().len());
}

persist_state_per_vnode(
barrier.epoch,
&mut self.state_table,
&mut backfill_state,
#[cfg(debug_assertions)]
state_len,
vnodes.iter_vnodes(),
)
.await?;
persist_state_per_vnode(
barrier.epoch,
&mut self.state_table,
&mut backfill_state,
#[cfg(debug_assertions)]
state_len,
vnodes.iter_vnodes(),
)
.await?;
}

self.progress
.finish(barrier.epoch.curr, total_snapshot_processed_rows);
tracing::trace!(
epoch = ?barrier.epoch,
"Updated CreateMaterializedTracker"
);
yield msg;
break;
} else {
// Allow other messages to pass through.
yield msg;
}
// Allow other messages to pass through.
// We won't yield twice here, since if there's a barrier,
// we will always break out of the loop.
yield msg;
}
}

tracing::trace!(
"Arrangement Backfill has already finished and forward messages directly to the downstream"
);

// After progress finished + state persisted,
// we can forward messages directly to the downstream,
// as backfill is finished.
#[for_await]
for msg in upstream {
if let Some(msg) = mapping_message(msg?, &self.output_indices) {
tracing::trace!(
actor = self.actor_id,
message = ?msg,
"backfill_finished_after_barrier"
);
if let Message::Barrier(barrier) = &msg {
self.state_table.commit_no_data_expected(barrier.epoch);
}
yield msg;
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ where
yield msg;
break;
}
// Allow other messages to pass through.
// We won't yield twice here, since if there's a barrier,
// we will always break out of the loop.
yield msg;
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ pub struct Args {
/// The probability of background ddl for a ddl query.
#[clap(long, default_value = "0.0")]
background_ddl_rate: f64,

/// Use arrangement backfill by default
#[clap(long, default_value = "false")]
use_arrangement_backfill: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -173,6 +177,11 @@ async fn main() {
meta_nodes: args.meta_nodes,
etcd_timeout_rate: args.etcd_timeout_rate,
etcd_data_path: args.etcd_data,
per_session_queries: if args.use_arrangement_backfill {
vec!["SET enable_arrangement_backfill = true;".to_string()].into()
} else {
vec![].into()
},
..Default::default()
};
let kill_opts = KillOpts {
Expand Down

0 comments on commit b27d252

Please sign in to comment.