Skip to content

Commit

Permalink
add metric
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Oct 22, 2020
1 parent 28803f4 commit f21f0a5
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 31 deletions.
17 changes: 14 additions & 3 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ where
let stats = self.global_stats.clone();
return Box::pin(async move {
let (file_num, offset, tracker) = r.await?;
// cacluate memtable cost
let t1 = now.elapsed().as_micros();
if file_num > 0 {
for mut item in items {
if let LogItemContent::Entries(entries) = &mut item.content {
Expand All @@ -228,8 +230,8 @@ where
);
}
}
let t = now.elapsed().as_micros();
stats.add_write_duration_change(t as usize);
let t2 = now.elapsed().as_micros();
stats.add_write_duration_change((t2 - t1) as usize, t2 as usize);
Ok(bytes)
});
}
Expand Down Expand Up @@ -573,18 +575,27 @@ where
}
let write_count = self.global_stats.write_count.load(Ordering::Relaxed);
let write_cost = self.global_stats.write_cost.load(Ordering::Relaxed);
let mem_cost = self.global_stats.mem_cost.load(Ordering::Relaxed);
let max_write_cost = self.global_stats.max_write_cost.load(Ordering::Relaxed);
let max_mem_cost = self.global_stats.max_mem_cost.load(Ordering::Relaxed);
self.global_stats
.write_count
.fetch_sub(write_count, Ordering::Relaxed);
self.global_stats
.write_cost
.fetch_sub(write_cost, Ordering::Relaxed);
self.global_stats
.mem_cost
.fetch_sub(mem_cost, Ordering::Relaxed);
return Box::pin(async move {
let mut stats = r.await?;
// transfer micro to sec
stats.avg_write_cost = write_cost / write_count;
if write_count > 0 {
stats.avg_write_cost = write_cost / write_count;
stats.avg_mem_cost = mem_cost / write_count;
}
stats.max_write_cost = max_write_cost;
stats.max_mem_cost = max_mem_cost;
Ok(stats)
});
}
Expand Down
16 changes: 12 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct GlobalStats {
write_count: AtomicUsize,
write_cost: AtomicUsize,
max_write_cost: AtomicUsize,
mem_cost: AtomicUsize,
max_mem_cost: AtomicUsize,
}

impl GlobalStats {
Expand All @@ -70,11 +72,17 @@ impl GlobalStats {
pub fn add_cache_miss(&self, count: usize) {
self.cache_miss.fetch_add(count, Ordering::Relaxed);
}
pub fn add_write_duration_change(&self, dur: usize) {
pub fn add_write_duration_change(&self, memtable_duration: usize, write_duration: usize) {
self.write_count.fetch_add(1, Ordering::Relaxed);
self.write_cost.fetch_add(dur, Ordering::Relaxed);
if dur > self.max_write_cost.load(Ordering::Relaxed) {
self.max_write_cost.store(dur, Ordering::Relaxed);
self.write_cost.fetch_add(write_duration, Ordering::Relaxed);
self.mem_cost
.fetch_add(memtable_duration, Ordering::Relaxed);
if write_duration > self.max_write_cost.load(Ordering::Relaxed) {
self.max_write_cost.store(write_duration, Ordering::Relaxed);
}
if memtable_duration > self.max_mem_cost.load(Ordering::Relaxed) {
self.max_write_cost
.store(memtable_duration, Ordering::Relaxed);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ mod tests {
entries
.encode_to::<Entry>(&mut encoded, &mut entries_size1)
.unwrap();
for idx in entries.entries_index.borrow_mut().iter_mut() {
for idx in entries.entries_index.iter_mut() {
idx.file_num = file_num;
}
let (mut s, mut entries_size2) = (encoded.as_slice(), 0);
Expand Down
31 changes: 13 additions & 18 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,14 @@ impl<T: Clone> Drop for Worker<T> {

#[derive(Clone, Debug, Copy, PartialEq, Default)]
pub struct Statistic {
pub wal_cost: usize,
pub sync_cost: usize,
pub avg_wal_cost: usize,
pub avg_sync_cost: usize,
pub avg_write_cost: usize,
pub avg_mem_cost: usize,
pub max_wal_cost: usize,
pub max_sync_cost: usize,
pub max_write_cost: usize,
pub max_mem_cost: usize,
pub freq: usize,
}

Expand All @@ -342,18 +344,9 @@ fn max(left: usize, right: usize) -> usize {
}

impl Statistic {
pub fn add(&mut self, other: &Self) {
self.wal_cost += other.wal_cost;
self.sync_cost += other.sync_cost;
self.freq += other.freq;
self.max_wal_cost = max(self.max_wal_cost, other.max_wal_cost);
self.max_write_cost = max(self.max_write_cost, other.max_write_cost);
self.max_sync_cost = max(self.max_sync_cost, other.max_sync_cost);
}

pub fn clear(&mut self) {
self.wal_cost = 0;
self.sync_cost = 0;
self.avg_wal_cost = 0;
self.avg_sync_cost = 0;
self.avg_write_cost = 0;
self.max_wal_cost = 0;
self.max_sync_cost = 0;
Expand All @@ -363,18 +356,20 @@ impl Statistic {

#[inline]
pub fn add_wal(&mut self, wal: usize) {
self.wal_cost += wal;
self.avg_wal_cost += wal;
self.max_wal_cost = max(self.max_wal_cost, wal);
}

#[inline]
pub fn add_sync(&mut self, sync: usize) {
self.sync_cost += sync;
self.avg_sync_cost += sync;
self.max_sync_cost = max(self.max_sync_cost, sync);
}

#[inline]
pub fn add_one(&mut self) {
self.freq += 1;
pub fn divide(&mut self) {
if self.freq > 0 {
self.avg_wal_cost /= self.freq;
self.avg_sync_cost /= self.freq;
}
}
}
14 changes: 9 additions & 5 deletions src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ where
return Ok(());
}
LogMsg::Metric(cb) => {
let _ = cb.send(self.statistic.clone());
self.statistic.clear();
self.report(cb);
continue;
}
};
Expand All @@ -79,8 +78,7 @@ where
return Ok(());
}
LogMsg::Metric(cb) => {
let _ = cb.send(self.statistic.clone());
self.statistic.clear();
self.report(cb);
continue;
}
};
Expand Down Expand Up @@ -118,12 +116,18 @@ where
self.statistic.add_wal(wal_cost as usize);
self.statistic
.add_sync((wal_cost - before_sync_cost) as usize);
self.statistic.add_one();
self.statistic.freq += 1;
for (offset, sender) in write_ret.drain(..) {
let _ = sender.send((file_num, base_offset + offset, tracker.clone()));
}
write_buffer.clear();
}
Ok(())
}

pub fn report(&mut self, cb: Sender<Statistic>) {
self.statistic.divide();
let _ = cb.send(self.statistic.clone());
self.statistic.clear();
}
}

0 comments on commit f21f0a5

Please sign in to comment.