diff --git a/src/cache_evict.rs b/src/cache_evict.rs index 125d23bf..22fa56ac 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -64,7 +64,7 @@ impl CacheSubmitor { self.block_on_full = false; } - pub fn get_cache_tracker(&mut self, file_num: u64, offset: u64) -> Option> { + pub fn get_cache_tracker(&mut self, file_num: u64, offset: u64) -> Option { if self.cache_limit == 0 { return None; } @@ -104,13 +104,15 @@ impl CacheSubmitor { } } - Some(self.size_tracker.clone()) + Some(CacheTracker::new( + self.global_stats.clone(), + self.size_tracker.clone(), + )) } pub fn fill_chunk(&mut self, size: usize) { self.chunk_size += size; self.size_tracker.fetch_add(size, Ordering::Release); - self.global_stats.add_mem_change(size); } fn reset(&mut self, file_num: u64, offset: u64) { diff --git a/src/engine.rs b/src/engine.rs index b427a474..31b502b5 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -195,15 +195,14 @@ where log_batch: &mut LogBatch, sync: bool, ) -> BoxFuture<'static, Result> { - let mut entries_size = 0; let now = Instant::now(); - if let Some(content) = log_batch.encode_to_bytes(&mut entries_size) { + if let Some(content) = log_batch.encode_to_bytes() { let (sender, r) = future_channel::oneshot::channel(); let bytes = content.len(); let task = WriteTask { content, sync, - entries_size, + entries_size: log_batch.entries_size(), sender, }; if let Err(_) = self.wal_sender.send(LogMsg::Write(task)) { @@ -378,14 +377,12 @@ where if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) { - let mut encoded_size = 0; for item in log_batch.items.iter_mut() { if let LogItemContent::Entries(entries) = &mut item.content { entries.attach_cache_tracker(tracker.clone()); - encoded_size += entries.encoded_size; } } - cache_submitor.fill_chunk(encoded_size); + cache_submitor.fill_chunk(log_batch.entries_size()); } } self.apply_to_memtable(&mut log_batch, queue, file_num); diff --git a/src/log_batch.rs b/src/log_batch.rs index c085728d..64e045fe 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -139,7 +139,7 @@ impl PartialEq for Entries { impl Entries { pub fn new(entries: Vec, entries_index: Option>) -> Entries { - let entries_index = + let entries_index = entries_index.unwrap_or_else(|| vec![EntryIndex::default(); entries.len()]); Entries { entries, @@ -202,7 +202,7 @@ impl Entries { // This offset doesn't count the header. self.entries_index[i].offset = vec.len() as u64; self.entries_index[i].len = content.len() as u64; - *entries_size += entries_index[i].len as usize; + *entries_size += self.entries_index[i].len as usize; } vec.extend_from_slice(&content); @@ -232,12 +232,12 @@ impl Entries { } } - pub fn attach_cache_tracker(&mut self, chunk_size: Arc) { + pub fn attach_cache_tracker(&mut self, tracker: CacheTracker) { for idx in self.entries_index.iter_mut() { - idx.cache_tracker = Some(CacheTracker { - chunk_size: chunk_size.clone(), - sub_on_drop: idx.len as usize, - }); + let mut tkr = tracker.clone(); + tkr.global_stats.add_mem_change(idx.len as usize); + tkr.sub_on_drop = idx.len as usize; + idx.cache_tracker = Some(tkr); } } @@ -466,7 +466,7 @@ where W: EntryExt, { pub items: Vec>, - entries_size: RefCell, + entries_size: usize, _phantom: PhantomData, } @@ -478,7 +478,7 @@ where fn default() -> Self { Self { items: Vec::with_capacity(16), - entries_size: RefCell::new(0), + entries_size: 0, _phantom: PhantomData, } } @@ -496,7 +496,7 @@ where pub fn with_capacity(cap: usize) -> Self { Self { items: Vec::with_capacity(cap), - entries_size: RefCell::new(0), + entries_size: 0, _phantom: PhantomData, } } @@ -576,7 +576,7 @@ where file_num, base_offset, content_offset, - &mut log_batch.entries_size.borrow_mut(), + &mut log_batch.entries_size, )?; log_batch.items.push(item); items_count -= 1; @@ -603,12 +603,17 @@ where let mut vec = Vec::with_capacity(4096); vec.encode_u64(0).unwrap(); vec.encode_var_u64(self.items.len() as u64).unwrap(); - for item in &self.items { - item.encode_to::(&mut vec, &mut *self.entries_size.borrow_mut()) + for item in self.items.iter_mut() { + item.encode_to::(&mut vec, &mut self.entries_size) .unwrap(); } - let compression_type = CompressionType::None; + let compression_type = if vec.len() > COMPRESSION_SIZE { + vec = lz4::encode_block(&vec[HEADER_LEN..], HEADER_LEN, 4); + CompressionType::Lz4 + } else { + CompressionType::None + }; let checksum = crc32(&vec[8..]); vec.encode_u32_le(checksum).unwrap(); diff --git a/src/wal.rs b/src/wal.rs index 223a321f..d56805dc 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -1,9 +1,7 @@ use futures::channel::oneshot::Sender; -use std::sync::atomic::AtomicUsize; use std::sync::mpsc::Receiver; -use std::sync::Arc; -use crate::cache_evict::CacheSubmitor; +use crate::cache_evict::{CacheSubmitor, CacheTracker}; use crate::errors::Result; use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::Statistic; @@ -14,7 +12,7 @@ pub struct WriteTask { pub content: Vec, pub entries_size: usize, pub sync: bool, - pub sender: Sender<(u64, u64, Option>)>, + pub sender: Sender<(u64, u64, Option)>, } pub enum LogMsg {