Skip to content

Commit

Permalink
improve write logic to reduce cpu usage (#46)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu authored Sep 18, 2020
1 parent b38b170 commit c2cbe93
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 94 deletions.
10 changes: 8 additions & 2 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,17 @@ where
loop {
match LogBatch::from_bytes(&mut buf, file_num, offset) {
Ok(Some(mut log_batch)) => {
let entries_size = log_batch.entries_size();
let mut encoded_size = 0;
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
encoded_size += entries.encoded_size.get();
}
}

if let Some(tracker) = pipe_log.cache_submitor().get_cache_tracker(
file_num,
offset,
entries_size,
encoded_size,
) {
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![feature(shrink_to)]
#![feature(cell_update)]

#[macro_export]
macro_rules! box_err {
Expand Down
160 changes: 86 additions & 74 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::borrow::{Borrow, Cow};
use std::cell::RefCell;
use std::cell::{Cell, RefCell};
use std::io::BufRead;
use std::marker::PhantomData;
use std::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -39,54 +39,50 @@ fn crc32(data: &[u8]) -> u32 {
mod lz4 {
use std::{i32, ptr};

// TODO: use in place compression instead.
#[inline]
pub fn encode_block(src: &[u8]) -> Vec<u8> {
pub fn encode_block(src: &[u8], head_reserve: usize, tail_alloc: usize) -> Vec<u8> {
unsafe {
let bound = lz4_sys::LZ4_compressBound(src.len() as i32);
if bound > 0 && src.len() <= i32::MAX as usize {
let mut output = Vec::<u8>::with_capacity(bound as usize + 4);
let le_len = src.len().to_le_bytes();
ptr::copy_nonoverlapping(le_len.as_ptr(), output.as_mut_ptr(), 4);
let size = lz4_sys::LZ4_compress_default(
src.as_ptr() as _,
output.as_mut_ptr().add(4) as _,
src.len() as i32,
bound,
);
if size > 0 {
output.set_len(size as usize + 4);
return output;
}
panic!("compression fail: {}", size);
}
panic!("input size is too large: {}", src.len());
assert!(bound > 0 && src.len() <= i32::MAX as usize);

// Layout: { header | decoded_len | content | checksum }.
let capacity = head_reserve + 4 + bound as usize + tail_alloc;
let mut output: Vec<u8> = Vec::with_capacity(capacity);

let le_len = src.len().to_le_bytes();
ptr::copy_nonoverlapping(le_len.as_ptr(), output.as_mut_ptr().add(head_reserve), 4);

let size = lz4_sys::LZ4_compress_default(
src.as_ptr() as _,
output.as_mut_ptr().add(head_reserve + 4) as _,
src.len() as i32,
bound,
);
assert!(size > 0);
output.set_len(head_reserve + 4 + size as usize);
output
}
}

#[inline]
pub fn decode_block(src: &[u8]) -> Vec<u8> {
assert!(src.len() > 4, "data is too short: {} <= 4", src.len());
unsafe {
if src.len() > 4 {
let len = u32::from_le(ptr::read_unaligned(src.as_ptr() as *const u32));
let mut dst = Vec::with_capacity(len as usize);
let l = lz4_sys::LZ4_decompress_safe(
src.as_ptr().add(4) as _,
dst.as_mut_ptr() as _,
src.len() as i32 - 4,
dst.capacity() as i32,
);
if l == len as i32 {
dst.set_len(l as usize);
return dst;
}
if l < 0 {
panic!("decompress failed: {}", l);
} else {
panic!("length of decompress result not match {} != {}", len, l);
}
let len = u32::from_le(ptr::read_unaligned(src.as_ptr() as *const u32));
let mut dst = Vec::with_capacity(len as usize);
let l = lz4_sys::LZ4_decompress_safe(
src.as_ptr().add(4) as _,
dst.as_mut_ptr() as _,
src.len() as i32 - 4,
dst.capacity() as i32,
);
if l == len as i32 {
dst.set_len(l as usize);
return dst;
}
if l < 0 {
panic!("decompress failed: {}", l);
} else {
panic!("length of decompress result not match {} != {}", len, l);
}
panic!("data is too short: {} <= 4", src.len());
}
}

Expand All @@ -96,7 +92,7 @@ mod lz4 {
fn test_basic() {
let data: Vec<&'static [u8]> = vec![b"", b"123", b"12345678910"];
for d in data {
let compressed = super::encode_block(d);
let compressed = super::encode_block(d, 0, 0);
assert!(compressed.len() > 4);
let res = super::decode_block(&compressed);
assert_eq!(res, d);
Expand Down Expand Up @@ -136,6 +132,8 @@ where
pub entries: Vec<E>,
// EntryIndex may be update after write to file.
pub entries_index: RefCell<Vec<EntryIndex>>,

pub encoded_size: Cell<usize>,
}

impl<E: Message + PartialEq> PartialEq for Entries<E> {
Expand All @@ -147,12 +145,17 @@ impl<E: Message + PartialEq> PartialEq for Entries<E> {
impl<E: Message> Entries<E> {
pub fn new(entries: Vec<E>, entries_index: Option<Vec<EntryIndex>>) -> Entries<E> {
let len = entries.len();
let (encoded_size, entries_index) = match entries_index {
Some(index) => (
index.iter().fold(0, |acc, x| acc + x.len as usize),
RefCell::new(index),
),
None => (0, RefCell::new(vec![EntryIndex::default(); len])),
};
Entries {
entries,
entries_index: match entries_index {
Some(index) => RefCell::new(index),
None => RefCell::new(vec![EntryIndex::default(); len]),
},
entries_index,
encoded_size: Cell::new(encoded_size),
}
}

Expand Down Expand Up @@ -207,35 +210,45 @@ impl<E: Message> Entries<E> {
// This offset doesn't count the header.
entries_index[i].offset = vec.len() as u64;
entries_index[i].len = content.len() as u64;
self.encoded_size.update(|x| x + content.len());
}

vec.extend_from_slice(&content);
}
Ok(())
}

pub fn update_position(&self, queue: LogQueue, file_num: u64, base: u64) {
pub fn update_position(
&self,
queue: LogQueue,
file_num: u64,
base: u64,
chunk_size: &Option<Arc<AtomicUsize>>,
) {
for idx in self.entries_index.borrow_mut().iter_mut() {
debug_assert!(idx.file_num == 0 && idx.base_offset == 0);
idx.queue = queue;
idx.file_num = file_num;
idx.base_offset = base;
if let Some(ref chunk_size) = chunk_size {
idx.cache_tracker = Some(CacheTracker {
chunk_size: chunk_size.clone(),
sub_on_drop: idx.len as usize,
});
}
}
}

pub fn attach_cache_tracker(&self, size: Arc<AtomicUsize>) {
pub fn attach_cache_tracker(&self, chunk_size: Arc<AtomicUsize>) {
for idx in self.entries_index.borrow_mut().iter_mut() {
let sub_on_drop = idx.len as usize;
let chunk_size = size.clone();
let tracker = CacheTracker {
chunk_size,
sub_on_drop,
};
idx.cache_tracker = Some(tracker);
idx.cache_tracker = Some(CacheTracker {
chunk_size: chunk_size.clone(),
sub_on_drop: idx.len as usize,
});
}
}

pub fn update_compression_type(&self, compression_type: CompressionType, batch_len: u64) {
fn update_compression_type(&self, compression_type: CompressionType, batch_len: u64) {
for idx in self.entries_index.borrow_mut().iter_mut() {
idx.compression_type = compression_type;
idx.batch_len = batch_len;
Expand Down Expand Up @@ -570,7 +583,7 @@ where
}

// TODO: avoid to write a large batch into one compressed chunk.
pub fn encode_to_bytes(&self) -> Option<Vec<u8>> {
pub fn encode_to_bytes(&self, encoded_size: &mut usize) -> Option<Vec<u8>> {
if self.items.is_empty() {
return None;
}
Expand All @@ -584,9 +597,7 @@ where
}

let compression_type = if vec.len() > COMPRESSION_SIZE {
let dst = lz4::encode_block(&vec[8..]);
vec.truncate(8);
vec.extend_from_slice(&dst);
vec = lz4::encode_block(&vec[HEADER_LEN..], HEADER_LEN, 4);
CompressionType::Lz4
} else {
CompressionType::None
Expand All @@ -602,24 +613,13 @@ where
let batch_len = (vec.len() - 8) as u64;
for item in &self.items {
if let LogItemContent::Entries(ref entries) = item.content {
*encoded_size += entries.encoded_size.get();
entries.update_compression_type(compression_type, batch_len as u64);
}
}

Some(vec)
}

pub fn entries_size(&self) -> usize {
let mut size = 0;
for item in &self.items {
if let LogItemContent::Entries(ref entries) = item.content {
for entry in entries.entries_index.borrow().iter() {
size += entry.len as usize;
}
}
}
size
}
}

pub fn test_batch_checksum(buf: &[u8]) -> Result<()> {
Expand Down Expand Up @@ -716,15 +716,27 @@ mod tests {
fn test_log_batch_enc_dec() {
let region_id = 8;
let mut batch = LogBatch::<Entry, Entry>::new();
batch.add_entries(region_id, vec![Entry::new(); 10]);
let mut entry = Entry::new();
entry.set_data(vec![b'x'; 1024]);
batch.add_entries(region_id, vec![entry; 10]);
batch.add_command(region_id, Command::Clean);
batch.put(region_id, b"key".to_vec(), b"value".to_vec());
batch.delete(region_id, b"key2".to_vec());

let encoded = batch.encode_to_bytes().unwrap();
let mut encoded_size = 0;
let encoded = batch.encode_to_bytes(&mut encoded_size).unwrap();
assert_eq!(encoded_size, 10270);

let mut s = encoded.as_slice();
let decoded_batch = LogBatch::from_bytes(&mut s, 0, 0).unwrap().unwrap();
assert_eq!(s.len(), 0);
assert_eq!(batch, decoded_batch);

match &decoded_batch.items[0].content {
LogItemContent::Entries(entries) => {
assert_eq!(entries.encoded_size.get(), encoded_size)
}
_ => unreachable!(),
}
}
}
11 changes: 4 additions & 7 deletions src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,9 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {

if let Some((queue, index)) = self.entries_index.back().map(|e| (e.queue, e.index)) {
if first_index_to_add > index + 1 {
assert_eq!(
queue,
LogQueue::Rewrite,
"memtable {} has a hole",
self.region_id
);
if queue != LogQueue::Rewrite {
panic!("memtable {} has a hole", self.region_id);
}
self.compact_to(index + 1);
}
}
Expand Down Expand Up @@ -444,7 +441,7 @@ impl<E: Message + Clone, W: EntryExt<E>> MemTable<E, W> {
.rev()
.find(|e| e.file_num <= latest_rewrite);
if let (Some(begin), Some(end)) = (begin, end) {
if begin.index < end.index {
if begin.index <= end.index {
return self.fetch_entries_to(begin.index, end.index + 1, None, vec, vec_idx);
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,8 @@ impl GenericPipeLog for PipeLog {
mut sync: bool,
file_num: &mut u64,
) -> Result<usize> {
if let Some(content) = batch.encode_to_bytes() {
let entries_size = batch.entries_size();

let mut entries_size = 0;
if let Some(content) = batch.encode_to_bytes(&mut entries_size) {
// TODO: `pwrite` is performed in the mutex. Is it possible for concurrence?
let mut cache_submitor = self.cache_submitor.lock().unwrap();
let (cur_file_num, offset, fd) = self.append(LogQueue::Append, &content, &mut sync)?;
Expand All @@ -456,12 +455,9 @@ impl GenericPipeLog for PipeLog {
fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?;
}

if let Some(tracker) = tracker {
for item in &batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
entries.update_position(LogQueue::Append, cur_file_num, offset);
entries.attach_cache_tracker(tracker.clone());
}
for item in &batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker);
}
}

Expand All @@ -477,14 +473,15 @@ impl GenericPipeLog for PipeLog {
mut sync: bool,
file_num: &mut u64,
) -> Result<usize> {
if let Some(content) = batch.encode_to_bytes() {
let mut encoded_size = 0;
if let Some(content) = batch.encode_to_bytes(&mut encoded_size) {
let (cur_file_num, offset, fd) = self.append(LogQueue::Rewrite, &content, &mut sync)?;
if sync {
fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?;
}
for item in &batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
entries.update_position(LogQueue::Rewrite, cur_file_num, offset);
entries.update_position(LogQueue::Rewrite, cur_file_num, offset, &None);
}
}
*file_num = cur_file_num;
Expand Down

0 comments on commit c2cbe93

Please sign in to comment.