Skip to content

Commit

Permalink
align inject barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 4, 2024
1 parent ecdf72f commit ac40aa0
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 22 deletions.
6 changes: 3 additions & 3 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,15 +585,15 @@ impl GlobalBarrierManager {
.checkpoint_control
.can_inject_barrier(self.in_flight_barrier_nums) => {
self.active_streaming_nodes.sync().await;
self.handle_new_barrier(scheduled);
self.handle_new_barrier(scheduled).await;
}
}
self.checkpoint_control.update_barrier_nums_metrics();
}
}

/// Handle the new barrier from the scheduled queue and inject it.
fn handle_new_barrier(&mut self, scheduled: Scheduled) {
async fn handle_new_barrier(&mut self, scheduled: Scheduled) {
let Scheduled {
command,
mut notifiers,
Expand Down Expand Up @@ -630,7 +630,7 @@ impl GlobalBarrierManager {

send_latency_timer.observe_duration();

self.rpc_manager.inject_barrier(command_ctx.clone());
self.rpc_manager.inject_barrier(command_ctx.clone()).await;

// Notify about the injection.
let prev_paused_reason = self.state.paused_reason();
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ impl GlobalBarrierManagerContext {
let res = match self
.inject_barrier(command_ctx.clone(), None, None)
.await
.await
.result
{
Ok(response) => {
Expand Down
33 changes: 14 additions & 19 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,16 @@ impl BarrierRpcManager {
self.prev_injecting_barrier = None;
}

pub(super) fn inject_barrier(&mut self, command_context: Arc<CommandContext>) {
pub(super) async fn inject_barrier(&mut self, command_context: Arc<CommandContext>) {
// this is to notify that the barrier has been injected so that the next
// barrier can be injected to avoid out of order barrier injection.
// TODO: can be removed when bidi-stream control in implemented.
let (inject_tx, inject_rx) = oneshot::channel();
let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx);
let await_complete_future =
self.context
.inject_barrier(command_context, Some(inject_tx), prev_inject_rx);
let await_complete_future = self
.context
.inject_barrier(command_context, Some(inject_tx), prev_inject_rx)
.await;
self.injected_in_progress_barrier
.push(await_complete_future);
}
Expand All @@ -88,7 +89,7 @@ pub(super) type BarrierCompletionFuture = impl Future<Output = BarrierCompletion

impl GlobalBarrierManagerContext {
/// Inject a barrier to all CNs and spawn a task to collect it
pub(super) fn inject_barrier(
pub(super) async fn inject_barrier(
&self,
command_context: Arc<CommandContext>,
inject_tx: Option<oneshot::Sender<()>>,
Expand All @@ -97,22 +98,16 @@ impl GlobalBarrierManagerContext {
let (tx, rx) = oneshot::channel();
let prev_epoch = command_context.prev_epoch.value().0;
let stream_rpc_manager = self.stream_rpc_manager.clone();
let span = command_context.span.clone();
if let Some(prev_inject_rx) = prev_inject_rx {
let _ = prev_inject_rx.await;
}
let result = stream_rpc_manager
.inject_barrier(command_context.clone())
.instrument(span.clone())
.await;
// todo: the collect handler should be abort when recovery.
let _join_handle = tokio::spawn(async move {
let span = command_context.span.clone();
if let Some(prev_inject_rx) = prev_inject_rx {
if prev_inject_rx.await.is_err() {
let _ = tx.send(BarrierCompletion {
prev_epoch,
result: Err(anyhow!("prev barrier failed to be injected").into()),
});
return;
}
}
let result = stream_rpc_manager
.inject_barrier(command_context.clone())
.instrument(span.clone())
.await;
match result {
Ok(node_need_collect) => {
if let Some(inject_tx) = inject_tx {
Expand Down

0 comments on commit ac40aa0

Please sign in to comment.