diff --git a/src/engine.rs b/src/engine.rs index 31b502b5..e78dc8ae 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -214,6 +214,8 @@ where let stats = self.global_stats.clone(); return Box::pin(async move { let (file_num, offset, tracker) = r.await?; + // cacluate memtable cost + let t1 = now.elapsed().as_micros(); if file_num > 0 { for mut item in items { if let LogItemContent::Entries(entries) = &mut item.content { @@ -228,8 +230,8 @@ where ); } } - let t = now.elapsed().as_micros(); - stats.add_write_duration_change(t as usize); + let t2 = now.elapsed().as_micros(); + stats.add_write_duration_change((t2 - t1) as usize, t2 as usize); Ok(bytes) }); } @@ -573,18 +575,27 @@ where } let write_count = self.global_stats.write_count.load(Ordering::Relaxed); let write_cost = self.global_stats.write_cost.load(Ordering::Relaxed); + let mem_cost = self.global_stats.mem_cost.load(Ordering::Relaxed); let max_write_cost = self.global_stats.max_write_cost.load(Ordering::Relaxed); + let max_mem_cost = self.global_stats.max_mem_cost.load(Ordering::Relaxed); self.global_stats .write_count .fetch_sub(write_count, Ordering::Relaxed); self.global_stats .write_cost .fetch_sub(write_cost, Ordering::Relaxed); + self.global_stats + .mem_cost + .fetch_sub(mem_cost, Ordering::Relaxed); return Box::pin(async move { let mut stats = r.await?; // transfer micro to sec - stats.avg_write_cost = write_cost / write_count; + if write_count > 0 { + stats.avg_write_cost = write_cost / write_count; + stats.avg_mem_cost = mem_cost / write_count; + } stats.max_write_cost = max_write_cost; + stats.max_mem_cost = max_mem_cost; Ok(stats) }); } diff --git a/src/lib.rs b/src/lib.rs index dae6151f..1720c23d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,8 @@ pub struct GlobalStats { write_count: AtomicUsize, write_cost: AtomicUsize, max_write_cost: AtomicUsize, + mem_cost: AtomicUsize, + max_mem_cost: AtomicUsize, } impl GlobalStats { @@ -70,11 +72,17 @@ impl GlobalStats { pub fn add_cache_miss(&self, count: usize) { self.cache_miss.fetch_add(count, Ordering::Relaxed); } - pub fn add_write_duration_change(&self, dur: usize) { + pub fn add_write_duration_change(&self, memtable_duration: usize, write_duration: usize) { self.write_count.fetch_add(1, Ordering::Relaxed); - self.write_cost.fetch_add(dur, Ordering::Relaxed); - if dur > self.max_write_cost.load(Ordering::Relaxed) { - self.max_write_cost.store(dur, Ordering::Relaxed); + self.write_cost.fetch_add(write_duration, Ordering::Relaxed); + self.mem_cost + .fetch_add(memtable_duration, Ordering::Relaxed); + if write_duration > self.max_write_cost.load(Ordering::Relaxed) { + self.max_write_cost.store(write_duration, Ordering::Relaxed); + } + if memtable_duration > self.max_mem_cost.load(Ordering::Relaxed) { + self.max_write_cost + .store(memtable_duration, Ordering::Relaxed); } } diff --git a/src/log_batch.rs b/src/log_batch.rs index 64e045fe..a16d695a 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -673,7 +673,7 @@ mod tests { entries .encode_to::(&mut encoded, &mut entries_size1) .unwrap(); - for idx in entries.entries_index.borrow_mut().iter_mut() { + for idx in entries.entries_index.iter_mut() { idx.file_num = file_num; } let (mut s, mut entries_size2) = (encoded.as_slice(), 0); diff --git a/src/util.rs b/src/util.rs index b1cdc4be..3c2d921d 100644 --- a/src/util.rs +++ b/src/util.rs @@ -324,12 +324,14 @@ impl Drop for Worker { #[derive(Clone, Debug, Copy, PartialEq, Default)] pub struct Statistic { - pub wal_cost: usize, - pub sync_cost: usize, + pub avg_wal_cost: usize, + pub avg_sync_cost: usize, pub avg_write_cost: usize, + pub avg_mem_cost: usize, pub max_wal_cost: usize, pub max_sync_cost: usize, pub max_write_cost: usize, + pub max_mem_cost: usize, pub freq: usize, } @@ -342,18 +344,9 @@ fn max(left: usize, right: usize) -> usize { } impl Statistic { - pub fn add(&mut self, other: &Self) { - self.wal_cost += other.wal_cost; - self.sync_cost += other.sync_cost; - self.freq += other.freq; - self.max_wal_cost = max(self.max_wal_cost, other.max_wal_cost); - self.max_write_cost = max(self.max_write_cost, other.max_write_cost); - self.max_sync_cost = max(self.max_sync_cost, other.max_sync_cost); - } - pub fn clear(&mut self) { - self.wal_cost = 0; - self.sync_cost = 0; + self.avg_wal_cost = 0; + self.avg_sync_cost = 0; self.avg_write_cost = 0; self.max_wal_cost = 0; self.max_sync_cost = 0; @@ -363,18 +356,20 @@ impl Statistic { #[inline] pub fn add_wal(&mut self, wal: usize) { - self.wal_cost += wal; + self.avg_wal_cost += wal; self.max_wal_cost = max(self.max_wal_cost, wal); } #[inline] pub fn add_sync(&mut self, sync: usize) { - self.sync_cost += sync; + self.avg_sync_cost += sync; self.max_sync_cost = max(self.max_sync_cost, sync); } - #[inline] - pub fn add_one(&mut self) { - self.freq += 1; + pub fn divide(&mut self) { + if self.freq > 0 { + self.avg_wal_cost /= self.freq; + self.avg_sync_cost /= self.freq; + } } } diff --git a/src/wal.rs b/src/wal.rs index d56805dc..eab62cf0 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -58,8 +58,7 @@ where return Ok(()); } LogMsg::Metric(cb) => { - let _ = cb.send(self.statistic.clone()); - self.statistic.clear(); + self.report(cb); continue; } }; @@ -79,8 +78,7 @@ where return Ok(()); } LogMsg::Metric(cb) => { - let _ = cb.send(self.statistic.clone()); - self.statistic.clear(); + self.report(cb); continue; } }; @@ -118,7 +116,7 @@ where self.statistic.add_wal(wal_cost as usize); self.statistic .add_sync((wal_cost - before_sync_cost) as usize); - self.statistic.add_one(); + self.statistic.freq += 1; for (offset, sender) in write_ret.drain(..) { let _ = sender.send((file_num, base_offset + offset, tracker.clone())); } @@ -126,4 +124,10 @@ where } Ok(()) } + + pub fn report(&mut self, cb: Sender) { + self.statistic.divide(); + let _ = cb.send(self.statistic.clone()); + self.statistic.clear(); + } }