Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: graceful shutdown in standalone mode #17633

Merged
merged 8 commits into from
Jul 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
standalone graceful shutdown
  • Loading branch information
BugenZhao committed Jul 12, 2024
commit 6c3dd9a8947e2ae6ea7b9400cee567b65f8f4090
6 changes: 2 additions & 4 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
@@ -230,8 +230,7 @@ fn standalone(opts: StandaloneOpts) -> ! {
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
// TODO(shutdown): pass the shutdown token
risingwave_rt::main_okk(|_| risingwave_cmd_all::standalone(opts));
risingwave_rt::main_okk(|shutdown| risingwave_cmd_all::standalone(opts, shutdown));
}

/// For single node, the internals are just a config mapping from its
@@ -246,8 +245,7 @@ fn single_node(opts: SingleNodeOpts) -> ! {
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
// TODO(shutdown): pass the shutdown token
risingwave_rt::main_okk(|_| risingwave_cmd_all::standalone(opts));
risingwave_rt::main_okk(|shutdown| risingwave_cmd_all::standalone(opts, shutdown));
}

#[cfg(test)]
141 changes: 99 additions & 42 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
@@ -12,16 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use clap::Parser;
use risingwave_common::config::MetaBackend;
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_frontend::FrontendOpts;
use risingwave_meta_node::MetaNodeOpts;
use shell_words::split;
use tokio::signal;

use crate::common::osstrs;

@@ -173,6 +175,46 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts
}
}

struct Service {
name: &'static str,
runtime: BackgroundShutdownRuntime,
main_task: tokio::task::JoinHandle<()>,
shutdown: CancellationToken,
}

impl Service {
fn spawn<F, Fut>(name: &'static str, f: F) -> Self
where
F: FnOnce(CancellationToken) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name(format!("rw-standalone-{name}"))
.enable_all()
.build()
.unwrap();
let shutdown = CancellationToken::new();
let main_task = runtime.spawn(f(shutdown.clone()));

Self {
name,
runtime: runtime.into(),
main_task,
shutdown,
}
}

async fn shutdown(self) {
tracing::info!("stopping {} service...", self.name);

self.shutdown.cancel();
let _ = self.main_task.await;
drop(self.runtime);

tracing::info!("{} service stopped", self.name);
}
}

