Skip to content

Commit

Permalink
Revert "revert changes not relavent to standalone"
Browse files Browse the repository at this point in the history
This reverts commit 3142219.
  • Loading branch information
BugenZhao committed Jul 23, 2024
1 parent 007e802 commit 698dbe0
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ where
match self.rx.message().await {
Ok(resp) => {
if resp.is_none() {
tracing::error!("Stream of notification terminated.");
tracing::warn!("Stream of notification terminated.");
self.re_subscribe().await;
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ pub async fn compute_node_serve(
// Wait for the shutdown signal.
shutdown.cancelled().await;

// TODO(shutdown): gracefully unregister from the meta service (need to cautious since it may
// trigger auto-scaling)
// TODO(shutdown): how does this interact with auto-scaling?
meta_client.try_unregister().await;

// NOTE(shutdown): We can't simply join the tonic server here because it only returns when all
// existing connections are closed, while we have long-running streaming calls that never
Expand Down
27 changes: 23 additions & 4 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,10 +719,11 @@ pub async fn start_service_as_election_leader(
}
}

let idle_shutdown = CancellationToken::new();
let _idle_checker_handle = IdleManager::start_idle_checker(
env.idle_manager_ref(),
Duration::from_secs(30),
shutdown.clone(),
idle_shutdown.clone(),
);

let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -771,6 +772,7 @@ pub async fn start_service_as_election_leader(
risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
]);

let server_shutdown = CancellationToken::new();
let server = tonic::transport::Server::builder()
.layer(MetricsMiddlewareLayer::new(meta_metrics))
.layer(TracingExtractLayer::new())
Expand Down Expand Up @@ -802,14 +804,31 @@ pub async fn start_service_as_election_leader(
tcp_nodelay: true,
keepalive_duration: None,
},
shutdown.clone().cancelled_owned(),
server_shutdown.clone().cancelled_owned(),
);
started::set();
let _server_handle = tokio::spawn(server);

// Wait for the shutdown signal.
shutdown.cancelled().await;
// TODO(shutdown): may warn user if there's any other node still running in the cluster.
tokio::select! {
// Idle manager informs to shutdown. Do nothing else but directly return.
_ = idle_shutdown.cancelled() => {}

// External shutdown signal.
_ = shutdown.cancelled() => {
// Wait for all other workers to shutdown for gracefulness.
if election_client.is_leader() {
let res = metadata_manager.wait_till_all_worker_nodes_exit().await;
if let Err(e) = res {
tracing::error!(
error = %e.as_report(),
"error occurs while waiting for all worker nodes to exit, directly shutdown",
);
}
}
server_shutdown.cancel();
}
}
// TODO(shutdown): do we have any other shutdown tasks?
Ok(())
}
Expand Down
23 changes: 19 additions & 4 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_pb::common::PbWorkerNode;
Expand Down Expand Up @@ -137,11 +137,26 @@ impl InflightActorInfo {
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();
for (actor_id, location) in &self.actor_location_map {
if !new_node_map.contains_key(location) {
warn!(actor_id, location, node = ?self.node_map.get(location), "node with running actors is deleted");

let mut deleted_actors = BTreeMap::new();
for (&actor_id, &location) in &self.actor_location_map {
if !new_node_map.contains_key(&location) {
deleted_actors
.entry(location)
.or_insert_with(BTreeSet::new)
.insert(actor_id);
}
}
for (node_id, actors) in deleted_actors {
let node = self.node_map.get(&node_id);
warn!(
node_id,
?node,
?actors,
"node with running actors is deleted"
);
}

self.node_map = new_node_map;
}

Expand Down
27 changes: 27 additions & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,33 @@ impl MetadataManager {
}
}

/// Wait until all worker nodes (except meta nodes) exit. Used for graceful shutdown.
pub async fn wait_till_all_worker_nodes_exit(&self) -> MetaResult<()> {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut last_remaining = 0;

loop {
interval.tick().await;

let remaining = self
.list_worker_node(None, Some(State::Running))
.await?
.into_iter()
.filter(|w| !matches!(w.r#type(), WorkerType::Meta)) // filter out meta node
.count();

if remaining == 0 {
tracing::info!("all worker nodes exited");
return Ok(());
}

if remaining != last_remaining {
last_remaining = remaining;
warn!(remaining, "waiting for all worker nodes to exit...");
}
}
}

pub async fn subscribe_active_streaming_compute_nodes(
&self,
) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl Drop for LocalInstanceGuard {
instance_id: self.instance_id,
})
.unwrap_or_else(|err| {
tracing::error!(
tracing::debug!(
error = %err.as_report(),
table_id = %self.table_id,
instance_id = self.instance_id,
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use risingwave_rpc_client::HummockMetaClient;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::oneshot;
use tracing::error;

use super::local_hummock_storage::LocalHummockStorage;
use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader};
Expand Down Expand Up @@ -75,7 +74,7 @@ impl Drop for HummockStorageShutdownGuard {
let _ = self
.shutdown_sender
.send(HummockEvent::Shutdown)
.inspect_err(|e| error!(event = ?e.0, "unable to send shutdown"));
.inspect_err(|e| tracing::warn!(event = ?e.0, "unable to send shutdown"));
}
}

Expand Down

0 comments on commit 698dbe0

Please sign in to comment.