Skip to content

Commit

Permalink
Merge branch 'master' into async-io
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored Sep 27, 2020
2 parents 38d77cf + 68b6a07 commit ab2b110
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 41 deletions.
15 changes: 6 additions & 9 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ where
E: Message + Clone,
W: EntryExt<E>,
{
fn new(creator: Arc<dyn Fn(u64) -> MemTable<E, W> + Send + Sync>) -> MemTableAccessor<E, W> {
pub fn new(
creator: Arc<dyn Fn(u64) -> MemTable<E, W> + Send + Sync>,
) -> MemTableAccessor<E, W> {
let mut slots = Vec::with_capacity(SLOTS_COUNT);
for _ in 0..SLOTS_COUNT {
slots.push(Arc::new(RwLock::new(MemTables::default())));
Expand All @@ -82,11 +84,11 @@ where
memtables.get(&raft_group_id).cloned()
}

pub fn remove(&self, raft_group_id: u64) {
pub fn remove(&self, raft_group_id: u64) -> Option<Arc<RwLock<MemTable<E, W>>>> {
let mut memtables = self.slots[raft_group_id as usize % SLOTS_COUNT]
.write()
.unwrap();
memtables.remove(&raft_group_id);
memtables.remove(&raft_group_id)
}

pub fn fold<B, F: Fn(B, &MemTable<E, W>) -> B>(&self, mut init: B, fold: F) -> B {
Expand Down Expand Up @@ -291,6 +293,7 @@ where
wal: None,
})),
};

cache_submitor.block_on_full();
engine.recover(
&mut cache_submitor,
Expand Down Expand Up @@ -618,12 +621,6 @@ mod tests {
use crate::util::ReadableSize;
use raft::eraftpb::Entry;

impl EntryExt<Entry> for Entry {
fn index(e: &Entry) -> u64 {
e.get_index()
}
}

type RaftLogEngine = Engine<Entry, Entry, PipeLog>;
impl RaftLogEngine {
fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
Expand Down
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,15 @@ impl GlobalStats {
self.cache_size.store(0, Ordering::Relaxed);
}
}

#[cfg(test)]
mod tests {
use crate::log_batch::EntryExt;
use raft::eraftpb::Entry;

impl EntryExt<Entry> for Entry {
fn index(e: &Entry) -> u64 {
e.get_index()
}
}
}
21 changes: 12 additions & 9 deletions src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct MemTable<E: Message + Clone, W: EntryExt<E>> {
rewrite_count: usize,

// Region scope key/value pairs
// key -> (value, file_num)
// key -> (value, queue, file_num)
kvs: HashMap<Vec<u8>, (Vec<u8>, LogQueue, u64)>,

cache_size: usize,
Expand Down Expand Up @@ -401,9 +401,10 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
vec_idx.extend_from_slice(first);
vec_idx.extend_from_slice(second);
}
self.global_stats.add_cache_hit(vec.len() - vec_len);
self.global_stats
.add_cache_miss(vec_idx.len() - vec_idx_len);

let (hit, miss) = (vec.len() - vec_len, vec_idx.len() - vec_idx_len);
self.global_stats.add_cache_hit(hit);
self.global_stats.add_cache_miss(miss);
Ok(())
}

Expand Down Expand Up @@ -442,7 +443,7 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
let entry = match queue {
LogQueue::Append => self.entries_index.get(self.rewrite_count),
LogQueue::Rewrite if self.rewrite_count == 0 => None,
_ => self.entries_index.front(),
LogQueue::Rewrite => self.entries_index.front(),
};
let ents_min = entry.map(|e| e.file_num);
let kvs_min = self.kvs_min_file_num(queue);
Expand Down Expand Up @@ -470,7 +471,7 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
self.entries_index.back().map(|e| e.index)
}

fn kvs_min_file_num(&self, queue: LogQueue) -> Option<u64> {
pub fn kvs_min_file_num(&self, queue: LogQueue) -> Option<u64> {
self.kvs
.values()
.filter(|v| v.1 == queue)
Expand Down Expand Up @@ -508,11 +509,12 @@ mod tests {
use raft::eraftpb::Entry;

impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
fn max_file_num(&self, queue: LogQueue) -> Option<u64> {
pub fn max_file_num(&self, queue: LogQueue) -> Option<u64> {
let entry = match queue {
LogQueue::Append if self.rewrite_count == self.entries_index.len() => None,
LogQueue::Append => self.entries_index.back(),
LogQueue::Rewrite if self.rewrite_count == 0 => None,
_ => self.entries_index.get(self.rewrite_count - 1),
LogQueue::Rewrite => self.entries_index.get(self.rewrite_count - 1),
};
let ents_max = entry.map(|e| e.file_num);

Expand All @@ -524,7 +526,8 @@ mod tests {
(None, None) => None,
}
}
fn kvs_max_file_num(&self, queue: LogQueue) -> Option<u64> {

pub fn kvs_max_file_num(&self, queue: LogQueue) -> Option<u64> {
self.kvs
.values()
.filter(|v| v.1 == queue)
Expand Down
43 changes: 20 additions & 23 deletions src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,34 +144,31 @@ where
let mut file_num = 0;
self.pipe_log.rewrite(log_batch, true, &mut file_num)?;
if file_num > 0 {
rewrite_to_memtable(&self.memtables, log_batch, file_num, latest_rewrite);
self.rewrite_to_memtable(log_batch, file_num, latest_rewrite);
}
Ok(())
}
}

fn rewrite_to_memtable<E, W>(
memtables: &MemTableAccessor<E, W>,
log_batch: &mut LogBatch<E, W>,
file_num: u64,
latest_rewrite: u64,
) where
E: Message + Clone,
W: EntryExt<E>,
{
for item in log_batch.items.drain(..) {
let memtable = memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
memtable
.wl()
.rewrite(entries_to_add.entries_index, latest_rewrite);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num),

fn rewrite_to_memtable(
&self,
log_batch: &mut LogBatch<E, W>,
file_num: u64,
latest_rewrite: u64,
) {
for item in log_batch.items.drain(..) {
let memtable = self.memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
let entries_index = entries_to_add.entries_index.into_inner();
memtable.wl().rewrite(entries_index, latest_rewrite);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num),
_ => unreachable!(),
},
_ => unreachable!(),
},
_ => unreachable!(),
}
}
}
}

0 comments on commit ab2b110

Please sign in to comment.