-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
read entries using AIO #286
base: master
Are you sure you want to change the base?
Changes from 15 commits
b0fa034
147130d
93282f9
169b6fe
ad3cbcf
dfe2340
aadefae
7a6745a
1a41d8c
f3a89c0
1010535
1bc8888
5337a74
0b9988d
198ea08
a732d52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -308,11 +308,45 @@ where | |
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?); | ||
} | ||
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); | ||
|
||
return Ok(ents_idx.len()); | ||
} | ||
Ok(0) | ||
} | ||
|
||
pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>( | ||
&self, | ||
region_id: u64, | ||
begin: u64, | ||
end: u64, | ||
max_size: Option<usize>, | ||
vec: &mut Vec<M::Entry>, | ||
) -> Result<usize> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think based on the tests, you can antomatically select single_read or multi_read and avoid creating two different engine methods, e.g. use aio when blocks.len() > 4 or something. One issue though is that I'm not sure if aio syscall is portable enough. You might need to do some research on how to detect if aio is available (maybe take a look at how RocksDB did it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Let me try. |
||
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); | ||
if let Some(memtable) = self.memtables.get(region_id) { | ||
let length = (end - begin) as usize; | ||
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity(length); | ||
memtable | ||
.read() | ||
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?; | ||
|
||
let mut blocks: Vec<FileBlockHandle> = Vec::new(); | ||
for (t, i) in ents_idx.iter().enumerate() { | ||
if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) { | ||
blocks.push(i.entries.unwrap()); | ||
} | ||
} | ||
let bytes = self.pipe_log.async_read_bytes(blocks)?; | ||
parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec)?; | ||
|
||
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); | ||
|
||
return Ok(ents_idx.len()); | ||
} | ||
|
||
Ok(0) | ||
} | ||
|
||
pub fn first_index(&self, region_id: u64) -> Option<u64> { | ||
if let Some(memtable) = self.memtables.get(region_id) { | ||
return memtable.read().first_index(); | ||
|
@@ -544,7 +578,32 @@ impl BlockCache { | |
thread_local! { | ||
static BLOCK_CACHE: BlockCache = BlockCache::new(); | ||
} | ||
|
||
pub(crate) fn parse_entries_from_bytes<M: MessageExt>( | ||
bytes: Vec<Vec<u8>>, | ||
ents_idx: &mut [EntryIndex], | ||
vec: &mut Vec<M::Entry>, | ||
) -> Result<()> { | ||
let mut decode_buf = vec![]; | ||
let mut seq: i32 = -1; | ||
for (t, idx) in ents_idx.iter().enumerate() { | ||
decode_buf = | ||
match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() { | ||
true => { | ||
seq += 1; | ||
bytes[seq as usize].to_vec() | ||
} | ||
false => decode_buf, | ||
}; | ||
vec.push(parse_from_bytes( | ||
&LogBatch::decode_entries_block( | ||
&decode_buf, | ||
idx.entries.unwrap(), | ||
idx.compression_type, | ||
)?[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], | ||
)?); | ||
} | ||
Ok(()) | ||
} | ||
pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry> | ||
where | ||
M: MessageExt, | ||
|
@@ -683,6 +742,41 @@ mod tests { | |
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data); | ||
} | ||
} | ||
fn scan_entries_aio<FR: Fn(u64, LogQueue, &[u8])>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a newline between functions. |
||
&self, | ||
rid: u64, | ||
start: u64, | ||
end: u64, | ||
reader: FR, | ||
) { | ||
let mut entries = Vec::new(); | ||
self.fetch_entries_to_aio::<Entry>( | ||
rid, | ||
self.first_index(rid).unwrap(), | ||
self.last_index(rid).unwrap() + 1, | ||
None, | ||
&mut entries, | ||
) | ||
.unwrap(); | ||
assert_eq!(entries.len(), (end - start) as usize); | ||
assert_eq!(entries.first().unwrap().index, start); | ||
assert_eq!( | ||
entries.last().unwrap().index, | ||
self.decode_last_index(rid).unwrap() | ||
); | ||
assert_eq!(entries.last().unwrap().index + 1, end); | ||
for e in entries.iter() { | ||
let entry_index = self | ||
.memtables | ||
.get(rid) | ||
.unwrap() | ||
.read() | ||
.get_entry(e.index) | ||
.unwrap(); | ||
assert_eq!(&self.get_entry::<Entry>(rid, e.index).unwrap().unwrap(), e); | ||
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data); | ||
} | ||
} | ||
|
||
fn file_count(&self, queue: Option<LogQueue>) -> usize { | ||
if let Some(queue) = queue { | ||
|
@@ -759,6 +853,55 @@ mod tests { | |
} | ||
} | ||
|
||
#[test] | ||
fn test_async_get_entry() { | ||
let normal_batch_size = 10; | ||
let compressed_batch_size = 5120; | ||
for &entry_size in &[normal_batch_size, compressed_batch_size] { | ||
let dir = tempfile::Builder::new() | ||
.prefix("test_get_entry") | ||
.tempdir() | ||
.unwrap(); | ||
let cfg = Config { | ||
dir: dir.path().to_str().unwrap().to_owned(), | ||
target_file_size: ReadableSize(1), | ||
..Default::default() | ||
}; | ||
|
||
let engine = RaftLogEngine::open_with_file_system( | ||
cfg.clone(), | ||
Arc::new(ObfuscatedFileSystem::default()), | ||
) | ||
.unwrap(); | ||
assert_eq!(engine.path(), dir.path().to_str().unwrap()); | ||
let data = vec![b'x'; entry_size]; | ||
for i in 10..20 { | ||
let rid = i; | ||
let index = i; | ||
engine.append(rid, index, index + 2, Some(&data)); | ||
} | ||
for i in 10..20 { | ||
let rid = i; | ||
let index = i; | ||
engine.scan_entries_aio(rid, index, index + 2, |_, q, d| { | ||
assert_eq!(q, LogQueue::Append); | ||
assert_eq!(d, &data); | ||
}); | ||
} | ||
|
||
// Recover the engine. | ||
let engine = engine.reopen(); | ||
for i in 10..20 { | ||
let rid = i; | ||
let index = i; | ||
engine.scan_entries_aio(rid, index, index + 2, |_, q, d| { | ||
assert_eq!(q, LogQueue::Append); | ||
assert_eq!(d, &data); | ||
}); | ||
} | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_clean_raft_group() { | ||
fn run_steps(steps: &[Option<(u64, u64)>]) { | ||
|
@@ -1994,6 +2137,20 @@ mod tests { | |
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle; | ||
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader; | ||
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer; | ||
type AsyncIoContext = <ObfuscatedFileSystem as FileSystem>::AsyncIoContext; | ||
|
||
fn async_read( | ||
&self, | ||
ctx: &mut Self::AsyncIoContext, | ||
handle: Arc<Self::Handle>, | ||
block: &FileBlockHandle, | ||
) -> std::io::Result<()> { | ||
self.inner.async_read(ctx, handle, block) | ||
} | ||
|
||
fn async_finish(&self, ctx: Self::AsyncIoContext) -> std::io::Result<Vec<Vec<u8>>> { | ||
self.inner.async_finish(ctx) | ||
} | ||
|
||
fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> { | ||
let handle = self.inner.create(&path)?; | ||
|
@@ -2056,6 +2213,10 @@ mod tests { | |
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> { | ||
self.inner.new_writer(h) | ||
} | ||
|
||
fn new_async_io_context(&self) -> std::io::Result<Self::AsyncIoContext> { | ||
self.inner.new_async_io_context() | ||
} | ||
} | ||
|
||
#[test] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,18 +3,23 @@ | |
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; | ||
use std::os::unix::io::RawFd; | ||
use std::path::Path; | ||
use std::pin::Pin; | ||
use std::slice; | ||
use std::sync::Arc; | ||
|
||
use fail::fail_point; | ||
use log::error; | ||
use nix::errno::Errno; | ||
use nix::fcntl::{self, OFlag}; | ||
use nix::sys::aio::{aio_suspend, Aio, AioRead}; | ||
use nix::sys::signal::SigevNotify; | ||
use nix::sys::stat::Mode; | ||
use nix::sys::uio::{pread, pwrite}; | ||
use nix::unistd::{close, ftruncate, lseek, Whence}; | ||
use nix::NixPath; | ||
|
||
use crate::env::{FileSystem, Handle, WriteExt}; | ||
use crate::pipe_log::FileBlockHandle; | ||
|
||
fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { | ||
let kind = std::io::Error::from(e).kind(); | ||
|
@@ -256,13 +261,52 @@ impl WriteExt for LogFile { | |
self.inner.allocate(offset, size) | ||
} | ||
} | ||
#[derive(Default)] | ||
pub struct AioContext { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't use this name outside this file. Just like |
||
aio_vec: Vec<Pin<Box<AioRead<'static>>>>, | ||
buf_vec: Vec<Vec<u8>>, | ||
} | ||
|
||
pub struct DefaultFileSystem; | ||
|
||
impl FileSystem for DefaultFileSystem { | ||
type Handle = LogFd; | ||
type Reader = LogFile; | ||
type Writer = LogFile; | ||
type AsyncIoContext = AioContext; | ||
|
||
fn async_read( | ||
&self, | ||
ctx: &mut Self::AsyncIoContext, | ||
handle: Arc<Self::Handle>, | ||
block: &FileBlockHandle, | ||
) -> IoResult<()> { | ||
let buf = vec![0_u8; block.len]; | ||
ctx.buf_vec.push(buf); | ||
|
||
let mut aior = Box::pin(AioRead::new( | ||
handle.0, | ||
block.offset as i64, | ||
unsafe { | ||
slice::from_raw_parts_mut(ctx.buf_vec.last_mut().unwrap().as_mut_ptr(), block.len) | ||
}, | ||
0, | ||
SigevNotify::SigevNone, | ||
)); | ||
aior.as_mut().submit()?; | ||
ctx.aio_vec.push(aior); | ||
|
||
Ok(()) | ||
} | ||
fn async_finish(&self, mut ctx: Self::AsyncIoContext) -> IoResult<Vec<Vec<u8>>> { | ||
for seq in 0..ctx.aio_vec.len() { | ||
let buf_len = ctx.buf_vec[seq].len(); | ||
aio_suspend(&[&*ctx.aio_vec[seq]], None)?; | ||
assert_eq!(ctx.aio_vec[seq].as_mut().aio_return()?, buf_len); | ||
} | ||
let res = ctx.buf_vec.to_owned(); | ||
Ok(res) | ||
} | ||
|
||
fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> { | ||
LogFd::create(path.as_ref()) | ||
|
@@ -288,4 +332,8 @@ impl FileSystem for DefaultFileSystem { | |
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> { | ||
Ok(LogFile::new(handle)) | ||
} | ||
|
||
fn new_async_io_context(&self) -> IoResult<Self::AsyncIoContext> { | ||
Ok(AioContext::default()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,11 +10,21 @@ mod obfuscated; | |
pub use default::DefaultFileSystem; | ||
pub use obfuscated::ObfuscatedFileSystem; | ||
|
||
use crate::pipe_log::FileBlockHandle; | ||
/// FileSystem | ||
pub trait FileSystem: Send + Sync { | ||
type Handle: Send + Sync + Handle; | ||
type Reader: Seek + Read + Send; | ||
type Writer: Seek + Write + Send + WriteExt; | ||
type AsyncIoContext; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This to MultiReadContext. |
||
|
||
fn async_read( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since all the async happens inside the implementation, we can rename this to something like RocksDB's MultiGet, i.e. multi_read. |
||
&self, | ||
ctx: &mut Self::AsyncIoContext, | ||
handle: Arc<Self::Handle>, | ||
block: &FileBlockHandle, | ||
) -> Result<()>; | ||
fn async_finish(&self, ctx: Self::AsyncIoContext) -> Result<Vec<Vec<u8>>>; | ||
|
||
fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>; | ||
|
||
|
@@ -55,6 +65,8 @@ pub trait FileSystem: Send + Sync { | |
fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>; | ||
|
||
fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>; | ||
|
||
fn new_async_io_context(&self) -> Result<Self::AsyncIoContext>; | ||
} | ||
|
||
pub trait Handle { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.