Skip to content

Commit

Permalink
standalone graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao committed Jul 9, 2024
1 parent 6ec81c2 commit 9ffcdd8
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 53 deletions.
6 changes: 2 additions & 4 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ fn standalone(opts: StandaloneOpts) -> ! {
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
// TODO(shutdown): pass the shutdown token
risingwave_rt::main_okk(|_| risingwave_cmd_all::standalone(opts));
risingwave_rt::main_okk(|shutdown| risingwave_cmd_all::standalone(opts, shutdown));
}

/// For single node, the internals are just a config mapping from its
Expand All @@ -246,8 +245,7 @@ fn single_node(opts: SingleNodeOpts) -> ! {
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
// TODO(shutdown): pass the shutdown token
risingwave_rt::main_okk(|_| risingwave_cmd_all::standalone(opts));
risingwave_rt::main_okk(|shutdown| risingwave_cmd_all::standalone(opts, shutdown));
}

#[cfg(test)]
Expand Down
141 changes: 99 additions & 42 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use clap::Parser;
use risingwave_common::config::MetaBackend;
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_frontend::FrontendOpts;
use risingwave_meta_node::MetaNodeOpts;
use shell_words::split;
use tokio::signal;

use crate::common::osstrs;

Expand Down Expand Up @@ -173,6 +175,46 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts
}
}

struct Service {
name: &'static str,
runtime: BackgroundShutdownRuntime,
main_task: tokio::task::JoinHandle<()>,
shutdown: CancellationToken,
}

impl Service {
fn spawn<F, Fut>(name: &'static str, f: F) -> Self
where
F: FnOnce(CancellationToken) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name(format!("rw-standalone-{name}"))
.enable_all()
.build()
.unwrap();
let shutdown = CancellationToken::new();
let main_task = runtime.spawn(f(shutdown.clone()));

Self {
name,
runtime: runtime.into(),
main_task,
shutdown,
}
}

async fn shutdown(self) {
tracing::info!("stopping {} service...", self.name);

self.shutdown.cancel();
let _ = self.main_task.await;
drop(self.runtime);

tracing::info!("{} service stopped", self.name);
}
}

