Skip to content

Commit

Permalink
shutdown stream manager
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao committed Jul 23, 2024
1 parent 2d3dcca commit 0b8f00d
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 32 deletions.
4 changes: 3 additions & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ pub async fn compute_node_serve(
meta_cache,
block_cache,
);
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr);
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone());
let health_srv = HealthServiceImpl::new();

let telemetry_manager = TelemetryManager::new(
Expand Down Expand Up @@ -468,6 +468,8 @@ pub async fn compute_node_serve(
// TODO(shutdown): how does this interact with auto-scaling?
meta_client.try_unregister().await;

let _ = stream_mgr.shutdown().await;

// NOTE(shutdown): We can't simply join the tonic server here because it only returns when all
// existing connections are closed, while we have long-running streaming calls that never
// close. From the other side, there's also no need to gracefully shutdown them if we have
Expand Down
30 changes: 12 additions & 18 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,24 +985,18 @@ impl GlobalBarrierManagerContext {
}

fn report_complete_event(&self, duration_sec: f64, command_ctx: &CommandContext) {
{
{
{
// Record barrier latency in event log.
use risingwave_pb::meta::event_log;
let event = event_log::EventBarrierComplete {
prev_epoch: command_ctx.prev_epoch.value().0,
cur_epoch: command_ctx.curr_epoch.value().0,
duration_sec,
command: command_ctx.command.to_string(),
barrier_kind: command_ctx.kind.as_str_name().to_string(),
};
self.env
.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
}
}
}
// Record barrier latency in event log.
use risingwave_pb::meta::event_log;
let event = event_log::EventBarrierComplete {
prev_epoch: command_ctx.prev_epoch.value().0,
cur_epoch: command_ctx.curr_epoch.value().0,
duration_sec,
command: command_ctx.command.to_string(),
barrier_kind: command_ctx.kind.as_str_name().to_string(),
};
self.env
.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
}
}

Expand Down
27 changes: 17 additions & 10 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,22 @@ impl ControlStreamManager {
.expect("should exist when get collect resp");
// Note: No need to use `?` as the backtrace is from meta and not useful.
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");

// TODO: this future can be cancelled during collection, so may not work as expected.
let errors = self.collect_errors(node.worker.id, err).await;
let err = merge_node_rpc_errors("get error from control stream", errors);

if let Some(command) = node.inflight_barriers.pop_front() {
let errors = self.collect_errors(node.worker.id, err).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
self.context.report_collect_failure(&command, &err);
break Err(err);
self.context.report_collect_failure(
command.prev_epoch.value().0,
command.curr_epoch.value().0,
&err,
);
} else {
// for node with no inflight barrier, simply ignore the error
info!(node = ?node.worker, "no inflight barrier no node. Ignore error");
continue;
self.context.report_collect_failure(0, 0, &err);
}

break Err(err);
}
}
}
Expand All @@ -239,6 +245,7 @@ impl ControlStreamManager {
})
.await;
}
tracing::debug!(?errors, "collected stream errors");
errors
}
}
Expand Down Expand Up @@ -366,12 +373,12 @@ impl GlobalBarrierManagerContext {
}

/// Send barrier-complete-rpc and wait for responses from all CNs
fn report_collect_failure(&self, command_context: &CommandContext, error: &MetaError) {
fn report_collect_failure(&self, prev_epoch: u64, curr_epoch: u64, error: &MetaError) {
// Record failure in event log.
use risingwave_pb::meta::event_log;
let event = event_log::EventCollectBarrierFail {
prev_epoch: command_context.prev_epoch.value().0,
cur_epoch: command_context.curr_epoch.value().0,
prev_epoch,
cur_epoch: curr_epoch,
error: error.to_report_string(),
};
self.env
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,23 @@ pub enum ErrorKind {
actor_id: ActorId,
reason: &'static str,
},

#[error("Secret error: {0}")]
Secret(
#[from]
#[backtrace]
SecretError,
),

#[error(transparent)]
Uncategorized(
#[from]
#[backtrace]
anyhow::Error,
),

#[error("the compute node is asked to shutdown")]
Shutdown,
}

impl From<PbFieldNotFound> for StreamError {
Expand Down
39 changes: 36 additions & 3 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,27 @@ impl ControlStreamHandle {

let err = err.into_inner();
if sender.send(Err(err)).is_err() {
warn!("failed to notify finish of control stream");
warn!("failed to notify reset of control stream");
}
}
}

async fn shutdown_stream(&mut self) {
let err =
ScoredStreamError::new(StreamError::shutdown()).to_status_unnamed(Code::Cancelled);

if let Some((sender, _)) = self.pair.take() {
if sender.send(Err(err)).is_err() {
warn!("failed to notify shutdown of control stream");
} else {
// We should always unregister the current node from the meta service before
// calling this method. So upon next recovery, the meta service will not involve
// us anymore and drop the connection to us.
//
// We detect this by waiting the stream to be closed. This is to ensure that the
// `Shutdown` error has been acknowledged by the meta service, for more precise
// error report.
sender.closed().await;
}
}
}
Expand Down Expand Up @@ -243,6 +263,9 @@ pub(super) enum LocalActorOperation {
InspectState {
result_sender: oneshot::Sender<String>,
},
Shutdown {
result_sender: oneshot::Sender<()>,
},
}

pub(super) struct CreateActorOutput {
Expand Down Expand Up @@ -452,7 +475,7 @@ impl LocalBarrierWorker {
});
}
actor_op => {
self.handle_actor_op(actor_op);
self.handle_actor_op(actor_op).await;
}
}
}
Expand Down Expand Up @@ -534,7 +557,7 @@ impl LocalBarrierWorker {
}
}

fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
async fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
match actor_op {
LocalActorOperation::NewControlStream { .. } => {
unreachable!("NewControlStream event should be handled separately in async context")
Expand Down Expand Up @@ -583,6 +606,13 @@ impl LocalBarrierWorker {
let debug_info = self.to_debug_info();
let _ = result_sender.send(debug_info.to_string());
}
LocalActorOperation::Shutdown { result_sender } => {
if !self.actor_manager_state.handles.is_empty() {
tracing::warn!("shutdown with running actors");
}
self.control_stream_handle.shutdown_stream().await;
let _ = result_sender.send(());
}
}
}
}
Expand Down Expand Up @@ -1010,6 +1040,9 @@ impl ScoredStreamError {
// `BarrierSend` is likely to be caused by actor exit and not the root cause.
ErrorKind::BarrierSend { .. } => 1,

// Asked to shutdown by user, likely to be the root cause.
ErrorKind::Shutdown => 3000,

// Executor errors first.
ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),

Expand Down
6 changes: 6 additions & 0 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ impl LocalStreamManager {
})
.await
}

pub async fn shutdown(&self) -> StreamResult<()> {
self.actor_op_tx
.send_and_await(|result_sender| LocalActorOperation::Shutdown { result_sender })
.await
}
}

impl LocalBarrierWorker {
Expand Down

0 comments on commit 0b8f00d

Please sign in to comment.