Skip to content

Commit

Permalink
remove no-actor optimization & move actor op handler to async context
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jul 26, 2024
1 parent c36de8a commit d0515b1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 46 deletions.
4 changes: 1 addition & 3 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ message StreamingControlStreamRequest {

message StreamingControlStreamResponse {
message InitResponse {}
message ShutdownResponse {
bool has_running_actor = 1;
}
message ShutdownResponse {}

oneof response {
InitResponse init = 1;
Expand Down
20 changes: 4 additions & 16 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,25 +210,13 @@ impl ControlStreamManager {
.expect("should exist when get collect resp");
break Ok((worker_id, command.prev_epoch.value().0, resp));
}
Response::Shutdown(resp) => {
let node = self
Response::Shutdown(_) => {
let _ = self
.nodes
.remove(&worker_id)
.expect("should exist when get shutdown resp");
let has_inflight_barrier = !node.inflight_barriers.is_empty();

// Trigger recovery only if there are actors running.
//
// We need "or" here because...
// - even if there's no running actor, it's possible that new actors will be created
// soon because of upcoming inflight barriers;
// - even if there's no inflight barrier, it's possible that the next periodic barrier
// has not been produced yet.
if resp.has_running_actor || has_inflight_barrier {
break Err(anyhow!("worker node {worker_id} is shutting down").into());
} else {
info!(node = ?node.worker, "idle worker node is shutting down, no need to recover");
}
// TODO: if there's no actor running on the node, we can ignore and not trigger recovery.
break Err(anyhow!("worker node {worker_id} is shutting down").into());
}
Response::Init(_) => {
// This arm should be unreachable.
Expand Down
53 changes: 26 additions & 27 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,38 +127,31 @@ impl ControlStreamHandle {
}

/// Send `Shutdown` message to the control stream and wait for the stream to be closed
/// by the meta service. Notify the caller through `shutdown_sender` once it's done.
fn shutdown_stream(&mut self, has_running_actor: bool, shutdown_sender: oneshot::Sender<()>) {
if has_running_actor {
tracing::warn!("shutdown with running actors, scaling or migration will be triggered");
}

/// by the meta service.
async fn shutdown_stream(&mut self) {
if let Some((sender, _)) = self.pair.take() {
if sender
.send(Ok(StreamingControlStreamResponse {
response: Some(streaming_control_stream_response::Response::Shutdown(
ShutdownResponse { has_running_actor },
ShutdownResponse::default(),
)),
}))
.is_err()
{
warn!("failed to notify shutdown of control stream");
} else {
tokio::spawn(async move {
tracing::info!("waiting for meta service to close control stream...");

// Wait for the stream to be closed, to ensure that the `Shutdown` message has
// been acknowledged by the meta service for more precise error report.
//
// This is because the meta service will reset the control stream manager and
// drop the connection to us upon recovery. As a result, the receiver part of
// this sender will also be dropped, causing the stream to close.
sender.closed().await;

// Notify the caller that the shutdown is done.
let _ = shutdown_sender.send(());
});
tracing::info!("waiting for meta service to close control stream...");

// Wait for the stream to be closed, to ensure that the `Shutdown` message has
// been acknowledged by the meta service for more precise error report.
//
// This is because the meta service will reset the control stream manager and
// drop the connection to us upon recovery. As a result, the receiver part of
// this sender will also be dropped, causing the stream to close.
sender.closed().await;
}
} else {
debug!("control stream has been reset, ignore shutdown");
}
}

Expand Down Expand Up @@ -245,6 +238,7 @@ pub(super) enum LocalBarrierEvent {
Flush(oneshot::Sender<()>),
}

#[derive(strum_macros::Display)]
pub(super) enum LocalActorOperation {
NewControlStream {
handle: ControlStreamHandle,
Expand Down Expand Up @@ -492,6 +486,15 @@ impl LocalBarrierWorker {
response: Some(streaming_control_stream_response::Response::Init(InitResponse {}))
});
}
LocalActorOperation::Shutdown { result_sender } => {
if !self.actor_manager_state.handles.is_empty() {
tracing::warn!(
"shutdown with running actors, scaling or migration will be triggered"
);
}
self.control_stream_handle.shutdown_stream().await;
let _ = result_sender.send(());
}
actor_op => {
self.handle_actor_op(actor_op);
}
Expand Down Expand Up @@ -577,8 +580,8 @@ impl LocalBarrierWorker {

fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
match actor_op {
LocalActorOperation::NewControlStream { .. } => {
unreachable!("NewControlStream event should be handled separately in async context")
LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
unreachable!("event {actor_op} should be handled separately in async context")
}
LocalActorOperation::DropActors {
actors,
Expand Down Expand Up @@ -624,10 +627,6 @@ impl LocalBarrierWorker {
let debug_info = self.to_debug_info();
let _ = result_sender.send(debug_info.to_string());
}
LocalActorOperation::Shutdown { result_sender } => {
self.control_stream_handle
.shutdown_stream(!self.actor_manager_state.handles.is_empty(), result_sender);
}
}
}
}
Expand Down

0 comments on commit d0515b1

Please sign in to comment.