diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 70e63b4b33cd0..fdc4e95ef5799 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -241,7 +241,7 @@ impl SinkExecutor { .await?; if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) { - log_writer.update_vnode_bitmap(vnode_bitmap); + log_writer.update_vnode_bitmap(vnode_bitmap).await?; } yield Message::Barrier(barrier); } @@ -276,7 +276,7 @@ impl SinkExecutor { .await?; if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) { - log_writer.update_vnode_bitmap(vnode_bitmap); + log_writer.update_vnode_bitmap(vnode_bitmap).await?; } yield Message::Barrier(barrier); } diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs index 8bb84051ab055..f327468543a37 100644 --- a/src/tests/simulation/tests/integration_tests/sink/scale.rs +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -37,7 +37,7 @@ async fn scale_and_check( for (plan, expected_parallelism) in schedule_plan { let prev_count = test_sink.store.id_count(); assert!(prev_count < target_count, "sink finish before scale"); - // cluster.reschedule(plan).await?; + cluster.reschedule(plan).await?; let after_count = test_sink.store.id_count(); sleep(Duration::from_secs(10)).await; if thread_rng().gen_bool(0.5) {