Skip to content

Commit

Permalink
change some internal functions to methods (#53)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu authored Sep 27, 2020
1 parent 7f89d27 commit 68b6a07
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 185 deletions.
4 changes: 2 additions & 2 deletions src/cache_evict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::sync::Arc;
use crossbeam::channel::{bounded, Sender};
use protobuf::Message;

use crate::engine::{MemTableAccessor};
use crate::GlobalStats;
use crate::engine::MemTableAccessor;
use crate::log_batch::{EntryExt, LogBatch, LogItemContent};
use crate::pipe_log::{GenericPipeLog, LogQueue};
use crate::util::{HandyRwLock, Runnable, Scheduler};
use crate::GlobalStats;

pub const DEFAULT_CACHE_CHUNK_SIZE: usize = 4 * 1024 * 1024;

Expand Down
276 changes: 125 additions & 151 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::memtable::{EntryIndex, MemTable};
use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION};
use crate::purge::PurgeManager;
use crate::util::{HandyRwLock, HashMap, Worker};
use crate::{codec, GlobalStats, CacheStats, Result};
use crate::{codec, CacheStats, GlobalStats, Result};

const SLOTS_COUNT: usize = 128;

Expand Down Expand Up @@ -49,7 +49,9 @@ where
E: Message + Clone,
W: EntryExt<E>,
{
fn new(creator: Arc<dyn Fn(u64) -> MemTable<E, W> + Send + Sync>) -> MemTableAccessor<E, W> {
pub fn new(
creator: Arc<dyn Fn(u64) -> MemTable<E, W> + Send + Sync>,
) -> MemTableAccessor<E, W> {
let mut slots = Vec::with_capacity(SLOTS_COUNT);
for _ in 0..SLOTS_COUNT {
slots.push(Arc::new(RwLock::new(MemTables::default())));
Expand All @@ -74,11 +76,11 @@ where
memtables.get(&raft_group_id).cloned()
}

pub fn remove(&self, raft_group_id: u64) {
pub fn remove(&self, raft_group_id: u64) -> Option<Arc<RwLock<MemTable<E, W>>>> {
let mut memtables = self.slots[raft_group_id as usize % SLOTS_COUNT]
.write()
.unwrap();
memtables.remove(&raft_group_id);
memtables.remove(&raft_group_id)
}

pub fn fold<B, F: Fn(B, &MemTable<E, W>) -> B>(&self, mut init: B, fold: F) -> B {
Expand Down Expand Up @@ -129,103 +131,45 @@ where
W: EntryExt<E> + 'static,
P: GenericPipeLog,
{
// Recover from disk.
fn recover(
queue: LogQueue,
pipe_log: &P,
memtables: &MemTableAccessor<E, W>,
recovery_mode: RecoveryMode,
) -> Result<()> {
// Get first file number and last file number.
let first_file_num = pipe_log.first_file_num(queue);
let active_file_num = pipe_log.active_file_num(queue);

// Iterate and recover from files one by one.
let start = Instant::now();
for file_num in first_file_num..=active_file_num {
// Read a file.
let content = pipe_log.read_whole_file(queue, file_num)?;

// Verify file header.
let mut buf = content.as_slice();
if !buf.starts_with(FILE_MAGIC_HEADER) {
if file_num != active_file_num {
warn!("Raft log header is corrupted at {:?}.{}", queue, file_num);
return Err(box_err!("Raft log file header is corrupted"));
} else {
pipe_log.truncate_active_log(queue, Some(0)).unwrap();
break;
}
}

// Iterate all LogBatch in one file.
let start_ptr = buf.as_ptr();
buf.consume(FILE_MAGIC_HEADER.len() + VERSION.len());
let mut offset = (FILE_MAGIC_HEADER.len() + VERSION.len()) as u64;
loop {
match LogBatch::from_bytes(&mut buf, file_num, offset) {
Ok(Some(mut log_batch)) => {
let mut encoded_size = 0;
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
encoded_size += entries.encoded_size.get();
}
}

if let Some(tracker) = pipe_log.cache_submitor().get_cache_tracker(
file_num,
offset,
encoded_size,
) {
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
entries.attach_cache_tracker(tracker.clone());
}
}
}
apply_to_memtable(memtables, &mut log_batch, queue, file_num);
offset = (buf.as_ptr() as usize - start_ptr as usize) as u64;
}
Ok(None) => {
info!("Recovered raft log {:?}.{}.", queue, file_num);
break;
fn apply_to_memtable(&self, log_batch: &mut LogBatch<E, W>, queue: LogQueue, file_num: u64) {
for item in log_batch.items.drain(..) {
let memtable = self.memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
let entries = entries_to_add.entries;
let entries_index = entries_to_add.entries_index.into_inner();
if queue == LogQueue::Rewrite {
memtable.wl().append_rewrite(entries, entries_index);
} else {
memtable.wl().append(entries, entries_index);
}
Err(e) => {
warn!(
"Raft log content is corrupted at {:?}.{}:{}, error: {}",
queue, file_num, offset, e
);
// There may be a pre-allocated space at the tail of the active log.
if file_num == active_file_num
&& recovery_mode == RecoveryMode::TolerateCorruptedTailRecords
{
pipe_log.truncate_active_log(queue, Some(offset as usize))?;
break;
}
LogItemContent::Command(Command::Clean) => {
if self.memtables.remove(item.raft_group_id).is_some() {}
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.wl().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let (key, value) = (kv.key, kv.value.unwrap());
match queue {
LogQueue::Append => memtable.wl().put(key, value, file_num),
LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num),
}
return Err(box_err!("Raft log content is corrupted"));
}
}
OpType::Del => memtable.wl().delete(kv.key.as_slice()),
},
}
}
info!("Recover raft log takes {:?}", start.elapsed());
Ok(())
}

// Write a batch needs 3 steps:
// 1. find all involved raft groups and then lock their memtables;
// 2. append the log batch to pipe log;
// 3. update all involved memtables.
// The lock logic is a little complex. However it's necessary because
// 1. "Inactive log rewrite" needs to keep logs on pipe log order;
// 2. Users can call `append` on one raft group concurrently.
// Maybe we can improve the implement of "inactive log rewrite" and
// forbid concurrent `append` to remove locks here.
fn write_impl(&self, log_batch: &mut LogBatch<E, W>, sync: bool) -> Result<usize> {
let queue = LogQueue::Append;
let mut file_num = 0;
let bytes = self.pipe_log.write(log_batch, sync, &mut file_num)?;
if file_num > 0 {
apply_to_memtable(&self.memtables, log_batch, queue, file_num);
self.apply_to_memtable(log_batch, queue, file_num);
}
Ok(bytes)
}
Expand All @@ -251,6 +195,10 @@ where
E: Message + Clone,
W: EntryExt<E> + 'static,
{
pub fn new(cfg: Config) -> Engine<E, W, PipeLog> {
Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap()
}

fn new_impl(cfg: Config, chunk_limit: usize) -> Result<Engine<E, W, PipeLog>> {
let cache_limit = cfg.cache_limit.0 as usize;
let global_stats = Arc::new(GlobalStats::default());
Expand All @@ -267,7 +215,6 @@ where
),
)
.expect("Open raft log");
pipe_log.cache_submitor().block_on_full();

let memtables = {
let stats = global_stats.clone();
Expand All @@ -285,20 +232,10 @@ where
);
cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1)));

let recovery_mode = cfg.recovery_mode;
Engine::recover(
LogQueue::Rewrite,
&pipe_log,
&memtables,
RecoveryMode::TolerateCorruptedTailRecords,
)?;
Engine::recover(LogQueue::Append, &pipe_log, &memtables, recovery_mode)?;
pipe_log.cache_submitor().nonblock_on_full();

let cfg = Arc::new(cfg);
let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone());