/// For `standalone` mode, we can configure and start multiple services in one process.
/// `standalone` mode is meant to be used by our cloud service and docker,
/// where we can configure and start multiple services in one process.
Expand All @@ -183,61 +225,67 @@ pub async fn standalone(
frontend_opts,
compactor_opts,
}: ParsedStandaloneOpts,
root_shutdown: CancellationToken,
) {
tracing::info!("launching Risingwave in standalone mode");

// TODO(shutdown): use the real one passed-in
let shutdown = CancellationToken::new();

let mut is_in_memory = false;
if let Some(opts) = meta_opts {
is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
let (meta, is_in_memory) = if let Some(opts) = meta_opts {
let is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
tracing::info!("starting meta-node thread with cli args: {:?}", opts);

let shutdown = shutdown.clone();
let _meta_handle = tokio::spawn(async move {
let dangerous_max_idle_secs = opts.dangerous_max_idle_secs;
risingwave_meta_node::start(opts, shutdown).await;
tracing::warn!("meta is stopped, shutdown all nodes");
if let Some(idle_exit_secs) = dangerous_max_idle_secs {
eprintln!("{}",
console::style(format_args!(
"RisingWave playground exited after being idle for {idle_exit_secs} seconds. Bye!"
)).bold());
std::process::exit(0);
}
let service = Service::spawn("meta", |shutdown| {
risingwave_meta_node::start(opts, shutdown)
});

// wait for the service to be ready
let mut tries = 0;
while !risingwave_meta_node::is_server_started() {
if tries % 50 == 0 {
tracing::info!("waiting for meta service to be ready...");
}
if service.main_task.is_finished() {
tracing::error!("meta service failed to start, exiting...");
return;
}
tries += 1;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
if let Some(opts) = compute_opts {

(Some(service), is_in_memory)
} else {
(None, false)
};

let compute = if let Some(opts) = compute_opts {
tracing::info!("starting compute-node thread with cli args: {:?}", opts);
let shutdown = shutdown.clone();
let _compute_handle =
tokio::spawn(async move { risingwave_compute::start(opts, shutdown).await });
}
if let Some(opts) = frontend_opts.clone() {
let service = Service::spawn("compute", |shutdown| {
risingwave_compute::start(opts, shutdown)
});
Some(service)
} else {
None
};

let frontend = if let Some(opts) = frontend_opts.clone() {
tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
let shutdown = shutdown.clone();
let _frontend_handle =
tokio::spawn(async move { risingwave_frontend::start(opts, shutdown).await });
}
if let Some(opts) = compactor_opts {
let service = Service::spawn("frontend", |shutdown| {
risingwave_frontend::start(opts, shutdown)
});
Some(service)
} else {
None
};

let compactor = if let Some(opts) = compactor_opts {
tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
let shutdown = shutdown.clone();
let _compactor_handle =
tokio::spawn(async move { risingwave_compactor::start(opts, shutdown).await });
}
let service = Service::spawn("compactor", |shutdown| risingwave_compactor::start(opts));
Some(service)
} else {
None
};

// wait for log messages to be flushed
tokio::time::sleep(std::time::Duration::from_millis(5000)).await;

eprintln!("----------------------------------------");
eprintln!("| RisingWave standalone mode is ready. |");
eprintln!("----------------------------------------");
Expand All @@ -252,6 +300,7 @@ It SHOULD NEVER be used in benchmarks and production environment!!!"
.bold()
);
}

if let Some(opts) = frontend_opts {
let host = opts.listen_addr.split(':').next().unwrap_or("localhost");
let port = opts.listen_addr.split(':').last().unwrap_or("4566");
Expand All @@ -268,12 +317,20 @@ It SHOULD NEVER be used in benchmarks and production environment!!!"
);
}

// TODO: should we join all handles?
// Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now.
// TODO(kwannoel): Why can't be shutdown gracefully? Is it that the service just does not
// support it?
signal::ctrl_c().await.unwrap();
tracing::info!("Ctrl+C received, now exiting");
// Wait for services to finish.
tokio::select! {
_ = meta.as_ref().unwrap().shutdown.cancelled(), if meta.is_some() => {
tracing::info!("meta service is stopped, terminating...");
return;
}

_ = root_shutdown.cancelled() => {
for service in [compactor, frontend, compute, meta].into_iter().flatten() {
service.shutdown().await;
}
tracing::info!("all services stopped, bye");
}
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ where
match self.rx.message().await {
Ok(resp) => {
if resp.is_none() {
tracing::error!("Stream of notification terminated.");
tracing::warn!("Stream of notification terminated.");
self.re_subscribe().await;
continue;
}
Expand Down
2 changes: 2 additions & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ pub async fn compute_node_serve(
// existing connections are closed, while we have long-running streaming calls that never
// close. From the other side, there's also no need to gracefully shutdown them if we have
// unregistered from the meta service.

meta_client.try_unregister().await;
}

/// Check whether the compute node has enough memory to perform computing tasks. Apart from storage,
Expand Down
23 changes: 19 additions & 4 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_pb::common::PbWorkerNode;
Expand Down Expand Up @@ -134,11 +134,26 @@ impl InflightActorInfo {
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();
for (actor_id, location) in &self.actor_location_map {
if !new_node_map.contains_key(location) {
warn!(actor_id, location, node = ?self.node_map.get(location), "node with running actors is deleted");

let mut deleted_actors = BTreeMap::new();
for (&actor_id, &location) in &self.actor_location_map {
if !new_node_map.contains_key(&location) {
deleted_actors
.entry(location)
.or_insert_with(BTreeSet::new)
.insert(actor_id);
}
}
for (node_id, actors) in deleted_actors {
let node = self.node_map.get(&node_id);
warn!(
node_id,
?node,
?actors,
"node with running actors is deleted"
);
}

self.node_map = new_node_map;
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl DdlController {
{
Ok(version) => Ok(version),
Err(err) => {
tracing::error!(id = job_id, error = ?err.as_report(), "failed to create streaming job");
tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
id: streaming_job.id(),
name: streaming_job.name(),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl Drop for LocalInstanceGuard {
instance_id: self.instance_id,
})
.unwrap_or_else(|err| {
tracing::error!(
tracing::debug!(
error = %err.as_report(),
table_id = %self.table_id,
instance_id = self.instance_id,
Expand Down

0 comments on commit 9ffcdd8

Please sign in to comment.