From 36dd239aa288ec196c37240b36eb7ada64633ff4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 7 Apr 2023 15:20:21 +0800 Subject: [PATCH] refactor: do not leak the nested runtime (#8968) Signed-off-by: Bugen Zhao Signed-off-by: Runji Wang Co-authored-by: Runji Wang --- Cargo.lock | 15 +++--- Cargo.toml | 2 +- src/batch/src/task/task_execution.rs | 6 +-- src/batch/src/task/task_manager.rs | 15 +++--- src/common/src/util/mod.rs | 1 + src/common/src/util/runtime.rs | 54 +++++++++++++++++++ .../hummock/compactor/compaction_executor.rs | 7 ++- src/stream/src/task/stream_manager.rs | 28 +++++----- src/tests/simulation/Cargo.toml | 4 +- src/utils/pgwire/src/pg_server.rs | 8 +-- 10 files changed, 94 insertions(+), 46 deletions(-) create mode 100644 src/common/src/util/runtime.rs diff --git a/Cargo.lock b/Cargo.lock index d6446d6ffd934..4a0027422a4fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -398,9 +398,9 @@ dependencies = [ [[package]] name = "async-task" -version = "4.3.0" +version = "4.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" [[package]] name = "async-trait" @@ -3589,9 +3589,9 @@ dependencies = [ [[package]] name = "madsim" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c846a15d407458f1ac5da7da965810277229be9c96ed8082a3eaf2787ef81c23" +checksum = "baac280a8f0a7760cfbf788dc523f8784a26550582b07e05ea92b62093c9a72b" dependencies = [ "ahash 0.7.6", "async-channel", @@ -3668,7 +3668,7 @@ dependencies = [ [[package]] name = "madsim-rdkafka" version = "0.2.14-alpha" -source = "git+https://github.com/madsim-rs/madsim.git?rev=43e025d#43e025db997df923cf6b891cfb874fe6dabba994" +source = "git+https://github.com/madsim-rs/madsim.git?rev=52adb98#52adb98d33939dc841dd6d0c616238edc4fb6db3" dependencies = [ "async-channel", "async-trait", @@ -3690,11 +3690,12 @@ dependencies = [ [[package]] name = "madsim-tokio" -version = "0.2.15" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebc35d85610c81cdefc44f71aad0781b0093fa2d956360e418466cb1d5b1adf2" +checksum = "8c1b74bc405410c47019265f21edee52c7bf08c90d6e984387e26db7ff8ce23e" dependencies = [ "madsim", + "spin 0.9.8", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 8a1110e71dc32..81cb1f45fc497 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,4 +125,4 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710" tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } -madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "43e025d" } +madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "52adb98" } diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 69c163210bac4..fd7e2a8b142d3 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -23,12 +23,12 @@ use minitrace::prelude::*; use parking_lot::Mutex; use risingwave_common::array::DataChunk; use risingwave_common::error::{ErrorCode, Result, RwError}; +use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment}; use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::task_service::task_info_response::TaskStatus; use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse}; use task_stats_alloc::{TaskLocalBytesAllocated, BYTES_ALLOCATED}; -use tokio::runtime::Runtime; use tokio::sync::oneshot::{Receiver, Sender}; use tokio_metrics::TaskMonitor; @@ -293,7 +293,7 @@ pub struct BatchTaskExecution { epoch: BatchQueryEpoch, /// Runtime for the batch tasks. - runtime: &'static Runtime, + runtime: Arc, } impl BatchTaskExecution { @@ -302,7 +302,7 @@ impl BatchTaskExecution { plan: PlanFragment, context: C, epoch: BatchQueryEpoch, - runtime: &'static Runtime, + runtime: Arc, ) -> Result { let task_id = TaskId::from(prost_tid); diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index 16fdf6ff4f3de..f074420c7a0ff 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -21,10 +21,10 @@ use parking_lot::Mutex; use risingwave_common::config::BatchConfig; use risingwave_common::error::ErrorCode::{self, TaskNotFound}; use risingwave_common::error::Result; +use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment}; use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse}; -use tokio::runtime::Runtime; use tokio::sync::mpsc::Sender; use tonic::Status; @@ -41,7 +41,7 @@ pub struct BatchManager { tasks: Arc>>>>, /// Runtime for the batch manager. - runtime: &'static Runtime, + runtime: Arc, /// Batch configuration config: BatchConfig, @@ -70,10 +70,7 @@ impl BatchManager { }; BatchManager { tasks: Arc::new(Mutex::new(HashMap::new())), - // Leak the runtime to avoid runtime shutting-down in the main async context. - // TODO: may manually shutdown the runtime after we implement graceful shutdown for - // stream manager. - runtime: Box::leak(Box::new(runtime)), + runtime: Arc::new(runtime.into()), config, total_mem_val: TrAdder::new().into(), metrics, @@ -89,7 +86,7 @@ impl BatchManager { state_reporter: StateReporter, ) -> Result<()> { trace!("Received task id: {:?}, plan: {:?}", tid, plan); - let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime)?; + let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime())?; let task_id = task.get_task_id().clone(); let task = Arc::new(task); // Here the task id insert into self.tasks is put in front of `.async_execute`, cuz when @@ -203,8 +200,8 @@ impl BatchManager { self.tasks.lock().get(task_id).unwrap().state_receiver() } - pub fn runtime(&self) -> &'static Runtime { - self.runtime + pub fn runtime(&self) -> Arc { + self.runtime.clone() } pub fn config(&self) -> &BatchConfig { diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index 9c7d214c660c1..3fb9e01486ab2 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -33,6 +33,7 @@ pub mod ordered; pub mod prost; pub mod resource_util; pub mod row_id; +pub mod runtime; pub mod scan_range; pub mod schema_check; pub mod sort_util; diff --git a/src/common/src/util/runtime.rs b/src/common/src/util/runtime.rs new file mode 100644 index 0000000000000..e4ad7688880d8 --- /dev/null +++ b/src/common/src/util/runtime.rs @@ -0,0 +1,54 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem::ManuallyDrop; +use std::ops::{Deref, DerefMut}; + +use tokio::runtime::Runtime; + +/// A wrapper around [`Runtime`] that shuts down the runtime in the background when dropped. +/// +/// This is necessary because directly dropping a nested runtime is not allowed in a parent runtime. +pub struct BackgroundShutdownRuntime(ManuallyDrop); + +impl Drop for BackgroundShutdownRuntime { + fn drop(&mut self) { + // Safety: The runtime is only dropped once here. + let runtime = unsafe { ManuallyDrop::take(&mut self.0) }; + + #[cfg(madsim)] + drop(runtime); + #[cfg(not(madsim))] + runtime.shutdown_background(); + } +} + +impl Deref for BackgroundShutdownRuntime { + type Target = Runtime; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for BackgroundShutdownRuntime { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From for BackgroundShutdownRuntime { + fn from(runtime: Runtime) -> Self { + Self(ManuallyDrop::new(runtime)) + } +} diff --git a/src/storage/src/hummock/compactor/compaction_executor.rs b/src/storage/src/hummock/compactor/compaction_executor.rs index c7336a61203b7..2f300d4e134ac 100644 --- a/src/storage/src/hummock/compactor/compaction_executor.rs +++ b/src/storage/src/hummock/compactor/compaction_executor.rs @@ -14,12 +14,13 @@ use std::future::Future; +use risingwave_common::util::runtime::BackgroundShutdownRuntime; use tokio::task::JoinHandle; /// `CompactionExecutor` is a dedicated runtime for compaction's CPU intensive jobs. pub struct CompactionExecutor { /// Runtime for compaction tasks. - runtime: &'static tokio::runtime::Runtime, + runtime: BackgroundShutdownRuntime, } impl CompactionExecutor { @@ -34,9 +35,7 @@ impl CompactionExecutor { }; Self { - // Leak the runtime to avoid runtime shutting-down in the main async context. - // TODO: may manually shutdown the runtime gracefully. - runtime: Box::leak(Box::new(runtime)), + runtime: runtime.into(), } } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index b293dc9014d07..bc8a6aae8b964 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -29,6 +29,7 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::config::StreamingConfig; use risingwave_common::util::addr::HostAddr; +use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; @@ -57,7 +58,7 @@ pub type AtomicU64Ref = Arc; pub struct LocalStreamManagerCore { /// Runtime for the streaming actors. - runtime: &'static tokio::runtime::Runtime, + runtime: BackgroundShutdownRuntime, /// Each processor runs in a future. Upon receiving a `Terminate` message, they will exit. /// `handles` store join handles of these futures, and therefore we could wait their @@ -400,21 +401,20 @@ impl LocalStreamManagerCore { config: StreamingConfig, await_tree_config: Option, ) -> Self { - let mut builder = tokio::runtime::Builder::new_multi_thread(); - if let Some(worker_threads_num) = config.actor_runtime_worker_threads_num { - builder.worker_threads(worker_threads_num); - } - let runtime = builder - .thread_name("risingwave-streaming-actor") - .enable_all() - .build() - .unwrap(); + let runtime = { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + if let Some(worker_threads_num) = config.actor_runtime_worker_threads_num { + builder.worker_threads(worker_threads_num); + } + builder + .thread_name("risingwave-streaming-actor") + .enable_all() + .build() + .unwrap() + }; Self { - // Leak the runtime to avoid runtime shutting-down in the main async context. - // TODO: may manually shutdown the runtime after we implement graceful shutdown for - // stream manager. - runtime: Box::leak(Box::new(runtime)), + runtime: runtime.into(), handles: HashMap::new(), context: Arc::new(context), actors: HashMap::new(), diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index f788a50d9f3f2..c4e6c9b279f17 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -21,7 +21,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } glob = "0.3" itertools = "0.10" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", branch = "evict_by_timestamp" } -madsim = "0.2.18" +madsim = "0.2.19" paste = "1" pin-project = "1.0" pretty_assertions = "1" @@ -41,7 +41,7 @@ serde_derive = "1.0.152" serde_json = "1.0.91" sqllogictest = "0.11.1" tempfile = "3" -tokio = { version = "0.2.15", package = "madsim-tokio" } +tokio = { version = "0.2.19", package = "madsim-tokio" } tokio-postgres = "0.7.7" tracing = "0.1" tracing-subscriber = "0.3" diff --git a/src/utils/pgwire/src/pg_server.rs b/src/utils/pgwire/src/pg_server.rs index eb9482b001d6e..d6497ba4157d3 100644 --- a/src/utils/pgwire/src/pg_server.rs +++ b/src/utils/pgwire/src/pg_server.rs @@ -18,7 +18,7 @@ use std::result::Result; use std::sync::Arc; use bytes::Bytes; -use futures::Stream; +use futures::{Stream, TryFutureExt}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::Statement; use tokio::io::{AsyncRead, AsyncWrite}; @@ -144,11 +144,7 @@ where stream.set_nodelay(true)?; let ssl_config = ssl_config.clone(); let fut = handle_connection(stream, session_mgr, ssl_config); - tokio::spawn(async { - if let Err(e) = fut.await { - debug!("error handling connection : {}", e); - } - }); + tokio::spawn(fut.inspect_err(|e| debug!("error handling connection: {e}"))); } Err(e) => {