Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read entries using AIO #286

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
124 changes: 122 additions & 2 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,29 @@ where
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);

let mut blocks: Vec<FileBlockHandle> = Vec::new();
let mut total_bytes = 0;
for (t, i) in ents_idx.iter().enumerate() {
if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) {
blocks.push(i.entries.unwrap());
total_bytes += i.entries.unwrap().len;
}
}

if blocks.len() > 5 && total_bytes > 1024 * 1024 {
//Async IO
let bytes = self.pipe_log.async_read_bytes(blocks)?;
parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec)?;
} else {
//Sync IO
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
}

ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);

return Ok(ents_idx.len());
}
Ok(0)
Expand Down Expand Up @@ -545,6 +564,38 @@ thread_local! {
static BLOCK_CACHE: BlockCache = BlockCache::new();
}

pub(crate) fn parse_entries_from_bytes<M: MessageExt>(
bytes: Vec<Vec<u8>>,
ents_idx: &mut [EntryIndex],
vec: &mut Vec<M::Entry>,
) -> Result<()> {
let mut seq = 0;
for idx in ents_idx {
BLOCK_CACHE.with(|cache| {
if cache.key.get() != idx.entries.unwrap() {
cache.insert(
idx.entries.unwrap(),
LogBatch::decode_entries_block(
&bytes[seq],
idx.entries.unwrap(),
idx.compression_type,
)
.unwrap(),
);
seq += 1;
}
let e = parse_from_bytes(
&cache.block.borrow()
[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
)
.unwrap();
assert_eq!(M::index(&e), idx.index);
vec.push(e);
});
}
Ok(())
}

pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
where
M: MessageExt,
Expand Down Expand Up @@ -759,6 +810,57 @@ mod tests {
}
}

#[test]
fn test_multi_read_entry() {
let sync_batch_size = 1024;
let async_batch_size = 1024 * 1024;
for &entry_size in &[sync_batch_size, async_batch_size] {
let dir = tempfile::Builder::new()
.prefix("test_multi_read_entry")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};

let engine = RaftLogEngine::open_with_file_system(
cfg.clone(),
Arc::new(ObfuscatedFileSystem::default()),
)
.unwrap();
assert_eq!(engine.path(), dir.path().to_str().unwrap());
let data = vec![b'x'; entry_size];

for i in 0..10 {
for rid in 10..20 {
let index = i + rid;
engine.append(rid, index, index + 1, Some(&data));
}
}
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries(rid, index, index + 10, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}

// Recover the engine.
let engine = engine.reopen();
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries(rid, index, index + 10, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}
}
}

