Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): arrangement backfill background ddl #14563

Merged
merged 24 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ci/plugins/upload-failure-logs-zipped/hooks/post-command
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env bash

set -euo pipefail

if [ $BUILDKITE_COMMAND_EXIT_STATUS -ne 0 ]; then
mv .risingwave/log risedev-logs
zip -q -r risedev-logs.zip risedev-logs/
buildkite-agent artifact upload risedev-logs.zip
REGRESS_TEST_DIR="$PWD/src/tests/regress/output/results/"
if [ -d "$REGRESS_TEST_DIR" ]; then
mkdir regress-test-logs && cp src/tests/regress/output/results/* regress-test-logs/
zip -q -r regress-test.zip regress-test-logs/
buildkite-agent artifact upload regress-test-logs.zip
fi
if [ -e "$PWD/connector-node.log" ]; then
buildkite-agent artifact upload "$PWD/connector-node.log"
fi
fi
13 changes: 13 additions & 0 deletions ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,16 @@ function download_and_prepare_rw() {
cargo make pre-start-dev
cargo make --allow-private link-all-in-one-binaries
}

function filter_stack_trace() {
# Only keep first 3 lines of backtrace: 0-2.
echo "filtering stack trace for $1"
touch tmp
cat "$1" \
| sed -E '/ [1-9][0-9]+:/d' \
| sed -E '/ [3-9]+:/d' \
| sed -E '/ at .rustc/d' \
| sed -E '/ at ...cargo/d' > tmp
cp tmp "$1"
rm tmp
}
59 changes: 54 additions & 5 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,70 @@ risingwave_meta::manager::catalog=debug,\
risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
risingwave_simulation=debug"

# Extra logs you can enable if the existing trace does not give enough info.
#risingwave_stream::executor::backfill=trace,
#risingwave_meta::barrier::progress=debug,

# ========= Some tips for debugging recovery tests =========
# 1. If materialized view failed to create after multiple retries
# - Check logs to see where the materialized view creation was stuck.
# 1. Is it stuck at waiting for backfill executor response?
# In that case perhaps some backfill logic is flawed, add more trace in backfill to debug.
# 2. Is it stuck at waiting for backfill executor to finish?
# In that case perhaps some part of the backfill loop is slow / stuck.
# 3. Is it stuck at waiting for some executor to report progress?
# In that case perhaps the tracking of backfill's progress in meta is flawed.

export LOGDIR=.risingwave/log

mkdir -p $LOGDIR

filter_stack_trace_for_all_logs() {
# Defined in `common.sh`
for log in "${LOGDIR}"/*.log; do
filter_stack_trace $log
done
}

trap filter_stack_trace_for_all_logs ERR

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/background_ddl/sim/basic.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/background_ddl/sim/basic.slt \
2> $LOGDIR/recovery-background-ddl-{}.log && rm $LOGDIR/recovery-background-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, ddl"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, streaming"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, batch"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, kafka source,sink"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--kafka-datadir=./scripts/source/test_data \
${USE_ARRANGEMENT_BACKFILL:-} \
./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log'
25 changes: 23 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,27 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
timeout_in_minutes: 60
retry: *auto-retry

# Ddl statements will randomly run with background_ddl.
- label: "background_ddl, arrangement_backfill recovery test (deterministic simulation)"
key: "background-ddl-arrangement-backfill-recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=--use-arrangement-backfill timeout 55m ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
|| build.env("CI_STEPS") =~ /(^|,)recovery-tests?-deterministic-simulation(,|$$)/
depends_on: "build-simulation"
plugins:
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
timeout_in_minutes: 60
retry: *auto-retry

Expand All @@ -333,7 +353,8 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
timeout_in_minutes: 60
retry: *auto-retry

Expand Down
3 changes: 2 additions & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,11 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
# - test-collector#v1.0.0:
# files: "*-junit.xml"
# format: "junit"
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
cancel_on_build_failing: true
retry: *auto-retry
Expand Down
69 changes: 37 additions & 32 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ where

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
tracing::debug!("Arrangement Backfill Executor started");
// The primary key columns, in the output columns of the upstream_table scan.
// Table scan scans a subset of the columns of the upstream table.
let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
Expand Down Expand Up @@ -450,54 +451,58 @@ where
"backfill_finished_wait_for_barrier"
);
// If not finished then we need to update state, otherwise no need.
if let Message::Barrier(barrier) = &msg
&& !is_completely_finished
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we wait for this flag to pass. However is_completely_finished could be true on recovery, if backfill is completed already.

In that case, we should just yield the barrier and stop waiting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, after recovery, the backfill state may be inconsistent with the CreateMaterializedTracker state, so we need to notify the progress to finish every time after recovery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will always notify see https://github.com/risingwavelabs/risingwave/pull/14563/files#r1464421419. It is outside of the if-else block.

{
// If snapshot was empty, we do not need to backfill,
// but we still need to persist the finished state.
// We currently persist it on the second barrier here rather than first.
// This is because we can't update state table in first epoch,
// since it expects to have been initialized in previous epoch
// (there's no epoch before the first epoch).
for vnode in upstream_table.vnodes().iter_vnodes() {
backfill_state.finish_progress(vnode, upstream_table.pk_indices().len());
}
if let Message::Barrier(barrier) = &msg {
if is_completely_finished {
// If already finished, no need to persist any state.
} else {
// If snapshot was empty, we do not need to backfill,
// but we still need to persist the finished state.
// We currently persist it on the second barrier here rather than first.
// This is because we can't update state table in first epoch,
// since it expects to have been initialized in previous epoch
// (there's no epoch before the first epoch).
for vnode in upstream_table.vnodes().iter_vnodes() {
backfill_state
.finish_progress(vnode, upstream_table.pk_indices().len());
}

persist_state_per_vnode(
barrier.epoch,
&mut self.state_table,
&mut backfill_state,
#[cfg(debug_assertions)]
state_len,
vnodes.iter_vnodes(),
)
.await?;
persist_state_per_vnode(
barrier.epoch,
&mut self.state_table,
&mut backfill_state,
#[cfg(debug_assertions)]
state_len,
vnodes.iter_vnodes(),
)
.await?;
}

self.progress
.finish(barrier.epoch.curr, total_snapshot_processed_rows);
tracing::trace!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always notifies.

epoch = ?barrier.epoch,
"Updated CreateMaterializedTracker"
);
yield msg;
break;
} else {
// Allow other messages to pass through.
yield msg;
}
// Allow other messages to pass through.
// We won't yield twice here, since if there's a barrier,
// we will always break out of the loop.
yield msg;
}
}

tracing::trace!(
"Arrangement Backfill has already finished and forward messages directly to the downstream"
);

// After progress finished + state persisted,
// we can forward messages directly to the downstream,
// as backfill is finished.
#[for_await]
for msg in upstream {
if let Some(msg) = mapping_message(msg?, &self.output_indices) {
tracing::trace!(
actor = self.actor_id,
message = ?msg,
"backfill_finished_after_barrier"
);
if let Message::Barrier(barrier) = &msg {
self.state_table.commit_no_data_expected(barrier.epoch);
}
yield msg;
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ where
yield msg;
break;
}
// Allow other messages to pass through.
// We won't yield twice here, since if there's a barrier,
// we will always break out of the loop.
yield msg;
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ pub struct Args {
/// The probability of background ddl for a ddl query.
#[clap(long, default_value = "0.0")]
background_ddl_rate: f64,

/// Use arrangement backfill by default
#[clap(long, default_value = "false")]
use_arrangement_backfill: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -173,6 +177,11 @@ async fn main() {
meta_nodes: args.meta_nodes,
etcd_timeout_rate: args.etcd_timeout_rate,
etcd_data_path: args.etcd_data,
per_session_queries: if args.use_arrangement_backfill {
vec!["SET enable_arrangement_backfill = true;".to_string()].into()
} else {
vec![].into()
},
..Default::default()
};
let kill_opts = KillOpts {
Expand Down
Loading