/// For `standalone` mode, we can configure and start multiple services in one process.
/// `standalone` mode is meant to be used by our cloud service and docker,
/// where we can configure and start multiple services in one process.
@@ -183,61 +225,67 @@ pub async fn standalone(
frontend_opts,
compactor_opts,
}: ParsedStandaloneOpts,
root_shutdown: CancellationToken,
) {
tracing::info!("launching Risingwave in standalone mode");

// TODO(shutdown): use the real one passed-in
let shutdown = CancellationToken::new();

let mut is_in_memory = false;
if let Some(opts) = meta_opts {
is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
let (meta, is_in_memory) = if let Some(opts) = meta_opts {
let is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
tracing::info!("starting meta-node thread with cli args: {:?}", opts);

let shutdown = shutdown.clone();
let _meta_handle = tokio::spawn(async move {
let dangerous_max_idle_secs = opts.dangerous_max_idle_secs;
risingwave_meta_node::start(opts, shutdown).await;
tracing::warn!("meta is stopped, shutdown all nodes");
if let Some(idle_exit_secs) = dangerous_max_idle_secs {
eprintln!("{}",
console::style(format_args!(
"RisingWave playground exited after being idle for {idle_exit_secs} seconds. Bye!"
)).bold());
std::process::exit(0);
}
let service = Service::spawn("meta", |shutdown| {
risingwave_meta_node::start(opts, shutdown)
});

// wait for the service to be ready
let mut tries = 0;
while !risingwave_meta_node::is_server_started() {
if tries % 50 == 0 {
tracing::info!("waiting for meta service to be ready...");
}
if service.main_task.is_finished() {
tracing::error!("meta service failed to start, exiting...");
return;
}
tries += 1;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
if let Some(opts) = compute_opts {

(Some(service), is_in_memory)
} else {
(None, false)
};

let compute = if let Some(opts) = compute_opts {
tracing::info!("starting compute-node thread with cli args: {:?}", opts);
let shutdown = shutdown.clone();
let _compute_handle =
tokio::spawn(async move { risingwave_compute::start(opts, shutdown).await });
}
if let Some(opts) = frontend_opts.clone() {
let service = Service::spawn("compute", |shutdown| {
risingwave_compute::start(opts, shutdown)
});
Some(service)
} else {
None
};

let frontend = if let Some(opts) = frontend_opts.clone() {
tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
let shutdown = shutdown.clone();
let _frontend_handle =
tokio::spawn(async move { risingwave_frontend::start(opts, shutdown).await });
}
if let Some(opts) = compactor_opts {
let service = Service::spawn("frontend", |shutdown| {
risingwave_frontend::start(opts, shutdown)
});
Some(service)
} else {
None
};

let compactor = if let Some(opts) = compactor_opts {
tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
let shutdown = shutdown.clone();
let _compactor_handle =
tokio::spawn(async move { risingwave_compactor::start(opts, shutdown).await });
}
let service = Service::spawn("compactor", |shutdown| risingwave_compactor::start(opts));
Some(service)
} else {
None
};

// wait for log messages to be flushed
tokio::time::sleep(std::time::Duration::from_millis(5000)).await;

eprintln!("----------------------------------------");
eprintln!("| RisingWave standalone mode is ready. |");
eprintln!("----------------------------------------");
@@ -252,6 +300,7 @@ It SHOULD NEVER be used in benchmarks and production environment!!!"
.bold()
);
}

if let Some(opts) = frontend_opts {
let host = opts.listen_addr.split(':').next().unwrap_or("localhost");
let port = opts.listen_addr.split(':').last().unwrap_or("4566");
@@ -268,12 +317,20 @@ It SHOULD NEVER be used in benchmarks and production environment!!!"
);
}

// TODO: should we join all handles?
// Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now.
// TODO(kwannoel): Why can't be shutdown gracefully? Is it that the service just does not
// support it?
signal::ctrl_c().await.unwrap();
tracing::info!("Ctrl+C received, now exiting");
// Wait for services to finish.
tokio::select! {
_ = meta.as_ref().unwrap().shutdown.cancelled(), if meta.is_some() => {
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!("meta service is stopped, terminating...");
return;
}

_ = root_shutdown.cancelled() => {
for service in [compactor, frontend, compute, meta].into_iter().flatten() {
service.shutdown().await;
}
tracing::info!("all services stopped, bye");
}
}
}

#[cfg(test)]
2 changes: 1 addition & 1 deletion src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
@@ -157,7 +157,7 @@ where
match self.rx.message().await {
Ok(resp) => {
if resp.is_none() {
tracing::error!("Stream of notification terminated.");
tracing::warn!("Stream of notification terminated.");
self.re_subscribe().await;
continue;
}
2 changes: 2 additions & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
@@ -463,6 +463,8 @@ pub async fn compute_node_serve(
// existing connections are closed, while we have long-running streaming calls that never
// close. From the other side, there's also no need to gracefully shutdown them if we have
// unregistered from the meta service.

meta_client.try_unregister().await;
}

/// Check whether the compute node has enough memory to perform computing tasks. Apart from storage,
23 changes: 19 additions & 4 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_pb::common::PbWorkerNode;
@@ -134,11 +134,26 @@ impl InflightActorInfo {
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();
for (actor_id, location) in &self.actor_location_map {
if !new_node_map.contains_key(location) {
warn!(actor_id, location, node = ?self.node_map.get(location), "node with running actors is deleted");

let mut deleted_actors = BTreeMap::new();
for (&actor_id, &location) in &self.actor_location_map {
if !new_node_map.contains_key(&location) {
deleted_actors
.entry(location)
.or_insert_with(BTreeSet::new)
.insert(actor_id);
}
}
for (node_id, actors) in deleted_actors {
let node = self.node_map.get(&node_id);
warn!(
node_id,
?node,
?actors,
"node with running actors is deleted"
);
}

self.node_map = new_node_map;
}

2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
@@ -94,7 +94,7 @@ impl DdlController {
{
Ok(version) => Ok(version),
Err(err) => {
tracing::error!(id = job_id, error = ?err.as_report(), "failed to create streaming job");
tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
id: streaming_job.id(),
name: streaming_job.name(),
2 changes: 1 addition & 1 deletion src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -208,7 +208,7 @@ impl Drop for LocalInstanceGuard {
instance_id: self.instance_id,
})
.unwrap_or_else(|err| {
tracing::error!(
tracing::debug!(
error = %err.as_report(),
table_id = %self.table_id,
instance_id = self.instance_id,