#[test]
fn test_clean_raft_group() {
fn run_steps(steps: &[Option<(u64, u64)>]) {
Expand Down Expand Up @@ -1994,6 +2096,20 @@ mod tests {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
type MultiReadContext = <ObfuscatedFileSystem as FileSystem>::MultiReadContext;

fn multi_read(
&self,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> std::io::Result<()> {
self.inner.multi_read(ctx, handle, block)
}

fn async_finish(&self, ctx: Self::MultiReadContext) -> std::io::Result<Vec<Vec<u8>>> {
self.inner.async_finish(ctx)
}

fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.create(&path)?;
Expand Down Expand Up @@ -2056,6 +2172,10 @@ mod tests {
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
self.inner.new_writer(h)
}

fn new_async_io_context(&self) -> std::io::Result<Self::MultiReadContext> {
self.inner.new_async_io_context()
}
}

#[test]
Expand Down
49 changes: 49 additions & 0 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::os::unix::io::RawFd;
use std::path::Path;
use std::pin::Pin;
use std::slice;
use std::sync::Arc;

use fail::fail_point;
use log::error;
use nix::errno::Errno;
use nix::fcntl::{self, OFlag};
use nix::sys::aio::{aio_suspend, Aio, AioRead};
use nix::sys::signal::SigevNotify;
use nix::sys::stat::Mode;
use nix::sys::uio::{pread, pwrite};
use nix::unistd::{close, ftruncate, lseek, Whence};
use nix::NixPath;

use crate::env::{FileSystem, Handle, WriteExt};
use crate::pipe_log::FileBlockHandle;

fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error {
let kind = std::io::Error::from(e).kind();
Expand Down Expand Up @@ -256,13 +261,53 @@ impl WriteExt for LogFile {
self.inner.allocate(offset, size)
}
}
#[derive(Default)]
pub struct AioContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use this name outside this file. Just like type Handle = <DefaultFileSystem as FileSystem>::Handle;, you can use the same syntax to reference aio context of base file system without needing to expose this struct.

aio_vec: Vec<Pin<Box<AioRead<'static>>>>,
buf_vec: Vec<Vec<u8>>,
}

pub struct DefaultFileSystem;

impl FileSystem for DefaultFileSystem {
type Handle = LogFd;
type Reader = LogFile;
type Writer = LogFile;
type MultiReadContext = AioContext;

fn multi_read(
&self,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> IoResult<()> {
let buf = vec![0_u8; block.len];
ctx.buf_vec.push(buf);

let mut aior = Box::pin(AioRead::new(
handle.0,
block.offset as i64,
unsafe {
slice::from_raw_parts_mut(ctx.buf_vec.last_mut().unwrap().as_mut_ptr(), block.len)
},
0,
SigevNotify::SigevNone,
));
aior.as_mut().submit()?;
ctx.aio_vec.push(aior);

Ok(())
}

fn async_finish(&self, mut ctx: Self::MultiReadContext) -> IoResult<Vec<Vec<u8>>> {
for seq in 0..ctx.aio_vec.len() {
let buf_len = ctx.buf_vec[seq].len();
aio_suspend(&[&*ctx.aio_vec[seq]], None)?;
assert_eq!(ctx.aio_vec[seq].as_mut().aio_return()?, buf_len);
}
let res = ctx.buf_vec.to_owned();
Ok(res)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
LogFd::create(path.as_ref())
Expand All @@ -288,4 +333,8 @@ impl FileSystem for DefaultFileSystem {
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(LogFile::new(handle))
}

fn new_async_io_context(&self) -> IoResult<Self::MultiReadContext> {
Ok(AioContext::default())
}
}
12 changes: 12 additions & 0 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ mod obfuscated;
pub use default::DefaultFileSystem;
pub use obfuscated::ObfuscatedFileSystem;

use crate::pipe_log::FileBlockHandle;
/// FileSystem
pub trait FileSystem: Send + Sync {
type Handle: Send + Sync + Handle;
type Reader: Seek + Read + Send;
type Writer: Seek + Write + Send + WriteExt;
type MultiReadContext;

fn multi_read(
&self,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> Result<()>;
fn async_finish(&self, ctx: Self::MultiReadContext) -> Result<Vec<Vec<u8>>>;

fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;

Expand Down Expand Up @@ -55,6 +65,8 @@ pub trait FileSystem: Send + Sync {
fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>;

fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;

fn new_async_io_context(&self) -> Result<Self::MultiReadContext>;
}

pub trait Handle {
Expand Down
27 changes: 27 additions & 0 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;

use crate::env::{DefaultFileSystem, FileSystem, WriteExt};

use crate::pipe_log::FileBlockHandle;
pub struct ObfuscatedReader(<DefaultFileSystem as FileSystem>::Reader);

impl Read for ObfuscatedReader {
Expand Down Expand Up @@ -89,6 +90,28 @@ impl FileSystem for ObfuscatedFileSystem {
type Handle = <DefaultFileSystem as FileSystem>::Handle;
type Reader = ObfuscatedReader;
type Writer = ObfuscatedWriter;
type MultiReadContext = <DefaultFileSystem as FileSystem>::MultiReadContext;

fn multi_read(
&self,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> IoResult<()> {
self.inner.multi_read(ctx, handle, block)
}

fn async_finish(&self, ctx: Self::MultiReadContext) -> IoResult<Vec<Vec<u8>>> {
let mut base = self.inner.async_finish(ctx).unwrap();

for v in base.iter_mut() {
for c in v.iter_mut() {
// do obfuscation.
*c = c.wrapping_sub(1);
}
}
Ok(base)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
let r = self.inner.create(path);
Expand Down Expand Up @@ -127,4 +150,8 @@ impl FileSystem for ObfuscatedFileSystem {
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(ObfuscatedWriter(self.inner.new_writer(handle)?))
}

fn new_async_io_context(&self) -> IoResult<Self::MultiReadContext> {
self.inner.new_async_io_context()
}
}
2 changes: 1 addition & 1 deletion src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl<F: FileSystem> LogFileReader<F> {
}

pub fn read(&mut self, handle: FileBlockHandle) -> Result<Vec<u8>> {
let mut buf = vec![0; handle.len as usize];
let mut buf = vec![0; handle.len];
let size = self.read_to(handle.offset, &mut buf)?;
buf.truncate(size);
Ok(buf)
Expand Down
21 changes: 21 additions & 0 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use parking_lot::{Mutex, MutexGuard, RwLock};
use crate::config::Config;
use crate::env::FileSystem;
use crate::event_listener::EventListener;
use crate::memtable::EntryIndex;
use crate::metrics::*;
use crate::pipe_log::{
FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes,
Expand Down Expand Up @@ -254,6 +255,15 @@ impl<F: FileSystem> SinglePipe<F> {
reader.read(handle)
}

fn async_read(&self, ctx: &mut F::MultiReadContext, blocks: Vec<FileBlockHandle>) {
for block in blocks.iter() {
let fd = self.get_fd(block.id.seq).unwrap();
self.file_system
.multi_read(ctx, fd, block)
.expect("Async read failed.");
}
}

fn append<T: ReactiveBytes + ?Sized>(&self, bytes: &mut T) -> Result<FileBlockHandle> {
fail_point!("file_pipe_log::append");
let mut writable_file = self.writable_file.lock();
Expand Down Expand Up @@ -444,6 +454,17 @@ impl<F: FileSystem> PipeLog for DualPipes<F> {
self.pipes[handle.id.queue as usize].read_bytes(handle)
}

#[inline]
fn async_read_bytes(&self, blocks: Vec<FileBlockHandle>) -> Result<Vec<Vec<u8>>> {
let fs = &self.pipes[LogQueue::Append as usize].file_system;
let mut ctx = fs.new_async_io_context()?;

self.pipes[LogQueue::Append as usize].async_read(&mut ctx, blocks);
let res = fs.async_finish(ctx)?;

Ok(res)
}

#[inline]
fn append<T: ReactiveBytes + ?Sized>(
&self,
Expand Down
4 changes: 4 additions & 0 deletions src/pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::cmp::Ordering;
use std::fmt::{self, Display};

use crate::memtable::EntryIndex;
use fail::fail_point;
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::ToPrimitive;
Expand Down Expand Up @@ -172,6 +173,9 @@ pub trait PipeLog: Sized {
/// Reads some bytes from the specified position.
fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>>;

/// Reads bytes from multi blocks using 'Async IO'.
fn async_read_bytes(&self, blocks: Vec<FileBlockHandle>) -> Result<Vec<Vec<u8>>>;

/// Appends some bytes to the specified log queue. Returns file position of
/// the written bytes.
fn append<T: ReactiveBytes + ?Sized>(
Expand Down