Skip to content

Commit

Permalink
refactor: do not leak the nested runtime (#8968)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
Signed-off-by: Runji Wang <[email protected]>
Co-authored-by: Runji Wang <[email protected]>
  • Loading branch information
BugenZhao and wangrunji0408 authored Apr 7, 2023
1 parent e685bdb commit 36dd239
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 46 deletions.
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,4 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710"
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }
postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "43e025d" }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "52adb98" }
6 changes: 3 additions & 3 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use minitrace::prelude::*;
use parking_lot::Mutex;
use risingwave_common::array::DataChunk;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::task_service::task_info_response::TaskStatus;
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse};
use task_stats_alloc::{TaskLocalBytesAllocated, BYTES_ALLOCATED};
use tokio::runtime::Runtime;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio_metrics::TaskMonitor;

Expand Down Expand Up @@ -293,7 +293,7 @@ pub struct BatchTaskExecution<C> {
epoch: BatchQueryEpoch,

/// Runtime for the batch tasks.
runtime: &'static Runtime,
runtime: Arc<BackgroundShutdownRuntime>,
}

impl<C: BatchTaskContext> BatchTaskExecution<C> {
Expand All @@ -302,7 +302,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
plan: PlanFragment,
context: C,
epoch: BatchQueryEpoch,
runtime: &'static Runtime,
runtime: Arc<BackgroundShutdownRuntime>,
) -> Result<Self> {
let task_id = TaskId::from(prost_tid);

Expand Down
15 changes: 6 additions & 9 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use parking_lot::Mutex;
use risingwave_common::config::BatchConfig;
use risingwave_common::error::ErrorCode::{self, TaskNotFound};
use risingwave_common::error::Result;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::Sender;
use tonic::Status;

Expand All @@ -41,7 +41,7 @@ pub struct BatchManager {
tasks: Arc<Mutex<HashMap<TaskId, Arc<BatchTaskExecution<ComputeNodeContext>>>>>,

/// Runtime for the batch manager.
runtime: &'static Runtime,
runtime: Arc<BackgroundShutdownRuntime>,

/// Batch configuration
config: BatchConfig,
Expand Down Expand Up @@ -70,10 +70,7 @@ impl BatchManager {
};
BatchManager {
tasks: Arc::new(Mutex::new(HashMap::new())),
// Leak the runtime to avoid runtime shutting-down in the main async context.
// TODO: may manually shutdown the runtime after we implement graceful shutdown for
// stream manager.
runtime: Box::leak(Box::new(runtime)),
runtime: Arc::new(runtime.into()),
config,
total_mem_val: TrAdder::new().into(),
metrics,
Expand All @@ -89,7 +86,7 @@ impl BatchManager {
state_reporter: StateReporter,
) -> Result<()> {
trace!("Received task id: {:?}, plan: {:?}", tid, plan);
let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime)?;
let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime())?;
let task_id = task.get_task_id().clone();
let task = Arc::new(task);
// Here the task id insert into self.tasks is put in front of `.async_execute`, cuz when
Expand Down Expand Up @@ -203,8 +200,8 @@ impl BatchManager {
self.tasks.lock().get(task_id).unwrap().state_receiver()
}

pub fn runtime(&self) -> &'static Runtime {
self.runtime
pub fn runtime(&self) -> Arc<BackgroundShutdownRuntime> {
self.runtime.clone()
}

pub fn config(&self) -> &BatchConfig {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod ordered;
pub mod prost;
pub mod resource_util;
pub mod row_id;
pub mod runtime;
pub mod scan_range;
pub mod schema_check;
pub mod sort_util;
Expand Down
54 changes: 54 additions & 0 deletions src/common/src/util/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};

use tokio::runtime::Runtime;

/// A wrapper around [`Runtime`] that shuts down the runtime in the background when dropped.
///
/// This is necessary because directly dropping a nested runtime is not allowed in a parent runtime.
pub struct BackgroundShutdownRuntime(ManuallyDrop<Runtime>);

impl Drop for BackgroundShutdownRuntime {
fn drop(&mut self) {
// Safety: The runtime is only dropped once here.
let runtime = unsafe { ManuallyDrop::take(&mut self.0) };

#[cfg(madsim)]
drop(runtime);
#[cfg(not(madsim))]
runtime.shutdown_background();
}
}

impl Deref for BackgroundShutdownRuntime {
type Target = Runtime;

fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for BackgroundShutdownRuntime {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl From<Runtime> for BackgroundShutdownRuntime {
fn from(runtime: Runtime) -> Self {
Self(ManuallyDrop::new(runtime))
}
}
7 changes: 3 additions & 4 deletions src/storage/src/hummock/compactor/compaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

use std::future::Future;

use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use tokio::task::JoinHandle;

/// `CompactionExecutor` is a dedicated runtime for compaction's CPU intensive jobs.
pub struct CompactionExecutor {
/// Runtime for compaction tasks.
runtime: &'static tokio::runtime::Runtime,
runtime: BackgroundShutdownRuntime,
}

impl CompactionExecutor {
Expand All @@ -34,9 +35,7 @@ impl CompactionExecutor {
};

Self {
// Leak the runtime to avoid runtime shutting-down in the main async context.
// TODO: may manually shutdown the runtime gracefully.
runtime: Box::leak(Box::new(runtime)),
runtime: runtime.into(),
}
}

Expand Down
28 changes: 14 additions & 14 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::config::StreamingConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::stream_plan;
Expand Down Expand Up @@ -57,7 +58,7 @@ pub type AtomicU64Ref = Arc<AtomicU64>;

pub struct LocalStreamManagerCore {
/// Runtime for the streaming actors.
runtime: &'static tokio::runtime::Runtime,
runtime: BackgroundShutdownRuntime,

/// Each processor runs in a future. Upon receiving a `Terminate` message, they will exit.
/// `handles` store join handles of these futures, and therefore we could wait their
Expand Down Expand Up @@ -400,21 +401,20 @@ impl LocalStreamManagerCore {
config: StreamingConfig,
await_tree_config: Option<await_tree::Config>,
) -> Self {
let mut builder = tokio::runtime::Builder::new_multi_thread();
if let Some(worker_threads_num) = config.actor_runtime_worker_threads_num {
builder.worker_threads(worker_threads_num);
}
let runtime = builder
.thread_name("risingwave-streaming-actor")
.enable_all()
.build()
.unwrap();
let runtime = {
let mut builder = tokio::runtime::Builder::new_multi_thread();
if let Some(worker_threads_num) = config.actor_runtime_worker_threads_num {
builder.worker_threads(worker_threads_num);
}
builder
.thread_name("risingwave-streaming-actor")
.enable_all()
.build()
.unwrap()
};

Self {
// Leak the runtime to avoid runtime shutting-down in the main async context.
// TODO: may manually shutdown the runtime after we implement graceful shutdown for
// stream manager.
runtime: Box::leak(Box::new(runtime)),
runtime: runtime.into(),
handles: HashMap::new(),
context: Arc::new(context),
actors: HashMap::new(),
Expand Down
4 changes: 2 additions & 2 deletions src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
glob = "0.3"
itertools = "0.10"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", branch = "evict_by_timestamp" }
madsim = "0.2.18"
madsim = "0.2.19"
paste = "1"
pin-project = "1.0"
pretty_assertions = "1"
Expand All @@ -41,7 +41,7 @@ serde_derive = "1.0.152"
serde_json = "1.0.91"
sqllogictest = "0.11.1"
tempfile = "3"
tokio = { version = "0.2.15", package = "madsim-tokio" }
tokio = { version = "0.2.19", package = "madsim-tokio" }
tokio-postgres = "0.7.7"
tracing = "0.1"
tracing-subscriber = "0.3"
8 changes: 2 additions & 6 deletions src/utils/pgwire/src/pg_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::result::Result;
use std::sync::Arc;

use bytes::Bytes;
use futures::Stream;
use futures::{Stream, TryFutureExt};
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::Statement;
use tokio::io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -144,11 +144,7 @@ where
stream.set_nodelay(true)?;
let ssl_config = ssl_config.clone();
let fut = handle_connection(stream, session_mgr, ssl_config);
tokio::spawn(async {
if let Err(e) = fut.await {
debug!("error handling connection : {}", e);
}
});
tokio::spawn(fut.inspect_err(|e| debug!("error handling connection: {e}")));
}

Err(e) => {
Expand Down

0 comments on commit 36dd239

Please sign in to comment.