diff --git a/src/engine.rs b/src/engine.rs index 36a58cb1..e1cea450 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -142,7 +142,7 @@ where match item.content { LogItemContent::Entries(entries_to_add) => { let entries = entries_to_add.entries; - let entries_index = entries_to_add.entries_index.into_inner(); + let entries_index = entries_to_add.entries_index; if queue == LogQueue::Rewrite { memtable.wl().append_rewrite(entries, entries_index); } else { @@ -184,8 +184,8 @@ where .map_err(|_| Error::Stop)?; let (file_num, offset, tracker) = r.await?; if file_num > 0 { - for item in log_batch.items.drain(..) { - if let LogItemContent::Entries(ref entries) = item.content { + for mut item in log_batch.items.drain(..) { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_position(LogQueue::Append, file_num, offset, &tracker); } self.apply_to_memtable(item, LogQueue::Append, file_num); @@ -332,13 +332,13 @@ where let mut encoded_size = 0; for item in &log_batch.items { if let LogItemContent::Entries(ref entries) = item.content { - encoded_size += entries.encoded_size.get(); + encoded_size += entries.encoded_size; } } if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) { - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in log_batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.attach_cache_tracker(tracker.clone()); } } diff --git a/src/log_batch.rs b/src/log_batch.rs index 2204564d..7509e8ce 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -1,5 +1,4 @@ use std::borrow::{Borrow, Cow}; -use std::cell::{Cell, RefCell}; use std::io::BufRead; use std::marker::PhantomData; use std::sync::atomic::AtomicUsize; @@ -131,9 +130,9 @@ where { pub entries: Vec, // EntryIndex may be update after write to file. - pub entries_index: RefCell>, + pub entries_index: Vec, - pub encoded_size: Cell, + pub encoded_size: usize, } impl PartialEq for Entries { @@ -148,14 +147,14 @@ impl Entries { let (encoded_size, entries_index) = match entries_index { Some(index) => ( index.iter().fold(0, |acc, x| acc + x.len as usize), - RefCell::new(index), + index, ), - None => (0, RefCell::new(vec![EntryIndex::default(); len])), + None => (0, vec![EntryIndex::default(); len]), }; Entries { entries, entries_index, - encoded_size: Cell::new(encoded_size), + encoded_size, } } @@ -190,7 +189,7 @@ impl Entries { Ok(Entries::new(entries, Some(entries_index))) } - pub fn encode_to>(&self, vec: &mut Vec) -> Result<()> { + pub fn encode_to>(&mut self, vec: &mut Vec) -> Result<()> { if self.entries.is_empty() { return Ok(()); } @@ -204,13 +203,12 @@ impl Entries { vec.encode_var_u64(content.len() as u64)?; // file_num = 0 means entry index is not initialized. - let mut entries_index = self.entries_index.borrow_mut(); - if entries_index[i].file_num == 0 { - entries_index[i].index = W::index(&e); + if self.entries_index[i].file_num == 0 { + self.entries_index[i].index = W::index(&e); // This offset doesn't count the header. - entries_index[i].offset = vec.len() as u64; - entries_index[i].len = content.len() as u64; - self.encoded_size.update(|x| x + content.len()); + self.entries_index[i].offset = vec.len() as u64; + self.entries_index[i].len = content.len() as u64; + self.encoded_size += content.len(); } vec.extend_from_slice(&content); @@ -219,13 +217,13 @@ impl Entries { } pub fn update_position( - &self, + &mut self, queue: LogQueue, file_num: u64, base: u64, chunk_size: &Option>, ) { - for idx in self.entries_index.borrow_mut().iter_mut() { + for idx in self.entries_index.iter_mut() { debug_assert!(idx.file_num == 0 && idx.base_offset == 0); idx.queue = queue; idx.file_num = file_num; @@ -239,8 +237,8 @@ impl Entries { } } - pub fn attach_cache_tracker(&self, chunk_size: Arc) { - for idx in self.entries_index.borrow_mut().iter_mut() { + pub fn attach_cache_tracker(&mut self, chunk_size: Arc) { + for idx in self.entries_index.iter_mut() { idx.cache_tracker = Some(CacheTracker { chunk_size: chunk_size.clone(), sub_on_drop: idx.len as usize, @@ -248,8 +246,8 @@ impl Entries { } } - fn update_compression_type(&self, compression_type: CompressionType, batch_len: u64) { - for idx in self.entries_index.borrow_mut().iter_mut() { + fn update_compression_type(&mut self, compression_type: CompressionType, batch_len: u64) { + for idx in self.entries_index.iter_mut() { idx.compression_type = compression_type; idx.batch_len = batch_len; } @@ -403,19 +401,19 @@ impl LogItem { } } - pub fn encode_to>(&self, vec: &mut Vec) -> Result<()> { + pub fn encode_to>(&mut self, vec: &mut Vec) -> Result<()> { // layout = { 8 byte id | 1 byte type | item layout } vec.encode_var_u64(self.raft_group_id)?; - match self.content { - LogItemContent::Entries(ref entries) => { + match &mut self.content { + LogItemContent::Entries(entries) => { vec.push(TYPE_ENTRIES); entries.encode_to::(vec)?; } - LogItemContent::Command(ref command) => { + LogItemContent::Command(command) => { vec.push(TYPE_COMMAND); command.encode_to(vec); } - LogItemContent::Kv(ref kv) => { + LogItemContent::Kv(kv) => { vec.push(TYPE_KV); kv.encode_to(vec)?; } @@ -573,8 +571,8 @@ where assert!(reader.is_empty()); buf.consume(batch_len); - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in log_batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_compression_type(batch_type, batch_len as u64); } } @@ -583,7 +581,7 @@ where } // TODO: avoid to write a large batch into one compressed chunk. - pub fn encode_to_bytes(&self, encoded_size: &mut usize) -> Option> { + pub fn encode_to_bytes(&mut self, encoded_size: &mut usize) -> Option> { if self.items.is_empty() { return None; } @@ -592,7 +590,7 @@ where let mut vec = Vec::with_capacity(4096); vec.encode_u64(0).unwrap(); vec.encode_var_u64(self.items.len() as u64).unwrap(); - for item in &self.items { + for item in self.items.iter_mut() { item.encode_to::(&mut vec).unwrap(); } @@ -611,9 +609,9 @@ where vec.as_mut_slice().write_u64::(header).unwrap(); let batch_len = (vec.len() - 8) as u64; - for item in &self.items { - if let LogItemContent::Entries(ref entries) = item.content { - *encoded_size += entries.encoded_size.get(); + for item in self.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { + *encoded_size += entries.encoded_size; entries.update_compression_type(compression_type, batch_len as u64); } } @@ -652,11 +650,11 @@ mod tests { fn test_entries_enc_dec() { let pb_entries = vec![Entry::new(); 10]; let file_num = 1; - let entries = Entries::new(pb_entries, None); + let mut entries = Entries::new(pb_entries, None); let mut encoded = vec![]; entries.encode_to::(&mut encoded).unwrap(); - for idx in entries.entries_index.borrow_mut().iter_mut() { + for idx in entries.entries_index.iter_mut() { idx.file_num = file_num; } let mut s = encoded.as_slice(); diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 1a80ca0f..e4b498d3 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -54,7 +54,7 @@ pub trait GenericPipeLog: Sized + Clone + Send { /// Rewrite a batch into the rewrite queue. fn rewrite>( &self, - batch: &LogBatch, + batch: &mut LogBatch, sync: bool, file_num: &mut u64, ) -> Result; @@ -417,7 +417,7 @@ impl GenericPipeLog for PipeLog { fn rewrite>( &self, - batch: &LogBatch, + batch: &mut LogBatch, sync: bool, file_num: &mut u64, ) -> Result { @@ -428,8 +428,8 @@ impl GenericPipeLog for PipeLog { if sync { fd.sync()?; } - for item in &batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_position(LogQueue::Rewrite, cur_file_num, offset, &None); } } diff --git a/src/purge.rs b/src/purge.rs index d6723c1f..1ff95b19 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -142,7 +142,7 @@ where fn rewrite_impl(&self, log_batch: &mut LogBatch, latest_rewrite: u64) -> Result<()> { let mut file_num = 0; - self.pipe_log.rewrite(&log_batch, true, &mut file_num)?; + self.pipe_log.rewrite(log_batch, true, &mut file_num)?; if file_num > 0 { rewrite_to_memtable(&self.memtables, log_batch, file_num, latest_rewrite); } @@ -163,8 +163,7 @@ fn rewrite_to_memtable( let memtable = memtables.get_or_insert(item.raft_group_id); match item.content { LogItemContent::Entries(entries_to_add) => { - let entries_index = entries_to_add.entries_index.into_inner(); - memtable.wl().rewrite(entries_index, latest_rewrite); + memtable.wl().rewrite(entries_to_add.entries_index, latest_rewrite); } LogItemContent::Kv(kv) => match kv.op_type { OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num),