diff --git a/Cargo.lock b/Cargo.lock index 53b60cbf3166..b364cac8b55b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -404,26 +404,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "async-io" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" -dependencies = [ - "async-lock", - "autocfg", - "cfg-if", - "concurrent-queue", - "futures-lite", - "log", - "parking", - "polling", - "rustix 0.37.23", - "slab", - "socket2 0.4.9", - "waker-fn", -] - [[package]] name = "async-lock" version = "2.8.0" @@ -3082,21 +3062,6 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand 1.9.0", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.28" @@ -4072,12 +4037,6 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" -[[package]] -name = "linux-raw-sys" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - [[package]] name = "linux-raw-sys" version = "0.4.5" @@ -4477,12 +4436,12 @@ dependencies = [ [[package]] name = "moka" -version = "0.11.3" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa6e72583bf6830c956235bff0d5afec8cf2952f579ebad18ae7821a917d950f" +checksum = "8dc65d4615c08c8a13d91fd404b5a2a4485ba35b4091e3315cf8798d280c2f29" dependencies = [ - "async-io", "async-lock", + "async-trait", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", @@ -4491,7 +4450,6 @@ dependencies = [ "parking_lot 0.12.1", "quanta", "rustc_version", - "scheduled-thread-pool", "skeptic", "smallvec", "tagptr", @@ -5200,12 +5158,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" -[[package]] -name = "parking" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" - [[package]] name = "parking_lot" version = "0.11.2" @@ -5585,22 +5537,6 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags 1.3.2", - "cfg-if", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - [[package]] name = "portable-atomic" version = "1.4.3" @@ -7795,20 +7731,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "rustix" -version = "0.37.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" -dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys 0.48.0", -] - [[package]] name = "rustix" version = "0.38.11" @@ -7948,15 +7870,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" -dependencies = [ - "parking_lot 0.12.1", -] - [[package]] name = "scoped-tls" version = "1.0.1" @@ -9625,12 +9538,6 @@ dependencies = [ "libc", ] -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "walkdir" version = "2.4.0" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index bb1b4b2b4ee0..c8cb5a0204fa 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -58,7 +58,7 @@ itertools = "0.11" jni = { version = "0.21.1", features = ["invocation"] } jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } maplit = "1.0.2" -moka = { version = "0.11", features = ["future"] } +moka = { version = "0.12", features = ["future"] } mysql_async = { version = "0.32", default-features = false, features = [ "default", ] } diff --git a/src/connector/src/parser/avro/schema_resolver.rs b/src/connector/src/parser/avro/schema_resolver.rs index a020109d0c79..1383367ee557 100644 --- a/src/connector/src/parser/avro/schema_resolver.rs +++ b/src/connector/src/parser/avro/schema_resolver.rs @@ -62,7 +62,8 @@ impl ConfluentSchemaResolver { // get the writer schema by id pub async fn get(&self, schema_id: i32) -> Result> { - if let Some(schema) = self.writer_schemas.get(&schema_id) { + // TODO: use `get_with` + if let Some(schema) = self.writer_schemas.get(&schema_id).await { Ok(schema) } else { let raw_schema = self.confluent_client.get_schema_by_id(schema_id).await?; diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 7c3707d4fbc4..f340837bf5d6 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -22,6 +22,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } hyper = "0.14" itertools = "0.11.0" lru = "0.10.1" +moka = { version = "0.12", features = ["future"] } rand = "0.8" risingwave_common = { workspace = true } risingwave_hummock_sdk = { workspace = true } @@ -44,7 +45,6 @@ tracing = "0.1" url = "2.4.1" [target.'cfg(not(madsim))'.dependencies] -moka = { version = "0.11", features = ["future"] } workspace-hack = { path = "../workspace-hack" } [lints] diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index aabb8e7378b6..7d94c1a0d789 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -28,8 +28,6 @@ #![feature(impl_trait_in_assoc_type)] use std::any::type_name; -#[cfg(madsim)] -use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::iter::repeat; @@ -40,15 +38,12 @@ use async_trait::async_trait; use futures::future::try_join_all; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; -#[cfg(not(madsim))] use moka::future::Cache; use rand::prelude::SliceRandom; use risingwave_common::util::addr::HostAddr; use risingwave_pb::common::WorkerNode; use risingwave_pb::meta::heartbeat_request::extra_info; use tokio::sync::mpsc::{channel, Sender}; -#[cfg(madsim)] -use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; @@ -84,12 +79,7 @@ pub trait RpcClient: Send + Sync + 'static + Clone { pub struct RpcClientPool { connection_pool_size: u16, - #[cfg(not(madsim))] clients: Cache>, - - // moka::Cache internally uses system thread, so we can't use it in simulation - #[cfg(madsim)] - clients: Arc>>, } impl Default for RpcClientPool @@ -108,10 +98,7 @@ where pub fn new(connection_pool_size: u16) -> Self { Self { connection_pool_size, - #[cfg(not(madsim))] clients: Cache::new(u64::MAX), - #[cfg(madsim)] - clients: Arc::new(Mutex::new(HashMap::new())), } } @@ -124,7 +111,6 @@ where /// Gets the RPC client for the given addr. If the connection is not established, a /// new client will be created and returned. - #[cfg(not(madsim))] pub async fn get_by_addr(&self, addr: HostAddr) -> Result { Ok(self .clients @@ -140,17 +126,6 @@ where .unwrap() .clone()) } - - #[cfg(madsim)] - pub async fn get_by_addr(&self, addr: HostAddr) -> Result { - let mut clients = self.clients.lock().await; - if let Some(client) = clients.get(&addr) { - return Ok(client.clone()); - } - let client = S::new_client(addr.clone()).await?; - clients.insert(addr, client.clone()); - Ok(client) - } } /// `ExtraInfoSource` is used by heartbeat worker to pull extra info that needs to be piggybacked. diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 77501cd5f4dc..3d38a575291b 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -87,7 +87,7 @@ workspace-hack = { path = "../workspace-hack" } [dev-dependencies] criterion = { workspace = true, features = ["async_futures"] } -moka = { version = "0.11", features = ["future"] } +moka = { version = "0.12", features = ["future"] } risingwave_test_runner = { workspace = true } uuid = { version = "1", features = ["v4"] }