Skip to content

Commit

Permalink
Migrate from async-std to tokio (#169)
Browse files Browse the repository at this point in the history
* Migrate from async-std to tokio

Signed-off-by: ChenYing Kuo <[email protected]>

* Support reusing Runtime if possible.

Signed-off-by: ChenYing Kuo <[email protected]>

---------

Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary authored Aug 6, 2024
1 parent 42d165e commit 2260d11
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 14 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ categories = ["network-programming", "database"]
description = "Backend for Zenoh using InfluxDB"

[workspace.dependencies]
async-std = "1.12.0"
async-trait = "0.1.66"
base64 = "0.21.0"
git-version = "0.3.5"
lazy_static = "1.4.0"
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.94"
tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements
tracing = "0.1"
uuid = { version = "1.3.0", features = ["v4"] }
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [
Expand Down
2 changes: 1 addition & 1 deletion v1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ default = ["dynamic_plugin"]


[dependencies]
async-std = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
git-version = { workspace = true }
Expand All @@ -35,6 +34,7 @@ influxdb = { version = "0.7.1", default-features = false, features = [
lazy_static = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
zenoh = { workspace = true, features = ["unstable", "internal", "plugins"] }
Expand Down
34 changes: 30 additions & 4 deletions v1/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

use std::{
convert::{TryFrom, TryInto},
future::Future,
str::FromStr,
time::{Duration, Instant},
};

use async_std::task;
use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
use influxdb::{
Expand All @@ -41,6 +41,32 @@ use zenoh_backend_traits::{
};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};

const WORKER_THREAD_NUM: usize = 2;
const MAX_BLOCK_THREAD_NUM: usize = 50;
lazy_static::lazy_static! {
// The global runtime is used in the dynamic plugins, which we can't get the current runtime
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORKER_THREAD_NUM)
.max_blocking_threads(MAX_BLOCK_THREAD_NUM)
.enable_all()
.build()
.expect("Unable to create runtime");
}
#[inline(always)]
fn blockon_runtime<F: Future>(task: F) -> F::Output {
// Check whether able to get the current runtime
match tokio::runtime::Handle::try_current() {
Ok(rt) => {
// Able to get the current runtime (standalone binary), spawn on the current runtime
tokio::task::block_in_place(|| rt.block_on(task))
}
Err(_) => {
// Unable to get the current runtime (dynamic plugins), spawn on the global runtime
tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task))
}
}
}

// Properties used by the Backend
pub const PROP_BACKEND_URL: &str = "url";
pub const PROP_BACKEND_USERNAME: &str = "username";
Expand Down Expand Up @@ -156,7 +182,7 @@ impl Plugin for InfluxDbBackend {
};

// Check connectivity to InfluxDB, trying to list databases
match async_std::task::block_on(async { show_databases(&admin_client).await }) {
match blockon_runtime(async { show_databases(&admin_client).await }) {
Ok(dbs) => {
// trick: if "_internal" db is not shown, it means the credentials are not for an admin
if !dbs.iter().any(|e| e == "_internal") {
Expand Down Expand Up @@ -686,7 +712,7 @@ impl Drop for InfluxDbStorage {
debug!("Closing InfluxDB storage");
match self.on_closure {
OnClosure::DropDb => {
task::block_on(async move {
blockon_runtime(async move {
let db = self.admin_client.database_name();
debug!("Close InfluxDB storage, dropping database {}", db);
let query = InfluxRQuery::new(format!(r#"DROP DATABASE "{db}""#));
Expand All @@ -696,7 +722,7 @@ impl Drop for InfluxDbStorage {
});
}
OnClosure::DropSeries => {
task::block_on(async move {
blockon_runtime(async move {
let db = self.client.database_name();
debug!(
"Close InfluxDB storage, dropping all series from database {}",
Expand Down
2 changes: 1 addition & 1 deletion v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ dynamic_plugin = []
default = ["dynamic_plugin"]

[dependencies]
async-std = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
chrono = { version = "0.4.31", features = ["serde"] }
Expand All @@ -38,6 +37,7 @@ num-traits = "0.2"
rand = "0.8.5"
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
zenoh = { workspace = true, features = ["unstable", "internal", "plugins"] }
Expand Down
34 changes: 29 additions & 5 deletions v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::{
time::{Duration, Instant, UNIX_EPOCH},
};

use async_std::task;
use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
use chrono::{NaiveDateTime, SecondsFormat};
Expand All @@ -42,6 +41,32 @@ use zenoh_backend_traits::{
};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};

const WORKER_THREAD_NUM: usize = 2;
const MAX_BLOCK_THREAD_NUM: usize = 50;
lazy_static::lazy_static! {
// The global runtime is used in the dynamic plugins, which we can't get the current runtime
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORKER_THREAD_NUM)
.max_blocking_threads(MAX_BLOCK_THREAD_NUM)
.enable_all()
.build()
.expect("Unable to create runtime");
}
#[inline(always)]
fn blockon_runtime<F: Future>(task: F) -> F::Output {
// Check whether able to get the current runtime
match tokio::runtime::Handle::try_current() {
Ok(rt) => {
// Able to get the current runtime (standalone binary), spawn on the current runtime
tokio::task::block_in_place(|| rt.block_on(task))
}
Err(_) => {
// Unable to get the current runtime (dynamic plugins), spawn on the global runtime
tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task))
}
}
}

// Properties used by the Backend
pub const PROP_BACKEND_URL: &str = "url";
pub const PROP_BACKEND_ORG_ID: &str = "org_id";
Expand Down Expand Up @@ -167,8 +192,7 @@ impl Plugin for InfluxDbBackend {
Ok(client) => client,
Err(e) => bail!("Error in creating client for InfluxDBv2 volume: {:?}", e),
};

match async_std::task::block_on(async { admin_client.ready().await }) {
match blockon_runtime(async { admin_client.ready().await }) {
Ok(res) => {
if !res {
bail!("InfluxDBv2 server is not ready! ")
Expand Down Expand Up @@ -795,15 +819,15 @@ impl Drop for InfluxDbStorage {

match self.on_closure {
OnClosure::DropDb => {
task::block_on(async move {
blockon_runtime(async move {
tracing::debug!("Close InfluxDBv2 storage, dropping database {}", db);
if let Err(e) = self.admin_client.delete_bucket(&db).await {
tracing::error!("Failed to drop InfluxDbv2 database '{}' : {}", db, e)
}
});
}
OnClosure::DropSeries => {
task::block_on(async move {
blockon_runtime(async move {
tracing::debug!(
"Close InfluxDBv2 storage, dropping all series from database {}",
db
Expand Down

0 comments on commit 2260d11

Please sign in to comment.