From 0c8d62d8f2c53d7590a9d86b2b86afaedc2d9d1c Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 18 Oct 2023 01:17:30 +0800 Subject: [PATCH] fix update vnode await --- src/stream/src/executor/sink.rs | 4 ++-- src/tests/simulation/tests/integration_tests/sink/scale.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 70e63b4b33cd0..fdc4e95ef5799 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -241,7 +241,7 @@ impl SinkExecutor { .await?; if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) { - log_writer.update_vnode_bitmap(vnode_bitmap); + log_writer.update_vnode_bitmap(vnode_bitmap).await?; } yield Message::Barrier(barrier); } @@ -276,7 +276,7 @@ impl SinkExecutor { .await?; if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) { - log_writer.update_vnode_bitmap(vnode_bitmap); + log_writer.update_vnode_bitmap(vnode_bitmap).await?; } yield Message::Barrier(barrier); } diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs index 8bb84051ab055..f327468543a37 100644 --- a/src/tests/simulation/tests/integration_tests/sink/scale.rs +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -37,7 +37,7 @@ async fn scale_and_check( for (plan, expected_parallelism) in schedule_plan { let prev_count = test_sink.store.id_count(); assert!(prev_count < target_count, "sink finish before scale"); - // cluster.reschedule(plan).await?; + cluster.reschedule(plan).await?; let after_count = test_sink.store.id_count(); sleep(Duration::from_secs(10)).await; if thread_rng().gen_bool(0.5) {