diff --git a/Cargo.lock b/Cargo.lock index f2fd805..f48a9f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3301,13 +3301,13 @@ dependencies = [ name = "zenoh-backend-rocksdb" version = "0.11.0-dev" dependencies = [ - "async-std", "async-trait", "git-version", "lazy_static", "rocksdb", "rustc_version", "serde_json", + "tokio", "tracing", "uhlc 0.7.0", "zenoh", diff --git a/Cargo.toml b/Cargo.toml index 774353c..fd0c566 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,12 +38,12 @@ dynamic_plugin = [] default = ["dynamic_plugin"] [dependencies] -async-std = "=1.12.0" async-trait = "0.1.66" git-version = "0.3.5" lazy_static = "1.4.0" rocksdb = "0.22.0" serde_json = "1.0.114" +tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements tracing = "0.1" uhlc = "0.7.0" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [ diff --git a/src/lib.rs b/src/lib.rs index 5cb222b..97c2729 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,11 +12,13 @@ // ZettaScale Zenoh Team, // -use std::{borrow::Cow, collections::HashMap, path::PathBuf, time::Duration}; +use std::{ + borrow::Cow, collections::HashMap, future::Future, path::PathBuf, sync::Arc, time::Duration, +}; -use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB}; +use tokio::sync::Mutex; use tracing::{debug, error, trace, warn}; use uhlc::NTP64; use zenoh::{ @@ -33,6 +35,32 @@ use zenoh_backend_traits::{ }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; +const WORKER_THREAD_NUM: usize = 2; +const MAX_BLOCK_THREAD_NUM: usize = 50; +lazy_static::lazy_static! { + // The global runtime is used in the dynamic plugins, which we can't get the current runtime + static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREAD_NUM) + .max_blocking_threads(MAX_BLOCK_THREAD_NUM) + .enable_all() + .build() + .expect("Unable to create runtime"); +} +#[inline(always)] +fn blockon_runtime(task: F) -> F::Output { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), spawn on the current runtime + tokio::task::block_in_place(|| rt.block_on(task)) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task)) + } + } +} + /// The environement variable used to configure the root of all storages managed by this RocksdbBackend. pub const SCOPE_ENV_VAR: &str = "ZENOH_BACKEND_ROCKSDB_ROOT"; @@ -325,7 +353,7 @@ impl Storage for RocksdbStorage { impl Drop for RocksdbStorage { fn drop(&mut self) { - async_std::task::block_on(async move { + blockon_runtime(async move { // Get lock on DB and take DB so we can drop it before destroying it // (avoiding RocksDB lock to be taken twice) let mut db_cell = self.db.lock().await;