Skip to content

Commit

Permalink
Optimize code structure(version 1.1).
Browse files Browse the repository at this point in the history
Signed-off-by: root <[email protected]>
  • Loading branch information
ustc-wxy authored and root committed Dec 7, 2022
1 parent 40812d0 commit da569a8
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 144 deletions.
166 changes: 91 additions & 75 deletions src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.

use libc::{aio_return, aiocb};
use std::cell::{Cell, RefCell};
use std::marker::PhantomData;
use std::mem;
use std::path::Path;
use std::sync::{mpsc, Arc, Mutex};
use std::thread::{Builder as ThreadBuilder, JoinHandle};
use std::time::{Duration, Instant};
use libc::aiocb;

use log::{error, info};
use protobuf::{parse_from_bytes, Message};
Expand Down Expand Up @@ -311,13 +311,16 @@ where
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
println!("[fetch_entries_to] time cost: {:?} us", start.elapsed().as_micros());
println!(
"[fetch_entries_to] time cost: {:?} us",
start.elapsed().as_micros()
);
return Ok(ents_idx.len());
}
Ok(0)
}

pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M> >(
pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>(
&self,
region_id: u64,
begin: u64,
Expand All @@ -326,80 +329,96 @@ where
vec: &mut Vec<M::Entry>,
) -> Result<usize> {
let start = Instant::now();
println!("[fetch_entries_to_aio] region id: {} left: {}, right: {}",region_id,begin,end);
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let length = (end - begin) as usize;
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity(length);
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
println!("[fetch_entries_to_aio] (stage1) time cost: {:?} us", start.elapsed().as_micros());
println!(
"[fetch_entries_to_aio] (stage1) time cost: {:?} us",
start.elapsed().as_micros()
);

let mut new_block_flags:Vec<bool> = Vec::with_capacity(length);
let mut new_block_flags: Vec<bool> = Vec::with_capacity(length);
let mut block_sum = 0;
for (t,i) in ents_idx.iter().enumerate(){
for (t, i) in ents_idx.iter().enumerate() {
if match t {
0 => true,
_ => ents_idx[t-1].entries.unwrap() != ents_idx[t].entries.unwrap(),
}{
_ => ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap(),
} {
block_sum += 1;
new_block_flags.push(true);
}else{
} else {
new_block_flags.push(false);
}
}
let mut a_list: Vec<aiocb> = Vec::with_capacity(block_sum);
let mut ctx_vec: Vec<Arc<AioContext>> = Vec::with_capacity(block_sum);
unsafe {
for i in 0..block_sum{
a_list.push(mem::zeroed::<libc::aiocb>());
}
}
for (t,i) in ents_idx.iter().enumerate() {
if new_block_flags[t]{
ctx_vec.push(submit_read_request_to_file(self.pipe_log.as_ref(), &mut a_list[t],i.entries.unwrap(), )?);

let mut ctx = AioContext::new(block_sum);
for (seq, i) in ents_idx.iter().enumerate() {
if new_block_flags[seq] {
submit_read_request_to_file(
self.pipe_log.as_ref(),
seq,
&mut ctx,
i.entries.unwrap(),
)
.unwrap();
}
}
println!("[fetch_entries_to_aio] (stage2) time cost: {:?} us", start.elapsed().as_micros());
let mut j = 0;

ctx_vec[0].wait()?;
let mut decode_buf = LogBatch::decode_entries_block(
&ctx_vec[0].buf.lock().unwrap(),
ents_idx[0].entries.unwrap(),
ents_idx[0].compression_type,
).unwrap();

for (t,i) in ents_idx.iter().enumerate() {
decode_buf = match t{
0 => decode_buf,
println!(
"[fetch_entries_to_aio] (stage2) time cost: {:?} us",
start.elapsed().as_micros()
);

let mut seq = 0;
let mut decode_buf = vec![];

for (t, i) in ents_idx.iter().enumerate() {
decode_buf = match t {
0 => {
ctx.single_wait(0).unwrap();
LogBatch::decode_entries_block(
&ctx.data(0).lock().unwrap(),
ents_idx[0].entries.unwrap(),
ents_idx[0].compression_type,
)
.unwrap()
}
_ => match new_block_flags[t] {
true => {
j+=1;
ctx_vec[j].wait();
seq += 1;
ctx.single_wait(seq).unwrap();
LogBatch::decode_entries_block(
&ctx_vec[j].buf.lock().unwrap(),
&ctx.data(seq).lock().unwrap(),
i.entries.unwrap(),
i.compression_type,
).unwrap()
},
)
.unwrap()
}
false => decode_buf,
}
},
};
let e = parse_from_bytes::<M>(
&mut decode_buf[(i.entry_offset) as usize .. (i.entry_offset + i.entry_len) as usize]).unwrap();
vec.push(e);
vec.push(
parse_from_bytes::<M>(
&mut decode_buf
[(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize],
)
.unwrap(),
);
}
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
println!("[fetch_entries_to_aio] (end) time cost: {:?} us", start.elapsed().as_micros());
println!(
"[fetch_entries_to_aio] (end) time cost: {:?} us",
start.elapsed().as_micros()
);
return Ok(ents_idx.len());
}

Ok(0)
}


pub fn first_index(&self, region_id: u64) -> Option<u64> {
if let Some(memtable) = self.memtables.get(region_id) {
return memtable.read().first_index();
Expand Down Expand Up @@ -673,11 +692,17 @@ where
})
}

pub(crate) fn submit_read_request_to_file<P>(pipe_log: &P, mut a:&mut aiocb,handle: FileBlockHandle) -> Result<Arc<AioContext>>
where
P: PipeLog,
pub(crate) fn submit_read_request_to_file<P>(
pipe_log: &P,
seq: usize,
ctx: &mut AioContext,
handle: FileBlockHandle,
) -> Result<()>
where
P: PipeLog,
{
Ok(Arc::new(pipe_log.read_bytes_aio(a,handle)?))
pipe_log.read_bytes_aio(seq, ctx, handle).unwrap();
Ok(())
}

#[cfg(test)]
Expand All @@ -689,11 +714,11 @@ mod tests {
use crate::test_util::{generate_entries, PanicGuard};
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use libc::aiocb;
use raft::eraftpb::Entry;
use std::collections::BTreeSet;
use std::fs::OpenOptions;
use std::path::PathBuf;
use libc::aiocb;

type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
impl<F: FileSystem> RaftLogEngine<F> {
Expand Down Expand Up @@ -787,7 +812,7 @@ mod tests {
None,
&mut entries,
)
.unwrap();
.unwrap();
assert_eq!(entries.len(), (end - start) as usize);
assert_eq!(entries.first().unwrap().index, start);
assert_eq!(
Expand Down Expand Up @@ -884,13 +909,13 @@ mod tests {
}

#[test]
fn test_async_read(){
let normal_batch_size = 4096;
fn test_async_read() {
let normal_batch_size = 8192;
let compressed_batch_size = 5120;
for &entry_size in &[normal_batch_size] {
if entry_size == normal_batch_size{
if entry_size == normal_batch_size {
println!("[normal_batch_size]");
}else if entry_size == compressed_batch_size{
} else if entry_size == compressed_batch_size {
println!("[compressed_batch_size]");
}
let dir = tempfile::Builder::new()
Expand All @@ -903,39 +928,34 @@ mod tests {
..Default::default()
};


let engine = RaftLogEngine::open_with_file_system(
cfg.clone(),
Arc::new(DefaultFileSystem),
)
.unwrap();
let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), Arc::new(DefaultFileSystem))
.unwrap();

assert_eq!(engine.path(), dir.path().to_str().unwrap());
let data = vec![b'x'; entry_size];
for i in 10..510{
for rid in 10..15{
for i in 10..1010 {
for rid in 10..15 {
let index = i;
engine.append(rid,index,index+1,Some(&data));
engine.append(rid, index, index + 1, Some(&data));
}
}
for i in 10..15 {
let rid = i;
let index = 10;
println!("[PREAD]");
engine.scan_entries(rid, index, index + 500, |_, q, d| {
engine.scan_entries(rid, index, index + 1000, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
println!("[AIO]");
engine.scan_entries_aio(rid, index, index + 500, |_, q, d| {
engine.scan_entries_aio(rid, index, index + 1000, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
println!("====================================================================================");
}

}

}

#[test]
Expand Down Expand Up @@ -2143,10 +2163,10 @@ mod tests {
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;

fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()> {
// todo!()
Ok(())
}
// fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()>
// { // todo!()
// Ok(())
// }

fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.create(&path)?;
Expand Down Expand Up @@ -2206,10 +2226,6 @@ mod tests {
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
self.inner.new_writer(h)
}

fn new_async_context(&self, handle: Arc<Self::Handle>, ptr: *mut aiocb, buf: Arc<Mutex<Vec<u8>>>) -> std::io::Result<AioContext> {
self.inner.new_async_context(handle,ptr,buf)
}
}

#[test]
Expand Down
Loading

0 comments on commit da569a8

Please sign in to comment.