Skip to content

Commit

Permalink
Use tokio instead of async-std. (#155)
Browse files Browse the repository at this point in the history
* Use tokio instead of async-std.

Signed-off-by: ChenYing Kuo <[email protected]>

* Reuse the current runtime if possible.

Signed-off-by: ChenYing Kuo <[email protected]>

---------

Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary authored Aug 6, 2024
1 parent b1505ef commit 52de3f0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ dynamic_plugin = []
default = ["dynamic_plugin"]

[dependencies]
async-std = "=1.12.0"
async-trait = "0.1.66"
git-version = "0.3.5"
lazy_static = "1.4.0"
rocksdb = "0.22.0"
serde_json = "1.0.114"
tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements
tracing = "0.1"
uhlc = "0.7.0"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = [
Expand Down
34 changes: 31 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{borrow::Cow, collections::HashMap, path::PathBuf, time::Duration};
use std::{
borrow::Cow, collections::HashMap, future::Future, path::PathBuf, sync::Arc, time::Duration,
};

use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB};
use tokio::sync::Mutex;
use tracing::{debug, error, trace, warn};
use uhlc::NTP64;
use zenoh::{
Expand All @@ -33,6 +35,32 @@ use zenoh_backend_traits::{
};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};

const WORKER_THREAD_NUM: usize = 2;
const MAX_BLOCK_THREAD_NUM: usize = 50;
lazy_static::lazy_static! {
// The global runtime is used in the dynamic plugins, which we can't get the current runtime
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORKER_THREAD_NUM)
.max_blocking_threads(MAX_BLOCK_THREAD_NUM)
.enable_all()
.build()
.expect("Unable to create runtime");
}
#[inline(always)]
fn blockon_runtime<F: Future>(task: F) -> F::Output {
// Check whether able to get the current runtime
match tokio::runtime::Handle::try_current() {
Ok(rt) => {
// Able to get the current runtime (standalone binary), spawn on the current runtime
tokio::task::block_in_place(|| rt.block_on(task))
}
Err(_) => {
// Unable to get the current runtime (dynamic plugins), spawn on the global runtime
tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task))
}
}
}

/// The environement variable used to configure the root of all storages managed by this RocksdbBackend.
pub const SCOPE_ENV_VAR: &str = "ZENOH_BACKEND_ROCKSDB_ROOT";

Expand Down Expand Up @@ -325,7 +353,7 @@ impl Storage for RocksdbStorage {

impl Drop for RocksdbStorage {
fn drop(&mut self) {
async_std::task::block_on(async move {
blockon_runtime(async move {
// Get lock on DB and take DB so we can drop it before destroying it
// (avoiding RocksDB lock to be taken twice)
let mut db_cell = self.db.lock().await;
Expand Down

0 comments on commit 52de3f0

Please sign in to comment.