diff --git a/Cargo.lock b/Cargo.lock index 7b227e17f9467..360f1a45ebd13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1482,6 +1482,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg-or-panic" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf85d5384815558275789d91d1895d1d9919a6e2534d6144650f036ac65691a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "chrono" version = "0.4.26" @@ -7373,6 +7384,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "cfg-or-panic", "clap 4.4.1", "console", "futures", diff --git a/Makefile.toml b/Makefile.toml index 1faf715e984a7..1765307fe691a 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -671,7 +671,7 @@ script = """ set -e export CARGO_TARGET_DIR=target/coverage -cargo llvm-cov nextest --html +cargo llvm-cov nextest --html --workspace --exclude risingwave_simulation """ description = "Run unit tests and report coverage" @@ -682,7 +682,7 @@ script = """ #!/usr/bin/env bash set -e -cargo nextest run "$@" +cargo nextest run --workspace --exclude risingwave_simulation "$@" """ description = "🌟 Run unit tests" diff --git a/ci/scripts/run-unit-test.sh b/ci/scripts/run-unit-test.sh index f5afdc017e52c..4ba534559ee9f 100755 --- a/ci/scripts/run-unit-test.sh +++ b/ci/scripts/run-unit-test.sh @@ -12,8 +12,8 @@ cd ${REPO_ROOT} echo "+++ Run unit tests with coverage" # use tee to disable progress bar -NEXTEST_PROFILE=ci cargo llvm-cov nextest --lcov --output-path lcov.info --features failpoints,sync_point 2> >(tee); +NEXTEST_PROFILE=ci cargo llvm-cov nextest --lcov --output-path lcov.info --features failpoints,sync_point --workspace --exclude risingwave_simulation 2> >(tee); echo "--- Codecov upload coverage reports" curl -Os https://uploader.codecov.io/latest/linux/codecov && chmod +x codecov -./codecov -t "$CODECOV_TOKEN" -s . -F rust \ No newline at end of file +./codecov -t "$CODECOV_TOKEN" -s . -F rust diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 2e819327962a1..dd21ac0ac6949 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -14,6 +14,7 @@ normal = ["serde"] anyhow = "1.0" async-trait = "0.1" aws-sdk-s3 = { version = "0.2", package = "madsim-aws-sdk-s3" } +cfg-or-panic = "0.1" clap = { version = "4", features = ["derive"] } console = "0.15" etcd-client = { workspace = true } diff --git a/src/tests/simulation/src/client.rs b/src/tests/simulation/src/client.rs index 498aaa2500edf..089d67bceeeab 100644 --- a/src/tests/simulation/src/client.rs +++ b/src/tests/simulation/src/client.rs @@ -64,7 +64,7 @@ impl<'a, 'b> SetStmtsIterator<'a, 'b> { impl SetStmts { fn push(&mut self, sql: &str) { - let ast = Parser::parse_sql(&sql).expect("a set statement should be parsed successfully"); + let ast = Parser::parse_sql(sql).expect("a set statement should be parsed successfully"); match ast .into_iter() .exactly_one() @@ -134,7 +134,7 @@ impl RisingWave { .simple_query("SET RW_IMPLICIT_FLUSH TO true;") .await?; // replay all SET statements - for stmt in SetStmtsIterator::new(&set_stmts) { + for stmt in SetStmtsIterator::new(set_stmts) { client.simple_query(&stmt).await?; } Ok((client, task)) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 9c9b426a04001..2cde876e3a143 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![cfg_attr(not(madsim), allow(unused_imports))] + use std::collections::HashMap; use std::future::Future; use std::io::Write; @@ -20,17 +22,20 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Result}; +use cfg_or_panic::cfg_or_panic; use clap::Parser; use futures::channel::{mpsc, oneshot}; use futures::future::join_all; use futures::{SinkExt, StreamExt}; use itertools::Itertools; -use madsim::net::ipvs::*; +#[cfg(madsim)] use madsim::runtime::{Handle, NodeHandle}; use rand::seq::IteratorRandom; use rand::Rng; use risingwave_pb::common::WorkerNode; use sqllogictest::AsyncDB; +#[cfg(not(madsim))] +use tokio::runtime::Handle; use crate::client::RisingWave; @@ -153,7 +158,9 @@ impl Configuration { pub struct Cluster { config: Configuration, handle: Handle, + #[cfg(madsim)] pub(crate) client: NodeHandle, + #[cfg(madsim)] pub(crate) ctl: NodeHandle, } @@ -161,7 +168,10 @@ impl Cluster { /// Start a RisingWave cluster for testing. /// /// This function should be called exactly once in a test. + #[cfg_or_panic(madsim)] pub async fn start(conf: Configuration) -> Result { + use madsim::net::ipvs::*; + let handle = madsim::runtime::Handle::current(); println!("seed = {}", handle.seed()); println!("{:#?}", conf); @@ -361,6 +371,7 @@ impl Cluster { } /// Start a SQL session on the client node. + #[cfg_or_panic(madsim)] pub fn start_session(&mut self) -> Session { let (query_tx, mut query_rx) = mpsc::channel::(0); @@ -404,6 +415,7 @@ impl Cluster { } /// Run a future on the client node. + #[cfg_or_panic(madsim)] pub async fn run_on_client(&self, future: F) -> F::Output where F: Future + Send + 'static, @@ -433,7 +445,7 @@ impl Cluster { timeout: Duration, ) -> Result { let fut = async move { - let mut interval = madsim::time::interval(interval); + let mut interval = tokio::time::interval(interval); loop { interval.tick().await; let result = self.run(sql.clone()).await?; @@ -443,7 +455,7 @@ impl Cluster { } }; - match madsim::time::timeout(timeout, fut).await { + match tokio::time::timeout(timeout, fut).await { Ok(r) => Ok(r?), Err(_) => bail!("wait_until timeout"), } @@ -461,6 +473,7 @@ impl Cluster { } /// Kill some nodes and restart them in 2s + restart_delay_secs with a probability of 0.1. + #[cfg_or_panic(madsim)] pub async fn kill_node(&self, opts: &KillOpts) { let mut nodes = vec![]; if opts.kill_meta { @@ -519,7 +532,7 @@ impl Cluster { let t = rand::thread_rng().gen_range(Duration::from_secs(0)..Duration::from_secs(1)); tokio::time::sleep(t).await; tracing::info!("kill {name}"); - madsim::runtime::Handle::current().kill(name); + Handle::current().kill(name); let mut t = rand::thread_rng().gen_range(Duration::from_secs(0)..Duration::from_secs(1)); @@ -531,12 +544,13 @@ impl Cluster { } tokio::time::sleep(t).await; tracing::info!("restart {name}"); - madsim::runtime::Handle::current().restart(name); + Handle::current().restart(name); })) .await; } /// Create a node for kafka producer and prepare data. + #[cfg_or_panic(madsim)] pub async fn create_kafka_producer(&self, datadir: &str) { self.handle .create_node() @@ -552,6 +566,7 @@ impl Cluster { } /// Create a kafka topic. + #[cfg_or_panic(madsim)] pub fn create_kafka_topics(&self, topics: HashMap) { self.handle .create_node() @@ -570,6 +585,7 @@ impl Cluster { } /// Graceful shutdown all RisingWave nodes. + #[cfg_or_panic(madsim)] pub async fn graceful_shutdown(&self) { let mut nodes = vec![]; let mut metas = vec![]; @@ -592,12 +608,12 @@ impl Cluster { for node in &nodes { self.handle.send_ctrl_c(node); } - madsim::time::sleep(waiting_time).await; + tokio::time::sleep(waiting_time).await; // shutdown metas for meta in &metas { self.handle.send_ctrl_c(meta); } - madsim::time::sleep(waiting_time).await; + tokio::time::sleep(waiting_time).await; // check all nodes are exited for node in nodes.iter().chain(metas.iter()) { diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index feb2b40be7986..d81c0468430d2 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -12,18 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![cfg_attr(not(madsim), expect(unused_imports))] + use std::collections::{HashMap, HashSet}; use std::fmt::Write; use std::sync::Arc; use anyhow::{anyhow, Result}; +use cfg_or_panic::cfg_or_panic; use clap::Parser; use itertools::Itertools; -use madsim::rand::thread_rng; use rand::seq::{IteratorRandom, SliceRandom}; -use rand::Rng; +use rand::{thread_rng, Rng}; use risingwave_common::hash::ParallelUnitId; -use risingwave_pb::common::{HostAddress, WorkerNode}; use risingwave_pb::meta::get_reschedule_plan_request::PbPolicy; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; @@ -219,6 +220,7 @@ impl Fragment { impl Cluster { /// Locate fragments that satisfy all the predicates. + #[cfg_or_panic(madsim)] pub async fn locate_fragments( &mut self, predicates: impl IntoIterator, @@ -291,6 +293,7 @@ impl Cluster { self.locate_one_fragment([predicate::id(id)]).await } + #[cfg_or_panic(madsim)] pub async fn get_cluster_info(&self) -> Result { let response = self .ctl @@ -305,6 +308,7 @@ impl Cluster { } // update node schedulability + #[cfg_or_panic(madsim)] async fn update_worker_node_schedulability( &self, worker_ids: Vec, @@ -345,11 +349,12 @@ impl Cluster { self.reschedule_helper(plan, false).await } - /// Same as reschedule, but resolve the no_shuffle upstream + /// Same as reschedule, but resolve the no-shuffle upstream pub async fn reschedule_resolve_no_shuffle(&mut self, plan: impl Into) -> Result<()> { self.reschedule_helper(plan, true).await } + #[cfg_or_panic(madsim)] async fn reschedule_helper( &mut self, plan: impl Into, @@ -394,6 +399,7 @@ impl Cluster { Ok(()) } + #[cfg_or_panic(madsim)] pub async fn get_reschedule_plan(&self, policy: PbPolicy) -> Result { let revision = self .ctl diff --git a/src/tests/simulation/src/kafka.rs b/src/tests/simulation/src/kafka.rs index b62072961e629..0ab4829d9e1b8 100644 --- a/src/tests/simulation/src/kafka.rs +++ b/src/tests/simulation/src/kafka.rs @@ -15,9 +15,10 @@ use std::collections::HashMap; use std::time::SystemTime; +use itertools::Either; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; -use rdkafka::producer::{BaseProducer, BaseRecord}; +use rdkafka::producer::{BaseProducer, BaseRecord, Producer}; use rdkafka::ClientConfig; /// Create a kafka topic @@ -82,22 +83,21 @@ pub async fn producer(broker_addr: &str, datadir: String) { .expect("failed to create topic"); let content = std::fs::read(file.path()).unwrap(); - let msgs: Box, &[u8])> + Send> = - if topic.ends_with("bin") { - // binary message data, a file is a message - Box::new(std::iter::once((None, content.as_slice()))) - } else { - // text message data, a line is a message - Box::new( - content - .split(|&b| b == b'\n') - .filter(|line| !line.is_empty()) - .map(|line| match line.iter().position(|&b| b == KEY_DELIMITER) { - Some(pos) => (Some(&line[..pos]), &line[pos + 1..]), - None => (None, line), - }), - ) - }; + let msgs = if topic.ends_with("bin") { + // binary message data, a file is a message + Either::Left(std::iter::once((None, content.as_slice()))) + } else { + // text message data, a line is a message + Either::Right( + content + .split(|&b| b == b'\n') + .filter(|line| !line.is_empty()) + .map(|line| match line.iter().position(|&b| b == KEY_DELIMITER) { + Some(pos) => (Some(&line[..pos]), &line[pos + 1..]), + None => (None, line), + }), + ) + }; for (key, payload) in msgs { loop { let ts = SystemTime::now() diff --git a/src/tests/simulation/src/lib.rs b/src/tests/simulation/src/lib.rs index 8e0bd81ad6eec..68c1d0446944d 100644 --- a/src/tests/simulation/src/lib.rs +++ b/src/tests/simulation/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![cfg(madsim)] #![feature(trait_alias)] #![feature(lint_reasons)] #![feature(lazy_cell)] diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index c37af587553f3..a3aa4ca056415 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -17,13 +17,9 @@ use std::path::PathBuf; +use cfg_or_panic::cfg_or_panic; use clap::Parser; -#[cfg(not(madsim))] -fn main() { - println!("This binary is only available in simulation."); -} - /// Deterministic simulation end-to-end test runner. /// /// ENVS: @@ -145,8 +141,8 @@ pub struct Args { e2e_extended_test: bool, } -#[cfg(madsim)] -#[madsim::main] +#[tokio::main] +#[cfg_or_panic(madsim)] async fn main() { use std::sync::Arc; diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index b78b11afb93a9..9df28c1ee24ff 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -24,11 +24,7 @@ use crate::cluster::{Cluster, KillOpts}; use crate::utils::TimedExt; fn is_create_table_as(sql: &str) -> bool { - let parts: Vec = sql - .trim_start() - .split_whitespace() - .map(|s| s.to_lowercase()) - .collect(); + let parts: Vec = sql.split_whitespace().map(|s| s.to_lowercase()).collect(); parts.len() >= 4 && parts[0] == "create" && parts[1] == "table" && parts[3] == "as" } diff --git a/src/tests/simulation/tests/integration_tests/backfill_tests.rs b/src/tests/simulation/tests/integration_tests/backfill_tests.rs index d42d4b3d500af..813bc3c0b7257 100644 --- a/src/tests/simulation/tests/integration_tests/backfill_tests.rs +++ b/src/tests/simulation/tests/integration_tests/backfill_tests.rs @@ -13,16 +13,14 @@ // limitations under the License. use anyhow::Result; -use itertools::{any, Itertools}; +use itertools::Itertools; use risingwave_simulation::cluster::{Cluster, Configuration}; const SET_PARALLELISM: &str = "SET STREAMING_PARALLELISM=1;"; const ROOT_TABLE_CREATE: &str = "create table t1 (_id int, data jsonb);"; const INSERT_SEED_SQL: &str = r#"insert into t1 values (1, '{"orders": {"id": 1, "price": "2.30", "customer_id": 2}}');"#; -const INSERT_AND_FLUSH_SQL: &str = r#"insert into t1 values (1, '{"orders": {"id": 1, "price": "2.30", "customer_id": 2}}'); FLUSH;"#; const INSERT_RECURSE_SQL: &str = "insert into t1 select _id + 1, data from t1;"; -const INSERT_RECURSE_AND_FLUSH_SQL: &str = "insert into t1 select _id + 1, data from t1;"; const MV1: &str = r#" create materialized view mv1 as with p1 as ( @@ -47,7 +45,7 @@ select from p2; "#; -#[madsim::test] +#[tokio::test] async fn test_backfill_with_upstream_and_snapshot_read() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_backfill()).await?; let mut session = cluster.start_session(); @@ -70,7 +68,7 @@ async fn test_backfill_with_upstream_and_snapshot_read() -> Result<()> { let sessions = (0..3).map(|_| cluster.start_session()).collect_vec(); // Create lots of base table update - for mut session in sessions.into_iter() { + for mut session in sessions { let task = tokio::spawn(async move { session.run(INSERT_RECURSE_SQL).await?; anyhow::Ok(()) @@ -82,7 +80,7 @@ async fn test_backfill_with_upstream_and_snapshot_read() -> Result<()> { let sessions = (0..10).map(|_| cluster.start_session()).collect_vec(); // Create lots of base table update - for mut session in sessions.into_iter() { + for mut session in sessions { let task = tokio::spawn(async move { for _ in 0..10 { session.run("FLUSH;").await?; diff --git a/src/tests/simulation/tests/integration_tests/batch/mod.rs b/src/tests/simulation/tests/integration_tests/batch/mod.rs index 52bce7212c41b..dfe5de1405bf9 100644 --- a/src/tests/simulation/tests/integration_tests/batch/mod.rs +++ b/src/tests/simulation/tests/integration_tests/batch/mod.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![cfg(madsim)] + use std::io::Write; -use anyhow::Result; use clap::Parser; use itertools::Itertools; -use madsim::runtime::Handle; use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration, Session}; use tokio::time::Duration; @@ -81,7 +81,7 @@ telemetry_enabled = false } } -#[madsim::test] +#[tokio::test] async fn test_serving_cluster_availability() { let config = cluster_config_no_compute_nodes(); let mut cluster = Cluster::start(config).await.unwrap(); diff --git a/src/tests/simulation/tests/integration_tests/main.rs b/src/tests/simulation/tests/integration_tests/main.rs index 0f4c10c8569ca..82ed948b39a51 100644 --- a/src/tests/simulation/tests/integration_tests/main.rs +++ b/src/tests/simulation/tests/integration_tests/main.rs @@ -18,7 +18,6 @@ //! for the rationale behind this approach. #![feature(stmt_expr_attributes)] -#![cfg(madsim)] #![feature(lazy_cell)] #![feature(drain_filter)] diff --git a/src/tests/simulation/tests/integration_tests/recovery/backfill.rs b/src/tests/simulation/tests/integration_tests/recovery/backfill.rs index a83c217cf151d..e907528826864 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/backfill.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/backfill.rs @@ -17,10 +17,10 @@ use std::time::Duration; use anyhow::Result; use itertools::Itertools; -use madsim::time::sleep; use risingwave_simulation::cluster::{Cluster, Configuration, Session}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; const ROOT_TABLE_CREATE: &str = "create table t1 (v1 int);"; const ROOT_TABLE_DROP: &str = "drop table t1;"; @@ -46,7 +46,7 @@ async fn test_no_backfill_state(session: &mut Session) -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_snapshot_mv() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); @@ -98,7 +98,7 @@ async fn test_snapshot_mv() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_backfill_mv() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); @@ -151,7 +151,7 @@ async fn test_backfill_mv() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_index_backfill() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); diff --git a/src/tests/simulation/tests/integration_tests/recovery/nexmark_recovery.rs b/src/tests/simulation/tests/integration_tests/recovery/nexmark_recovery.rs index 1720c2edc21e7..edd419adabf66 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/nexmark_recovery.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/nexmark_recovery.rs @@ -15,10 +15,10 @@ use std::time::Duration; use anyhow::Result; -use madsim::time::{sleep, Instant}; use risingwave_simulation::cluster::{Configuration, KillOpts}; -use risingwave_simulation::nexmark::{self, NexmarkCluster, THROUGHPUT}; +use risingwave_simulation::nexmark::{NexmarkCluster, THROUGHPUT}; use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; /// Setup a nexmark stream, inject failures, and verify results. async fn nexmark_recovery_common(create: &str, select: &str, drop: &str) -> Result<()> { @@ -54,7 +54,7 @@ async fn nexmark_recovery_common(create: &str, select: &str, drop: &str) -> Resu macro_rules! test { ($query:ident) => { paste::paste! { - #[madsim::test] + #[tokio::test] async fn [< nexmark_recovery_ $query >]() -> Result<()> { use risingwave_simulation::nexmark::queries::$query::*; nexmark_recovery_common(CREATE, SELECT, DROP) diff --git a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs index 968762570ec79..c05e52c927424 100644 --- a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs +++ b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs @@ -16,10 +16,10 @@ use std::time::Duration; use anyhow::Result; use itertools::Itertools; -use madsim::time::sleep; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; const ROOT_TABLE_CREATE: &str = "create table t1 (v1 int);"; const MV1: &str = "create materialized view m1 as select * from t1 where v1 > 5;"; @@ -28,7 +28,7 @@ const MV3: &str = "create materialized view m3 as select * from m2 where v1 < 15 const MV4: &str = "create materialized view m4 as select m1.v1 as m1v, m3.v1 as m3v from m1 join m3 on m1.v1 = m3.v1;"; const MV5: &str = "create materialized view m5 as select * from m4;"; -#[madsim::test] +#[tokio::test] async fn test_simple_cascade_materialized_view() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); @@ -113,7 +113,7 @@ async fn test_simple_cascade_materialized_view() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_diamond_cascade_materialized_view() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); diff --git a/src/tests/simulation/tests/integration_tests/scale/dynamic_filter.rs b/src/tests/simulation/tests/integration_tests/scale/dynamic_filter.rs index a9028836a8c54..e8e1af703c1c2 100644 --- a/src/tests/simulation/tests/integration_tests/scale/dynamic_filter.rs +++ b/src/tests/simulation/tests/integration_tests/scale/dynamic_filter.rs @@ -16,14 +16,14 @@ use std::collections::HashSet; use std::time::Duration; use anyhow::Result; -use madsim::time::sleep; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::identity_contains; use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; const SELECT: &str = "select * from mv1 order by v1;"; -#[madsim::test] +#[tokio::test] async fn test_dynamic_filter() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); diff --git a/src/tests/simulation/tests/integration_tests/scale/mod.rs b/src/tests/simulation/tests/integration_tests/scale/mod.rs index 2fb163ffea443..5270247bf3997 100644 --- a/src/tests/simulation/tests/integration_tests/scale/mod.rs +++ b/src/tests/simulation/tests/integration_tests/scale/mod.rs @@ -21,5 +21,6 @@ mod no_shuffle; mod plan; mod schedulability; mod singleton_migration; +mod sink; mod streaming_parallelism; mod table; diff --git a/src/tests/simulation/tests/integration_tests/scale/nexmark_chaos.rs b/src/tests/simulation/tests/integration_tests/scale/nexmark_chaos.rs index a06fb25fe7a18..6ed30a7452dcc 100644 --- a/src/tests/simulation/tests/integration_tests/scale/nexmark_chaos.rs +++ b/src/tests/simulation/tests/integration_tests/scale/nexmark_chaos.rs @@ -17,11 +17,11 @@ use std::time::Duration; use anyhow::Result; use futures::future::BoxFuture; use itertools::Itertools; -use madsim::time::sleep; use risingwave_simulation::cluster::Configuration; use risingwave_simulation::ctl_ext::Fragment; use risingwave_simulation::nexmark::{NexmarkCluster, THROUGHPUT}; use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; /// Common code for Nexmark chaos tests. /// @@ -117,7 +117,7 @@ macro_rules! test { }; ($query:ident, $after_scale_duration:expr) => { paste::paste! { - #[madsim::test] + #[tokio::test] async fn [< nexmark_chaos_ $query _single >]() -> Result<()> { use risingwave_simulation::nexmark::queries::$query::*; nexmark_chaos_common( @@ -132,7 +132,7 @@ macro_rules! test { .await } - #[madsim::test] + #[tokio::test] async fn [< nexmark_chaos_ $query _multiple >]() -> Result<()> { use risingwave_simulation::nexmark::queries::$query::*; nexmark_chaos_common( diff --git a/src/tests/simulation/tests/integration_tests/scale/nexmark_q4.rs b/src/tests/simulation/tests/integration_tests/scale/nexmark_q4.rs index ed3f3d4ad580f..41a90fcf5d0d5 100644 --- a/src/tests/simulation/tests/integration_tests/scale/nexmark_q4.rs +++ b/src/tests/simulation/tests/integration_tests/scale/nexmark_q4.rs @@ -15,7 +15,6 @@ use std::time::Duration; use anyhow::Result; -use madsim::time::sleep; use risingwave_simulation::cluster::Configuration; use risingwave_simulation::ctl_ext::predicate::{ identity_contains, upstream_fragment_count, BoxedPredicate, @@ -23,6 +22,7 @@ use risingwave_simulation::ctl_ext::predicate::{ use risingwave_simulation::nexmark::queries::q4::*; use risingwave_simulation::nexmark::{NexmarkCluster, THROUGHPUT}; use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; #[cfg(target_os = "linux")] const RESULT: &str = r#" @@ -55,7 +55,7 @@ async fn wait_initial_data(cluster: &mut NexmarkCluster) -> Result { .await } -#[madsim::test] +#[tokio::test] async fn nexmark_q4_ref() -> Result<()> { let mut cluster = init().await?; @@ -93,7 +93,7 @@ async fn nexmark_q4_common(predicates: impl IntoIterator) Ok(()) } -#[madsim::test] +#[tokio::test] async fn nexmark_q4_materialize_agg() -> Result<()> { nexmark_q4_common([ identity_contains("materialize"), @@ -102,12 +102,12 @@ async fn nexmark_q4_materialize_agg() -> Result<()> { .await } -#[madsim::test] +#[tokio::test] async fn nexmark_q4_source() -> Result<()> { nexmark_q4_common([identity_contains("source: bid")]).await } -#[madsim::test] +#[tokio::test] async fn nexmark_q4_agg_join() -> Result<()> { nexmark_q4_common([ identity_contains("hashagg"), @@ -117,7 +117,7 @@ async fn nexmark_q4_agg_join() -> Result<()> { .await } -#[madsim::test] +#[tokio::test] async fn nexmark_q4_cascade() -> Result<()> { let mut cluster = init().await?; @@ -165,7 +165,7 @@ async fn nexmark_q4_cascade() -> Result<()> { } // https://github.com/risingwavelabs/risingwave/issues/5567 -#[madsim::test] +#[tokio::test] async fn nexmark_q4_materialize_agg_cache_invalidation() -> Result<()> { let mut cluster = init().await?; diff --git a/src/tests/simulation/tests/integration_tests/scale/nexmark_source.rs b/src/tests/simulation/tests/integration_tests/scale/nexmark_source.rs index d1d6d1814f38b..864195d0efd7b 100644 --- a/src/tests/simulation/tests/integration_tests/scale/nexmark_source.rs +++ b/src/tests/simulation/tests/integration_tests/scale/nexmark_source.rs @@ -15,17 +15,17 @@ use std::time::Duration; use anyhow::Result; -use madsim::time::sleep; use risingwave_simulation::cluster::Configuration; use risingwave_simulation::ctl_ext::predicate::identity_contains; use risingwave_simulation::nexmark::{NexmarkCluster, THROUGHPUT}; +use tokio::time::sleep; -#[madsim::test] +#[tokio::test] async fn nexmark_source() -> Result<()> { nexmark_source_inner(false).await } -#[madsim::test] +#[tokio::test] async fn nexmark_source_with_watermark() -> Result<()> { nexmark_source_inner(true).await } diff --git a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs index 3a8402d5a83ae..0fde294d08e7f 100644 --- a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs +++ b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs @@ -18,7 +18,7 @@ use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::identity_contains; use risingwave_simulation::utils::AssertResult; -#[madsim::test] +#[tokio::test] async fn test_delta_join() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); @@ -120,7 +120,7 @@ async fn test_delta_join() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_share_multiple_no_shuffle_upstream() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); @@ -140,7 +140,7 @@ async fn test_share_multiple_no_shuffle_upstream() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_resolve_no_shuffle_upstream() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); diff --git a/src/tests/simulation/tests/integration_tests/scale/plan.rs b/src/tests/simulation/tests/integration_tests/scale/plan.rs index 5899ab7ba43b3..fdb72d35830a1 100644 --- a/src/tests/simulation/tests/integration_tests/scale/plan.rs +++ b/src/tests/simulation/tests/integration_tests/scale/plan.rs @@ -26,7 +26,7 @@ use risingwave_pb::meta::PbReschedule; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; -#[madsim::test] +#[tokio::test] async fn test_resize_normal() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); @@ -92,7 +92,7 @@ async fn test_resize_normal() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_resize_single() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); @@ -135,7 +135,7 @@ async fn test_resize_single() -> Result<()> { .parallel_units .iter() .map(|parallel_unit| parallel_unit.id) - .contains(&used_parallel_unit_id) + .contains(used_parallel_unit_id) }) .collect_vec(); @@ -175,7 +175,7 @@ async fn test_resize_single() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_resize_single_failed() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); @@ -242,7 +242,7 @@ async fn test_resize_single_failed() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_resize_no_shuffle() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); diff --git a/src/tests/simulation/tests/integration_tests/scale/schedulability.rs b/src/tests/simulation/tests/integration_tests/scale/schedulability.rs index ed5c5ec468771..5ad82b5326732 100644 --- a/src/tests/simulation/tests/integration_tests/scale/schedulability.rs +++ b/src/tests/simulation/tests/integration_tests/scale/schedulability.rs @@ -19,7 +19,7 @@ use risingwave_common::hash::ParallelUnitId; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_simulation::cluster::{Cluster, Configuration}; -#[madsim::test] +#[tokio::test] async fn test_cordon_normal() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); @@ -78,7 +78,7 @@ async fn test_cordon_normal() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_cordon_no_shuffle_failed() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); diff --git a/src/tests/simulation/tests/integration_tests/scale/singleton_migration.rs b/src/tests/simulation/tests/integration_tests/scale/singleton_migration.rs index 51c4cb17a6387..6abd980bea3bd 100644 --- a/src/tests/simulation/tests/integration_tests/scale/singleton_migration.rs +++ b/src/tests/simulation/tests/integration_tests/scale/singleton_migration.rs @@ -16,18 +16,18 @@ use std::time::Duration; use anyhow::Result; use itertools::Itertools; -use madsim::time::sleep; use rand::prelude::SliceRandom; use rand::thread_rng; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::identity_contains; use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; const ROOT_TABLE_CREATE: &str = "create table t (v1 int);"; const ROOT_MV: &str = "create materialized view m1 as select count(*) as c1 from t;"; const CASCADE_MV: &str = "create materialized view m2 as select * from m1;"; -#[madsim::test] +#[tokio::test] async fn test_singleton_migration() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); diff --git a/src/tests/simulation/tests/integration_tests/scale/sink.rs b/src/tests/simulation/tests/integration_tests/scale/sink.rs index 414acd0ad05e6..99f150e85b8a1 100644 --- a/src/tests/simulation/tests/integration_tests/scale/sink.rs +++ b/src/tests/simulation/tests/integration_tests/scale/sink.rs @@ -16,15 +16,16 @@ use std::collections::HashMap; use std::time::Duration; use anyhow::Result; -use madsim::export::futures::StreamExt; -use madsim::rand::prelude::SliceRandom; -use madsim::rand::thread_rng; -use madsim::time; -use rdkafka::consumer::{MessageStream, StreamConsumer}; +use futures::StreamExt; +use rand::prelude::SliceRandom; +use rand::thread_rng; +use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::error::KafkaResult; use rdkafka::message::BorrowedMessage; use rdkafka::{ClientConfig, Message, TopicPartitionList}; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; +use tokio::time; const ROOT_TABLE_CREATE: &str = "create table t (v1 int) append only;"; const APPEND_ONLY_SINK_CREATE: &str = "create sink s1 from t with (connector='kafka', properties.bootstrap.server='192.168.11.1:29092', topic='t_sink_append_only', type='append-only');"; @@ -70,7 +71,8 @@ pub struct Before { pub count: i64, } -#[madsim::test] +#[tokio::test] +#[ignore] // https://github.com/risingwavelabs/risingwave/issues/12003 async fn test_sink_append_only() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; @@ -122,7 +124,8 @@ async fn test_sink_append_only() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] +#[ignore] // https://github.com/risingwavelabs/risingwave/issues/12003 async fn test_sink_debezium() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; @@ -201,7 +204,7 @@ async fn test_sink_debezium() -> Result<()> { Ok(()) } -fn check_payload(msg: &BorrowedMessage, payload: &[u8], i: i64) { +fn check_payload(msg: &BorrowedMessage<'_>, payload: &[u8], i: i64) { match msg.topic() { APPEND_ONLY_TOPIC => { let data: AppendOnlyPayload = serde_json::from_slice(payload).unwrap(); @@ -228,7 +231,7 @@ fn check_payload(msg: &BorrowedMessage, payload: &[u8], i: i64) { async fn check_kafka_after_insert( cluster: &mut Cluster, - stream: &mut MessageStream<'_>, + stream: &mut (impl futures::Stream>> + Unpin), input: &[i64], ) -> Result<()> { for i in input { diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index 8828b7380616d..1e1260e7b0bd2 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -16,7 +16,7 @@ use anyhow::Result; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::identity_contains; -#[madsim::test] +#[tokio::test] async fn test_streaming_parallelism_default() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let default_parallelism = cluster.config().compute_nodes * cluster.config().compute_node_cores; @@ -28,7 +28,7 @@ async fn test_streaming_parallelism_default() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_streaming_parallelism_set_some() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let default_parallelism = cluster.config().compute_nodes * cluster.config().compute_node_cores; @@ -48,7 +48,7 @@ async fn test_streaming_parallelism_set_some() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_streaming_parallelism_set_zero() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let default_parallelism = cluster.config().compute_nodes * cluster.config().compute_node_cores; @@ -64,7 +64,7 @@ async fn test_streaming_parallelism_set_zero() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_streaming_parallelism_mv_on_mv() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let default_parallelism = cluster.config().compute_nodes * cluster.config().compute_node_cores; @@ -101,7 +101,7 @@ async fn test_streaming_parallelism_mv_on_mv() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_streaming_parallelism_index() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let default_parallelism = cluster.config().compute_nodes * cluster.config().compute_node_cores; diff --git a/src/tests/simulation/tests/integration_tests/scale/table.rs b/src/tests/simulation/tests/integration_tests/scale/table.rs index 4ab4c2ce9ec25..8044818b43d0a 100644 --- a/src/tests/simulation/tests/integration_tests/scale/table.rs +++ b/src/tests/simulation/tests/integration_tests/scale/table.rs @@ -12,17 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::iter::{repeat, repeat_with}; -use std::time::Duration; +use std::iter::repeat_with; use anyhow::Result; use itertools::Itertools; -use madsim::time::sleep; -use rand::prelude::SliceRandom; -use rand::thread_rng; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::ctl_ext::predicate::identity_contains; -use risingwave_simulation::utils::AssertResult; const ROOT_TABLE_CREATE: &str = "create table t (v1 int);"; const MV1: &str = "create materialized view m1 as select * from t;"; @@ -41,7 +36,7 @@ macro_rules! insert_and_flush { }}; } -#[madsim::test] +#[tokio::test] async fn test_table() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; cluster.run(ROOT_TABLE_CREATE).await?; @@ -65,7 +60,7 @@ async fn test_table() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_mv_on_scaled_table() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; cluster.run(ROOT_TABLE_CREATE).await?; diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index 6747b8bbac4dd..d15e3a68ea2b6 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::pending; use std::io::Write; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -22,7 +21,6 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; use itertools::Itertools; -use madsim::time::{sleep, timeout}; use rand::prelude::SliceRandom; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; @@ -30,6 +28,7 @@ use risingwave_connector::sink::boxed::{BoxCoordinator, BoxWriter}; use risingwave_connector::sink::test_sink::registry_build_sink; use risingwave_connector::sink::{Sink, SinkWriter, SinkWriterParam}; use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration}; +use tokio::time::sleep; struct TestWriter { row_counter: Arc, @@ -106,7 +105,7 @@ impl Sink for TestSink { } } -#[madsim::test] +#[tokio::test] async fn test_sink_basic() -> Result<()> { let config_path = { let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); @@ -133,7 +132,7 @@ async fn test_sink_basic() -> Result<()> { let _sink_guard = registry_build_sink({ let row_counter = row_counter.clone(); let parallelism_counter = parallelism_counter.clone(); - move |param| { + move |_param| { Ok(Box::new(TestSink { row_counter: row_counter.clone(), parallelism_counter: parallelism_counter.clone(), @@ -178,7 +177,7 @@ async fn test_sink_basic() -> Result<()> { Ok(()) } -#[madsim::test] +#[tokio::test] async fn test_sink_decouple_basic() -> Result<()> { let config_path = { let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); @@ -205,7 +204,7 @@ async fn test_sink_decouple_basic() -> Result<()> { let _sink_guard = registry_build_sink({ let row_counter = row_counter.clone(); let parallelism_counter = parallelism_counter.clone(); - move |param| { + move |_param| { Ok(Box::new(TestSink { row_counter: row_counter.clone(), parallelism_counter: parallelism_counter.clone(), diff --git a/src/tests/simulation/tests/integration_tests/sink/mod.rs b/src/tests/simulation/tests/integration_tests/sink/mod.rs index 6db319067a068..71a65bf062d9c 100644 --- a/src/tests/simulation/tests/integration_tests/sink/mod.rs +++ b/src/tests/simulation/tests/integration_tests/sink/mod.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(madsim)] mod basic; diff --git a/src/tests/simulation/tests/integration_tests/storage.rs b/src/tests/simulation/tests/integration_tests/storage.rs index 193d00395d16d..e300a8194e453 100644 --- a/src/tests/simulation/tests/integration_tests/storage.rs +++ b/src/tests/simulation/tests/integration_tests/storage.rs @@ -16,10 +16,9 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::Result; -use madsim::rand::thread_rng; use rand::distributions::{Alphanumeric, DistString}; use rand::rngs::SmallRng; -use rand::{RngCore, SeedableRng}; +use rand::{thread_rng, RngCore, SeedableRng}; use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::utils::AssertResult; @@ -27,7 +26,7 @@ const NUM_ROWS: usize = 500; const NUM_OVERWRITES: usize = 5000; const MAX_STRING_LEN: usize = 150; -#[madsim::test] +#[tokio::test] async fn test_storage_with_random_writes() -> Result<()> { // TODO: Use backfill configuration for now let mut cluster = Cluster::start(Configuration::for_backfill()).await?;