Skip to content

Commit

Permalink
chore(deps): Bump moka from 0.11.3 to 0.12.0 (#12403)
Browse files Browse the repository at this point in the history
Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
dependabot[bot] and BugenZhao authored Sep 19, 2023
1 parent eb92f6f commit e19a536
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 125 deletions.
99 changes: 3 additions & 96 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ itertools = "0.11"
jni = { version = "0.21.1", features = ["invocation"] }
jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.11", features = ["future"] }
moka = { version = "0.12", features = ["future"] }
mysql_async = { version = "0.32", default-features = false, features = [
"default",
] }
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/parser/avro/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ impl ConfluentSchemaResolver {

// get the writer schema by id
pub async fn get(&self, schema_id: i32) -> Result<Arc<Schema>> {
if let Some(schema) = self.writer_schemas.get(&schema_id) {
// TODO: use `get_with`
if let Some(schema) = self.writer_schemas.get(&schema_id).await {
Ok(schema)
} else {
let raw_schema = self.confluent_client.get_schema_by_id(schema_id).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14"
itertools = "0.11.0"
lru = "0.10.1"
moka = { version = "0.12", features = ["future"] }
rand = "0.8"
risingwave_common = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
Expand All @@ -44,7 +45,6 @@ tracing = "0.1"
url = "2.4.1"

[target.'cfg(not(madsim))'.dependencies]
moka = { version = "0.11", features = ["future"] }
workspace-hack = { path = "../workspace-hack" }

[lints]
Expand Down
25 changes: 0 additions & 25 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#![feature(impl_trait_in_assoc_type)]

use std::any::type_name;
#[cfg(madsim)]
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::iter::repeat;
Expand All @@ -40,15 +38,12 @@ use async_trait::async_trait;
use futures::future::try_join_all;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
#[cfg(not(madsim))]
use moka::future::Cache;
use rand::prelude::SliceRandom;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::heartbeat_request::extra_info;
use tokio::sync::mpsc::{channel, Sender};
#[cfg(madsim)]
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

Expand Down Expand Up @@ -84,12 +79,7 @@ pub trait RpcClient: Send + Sync + 'static + Clone {
pub struct RpcClientPool<S> {
connection_pool_size: u16,

#[cfg(not(madsim))]
clients: Cache<HostAddr, Vec<S>>,

// moka::Cache internally uses system thread, so we can't use it in simulation
#[cfg(madsim)]
clients: Arc<Mutex<HashMap<HostAddr, S>>>,
}

impl<S> Default for RpcClientPool<S>
Expand All @@ -108,10 +98,7 @@ where
pub fn new(connection_pool_size: u16) -> Self {
Self {
connection_pool_size,
#[cfg(not(madsim))]
clients: Cache::new(u64::MAX),
#[cfg(madsim)]
clients: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand All @@ -124,7 +111,6 @@ where

/// Gets the RPC client for the given addr. If the connection is not established, a
/// new client will be created and returned.
#[cfg(not(madsim))]
pub async fn get_by_addr(&self, addr: HostAddr) -> Result<S> {
Ok(self
.clients
Expand All @@ -140,17 +126,6 @@ where
.unwrap()
.clone())
}

#[cfg(madsim)]
pub async fn get_by_addr(&self, addr: HostAddr) -> Result<S> {
let mut clients = self.clients.lock().await;
if let Some(client) = clients.get(&addr) {
return Ok(client.clone());
}
let client = S::new_client(addr.clone()).await?;
clients.insert(addr, client.clone());
Ok(client)
}
}

/// `ExtraInfoSource` is used by heartbeat worker to pull extra info that needs to be piggybacked.
Expand Down
2 changes: 1 addition & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ workspace-hack = { path = "../workspace-hack" }

[dev-dependencies]
criterion = { workspace = true, features = ["async_futures"] }
moka = { version = "0.11", features = ["future"] }
moka = { version = "0.12", features = ["future"] }
risingwave_test_runner = { workspace = true }
uuid = { version = "1", features = ["v4"] }

Expand Down

0 comments on commit e19a536

Please sign in to comment.