Ok(Engine {
let engine = Engine {
cfg,
memtables,
pipe_log,
Expand All @@ -307,11 +244,95 @@ where
workers: Arc::new(RwLock::new(Workers {
cache_evict: cache_evict_worker,
})),
})
};

engine.pipe_log.cache_submitor().block_on_full();
engine.recover(
LogQueue::Rewrite,
RecoveryMode::TolerateCorruptedTailRecords,
)?;
engine.recover(LogQueue::Append, engine.cfg.recovery_mode)?;
engine.pipe_log.cache_submitor().nonblock_on_full();

Ok(engine)
}

pub fn new(cfg: Config) -> Engine<E, W, PipeLog> {
Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap()
// Recover from disk.
fn recover(&self, queue: LogQueue, recovery_mode: RecoveryMode) -> Result<()> {
// Get first file number and last file number.
let first_file_num = self.pipe_log.first_file_num(queue);
let active_file_num = self.pipe_log.active_file_num(queue);

// Iterate and recover from files one by one.
let start = Instant::now();
for file_num in first_file_num..=active_file_num {
// Read a file.
let content = self.pipe_log.read_whole_file(queue, file_num)?;

// Verify file header.
let mut buf = content.as_slice();
if !buf.starts_with(FILE_MAGIC_HEADER) {
if file_num != active_file_num {
warn!("Raft log header is corrupted at {:?}.{}", queue, file_num);
return Err(box_err!("Raft log file header is corrupted"));
} else {
self.pipe_log.truncate_active_log(queue, Some(0)).unwrap();
break;
}
}

// Iterate all LogBatch in one file.
let start_ptr = buf.as_ptr();
buf.consume(FILE_MAGIC_HEADER.len() + VERSION.len());
let mut offset = (FILE_MAGIC_HEADER.len() + VERSION.len()) as u64;
loop {
match LogBatch::from_bytes(&mut buf, file_num, offset) {
Ok(Some(mut log_batch)) => {
let mut encoded_size = 0;
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
encoded_size += entries.encoded_size.get();
}
}

if let Some(tracker) = self.pipe_log.cache_submitor().get_cache_tracker(
file_num,
offset,
encoded_size,
) {
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
entries.attach_cache_tracker(tracker.clone());
}
}
}
self.apply_to_memtable(&mut log_batch, queue, file_num);
offset = (buf.as_ptr() as usize - start_ptr as usize) as u64;
}
Ok(None) => {
info!("Recovered raft log {:?}.{}.", queue, file_num);
break;
}
Err(e) => {
warn!(
"Raft log content is corrupted at {:?}.{}:{}, error: {}",
queue, file_num, offset, e
);
// There may be a pre-allocated space at the tail of the active log.
if file_num == active_file_num
&& recovery_mode == RecoveryMode::TolerateCorruptedTailRecords
{
self.pipe_log
.truncate_active_log(queue, Some(offset as usize))?;
break;
}
return Err(box_err!("Raft log content is corrupted"));
}
}
}
}
info!("Recover raft log takes {:?}", start.elapsed());
Ok(())
}
}

