Skip to content

Commit

Permalink
clean APIs and hide all internal modules (#48)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu authored Sep 19, 2020
1 parent c2cbe93 commit c615a14
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 147 deletions.
21 changes: 20 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use crate::log_batch::{
};
use crate::memtable::{EntryIndex, MemTable};
use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION};
use crate::purge::PurgeManager;
use crate::util::{HandyRwLock, HashMap, Worker};
use crate::PurgeManager;
use crate::{codec, CacheStats, Result};

const SLOTS_COUNT: usize = 128;
Expand Down Expand Up @@ -843,6 +843,7 @@ mod tests {
let active_len = engine.pipe_log.file_len(LogQueue::Rewrite, active_num);
assert!(active_num > 1 || active_len > 59); // The rewrite queue isn't empty.

// All entries should be available.
for i in 1..=10 {
for j in 1..=10 {
let e = engine.get_entry(j, i).unwrap().unwrap();
Expand All @@ -861,5 +862,23 @@ mod tests {
assert_eq!(last_index(&engine, j), 10);
}
}

// Rewrite again to check the rewrite queue is healthy.
for i in 11..=20 {
for j in 1..=10 {
entry.set_index(i);
append_log(&engine, j, &entry);
}
}

assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_expired_files().unwrap().is_empty());

let new_active_num = engine.pipe_log.active_file_num(LogQueue::Rewrite);
let new_active_len = engine.pipe_log.file_len(LogQueue::Rewrite, active_num);
assert!(
new_active_num > active_num
|| (new_active_num == active_num && new_active_len > active_len)
);
}
}
20 changes: 9 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![feature(shrink_to)]
#![feature(cell_update)]

#[macro_export]
macro_rules! box_err {
($e:expr) => ({
use std::error::Error;
Expand All @@ -13,25 +12,24 @@ macro_rules! box_err {
});
}

mod cache_evict;
pub mod codec;
pub mod config;
pub mod engine;

mod cache_evict;
mod config;
mod engine;
mod errors;
pub mod log_batch;
pub mod memtable;
pub mod metrics;
pub mod pipe_log;
mod log_batch;
mod memtable;
mod pipe_log;
mod purge;
pub mod util;
mod util;

use crate::pipe_log::PipeLog;
use crate::purge::PurgeManager;

pub use self::config::Config;
pub type RaftLogEngine<X, Y> = self::engine::Engine<X, Y, PipeLog>;
pub use self::errors::{Error, Result};
pub use self::log_batch::{EntryExt, LogBatch};
pub type RaftLogEngine<X, Y> = self::engine::Engine<X, Y, PipeLog>;

#[derive(Clone, Copy, Default)]
pub struct CacheStats {
Expand Down
119 changes: 53 additions & 66 deletions src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ impl Default for EntryIndex {
* | |
* first entry last entry
*/

pub struct MemTable<E: Message + Clone, W: EntryExt<E>> {
region_id: u64,

Expand Down Expand Up @@ -344,7 +343,7 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
}
}

pub(crate) fn fetch_entries_to(
pub fn fetch_entries_to(
&self,
begin: u64,
end: u64,
Expand Down Expand Up @@ -408,23 +407,6 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
Ok(())
}

pub fn fetch_all(&self, vec: &mut Vec<E>, vec_idx: &mut Vec<EntryIndex>) {
if self.entries_index.is_empty() {
return;
}

let begin = self.entries_index.front().unwrap().index;
let end = self.entries_index.back().unwrap().index + 1;
self.fetch_entries_to(begin, end, None, vec, vec_idx)
.unwrap();
}

pub fn fetch_all_kvs(&self, vec: &mut Vec<(Vec<u8>, Vec<u8>)>) {
for (key, value) in &self.kvs {
vec.push((key.clone(), value.0.clone()));
}
}

pub fn fetch_rewrite_entries(
&self,
latest_rewrite: u64,
Expand Down Expand Up @@ -472,27 +454,6 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
}
}

pub fn max_file_num(&self, queue: LogQueue) -> Option<u64> {
let entry = match queue {
LogQueue::Append => self.entries_index.back(),
LogQueue::Rewrite if self.rewrite_count == 0 => None,
_ => self.entries_index.get(self.rewrite_count - 1),
};
let ents_max = entry.map(|e| e.file_num);

let kvs_max = self.kvs_max_file_num(queue);
match (ents_max, kvs_max) {
(Some(ents_max), Some(kvs_max)) => Some(cmp::max(ents_max, kvs_max)),
(Some(ents_max), None) => Some(ents_max),
(None, Some(kvs_max)) => Some(kvs_max),
(None, None) => None,
}
}

pub fn kvs_total_count(&self) -> usize {
self.kvs.len()
}

pub fn entries_count(&self) -> usize {
self.entries_index.len()
}
Expand All @@ -516,13 +477,6 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
.fold(None, |min, v| Some(cmp::min(min.unwrap_or(u64::MAX), v.2)))
}

fn kvs_max_file_num(&self, queue: LogQueue) -> Option<u64> {
self.kvs
.values()
.filter(|v| v.1 == queue)
.fold(None, |max, v| Some(cmp::max(max.unwrap_or(0), v.2)))
}

fn count_limit(&self, start_idx: usize, end_idx: usize, max_size: usize) -> usize {
assert!(start_idx < end_idx);
let (first, second) = slices_in_range(&self.entries_index, start_idx, end_idx);
Expand All @@ -538,24 +492,6 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
}
count
}

