diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index 0d6908f3c1450..24f709139f0fc 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -23,6 +23,7 @@ use crate::sink::utils::{ start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE, DROP_SINK, DROP_SOURCE, }; +use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; async fn basic_test_inner(is_decouple: bool) -> Result<()> { let mut cluster = start_sink_test_cluster().await?; @@ -45,13 +46,13 @@ async fn basic_test_inner(is_decouple: bool) -> Result<()> { test_sink .store .wait_for_count(test_source.id_list.len()) - .await; + .await?; session.run(DROP_SINK).await?; session.run(DROP_SOURCE).await?; assert_eq!(0, test_sink.parallelism_counter.load(Relaxed)); - test_sink.store.check_simple_result(&test_source.id_list); + test_sink.store.check_simple_result(&test_source.id_list)?; assert!(test_sink.store.inner().checkpoint_count > 0); Ok(()) diff --git a/src/tests/simulation/tests/integration_tests/sink/mod.rs b/src/tests/simulation/tests/integration_tests/sink/mod.rs index e4fb9ed843c1d..0d0de28b84dc8 100644 --- a/src/tests/simulation/tests/integration_tests/sink/mod.rs +++ b/src/tests/simulation/tests/integration_tests/sink/mod.rs @@ -20,3 +20,27 @@ mod recovery; mod scale; #[cfg(madsim)] mod utils; + +#[macro_export] +macro_rules! assert_with_err_returned { + ($condition:expr, $($rest:tt)*) => {{ + if !$condition { + return Err(anyhow::anyhow!($($rest)*).into()); + } + }}; + ($condition:expr) => {{ + if !$condition { + return Err(anyhow::anyhow!("fail assertion {}", stringify! {$condition}).into()); + } + }}; +} + +#[macro_export] +macro_rules! assert_eq_with_err_returned { + ($first:expr, $second:expr $(,$($rest:tt)*)?) => {{ + $crate::assert_with_err_returned ! { + {$first == $second} + $(, $($rest:tt)*)? + } + }}; +} diff --git a/src/tests/simulation/tests/integration_tests/sink/recovery.rs b/src/tests/simulation/tests/integration_tests/sink/recovery.rs index 3d80055840ae2..c23ea7fe5fa78 100644 --- a/src/tests/simulation/tests/integration_tests/sink/recovery.rs +++ b/src/tests/simulation/tests/integration_tests/sink/recovery.rs @@ -24,8 +24,13 @@ use crate::sink::utils::{ start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE, DROP_SINK, DROP_SOURCE, }; +use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; -async fn kill_and_check(cluster: &mut Cluster, test_sink: &SimulationTestSink, target: usize) { +async fn kill_and_check( + cluster: &mut Cluster, + test_sink: &SimulationTestSink, + target: usize, +) -> anyhow::Result<()> { let mut prev_count = 0; sleep(Duration::from_secs(2)).await; for i in 0..5 { @@ -45,6 +50,7 @@ async fn kill_and_check(cluster: &mut Cluster, test_sink: &SimulationTestSink, t cluster.kill_node(&KillOpts::ALL).await; sleep(Duration::from_secs(10)).await; } + Ok(()) } async fn recovery_test_inner(is_decouple: bool) -> Result<()> { @@ -67,9 +73,9 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> { let count = test_source.id_list.len(); - kill_and_check(&mut cluster, &test_sink, count).await; + kill_and_check(&mut cluster, &test_sink, count).await?; - test_sink.store.wait_for_count(count).await; + test_sink.store.wait_for_count(count).await?; let mut session = cluster.start_session(); session.run(DROP_SINK).await?; @@ -78,7 +84,7 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> { assert_eq!(0, test_sink.parallelism_counter.load(Relaxed)); assert!(test_sink.store.inner().checkpoint_count > 0); - test_sink.store.check_simple_result(&test_source.id_list); + test_sink.store.check_simple_result(&test_source.id_list)?; assert!(test_sink.store.inner().checkpoint_count > 0); Ok(()) diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs index 56a12e62d3661..c526c9378892d 100644 --- a/src/tests/simulation/tests/integration_tests/sink/scale.rs +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -27,6 +27,7 @@ use crate::sink::utils::{ start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE, DROP_SINK, DROP_SOURCE, }; +use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; async fn scale_and_check( cluster: &mut Cluster, @@ -100,7 +101,7 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> { ) .await?; - test_sink.store.wait_for_count(count).await; + test_sink.store.wait_for_count(count).await?; let mut session = cluster.start_session(); session.run(DROP_SINK).await?; @@ -109,7 +110,7 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> { assert_eq!(0, test_sink.parallelism_counter.load(Relaxed)); assert!(test_sink.store.inner().checkpoint_count > 0); - test_sink.store.check_simple_result(&test_source.id_list); + test_sink.store.check_simple_result(&test_source.id_list)?; assert!(test_sink.store.inner().checkpoint_count > 0); Ok(()) diff --git a/src/tests/simulation/tests/integration_tests/sink/utils.rs b/src/tests/simulation/tests/integration_tests/sink/utils.rs index e906ae842b236..965c74b1e972a 100644 --- a/src/tests/simulation/tests/integration_tests/sink/utils.rs +++ b/src/tests/simulation/tests/integration_tests/sink/utils.rs @@ -40,6 +40,8 @@ use risingwave_connector::source::StreamChunkWithState; use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration}; use tokio::time::sleep; +use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; + pub const CREATE_SOURCE: &str = "create source test_source (id int, name varchar) with (connector = 'test') FORMAT PLAIN ENCODE JSON"; pub const CREATE_SINK: &str = "create sink test_sink from test_source with (connector = 'test')"; pub const DROP_SINK: &str = "drop sink test_sink"; @@ -79,7 +81,7 @@ impl TestSinkStore { self.inner.lock().unwrap() } - pub fn check_simple_result(&self, id_list: &[i32]) { + pub fn check_simple_result(&self, id_list: &[i32]) -> anyhow::Result<()> { let inner = self.inner(); assert_eq!(inner.id_name.len(), id_list.len()); for id in id_list { @@ -89,13 +91,14 @@ impl TestSinkStore { assert_eq!(name, &simple_name_of_id(*id)); } } + Ok(()) } pub fn id_count(&self) -> usize { self.inner().id_name.len() } - pub async fn wait_for_count(&self, count: usize) { + pub async fn wait_for_count(&self, count: usize) -> anyhow::Result<()> { let mut prev_count = 0; loop { sleep(Duration::from_secs(1)).await; @@ -112,6 +115,7 @@ impl TestSinkStore { ); prev_count = curr_count; } + Ok(()) } } @@ -206,7 +210,7 @@ pub fn build_stream_chunk(row_iter: impl Iterator) -> Stre ); for (id, name) in row_iter { let row_id = ROW_ID_GEN.fetch_add(1, Relaxed); - assert!(builder + std::assert!(builder .append_one_row([ Some(ScalarImpl::Int32(id)), Some(ScalarImpl::Utf8(name.into())),