diff --git a/src/lib.rs b/src/lib.rs index 917ab47..c14ea20 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,9 @@ // ZettaScale Zenoh Team, // -use std::{borrow::Cow, collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; +use std::{ + borrow::Cow, collections::HashMap, future::Future, path::PathBuf, sync::Arc, time::Duration, +}; use async_trait::async_trait; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB}; @@ -36,6 +38,7 @@ use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; const WORKER_THREAD_NUM: usize = 2; const MAX_BLOCK_THREAD_NUM: usize = 50; lazy_static::lazy_static! { + // The global runtime is used in the dynamic plugins, which we can't get the current runtime static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(WORKER_THREAD_NUM) .max_blocking_threads(MAX_BLOCK_THREAD_NUM) @@ -43,6 +46,20 @@ lazy_static::lazy_static! { .build() .expect("Unable to create runtime"); } +#[inline(always)] +fn blockon_runtime(task: F) -> F::Output { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), spawn on the current runtime + tokio::task::block_in_place(|| rt.block_on(task)) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + TOKIO_RUNTIME.block_on(task) + } + } +} /// The environement variable used to configure the root of all storages managed by this RocksdbBackend. pub const SCOPE_ENV_VAR: &str = "ZENOH_BACKEND_ROCKSDB_ROOT"; @@ -337,7 +354,7 @@ impl Storage for RocksdbStorage { impl Drop for RocksdbStorage { fn drop(&mut self) { tokio::task::block_in_place(|| { - TOKIO_RUNTIME.block_on(async move { + blockon_runtime(async move { // Get lock on DB and take DB so we can drop it before destroying it // (avoiding RocksDB lock to be taken twice) let mut db_cell = self.db.lock().await;