#[cfg(test)]
fn entries_size(&self) -> usize {
self.entries_index.iter().fold(0, |acc, e| acc + e.len) as usize
}

#[cfg(test)]
fn check_entries_index_and_cache(&self) {
match (self.entries_index.back(), self.entries_cache.back()) {
(Some(ei), Some(ec)) if ei.index != W::index(ec) => panic!(
"entries_index.last = {}, entries_cache.last = {}",
ei.index,
W::index(ec)
),
(None, Some(_)) => panic!("entries_index is empty, but entries_cache isn't"),
_ => return,
}
}
}

impl<E: Message + Clone, W: EntryExt<E>> Drop for MemTable<E, W> {
Expand All @@ -569,9 +505,60 @@ impl<E: Message + Clone, W: EntryExt<E>> Drop for MemTable<E, W> {
#[cfg(test)]
mod tests {
use super::*;

use raft::eraftpb::Entry;

impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
fn max_file_num(&self, queue: LogQueue) -> Option<u64> {
let entry = match queue {
LogQueue::Append => self.entries_index.back(),
LogQueue::Rewrite if self.rewrite_count == 0 => None,
_ => self.entries_index.get(self.rewrite_count - 1),
};
let ents_max = entry.map(|e| e.file_num);

let kvs_max = self.kvs_max_file_num(queue);
match (ents_max, kvs_max) {
(Some(ents_max), Some(kvs_max)) => Some(cmp::max(ents_max, kvs_max)),
(Some(ents_max), None) => Some(ents_max),
(None, Some(kvs_max)) => Some(kvs_max),
(None, None) => None,
}
}
fn kvs_max_file_num(&self, queue: LogQueue) -> Option<u64> {
self.kvs
.values()
.filter(|v| v.1 == queue)
.fold(None, |max, v| Some(cmp::max(max.unwrap_or(0), v.2)))
}

pub fn fetch_all(&self, vec: &mut Vec<E>, vec_idx: &mut Vec<EntryIndex>) {
if self.entries_index.is_empty() {
return;
}

let begin = self.entries_index.front().unwrap().index;
let end = self.entries_index.back().unwrap().index + 1;
self.fetch_entries_to(begin, end, None, vec, vec_idx)
.unwrap();
}

fn entries_size(&self) -> usize {
self.entries_index.iter().fold(0, |acc, e| acc + e.len) as usize
}

fn check_entries_index_and_cache(&self) {
match (self.entries_index.back(), self.entries_cache.back()) {
(Some(ei), Some(ec)) if ei.index != W::index(ec) => panic!(
"entries_index.last = {}, entries_cache.last = {}",
ei.index,
W::index(ec)
),
(None, Some(_)) => panic!("entries_index is empty, but entries_cache isn't"),
_ => return,
}
}
}

#[test]
fn test_memtable_append() {
let region_id = 8;
Expand Down
54 changes: 0 additions & 54 deletions src/metrics.rs

This file was deleted.

21 changes: 6 additions & 15 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::collections::{HashMap as StdHashMap, HashSet as StdHashSet, VecDeque};
pub use std::collections::hash_map::Entry as HashMapEntry;
use std::collections::{HashMap as StdHashMap, VecDeque};
use std::fmt::{self, Write};
use std::hash::BuildHasherDefault;
use std::ops::{Div, Mul};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::thread::{Builder as ThreadBuilder, JoinHandle};
use std::time::Duration;

use crossbeam::channel::{bounded, unbounded, Receiver, RecvTimeoutError, Sender};
use serde::de::{self, Unexpected, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

pub use crossbeam::channel::SendError as ScheduleError;
pub type HashMap<K, V> = StdHashMap<K, V, BuildHasherDefault<fxhash::FxHasher>>;
pub type HashSet<T> = StdHashSet<T, BuildHasherDefault<fxhash::FxHasher>>;
pub use std::collections::hash_map::Entry as HashMapEntry;

const UNIT: u64 = 1;
const DATA_MAGNITUDE: u64 = 1024;
Expand Down Expand Up @@ -199,13 +202,6 @@ pub fn slices_in_range<T>(entry: &VecDeque<T>, low: usize, high: usize) -> (&[T]
}
}

/// Converts Duration to seconds.
pub fn duration_to_sec(d: Duration) -> f64 {
let nanos = f64::from(d.subsec_nanos());
// Most of case, we can't have so large Duration, so here just panic if overflow now.
d.as_secs() as f64 + (nanos / 1_000_000_000.0)
}

pub trait HandyRwLock<T> {
fn wl(&self) -> RwLockWriteGuard<'_, T>;
fn rl(&self) -> RwLockReadGuard<'_, T>;
Expand All @@ -220,11 +216,6 @@ impl<T> HandyRwLock<T> for RwLock<T> {
}
}

pub use crossbeam::channel::SendError as ScheduleError;
use crossbeam::channel::{bounded, unbounded, Receiver, RecvTimeoutError, Sender};
use std::sync::Arc;
use std::thread::{Builder as ThreadBuilder, JoinHandle};

pub trait Runnable<T> {
fn run(&mut self, task: T) -> bool;
fn on_tick(&mut self);
Expand Down

0 comments on commit c615a14

Please sign in to comment.