From e22f3a3245c098729027df28e4fcafbe8f31c76a Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Mon, 5 Aug 2024 17:57:08 +0800 Subject: [PATCH] Reuse the current runtime if possible. Signed-off-by: ChenYing Kuo --- src/lib.rs | 95 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 917ab47..97c2729 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,9 @@ // ZettaScale Zenoh Team, // -use std::{borrow::Cow, collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; +use std::{ + borrow::Cow, collections::HashMap, future::Future, path::PathBuf, sync::Arc, time::Duration, +}; use async_trait::async_trait; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB}; @@ -36,6 +38,7 @@ use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; const WORKER_THREAD_NUM: usize = 2; const MAX_BLOCK_THREAD_NUM: usize = 50; lazy_static::lazy_static! { + // The global runtime is used in the dynamic plugins, which we can't get the current runtime static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(WORKER_THREAD_NUM) .max_blocking_threads(MAX_BLOCK_THREAD_NUM) @@ -43,6 +46,20 @@ lazy_static::lazy_static! { .build() .expect("Unable to create runtime"); } +#[inline(always)] +fn blockon_runtime(task: F) -> F::Output { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), spawn on the current runtime + tokio::task::block_in_place(|| rt.block_on(task)) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task)) + } + } +} /// The environement variable used to configure the root of all storages managed by this RocksdbBackend. pub const SCOPE_ENV_VAR: &str = "ZENOH_BACKEND_ROCKSDB_ROOT"; @@ -336,49 +353,47 @@ impl Storage for RocksdbStorage { impl Drop for RocksdbStorage { fn drop(&mut self) { - tokio::task::block_in_place(|| { - TOKIO_RUNTIME.block_on(async move { - // Get lock on DB and take DB so we can drop it before destroying it - // (avoiding RocksDB lock to be taken twice) - let mut db_cell = self.db.lock().await; - - if let Some(db) = db_cell.take() { - // Flush all - if let Err(err) = db.flush() { - warn!("Closing Rocksdb storage, flush failed: {}", err); - } - - // copy path for later use after DB is dropped - let path = db.path().to_path_buf(); - - // drop DB, releasing RocksDB lock - drop(db); + blockon_runtime(async move { + // Get lock on DB and take DB so we can drop it before destroying it + // (avoiding RocksDB lock to be taken twice) + let mut db_cell = self.db.lock().await; + + if let Some(db) = db_cell.take() { + // Flush all + if let Err(err) = db.flush() { + warn!("Closing Rocksdb storage, flush failed: {}", err); + } - match self.on_closure { - OnClosure::DestroyDB => { - debug!( - "Close Rocksdb storage, destroying database {}", - path.display() - ); - if let Err(err) = DB::destroy(&Options::default(), &path) { - error!( - "Failed to destroy Rocksdb database '{}' : {}", - path.display(), - err - ); - } - } - OnClosure::DoNothing => { - debug!( - "Close Rocksdb storage, keeping database {} as it is", - path.display() + // copy path for later use after DB is dropped + let path = db.path().to_path_buf(); + + // drop DB, releasing RocksDB lock + drop(db); + + match self.on_closure { + OnClosure::DestroyDB => { + debug!( + "Close Rocksdb storage, destroying database {}", + path.display() + ); + if let Err(err) = DB::destroy(&Options::default(), &path) { + error!( + "Failed to destroy Rocksdb database '{}' : {}", + path.display(), + err ); } } - } else { - warn!("Tried Dropping DB connection, however D Connection internally was None, Continuing"); - }; - }) + OnClosure::DoNothing => { + debug!( + "Close Rocksdb storage, keeping database {} as it is", + path.display() + ); + } + } + } else { + warn!("Tried Dropping DB connection, however D Connection internally was None, Continuing"); + }; }); } }