Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 21, 2024
1 parent 2ef8006 commit 678db30
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
8 changes: 6 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,12 @@ impl GlobalBarrierManager {

send_latency_timer.observe_duration();

self.rpc_manager.inject_barrier(command_ctx.clone()).await?;
self.rpc_manager
.inject_barrier(command_ctx.clone())
.await
.inspect_err(|_| {
fail_point!("inject_barrier_err_success");
})?;

// Notify about the injection.
let prev_paused_reason = self.state.paused_reason();
Expand Down Expand Up @@ -709,7 +714,6 @@ impl GlobalBarrierManager {
if let Err(err) = result {
// FIXME: If it is a connector source error occurred in the init barrier, we should pass
// back to frontend
fail_point!("inject_barrier_err_success");
let fail_node = self.checkpoint_control.barrier_failed();
warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch");
self.failure_recovery(err, fail_node).await;
Expand Down
14 changes: 7 additions & 7 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,13 @@ impl LocalBarrierWorker {
}

fn send_response(&mut self, response: StreamingControlStreamResponse) {
let (sender, _) = self
.control_stream_handle
.as_ref()
.expect("should not be None");
if sender.send(Ok(response)).is_err() {
self.control_stream_handle = None;
warn!("fail to send response. control stream reset");
if let Some((sender, _)) = self.control_stream_handle.as_ref() {
if sender.send(Ok(response)).is_err() {
self.control_stream_handle = None;
warn!("fail to send response. control stream reset");
}
} else {
debug!(?response, "control stream has been reset. ignore response");
}
}

Expand Down

0 comments on commit 678db30

Please sign in to comment.