Skip to content

Commit

Permalink
default_max use cpu*4
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuxiujia committed Dec 26, 2023
1 parent 536e491 commit ce68ff9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fast_pool"
version = "0.1.0"
version = "0.1.1"
edition = "2021"
description = "The Fast Pool based on channel"
readme = "README.md"
Expand All @@ -16,4 +16,5 @@ async-trait = "0.1"
futures-core = { version = "0.3" }
flume = "0.11.0"
tokio = {version = "1",features = ["time","rt-multi-thread","macros"]}
num_cpus = {version = "1.16.0"}
[dev-dependencies]
16 changes: 10 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ pub trait Manager {

impl<M: Manager> Pool<M> {
pub fn new(m: M) -> Self {
let default_max = num_cpus::get() as u64 * 4;
let (s, r) = flume::unbounded();
Self {
manager: Arc::new(m),
sender: s,
receiver: r,
max_open: Arc::new(AtomicU64::new(10)),
max_open: Arc::new(AtomicU64::new(default_max)),
in_use: Arc::new(AtomicU64::new(0)),
}
}
Expand Down Expand Up @@ -103,18 +104,19 @@ impl<M: Manager> Pool<M> {
max_open: self.max_open.load(Ordering::Relaxed),
connections: self.in_use.load(Ordering::Relaxed) + self.sender.len() as u64,
in_use: self.in_use.load(Ordering::Relaxed),
idle: self.sender.len() as u64,
}
}

pub fn set_max_open(&self, n: u64) {
self.max_open.store(n, Ordering::SeqCst);
let open = self.sender.len() as u64;
if open > n {
let del = open - n;
for _ in 0..del {
_ = self.receiver.try_recv();
}
}
self.max_open.store(n, Ordering::SeqCst);
}
}

Expand Down Expand Up @@ -142,24 +144,26 @@ impl<M: Manager> DerefMut for ConnectionBox<M> {

impl<M: Manager> Drop for ConnectionBox<M> {
fn drop(&mut self) {
self.in_use.fetch_sub(1, Ordering::SeqCst);
if let Some(v) = self.inner.take() {
let max_open = self.max_open.load(Ordering::SeqCst);
if self.sender.len() < max_open as usize {
if self.sender.len() as u64 + self.in_use.load(Ordering::SeqCst) < max_open {
_ = self.sender.send(v);
}
}
self.in_use.fetch_sub(1, Ordering::SeqCst);
}
}

#[derive(Debug)]
pub struct State {
/// max open limit
pub max_open: u64,
///connections = in_use number + in_pool number
///connections = in_use number + idle number
pub connections: u64,
/// user use connection number
pub in_use: u64,
/// idle connection
pub idle: u64,
}

#[cfg(test)]
Expand Down Expand Up @@ -192,7 +196,7 @@ mod test {
#[tokio::test]
async fn test_debug() {
let p = Pool::new(TestManager {});
println!("{:?}",p);
println!("{:?}", p);
}

// --nocapture
Expand Down

0 comments on commit ce68ff9

Please sign in to comment.