Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(test): use rate limit instead in background ddl test #13179

Merged
merged 22 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci/scripts/deterministic-it-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ mv target/ci-sim target/sim
echo "--- Run integration tests in deterministic simulation mode"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \
cargo nextest run \
--no-capture \
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--no-fail-fast \
--cargo-metadata target/nextest/cargo-metadata.json \
--binaries-metadata target/nextest/binaries-metadata.json \
Expand Down
16 changes: 0 additions & 16 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,6 @@ where

let pk_order = self.upstream_table.pk_serializer().get_order_types();

#[cfg(madsim)]
let snapshot_read_delay = if let Ok(v) = std::env::var("RW_BACKFILL_SNAPSHOT_READ_DELAY")
&& let Ok(v) = v.parse::<u64>() {
v
} else {
0
};

let upstream_table_id = self.upstream_table.table_id().table_id;

let mut upstream = self.upstream.execute();
Expand Down Expand Up @@ -303,14 +295,6 @@ where
break 'backfill_loop;
}
Some(chunk) => {
#[cfg(madsim)]
{
tokio::time::sleep(std::time::Duration::from_millis(
snapshot_read_delay as u64,
))
.await;
}

// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use tokio::time::sleep;
const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);";
const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100000);";
const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
const SET_RATE_LIMIT_4096: &str = "SET STREAMING_RATE_LIMIT=4096;";
const SET_RATE_LIMIT_2048: &str = "SET STREAMING_RATE_LIMIT=2048;";
const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=0;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";

async fn kill_cn_and_wait_recover(cluster: &Cluster) {
Expand Down Expand Up @@ -133,13 +136,11 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {

#[tokio::test]
async fn test_background_ddl_cancel() -> Result<()> {
env::set_var("RW_BACKFILL_SNAPSHOT_READ_DELAY", "100");
async fn create_mv(session: &mut Session) -> Result<()> {
session.run(CREATE_MV1).await?;
sleep(Duration::from_secs(2)).await;
Ok(())
}
// FIXME: See if we can use rate limit instead.
use std::env;
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
Expand All @@ -149,6 +150,7 @@ async fn test_background_ddl_cancel() -> Result<()> {
let mut session = cluster.start_session();
session.run(CREATE_TABLE).await?;
session.run(SEED_TABLE).await?;
session.run(SET_RATE_LIMIT_4096).await?;
session.run(SET_BACKGROUND_DDL).await?;

for _ in 0..5 {
Expand All @@ -157,6 +159,7 @@ async fn test_background_ddl_cancel() -> Result<()> {
assert_eq!(ids.len(), 1);
}

session.run(SET_RATE_LIMIT_2048).await?;
create_mv(&mut session).await?;

// Test cancel after kill cn
Expand All @@ -176,6 +179,7 @@ async fn test_background_ddl_cancel() -> Result<()> {
assert_eq!(ids.len(), 1);

// Make sure MV can be created after all these cancels
session.run(RESET_RATE_LIMIT).await?;
create_mv(&mut session).await?;

kill_and_wait_recover(&cluster).await;
Expand Down
Loading