Skip to content

Commit

Permalink
fix update vnode await
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 17, 2023
1 parent 6450887 commit 0c8d62d
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
log_writer.update_vnode_bitmap(vnode_bitmap).await?;
}
yield Message::Barrier(barrier);
}
Expand Down Expand Up @@ -276,7 +276,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
log_writer.update_vnode_bitmap(vnode_bitmap).await?;
}
yield Message::Barrier(barrier);
}
Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/tests/integration_tests/sink/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn scale_and_check(
for (plan, expected_parallelism) in schedule_plan {
let prev_count = test_sink.store.id_count();
assert!(prev_count < target_count, "sink finish before scale");
// cluster.reschedule(plan).await?;
cluster.reschedule(plan).await?;
let after_count = test_sink.store.id_count();
sleep(Duration::from_secs(10)).await;
if thread_rng().gen_bool(0.5) {
Expand Down

0 comments on commit 0c8d62d

Please sign in to comment.