From 1d30594779012d504e3867c923c8be7d5af529f7 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Mon, 29 Jul 2024 11:51:34 +0800 Subject: [PATCH 1/2] Use tokio instead of async-std. Signed-off-by: ChenYing Kuo --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/lib.rs | 91 +++++++++++++++++++++++++++++++----------------------- 3 files changed, 54 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2fd805..f48a9f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3301,13 +3301,13 @@ dependencies = [ name = "zenoh-backend-rocksdb" version = "0.11.0-dev" dependencies = [ - "async-std", "async-trait", "git-version", "lazy_static", "rocksdb", "rustc_version", "serde_json", + "tokio", "tracing", "uhlc 0.7.0", "zenoh", diff --git a/Cargo.toml b/Cargo.toml index 774353c..fd0c566 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,12 +38,12 @@ dynamic_plugin = [] default = ["dynamic_plugin"] [dependencies] -async-std = "=1.12.0" async-trait = "0.1.66" git-version = "0.3.5" lazy_static = "1.4.0" rocksdb = "0.22.0" serde_json = "1.0.114" +tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements tracing = "0.1" uhlc = "0.7.0" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [ diff --git a/src/lib.rs b/src/lib.rs index 5cb222b..917ab47 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,11 +12,11 @@ // ZettaScale Zenoh Team, // -use std::{borrow::Cow, collections::HashMap, path::PathBuf, time::Duration}; +use std::{borrow::Cow, collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; -use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB}; +use tokio::sync::Mutex; use tracing::{debug, error, trace, warn}; use uhlc::NTP64; use zenoh::{ @@ -33,6 +33,17 @@ use zenoh_backend_traits::{ }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; +const WORKER_THREAD_NUM: usize = 2; +const MAX_BLOCK_THREAD_NUM: usize = 50; +lazy_static::lazy_static! { + static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREAD_NUM) + .max_blocking_threads(MAX_BLOCK_THREAD_NUM) + .enable_all() + .build() + .expect("Unable to create runtime"); +} + /// The environement variable used to configure the root of all storages managed by this RocksdbBackend. pub const SCOPE_ENV_VAR: &str = "ZENOH_BACKEND_ROCKSDB_ROOT"; @@ -325,47 +336,49 @@ impl Storage for RocksdbStorage { impl Drop for RocksdbStorage { fn drop(&mut self) { - async_std::task::block_on(async move { - // Get lock on DB and take DB so we can drop it before destroying it - // (avoiding RocksDB lock to be taken twice) - let mut db_cell = self.db.lock().await; - - if let Some(db) = db_cell.take() { - // Flush all - if let Err(err) = db.flush() { - warn!("Closing Rocksdb storage, flush failed: {}", err); - } + tokio::task::block_in_place(|| { + TOKIO_RUNTIME.block_on(async move { + // Get lock on DB and take DB so we can drop it before destroying it + // (avoiding RocksDB lock to be taken twice) + let mut db_cell = self.db.lock().await; + + if let Some(db) = db_cell.take() { + // Flush all + if let Err(err) = db.flush() { + warn!("Closing Rocksdb storage, flush failed: {}", err); + } + + // copy path for later use after DB is dropped + let path = db.path().to_path_buf(); + + // drop DB, releasing RocksDB lock + drop(db); - // copy path for later use after DB is dropped - let path = db.path().to_path_buf(); - - // drop DB, releasing RocksDB lock - drop(db); - - match self.on_closure { - OnClosure::DestroyDB => { - debug!( - "Close Rocksdb storage, destroying database {}", - path.display() - ); - if let Err(err) = DB::destroy(&Options::default(), &path) { - error!( - "Failed to destroy Rocksdb database '{}' : {}", - path.display(), - err + match self.on_closure { + OnClosure::DestroyDB => { + debug!( + "Close Rocksdb storage, destroying database {}", + path.display() + ); + if let Err(err) = DB::destroy(&Options::default(), &path) { + error!( + "Failed to destroy Rocksdb database '{}' : {}", + path.display(), + err + ); + } + } + OnClosure::DoNothing => { + debug!( + "Close Rocksdb storage, keeping database {} as it is", + path.display() ); } } - OnClosure::DoNothing => { - debug!( - "Close Rocksdb storage, keeping database {} as it is", - path.display() - ); - } - } - } else { - warn!("Tried Dropping DB connection, however D Connection internally was None, Continuing"); - }; + } else { + warn!("Tried Dropping DB connection, however D Connection internally was None, Continuing"); + }; + }) }); } } From e22f3a3245c098729027df28e4fcafbe8f31c76a Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Mon, 5 Aug 2024 17:57:08 +0800 Subject: [PATCH 2/2] Reuse the current runtime if possible. Signed-off-by: ChenYing Kuo --- src/lib.rs | 95 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 917ab47..97c2729 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,9 @@ // ZettaScale Zenoh Team, // -use std::{borrow::Cow, collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; +use std::{ + borrow::Cow, collections::HashMap, future::Future, path::PathBuf, sync::Arc, time::Duration, +}; use async_trait::async_trait; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB}; @@ -36,6 +38,7 @@ use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; const WORKER_THREAD_NUM: usize = 2; const MAX_BLOCK_THREAD_NUM: usize = 50; lazy_static::lazy_static! { + // The global runtime is used in the dynamic plugins, which we can't get the current runtime static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(WORKER_THREAD_NUM) .max_blocking_threads(MAX_BLOCK_THREAD_NUM) @@ -43,6 +46,20 @@ lazy_static::lazy_static! { .build() .expect("Unable to create runtime"); } +#[inline(always)] +fn blockon_runtime(task: F) -> F::Output { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), spawn on the current runtime + tokio::task::block_in_place(|| rt.block_on(task)) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task)) + } + } +} /// The environement variable used to configure the root of all storages managed by this RocksdbBackend. pub const SCOPE_ENV_VAR: &str = "ZENOH_BACKEND_ROCKSDB_ROOT"; @@ -336,49 +353,47 @@ impl Storage for RocksdbStorage { impl Drop for RocksdbStorage { fn drop(&mut self) { - tokio::task::block_in_place(|| { - TOKIO_RUNTIME.block_on(async move { - // Get lock on DB and take DB so we can drop it before destroying it - // (avoiding RocksDB lock to be taken twice) - let mut db_cell = self.db.lock().await; - - if let Some(db) = db_cell.take() { - // Flush all - if let Err(err) = db.flush() { - warn!("Closing Rocksdb storage, flush failed: {}", err); - } - - // copy path for later use after DB is dropped - let path = db.path().to_path_buf(); - - // drop DB, releasing RocksDB lock - drop(db); + blockon_runtime(async move { + // Get lock on DB and take DB so we can drop it before destroying it + // (avoiding RocksDB lock to be taken twice) + let mut db_cell = self.db.lock().await; + + if let Some(db) = db_cell.take() { + // Flush all + if let Err(err) = db.flush() { + warn!("Closing Rocksdb storage, flush failed: {}", err); + } - match self.on_closure { - OnClosure::DestroyDB => { - debug!( - "Close Rocksdb storage, destroying database {}", - path.display() - ); - if let Err(err) = DB::destroy(&Options::default(), &path) { - error!( - "Failed to destroy Rocksdb database '{}' : {}", - path.display(), - err - ); - } - } - OnClosure::DoNothing => { - debug!( - "Close Rocksdb storage, keeping database {} as it is", - path.display() + // copy path for later use after DB is dropped + let path = db.path().to_path_buf(); + + // drop DB, releasing RocksDB lock + drop(db); + + match self.on_closure { + OnClosure::DestroyDB => { + debug!( + "Close Rocksdb storage, destroying database {}", + path.display() + ); + if let Err(err) = DB::destroy(&Options::default(), &path) { + error!( + "Failed to destroy Rocksdb database '{}' : {}", + path.display(), + err ); } } - } else { - warn!("Tried Dropping DB connection, however D Connection internally was None, Continuing"); - }; - }) + OnClosure::DoNothing => { + debug!( + "Close Rocksdb storage, keeping database {} as it is", + path.display() + ); + } + } + } else { + warn!("Tried Dropping DB connection, however D Connection internally was None, Continuing"); + }; }); } }