Expand Down Expand Up @@ -518,59 +539,12 @@ where
Ok(e)
}

fn apply_to_memtable<E, W>(
memtables: &MemTableAccessor<E, W>,
log_batch: &mut LogBatch<E, W>,
queue: LogQueue,
file_num: u64,
) where
E: Message + Clone,
W: EntryExt<E>,
{
for item in log_batch.items.drain(..) {
let memtable = memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
let entries = entries_to_add.entries;
let entries_index = entries_to_add.entries_index.into_inner();
if queue == LogQueue::Rewrite {
memtable.wl().append_rewrite(entries, entries_index);
} else {
memtable.wl().append(entries, entries_index);
}
}
LogItemContent::Command(Command::Clean) => {
memtables.remove(item.raft_group_id);
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.wl().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let (key, value) = (kv.key, kv.value.unwrap());
match queue {
LogQueue::Append => memtable.wl().put(key, value, file_num),
LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num),
}
}
OpType::Del => memtable.wl().delete(kv.key.as_slice()),
},
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::util::ReadableSize;
use raft::eraftpb::Entry;

impl EntryExt<Entry> for Entry {
fn index(e: &Entry) -> u64 {
e.get_index()
}
}

type RaftLogEngine = Engine<Entry, Entry, PipeLog>;
impl RaftLogEngine {
fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
Expand Down
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,15 @@ impl GlobalStats {
self.cache_size.store(0, Ordering::Relaxed);
}
}

#[cfg(test)]
mod tests {
use crate::log_batch::EntryExt;
use raft::eraftpb::Entry;

impl EntryExt<Entry> for Entry {
fn index(e: &Entry) -> u64 {
e.get_index()
}
}
}
Loading

0 comments on commit 68b6a07

Please sign in to comment.