Skip to content

Commit

Permalink
fix mutable entries index
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Sep 27, 2020
1 parent 6eb627e commit 2017cdb
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 46 deletions.
12 changes: 6 additions & 6 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ where
match item.content {
LogItemContent::Entries(entries_to_add) => {
let entries = entries_to_add.entries;
let entries_index = entries_to_add.entries_index.into_inner();
let entries_index = entries_to_add.entries_index;
if queue == LogQueue::Rewrite {
memtable.wl().append_rewrite(entries, entries_index);
} else {
Expand Down Expand Up @@ -184,8 +184,8 @@ where
.map_err(|_| Error::Stop)?;
let (file_num, offset, tracker) = r.await?;
if file_num > 0 {
for item in log_batch.items.drain(..) {
if let LogItemContent::Entries(ref entries) = item.content {
for mut item in log_batch.items.drain(..) {
if let LogItemContent::Entries(entries) = &mut item.content {
entries.update_position(LogQueue::Append, file_num, offset, &tracker);
}
self.apply_to_memtable(item, LogQueue::Append, file_num);
Expand Down Expand Up @@ -332,13 +332,13 @@ where
let mut encoded_size = 0;
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
encoded_size += entries.encoded_size.get();
encoded_size += entries.encoded_size;
}
}

if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) {
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
for item in log_batch.items.iter_mut() {
if let LogItemContent::Entries(entries) = &mut item.content {
entries.attach_cache_tracker(tracker.clone());
}
}
Expand Down
64 changes: 31 additions & 33 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::borrow::{Borrow, Cow};
use std::cell::{Cell, RefCell};
use std::io::BufRead;
use std::marker::PhantomData;
use std::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -131,9 +130,9 @@ where
{
pub entries: Vec<E>,
// EntryIndex may be update after write to file.
pub entries_index: RefCell<Vec<EntryIndex>>,
pub entries_index: Vec<EntryIndex>,

pub encoded_size: Cell<usize>,
pub encoded_size: usize,
}

impl<E: Message + PartialEq> PartialEq for Entries<E> {
Expand All @@ -148,14 +147,14 @@ impl<E: Message> Entries<E> {
let (encoded_size, entries_index) = match entries_index {
Some(index) => (
index.iter().fold(0, |acc, x| acc + x.len as usize),
RefCell::new(index),
index,
),
None => (0, RefCell::new(vec![EntryIndex::default(); len])),
None => (0, vec![EntryIndex::default(); len]),
};
Entries {
entries,
entries_index,
encoded_size: Cell::new(encoded_size),
encoded_size,
}
}

Expand Down Expand Up @@ -190,7 +189,7 @@ impl<E: Message> Entries<E> {
Ok(Entries::new(entries, Some(entries_index)))
}

pub fn encode_to<W: EntryExt<E>>(&self, vec: &mut Vec<u8>) -> Result<()> {
pub fn encode_to<W: EntryExt<E>>(&mut self, vec: &mut Vec<u8>) -> Result<()> {
if self.entries.is_empty() {
return Ok(());
}
Expand All @@ -204,13 +203,12 @@ impl<E: Message> Entries<E> {
vec.encode_var_u64(content.len() as u64)?;

// file_num = 0 means entry index is not initialized.
let mut entries_index = self.entries_index.borrow_mut();
if entries_index[i].file_num == 0 {
entries_index[i].index = W::index(&e);
if self.entries_index[i].file_num == 0 {
self.entries_index[i].index = W::index(&e);
// This offset doesn't count the header.
entries_index[i].offset = vec.len() as u64;
entries_index[i].len = content.len() as u64;
self.encoded_size.update(|x| x + content.len());
self.entries_index[i].offset = vec.len() as u64;
self.entries_index[i].len = content.len() as u64;
self.encoded_size += content.len();
}

vec.extend_from_slice(&content);
Expand All @@ -219,13 +217,13 @@ impl<E: Message> Entries<E> {
}

pub fn update_position(
&self,
&mut self,
queue: LogQueue,
file_num: u64,
base: u64,
chunk_size: &Option<Arc<AtomicUsize>>,
) {
for idx in self.entries_index.borrow_mut().iter_mut() {
for idx in self.entries_index.iter_mut() {
debug_assert!(idx.file_num == 0 && idx.base_offset == 0);
idx.queue = queue;
idx.file_num = file_num;
Expand All @@ -239,17 +237,17 @@ impl<E: Message> Entries<E> {
}
}

pub fn attach_cache_tracker(&self, chunk_size: Arc<AtomicUsize>) {
for idx in self.entries_index.borrow_mut().iter_mut() {
pub fn attach_cache_tracker(&mut self, chunk_size: Arc<AtomicUsize>) {
for idx in self.entries_index.iter_mut() {
idx.cache_tracker = Some(CacheTracker {
chunk_size: chunk_size.clone(),
sub_on_drop: idx.len as usize,
});
}
}

fn update_compression_type(&self, compression_type: CompressionType, batch_len: u64) {
for idx in self.entries_index.borrow_mut().iter_mut() {
fn update_compression_type(&mut self, compression_type: CompressionType, batch_len: u64) {
for idx in self.entries_index.iter_mut() {
idx.compression_type = compression_type;
idx.batch_len = batch_len;
}
Expand Down Expand Up @@ -403,19 +401,19 @@ impl<E: Message> LogItem<E> {
}
}

pub fn encode_to<W: EntryExt<E>>(&self, vec: &mut Vec<u8>) -> Result<()> {
pub fn encode_to<W: EntryExt<E>>(&mut self, vec: &mut Vec<u8>) -> Result<()> {
// layout = { 8 byte id | 1 byte type | item layout }
vec.encode_var_u64(self.raft_group_id)?;
match self.content {
LogItemContent::Entries(ref entries) => {
match &mut self.content {
LogItemContent::Entries(entries) => {
vec.push(TYPE_ENTRIES);
entries.encode_to::<W>(vec)?;
}
LogItemContent::Command(ref command) => {
LogItemContent::Command(command) => {
vec.push(TYPE_COMMAND);
command.encode_to(vec);
}
LogItemContent::Kv(ref kv) => {
LogItemContent::Kv(kv) => {
vec.push(TYPE_KV);
kv.encode_to(vec)?;
}
Expand Down Expand Up @@ -573,8 +571,8 @@ where
assert!(reader.is_empty());
buf.consume(batch_len);

for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
for item in log_batch.items.iter_mut() {
if let LogItemContent::Entries(entries) = &mut item.content {
entries.update_compression_type(batch_type, batch_len as u64);
}
}
Expand All @@ -583,7 +581,7 @@ where
}

// TODO: avoid to write a large batch into one compressed chunk.
pub fn encode_to_bytes(&self, encoded_size: &mut usize) -> Option<Vec<u8>> {
pub fn encode_to_bytes(&mut self, encoded_size: &mut usize) -> Option<Vec<u8>> {
if self.items.is_empty() {
return None;
}
Expand All @@ -592,7 +590,7 @@ where
let mut vec = Vec::with_capacity(4096);
vec.encode_u64(0).unwrap();
vec.encode_var_u64(self.items.len() as u64).unwrap();
for item in &self.items {
for item in self.items.iter_mut() {
item.encode_to::<W>(&mut vec).unwrap();
}

Expand All @@ -611,9 +609,9 @@ where
vec.as_mut_slice().write_u64::<BigEndian>(header).unwrap();

let batch_len = (vec.len() - 8) as u64;
for item in &self.items {
if let LogItemContent::Entries(ref entries) = item.content {
*encoded_size += entries.encoded_size.get();
for item in self.items.iter_mut() {
if let LogItemContent::Entries(entries) = &mut item.content {
*encoded_size += entries.encoded_size;
entries.update_compression_type(compression_type, batch_len as u64);
}
}
Expand Down Expand Up @@ -652,11 +650,11 @@ mod tests {
fn test_entries_enc_dec() {
let pb_entries = vec![Entry::new(); 10];
let file_num = 1;
let entries = Entries::new(pb_entries, None);
let mut entries = Entries::new(pb_entries, None);

let mut encoded = vec![];
entries.encode_to::<Entry>(&mut encoded).unwrap();
for idx in entries.entries_index.borrow_mut().iter_mut() {
for idx in entries.entries_index.iter_mut() {
idx.file_num = file_num;
}
let mut s = encoded.as_slice();
Expand Down
8 changes: 4 additions & 4 deletions src/pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub trait GenericPipeLog: Sized + Clone + Send {
/// Rewrite a batch into the rewrite queue.
fn rewrite<E: Message, W: EntryExt<E>>(
&self,
batch: &LogBatch<E, W>,
batch: &mut LogBatch<E, W>,
sync: bool,
file_num: &mut u64,
) -> Result<usize>;
Expand Down Expand Up @@ -417,7 +417,7 @@ impl GenericPipeLog for PipeLog {

fn rewrite<E: Message, W: EntryExt<E>>(
&self,
batch: &LogBatch<E, W>,
batch: &mut LogBatch<E, W>,
sync: bool,
file_num: &mut u64,
) -> Result<usize> {
Expand All @@ -428,8 +428,8 @@ impl GenericPipeLog for PipeLog {
if sync {
fd.sync()?;
}
for item in &batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
for item in batch.items.iter_mut() {
if let LogItemContent::Entries(entries) = &mut item.content {
entries.update_position(LogQueue::Rewrite, cur_file_num, offset, &None);
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ where

fn rewrite_impl(&self, log_batch: &mut LogBatch<E, W>, latest_rewrite: u64) -> Result<()> {
let mut file_num = 0;
self.pipe_log.rewrite(&log_batch, true, &mut file_num)?;
self.pipe_log.rewrite(log_batch, true, &mut file_num)?;
if file_num > 0 {
rewrite_to_memtable(&self.memtables, log_batch, file_num, latest_rewrite);
}
Expand All @@ -163,8 +163,7 @@ fn rewrite_to_memtable<E, W>(
let memtable = memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
let entries_index = entries_to_add.entries_index.into_inner();
memtable.wl().rewrite(entries_index, latest_rewrite);
memtable.wl().rewrite(entries_to_add.entries_index, latest_rewrite);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num),
Expand Down

0 comments on commit 2017cdb

Please sign in to comment.