diff --git a/Cargo.lock b/Cargo.lock index 8cc198f..c383281 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4428,7 +4428,6 @@ dependencies = [ name = "zenoh-backend-influxdb-v1" version = "0.11.0-dev" dependencies = [ - "async-std", "async-trait", "base64 0.21.4", "git-version", @@ -4438,6 +4437,7 @@ dependencies = [ "rustc_version 0.4.0", "serde", "serde_json", + "tokio", "tracing", "uuid", "zenoh", @@ -4449,7 +4449,6 @@ dependencies = [ name = "zenoh-backend-influxdb-v2" version = "0.11.0-dev" dependencies = [ - "async-std", "async-trait", "base64 0.21.4", "chrono", @@ -4464,6 +4463,7 @@ dependencies = [ "rustc_version 0.4.0", "serde", "serde_json", + "tokio", "tracing", "uuid", "zenoh", diff --git a/Cargo.toml b/Cargo.toml index ab20ad5..cca0d7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,13 +35,13 @@ categories = ["network-programming", "database"] description = "Backend for Zenoh using InfluxDB" [workspace.dependencies] -async-std = "1.12.0" async-trait = "0.1.66" base64 = "0.21.0" git-version = "0.3.5" lazy_static = "1.4.0" serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.94" +tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements tracing = "0.1" uuid = { version = "1.3.0", features = ["v4"] } zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [ diff --git a/v1/Cargo.toml b/v1/Cargo.toml index 76ccfb5..f0fcf7d 100644 --- a/v1/Cargo.toml +++ b/v1/Cargo.toml @@ -22,7 +22,6 @@ default = ["dynamic_plugin"] [dependencies] -async-std = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } git-version = { workspace = true } @@ -35,6 +34,7 @@ influxdb = { version = "0.7.1", default-features = false, features = [ lazy_static = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } uuid = { workspace = true } zenoh = { workspace = true, features = ["unstable", "internal", "plugins"] } diff --git a/v1/src/lib.rs b/v1/src/lib.rs index 1fa21fc..ca3d550 100644 --- a/v1/src/lib.rs +++ b/v1/src/lib.rs @@ -14,11 +14,11 @@ use std::{ convert::{TryFrom, TryInto}, + future::Future, str::FromStr, time::{Duration, Instant}, }; -use async_std::task; use async_trait::async_trait; use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine}; use influxdb::{ @@ -41,6 +41,32 @@ use zenoh_backend_traits::{ }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; +const WORKER_THREAD_NUM: usize = 2; +const MAX_BLOCK_THREAD_NUM: usize = 50; +lazy_static::lazy_static! { + // The global runtime is used in the dynamic plugins, which we can't get the current runtime + static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREAD_NUM) + .max_blocking_threads(MAX_BLOCK_THREAD_NUM) + .enable_all() + .build() + .expect("Unable to create runtime"); +} +#[inline(always)] +fn blockon_runtime(task: F) -> F::Output { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), spawn on the current runtime + tokio::task::block_in_place(|| rt.block_on(task)) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task)) + } + } +} + // Properties used by the Backend pub const PROP_BACKEND_URL: &str = "url"; pub const PROP_BACKEND_USERNAME: &str = "username"; @@ -156,7 +182,7 @@ impl Plugin for InfluxDbBackend { }; // Check connectivity to InfluxDB, trying to list databases - match async_std::task::block_on(async { show_databases(&admin_client).await }) { + match blockon_runtime(async { show_databases(&admin_client).await }) { Ok(dbs) => { // trick: if "_internal" db is not shown, it means the credentials are not for an admin if !dbs.iter().any(|e| e == "_internal") { @@ -686,7 +712,7 @@ impl Drop for InfluxDbStorage { debug!("Closing InfluxDB storage"); match self.on_closure { OnClosure::DropDb => { - task::block_on(async move { + blockon_runtime(async move { let db = self.admin_client.database_name(); debug!("Close InfluxDB storage, dropping database {}", db); let query = InfluxRQuery::new(format!(r#"DROP DATABASE "{db}""#)); @@ -696,7 +722,7 @@ impl Drop for InfluxDbStorage { }); } OnClosure::DropSeries => { - task::block_on(async move { + blockon_runtime(async move { let db = self.client.database_name(); debug!( "Close InfluxDB storage, dropping all series from database {}", diff --git a/v2/Cargo.toml b/v2/Cargo.toml index c330049..20c1c6b 100644 --- a/v2/Cargo.toml +++ b/v2/Cargo.toml @@ -21,7 +21,6 @@ dynamic_plugin = [] default = ["dynamic_plugin"] [dependencies] -async-std = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } chrono = { version = "0.4.31", features = ["serde"] } @@ -38,6 +37,7 @@ num-traits = "0.2" rand = "0.8.5" serde = { workspace = true } serde_json = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } uuid = { workspace = true } zenoh = { workspace = true, features = ["unstable", "internal", "plugins"] } diff --git a/v2/src/lib.rs b/v2/src/lib.rs index f62efad..b804040 100644 --- a/v2/src/lib.rs +++ b/v2/src/lib.rs @@ -18,7 +18,6 @@ use std::{ time::{Duration, Instant, UNIX_EPOCH}, }; -use async_std::task; use async_trait::async_trait; use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine}; use chrono::{NaiveDateTime, SecondsFormat}; @@ -42,6 +41,32 @@ use zenoh_backend_traits::{ }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; +const WORKER_THREAD_NUM: usize = 2; +const MAX_BLOCK_THREAD_NUM: usize = 50; +lazy_static::lazy_static! { + // The global runtime is used in the dynamic plugins, which we can't get the current runtime + static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(WORKER_THREAD_NUM) + .max_blocking_threads(MAX_BLOCK_THREAD_NUM) + .enable_all() + .build() + .expect("Unable to create runtime"); +} +#[inline(always)] +fn blockon_runtime(task: F) -> F::Output { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), spawn on the current runtime + tokio::task::block_in_place(|| rt.block_on(task)) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task)) + } + } +} + // Properties used by the Backend pub const PROP_BACKEND_URL: &str = "url"; pub const PROP_BACKEND_ORG_ID: &str = "org_id"; @@ -167,8 +192,7 @@ impl Plugin for InfluxDbBackend { Ok(client) => client, Err(e) => bail!("Error in creating client for InfluxDBv2 volume: {:?}", e), }; - - match async_std::task::block_on(async { admin_client.ready().await }) { + match blockon_runtime(async { admin_client.ready().await }) { Ok(res) => { if !res { bail!("InfluxDBv2 server is not ready! ") @@ -795,7 +819,7 @@ impl Drop for InfluxDbStorage { match self.on_closure { OnClosure::DropDb => { - task::block_on(async move { + blockon_runtime(async move { tracing::debug!("Close InfluxDBv2 storage, dropping database {}", db); if let Err(e) = self.admin_client.delete_bucket(&db).await { tracing::error!("Failed to drop InfluxDbv2 database '{}' : {}", db, e) @@ -803,7 +827,7 @@ impl Drop for InfluxDbStorage { }); } OnClosure::DropSeries => { - task::block_on(async move { + blockon_runtime(async move { tracing::debug!( "Close InfluxDBv2 storage, dropping all series from database {}", db