Skip to content

Commit

Permalink
test: support background_ddl_rate in recovery test (#13552)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Nov 28, 2023
1 parent 9c752e0 commit 679413a
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 61 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ export LOGDIR=.risingwave/log

mkdir -p $LOGDIR

# FIXME(kwannoel): Why is this failing?
# echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl"
# seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/background_ddl/sim/basic.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'
echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/background_ddl/sim/basic.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, ddl"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, streaming"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, batch"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, kafka source,sink"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log'
19 changes: 18 additions & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,24 @@ steps:
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "TEST_NUM=12 KILL_RATE=1.0 timeout 55m ci/scripts/deterministic-recovery-test.sh"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 55m ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
|| build.env("CI_STEPS") =~ /(^|,)recovery-tests?-deterministic-simulation(,|$$)/
depends_on: "build-simulation"
plugins:
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 60
retry: *auto-retry

# Ddl statements will randomly run with background_ddl.
- label: "background_ddl recovery test (deterministic simulation)"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 timeout 55m ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ steps:
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "TEST_NUM=8 KILL_RATE=0.5 ci/scripts/deterministic-recovery-test.sh"
command: "TEST_NUM=8 KILL_RATE=0.5 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
Expand Down
12 changes: 6 additions & 6 deletions e2e_test/background_ddl/sim/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ INSERT INTO t select * from generate_series(1, 200000);
statement ok
FLUSH;

statement ok
SET STREAMING_RATE_LIMIT=4000;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;

Expand All @@ -26,23 +29,20 @@ CREATE MATERIALIZED VIEW m3 as SELECT * FROM t;
statement error
CREATE MATERIALIZED VIEW m3 as SELECT * FROM t;

# Wait for background ddl to finish
sleep 30s

query I
select count(*) from m1;
----
10000000
200000

query I
select count(*) from m2;
----
10000000
200000

query I
select count(*) from m3;
----
10000000
200000

statement ok
DROP MATERIALIZED VIEW m1;
Expand Down
21 changes: 18 additions & 3 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_pb::catalog::{CreateType, PbTable};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::StreamScanType;
use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};

use super::privilege::resolve_relation_privileges;
Expand Down Expand Up @@ -164,7 +165,7 @@ pub async fn handle_create_mv(
return Ok(resp);
}

let (mut table, graph) = {
let (mut table, graph, can_run_in_background) = {
let context = OptimizerContext::from_handler_args(handler_args);
if !context.with_options().is_empty() {
// get other useful fields by `remove`, the logic here is to reject unknown options.
Expand All @@ -183,6 +184,20 @@ It only indicates the physical clustering of the data, which may improve the per

let (plan, table) =
gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?;
// All leaf nodes must be stream table scan, no other scan operators support recovery.
fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool {
if plan.inputs().is_empty() {
if let Some(scan) = plan.as_stream_table_scan() {
scan.stream_scan_type() == StreamScanType::Backfill
} else {
false
}
} else {
assert!(!plan.inputs().is_empty());
plan.inputs().iter().all(plan_has_backfill_leaf_nodes)
}
}
let can_run_in_background = plan_has_backfill_leaf_nodes(&plan);
let context = plan.plan_base().ctx().clone();
let mut graph = build_graph(plan);
graph.parallelism =
Expand All @@ -196,7 +211,7 @@ It only indicates the physical clustering of the data, which may improve the per
let env = graph.env.as_mut().unwrap();
env.timezone = context.get_session_timezone();

(table, graph)
(table, graph, can_run_in_background)
};

// Ensure writes to `StreamJobTracker` are atomic.
Expand All @@ -212,7 +227,7 @@ It only indicates the physical clustering of the data, which may improve the per
));

let run_in_background = session.config().background_ddl();
let create_type = if run_in_background {
let create_type = if run_in_background && can_run_in_background {
CreateType::Background
} else {
CreateType::Foreground
Expand Down
12 changes: 7 additions & 5 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::{
DispatchStrategy, Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode,
StreamScanType,
};
use tokio::sync::{RwLock, RwLockReadGuard};

Expand Down Expand Up @@ -177,21 +178,22 @@ impl FragmentManager {
let map = &self.core.read().await.table_fragments;
let mut table_map = HashMap::new();
// TODO(kwannoel): Can this be unified with `PlanVisitor`?
fn has_stream_scan(stream_node: &StreamNode) -> bool {
let is_node_scan = if let Some(node) = &stream_node.node_body {
node.is_stream_scan()
fn has_backfill(stream_node: &StreamNode) -> bool {
let is_backfill = if let Some(node) = &stream_node.node_body
&& let Some(node) = node.as_stream_scan() {
node.stream_scan_type == StreamScanType::Backfill as i32
} else {
false
};
is_node_scan || stream_node.get_input().iter().any(has_stream_scan)
is_backfill || stream_node.get_input().iter().any(has_backfill)
}
for table_id in table_ids {
if let Some(table_fragment) = map.get(table_id) {
let mut actors = HashSet::new();
for fragment in table_fragment.fragments.values() {
for actor in &fragment.actors {
if let Some(node) = &actor.nodes
&& has_stream_scan(node)
&& has_backfill(node)
{
actors.insert(actor.actor_id);
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ cfg-or-panic = "0.2"
clap = { version = "4", features = ["derive"] }
console = "0.15"
etcd-client = { workspace = true }
expect-test = "1"
fail = { version = "0.5" }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
glob = "0.3"
Expand All @@ -29,6 +30,7 @@ pin-project = "1.1"
pretty_assertions = "1"
prometheus = { version = "0.13" }
rand = "0.8"
rand_chacha = { version = "0.3.1" }
rdkafka = { workspace = true }
risingwave_common = { workspace = true }
risingwave_compactor = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions src/tests/simulation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(lazy_cell)]
#![feature(let_chains)]
#![feature(try_blocks)]

pub mod client;
pub mod cluster;
Expand Down
7 changes: 6 additions & 1 deletion src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ pub struct Args {

#[arg(short, long)]
e2e_extended_test: bool,

/// Background ddl
/// The probability of background ddl for a ddl query.
#[clap(long, default_value = "0.0")]
background_ddl_rate: f64,
}

#[tokio::main]
Expand Down Expand Up @@ -245,7 +250,7 @@ async fn main() {
if let Some(jobs) = args.jobs {
run_parallel_slt_task(glob, jobs).await.unwrap();
} else {
run_slt_task(cluster0, glob, &kill_opts).await;
run_slt_task(cluster0, glob, &kill_opts, args.background_ddl_rate).await;
}
})
.await;
Expand Down
Loading

0 comments on commit 679413a

Please sign in to comment.