Skip to content

Commit

Permalink
return err and not panic
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 27, 2023
1 parent 7c3c26d commit 2e41c9f
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 11 deletions.
5 changes: 3 additions & 2 deletions src/tests/simulation/tests/integration_tests/sink/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::sink::utils::{
start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE,
DROP_SINK, DROP_SOURCE,
};
use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert};

async fn basic_test_inner(is_decouple: bool) -> Result<()> {
let mut cluster = start_sink_test_cluster().await?;
Expand All @@ -45,13 +46,13 @@ async fn basic_test_inner(is_decouple: bool) -> Result<()> {
test_sink
.store
.wait_for_count(test_source.id_list.len())
.await;
.await?;

session.run(DROP_SINK).await?;
session.run(DROP_SOURCE).await?;

assert_eq!(0, test_sink.parallelism_counter.load(Relaxed));
test_sink.store.check_simple_result(&test_source.id_list);
test_sink.store.check_simple_result(&test_source.id_list)?;
assert!(test_sink.store.inner().checkpoint_count > 0);

Ok(())
Expand Down
24 changes: 24 additions & 0 deletions src/tests/simulation/tests/integration_tests/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,27 @@ mod recovery;
mod scale;
#[cfg(madsim)]
mod utils;

#[macro_export]
macro_rules! assert_with_err_returned {
($condition:expr, $($rest:tt)*) => {{
if !$condition {
return Err(anyhow::anyhow!($($rest)*).into());
}
}};
($condition:expr) => {{
if !$condition {
return Err(anyhow::anyhow!("fail assertion {}", stringify! {$condition}).into());
}
}};
}

#[macro_export]
macro_rules! assert_eq_with_err_returned {
($first:expr, $second:expr $(,$($rest:tt)*)?) => {{
$crate::assert_with_err_returned ! {
{$first == $second}
$(, $($rest:tt)*)?
}
}};
}
14 changes: 10 additions & 4 deletions src/tests/simulation/tests/integration_tests/sink/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ use crate::sink::utils::{
start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE,
DROP_SINK, DROP_SOURCE,
};
use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert};

async fn kill_and_check(cluster: &mut Cluster, test_sink: &SimulationTestSink, target: usize) {
async fn kill_and_check(
cluster: &mut Cluster,
test_sink: &SimulationTestSink,
target: usize,
) -> anyhow::Result<()> {
let mut prev_count = 0;
sleep(Duration::from_secs(2)).await;
for i in 0..5 {
Expand All @@ -45,6 +50,7 @@ async fn kill_and_check(cluster: &mut Cluster, test_sink: &SimulationTestSink, t
cluster.kill_node(&KillOpts::ALL).await;
sleep(Duration::from_secs(10)).await;
}
Ok(())
}

async fn recovery_test_inner(is_decouple: bool) -> Result<()> {
Expand All @@ -67,9 +73,9 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> {

let count = test_source.id_list.len();

kill_and_check(&mut cluster, &test_sink, count).await;
kill_and_check(&mut cluster, &test_sink, count).await?;

test_sink.store.wait_for_count(count).await;
test_sink.store.wait_for_count(count).await?;

let mut session = cluster.start_session();
session.run(DROP_SINK).await?;
Expand All @@ -78,7 +84,7 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> {
assert_eq!(0, test_sink.parallelism_counter.load(Relaxed));
assert!(test_sink.store.inner().checkpoint_count > 0);

test_sink.store.check_simple_result(&test_source.id_list);
test_sink.store.check_simple_result(&test_source.id_list)?;
assert!(test_sink.store.inner().checkpoint_count > 0);

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions src/tests/simulation/tests/integration_tests/sink/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::sink::utils::{
start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE,
DROP_SINK, DROP_SOURCE,
};
use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert};

async fn scale_and_check(
cluster: &mut Cluster,
Expand Down Expand Up @@ -100,7 +101,7 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> {
)
.await?;

test_sink.store.wait_for_count(count).await;
test_sink.store.wait_for_count(count).await?;

let mut session = cluster.start_session();
session.run(DROP_SINK).await?;
Expand All @@ -109,7 +110,7 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> {
assert_eq!(0, test_sink.parallelism_counter.load(Relaxed));
assert!(test_sink.store.inner().checkpoint_count > 0);

test_sink.store.check_simple_result(&test_source.id_list);
test_sink.store.check_simple_result(&test_source.id_list)?;
assert!(test_sink.store.inner().checkpoint_count > 0);

Ok(())
Expand Down
10 changes: 7 additions & 3 deletions src/tests/simulation/tests/integration_tests/sink/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use risingwave_connector::source::StreamChunkWithState;
use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration};
use tokio::time::sleep;

use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert};

pub const CREATE_SOURCE: &str = "create source test_source (id int, name varchar) with (connector = 'test') FORMAT PLAIN ENCODE JSON";
pub const CREATE_SINK: &str = "create sink test_sink from test_source with (connector = 'test')";
pub const DROP_SINK: &str = "drop sink test_sink";
Expand Down Expand Up @@ -79,7 +81,7 @@ impl TestSinkStore {
self.inner.lock().unwrap()
}

pub fn check_simple_result(&self, id_list: &[i32]) {
pub fn check_simple_result(&self, id_list: &[i32]) -> anyhow::Result<()> {
let inner = self.inner();
assert_eq!(inner.id_name.len(), id_list.len());
for id in id_list {
Expand All @@ -89,13 +91,14 @@ impl TestSinkStore {
assert_eq!(name, &simple_name_of_id(*id));
}
}
Ok(())
}

pub fn id_count(&self) -> usize {
self.inner().id_name.len()
}

pub async fn wait_for_count(&self, count: usize) {
pub async fn wait_for_count(&self, count: usize) -> anyhow::Result<()> {
let mut prev_count = 0;
loop {
sleep(Duration::from_secs(1)).await;
Expand All @@ -112,6 +115,7 @@ impl TestSinkStore {
);
prev_count = curr_count;
}
Ok(())
}
}

Expand Down Expand Up @@ -206,7 +210,7 @@ pub fn build_stream_chunk(row_iter: impl Iterator<Item = (i32, String)>) -> Stre
);
for (id, name) in row_iter {
let row_id = ROW_ID_GEN.fetch_add(1, Relaxed);
assert!(builder
std::assert!(builder
.append_one_row([
Some(ScalarImpl::Int32(id)),
Some(ScalarImpl::Utf8(name.into())),
Expand Down

0 comments on commit 2e41c9f

Please sign in to comment.