Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compute node unregisters from meta for graceful shutdown #17662

Merged
merged 11 commits into from
Jul 26, 2024
2 changes: 2 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ message StreamingControlStreamRequest {

message StreamingControlStreamResponse {
message InitResponse {}
message ShutdownResponse {}

oneof response {
InitResponse init = 1;
BarrierCompleteResponse complete_barrier = 2;
ShutdownResponse shutdown = 3;
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ where
match self.rx.message().await {
Ok(resp) => {
if resp.is_none() {
tracing::error!("Stream of notification terminated.");
tracing::warn!("Stream of notification terminated.");
self.re_subscribe().await;
continue;
}
Expand Down
10 changes: 7 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ pub async fn compute_node_serve(
meta_cache,
block_cache,
);
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr);
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone());
let health_srv = HealthServiceImpl::new();

let telemetry_manager = TelemetryManager::new(
Expand Down Expand Up @@ -469,8 +469,12 @@ pub async fn compute_node_serve(
// Wait for the shutdown signal.
shutdown.cancelled().await;

// TODO(shutdown): gracefully unregister from the meta service (need to cautious since it may
// trigger auto-scaling)
// Unregister from the meta service, then...
// - batch queries will not be scheduled to this compute node,
// - streaming actors will not be scheduled to this compute node after next recovery.
meta_client.try_unregister().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will a worker without any actors trigger a meta recovery if it exits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. Updated the code to not tigger recovery if an idle compute node exits.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Shutdown the streaming manager.
let _ = stream_mgr.shutdown().await;

// NOTE(shutdown): We can't simply join the tonic server here because it only returns when all
// existing connections are closed, while we have long-running streaming calls that never
Expand Down
23 changes: 19 additions & 4 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_pb::common::PbWorkerNode;
Expand Down Expand Up @@ -137,11 +137,26 @@ impl InflightActorInfo {
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();
for (actor_id, location) in &self.actor_location_map {
if !new_node_map.contains_key(location) {
warn!(actor_id, location, node = ?self.node_map.get(location), "node with running actors is deleted");

let mut deleted_actors = BTreeMap::new();
for (&actor_id, &location) in &self.actor_location_map {
if !new_node_map.contains_key(&location) {
deleted_actors
.entry(location)
.or_insert_with(BTreeSet::new)
.insert(actor_id);
}
}
for (node_id, actors) in deleted_actors {
let node = self.node_map.get(&node_id);
warn!(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make the logs more concise.

node_id,
?node,
?actors,
"node with running actors is deleted"
);
}

self.node_map = new_node_map;
}

Expand Down
30 changes: 12 additions & 18 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,24 +985,18 @@ impl GlobalBarrierManagerContext {
}

fn report_complete_event(&self, duration_sec: f64, command_ctx: &CommandContext) {
{
{
{
// Record barrier latency in event log.
use risingwave_pb::meta::event_log;
let event = event_log::EventBarrierComplete {
prev_epoch: command_ctx.prev_epoch.value().0,
cur_epoch: command_ctx.curr_epoch.value().0,
duration_sec,
command: command_ctx.command.to_string(),
barrier_kind: command_ctx.kind.as_str_name().to_string(),
};
self.env
.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
}
}
}
// Record barrier latency in event log.
use risingwave_pb::meta::event_log;
let event = event_log::EventBarrierComplete {
prev_epoch: command_ctx.prev_epoch.value().0,
cur_epoch: command_ctx.curr_epoch.value().0,
duration_sec,
command: command_ctx.command.to_string(),
barrier_kind: command_ctx.kind.as_str_name().to_string(),
};
self.env
.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,12 @@ impl GlobalBarrierManager {
.committed_epoch
.into(),
);

// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
.abort_and_mark_blocked("cluster is under recovering");
// Clear all control streams to release resources (connections to compute nodes) first.
self.control_stream_manager.clear();

tracing::info!("recovery start!");
let retry_strategy = Self::get_retry_strategy();
Expand Down Expand Up @@ -288,6 +291,7 @@ impl GlobalBarrierManager {
// Resolve actor info for recovery. If there's no actor to recover, most of the
// following steps will be no-op, while the compute nodes will still be reset.
// FIXME: Transactions should be used.
// TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
let mut info = if !self.env.opts.disable_automatic_parallelism_control
&& background_streaming_jobs.is_empty()
{
Expand Down
48 changes: 37 additions & 11 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,39 @@ impl ControlStreamManager {
Ok(())
}

/// Clear all nodes and response streams in the manager.
pub(super) fn clear(&mut self) {
*self = Self::new(self.context.clone());
}

async fn next_response(
&mut self,
) -> Option<(WorkerId, MetaResult<StreamingControlStreamResponse>)> {
let (worker_id, response_stream, result) = self.response_streams.next().await?;
if result.is_ok() {
self.response_streams
.push(into_future(worker_id, response_stream));

match result.as_ref().map(|r| r.response.as_ref().unwrap()) {
Ok(streaming_control_stream_response::Response::Shutdown(_)) | Err(_) => {
// Do not add it back to the `response_streams` so that it will not be polled again.
}
_ => {
self.response_streams
.push(into_future(worker_id, response_stream));
}
}

Some((worker_id, result))
}

pub(super) async fn next_complete_barrier_response(
&mut self,
) -> MetaResult<(WorkerId, u64, BarrierCompleteResponse)> {
use streaming_control_stream_response::Response;

loop {
let (worker_id, result) = pending_on_none(self.next_response()).await;
match result {
Ok(resp) => match resp.response {
Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => {
Ok(resp) => match resp.response.unwrap() {
Response::CompleteBarrier(resp) => {
let node = self
.nodes
.get_mut(&worker_id)
Expand All @@ -196,26 +210,37 @@ impl ControlStreamManager {
.expect("should exist when get collect resp");
break Ok((worker_id, command.prev_epoch.value().0, resp));
}
resp => {
break Err(anyhow!("get unexpected resp: {:?}", resp).into());
Response::Shutdown(_) => {
let _ = self
.nodes
.remove(&worker_id)
.expect("should exist when get shutdown resp");
// TODO: if there's no actor running on the node, we can ignore and not trigger recovery.
break Err(anyhow!("worker node {worker_id} is shutting down").into());
}
Response::Init(_) => {
// This arm should be unreachable.
break Err(anyhow!("get unexpected init response").into());
}
},
Err(err) => {
let mut node = self
let node = self
.nodes
.remove(&worker_id)
.expect("should exist when get collect resp");
// Note: No need to use `?` as the backtrace is from meta and not useful.
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
if let Some(command) = node.inflight_barriers.pop_front() {

if let Some(command) = node.inflight_barriers.into_iter().next() {
// FIXME: this future can be cancelled during collection, so the error collection
// might not work as expected.
let errors = self.collect_errors(node.worker.id, err).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
self.context.report_collect_failure(&command, &err);
break Err(err);
} else {
// for node with no inflight barrier, simply ignore the error
info!(node = ?node.worker, "no inflight barrier no node. Ignore error");
continue;
info!(node = ?node.worker, error = %err.as_report(), "no inflight barrier in the node, ignore error");
}
}
}
Expand All @@ -239,6 +264,7 @@ impl ControlStreamManager {
})
.await;
}
tracing::debug!(?errors, "collected stream errors");
errors
}
}
Expand Down
20 changes: 14 additions & 6 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2618,15 +2618,20 @@ impl GlobalStreamManager {
notification = local_notification_rx.recv() => {
let notification = notification.expect("local notification channel closed in loop of stream manager");

// Only maintain the cache for streaming compute nodes.
let worker_is_streaming_compute = |worker: &WorkerNode| {
worker.get_type() == Ok(WorkerType::ComputeNode)
&& worker.property.as_ref().unwrap().is_streaming
};

match notification {
LocalNotification::WorkerNodeActivated(worker) => {
match (worker.get_type(), worker.property.as_ref()) {
(Ok(WorkerType::ComputeNode), Some(prop)) if prop.is_streaming => {
tracing::info!("worker {} activated notification received", worker.id);
}
_ => continue
if !worker_is_streaming_compute(&worker) {
continue;
}

tracing::info!(worker = worker.id, "worker activated notification received");

let prev_worker = worker_cache.insert(worker.id, worker.clone());

match prev_worker {
Expand All @@ -2645,11 +2650,14 @@ impl GlobalStreamManager {
// Since our logic for handling passive scale-in is within the barrier manager,
// there’s not much we can do here. All we can do is proactively remove the entries from our cache.
LocalNotification::WorkerNodeDeleted(worker) => {
if !worker_is_streaming_compute(&worker) {
continue;
}
Comment on lines +2653 to +2655
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise we get false warnings.


match worker_cache.remove(&worker.id) {
Some(prev_worker) => {
tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
}

None => {
tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
}
Expand Down
13 changes: 11 additions & 2 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, SystemTime};
Expand Down Expand Up @@ -115,6 +117,7 @@ pub struct MetaClient {
inner: GrpcMetaClient,
meta_config: MetaConfig,
cluster_id: String,
shutting_down: Arc<AtomicBool>,
}

impl MetaClient {
Expand Down Expand Up @@ -276,6 +279,7 @@ impl MetaClient {
inner: grpc_meta_client,
meta_config: meta_config.to_owned(),
cluster_id: add_worker_resp.cluster_id,
shutting_down: Arc::new(false.into()),
};

static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
Expand Down Expand Up @@ -322,8 +326,12 @@ impl MetaClient {
let resp = self.inner.heartbeat(request).await?;
if let Some(status) = resp.status {
if status.code() == risingwave_pb::common::status::Code::UnknownWorker {
tracing::error!("worker expired: {}", status.message);
std::process::exit(1);
// Ignore the error if we're already shutting down.
// Otherwise, exit the process.
if !self.shutting_down.load(Relaxed) {
tracing::error!(message = status.message, "worker expired");
std::process::exit(1);
}
}
}
Ok(())
Expand Down Expand Up @@ -745,6 +753,7 @@ impl MetaClient {
host: Some(self.host_addr.to_protobuf()),
};
self.inner.delete_worker_node(request).await?;
self.shutting_down.store(true, Relaxed);
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl Drop for LocalInstanceGuard {
instance_id: self.instance_id,
})
.unwrap_or_else(|err| {
tracing::error!(
tracing::debug!(
error = %err.as_report(),
table_id = %self.table_id,
instance_id = self.instance_id,
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use risingwave_rpc_client::HummockMetaClient;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::oneshot;
use tracing::error;

use super::local_hummock_storage::LocalHummockStorage;
use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader};
Expand Down Expand Up @@ -75,7 +74,7 @@ impl Drop for HummockStorageShutdownGuard {
let _ = self
.shutdown_sender
.send(HummockEvent::Shutdown)
.inspect_err(|e| error!(event = ?e.0, "unable to send shutdown"));
.inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ pub enum ErrorKind {
actor_id: ActorId,
reason: &'static str,
},

#[error("Secret error: {0}")]
Secret(
#[from]
#[backtrace]
SecretError,
),

#[error(transparent)]
Uncategorized(
#[from]
Expand Down
Loading
Loading