diff --git a/src/engine.rs b/src/engine.rs index 180fba47..9a6984eb 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,5 +1,6 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use libc::{aio_return, aiocb}; use std::cell::{Cell, RefCell}; use std::marker::PhantomData; use std::mem; @@ -7,7 +8,6 @@ use std::path::Path; use std::sync::{mpsc, Arc, Mutex}; use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; -use libc::aiocb; use log::{error, info}; use protobuf::{parse_from_bytes, Message}; @@ -311,13 +311,16 @@ where vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); - println!("[fetch_entries_to] time cost: {:?} us", start.elapsed().as_micros()); + println!( + "[fetch_entries_to] time cost: {:?} us", + start.elapsed().as_micros() + ); return Ok(ents_idx.len()); } Ok(0) } - pub fn fetch_entries_to_aio >( + pub fn fetch_entries_to_aio>( &self, region_id: u64, begin: u64, @@ -326,7 +329,6 @@ where vec: &mut Vec, ) -> Result { let start = Instant::now(); - println!("[fetch_entries_to_aio] region id: {} left: {}, right: {}",region_id,begin,end); let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { let length = (end - begin) as usize; @@ -334,72 +336,89 @@ where memtable .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; - println!("[fetch_entries_to_aio] (stage1) time cost: {:?} us", start.elapsed().as_micros()); + println!( + "[fetch_entries_to_aio] (stage1) time cost: {:?} us", + start.elapsed().as_micros() + ); - let mut new_block_flags:Vec = Vec::with_capacity(length); + let mut new_block_flags: Vec = Vec::with_capacity(length); let mut block_sum = 0; - for (t,i) in ents_idx.iter().enumerate(){ + for (t, i) in ents_idx.iter().enumerate() { if match t { 0 => true, - _ => ents_idx[t-1].entries.unwrap() != ents_idx[t].entries.unwrap(), - }{ + _ => ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap(), + } { block_sum += 1; new_block_flags.push(true); - }else{ + } else { new_block_flags.push(false); } } - let mut a_list: Vec = Vec::with_capacity(block_sum); - let mut ctx_vec: Vec> = Vec::with_capacity(block_sum); - unsafe { - for i in 0..block_sum{ - a_list.push(mem::zeroed::()); - } - } - for (t,i) in ents_idx.iter().enumerate() { - if new_block_flags[t]{ - ctx_vec.push(submit_read_request_to_file(self.pipe_log.as_ref(), &mut a_list[t],i.entries.unwrap(), )?); + + let mut ctx = AioContext::new(block_sum); + for (seq, i) in ents_idx.iter().enumerate() { + if new_block_flags[seq] { + submit_read_request_to_file( + self.pipe_log.as_ref(), + seq, + &mut ctx, + i.entries.unwrap(), + ) + .unwrap(); } } - println!("[fetch_entries_to_aio] (stage2) time cost: {:?} us", start.elapsed().as_micros()); - let mut j = 0; - - ctx_vec[0].wait()?; - let mut decode_buf = LogBatch::decode_entries_block( - &ctx_vec[0].buf.lock().unwrap(), - ents_idx[0].entries.unwrap(), - ents_idx[0].compression_type, - ).unwrap(); - - for (t,i) in ents_idx.iter().enumerate() { - decode_buf = match t{ - 0 => decode_buf, + println!( + "[fetch_entries_to_aio] (stage2) time cost: {:?} us", + start.elapsed().as_micros() + ); + + let mut seq = 0; + let mut decode_buf = vec![]; + + for (t, i) in ents_idx.iter().enumerate() { + decode_buf = match t { + 0 => { + ctx.single_wait(0).unwrap(); + LogBatch::decode_entries_block( + &ctx.data(0).lock().unwrap(), + ents_idx[0].entries.unwrap(), + ents_idx[0].compression_type, + ) + .unwrap() + } _ => match new_block_flags[t] { true => { - j+=1; - ctx_vec[j].wait(); + seq += 1; + ctx.single_wait(seq).unwrap(); LogBatch::decode_entries_block( - &ctx_vec[j].buf.lock().unwrap(), + &ctx.data(seq).lock().unwrap(), i.entries.unwrap(), i.compression_type, - ).unwrap() - }, + ) + .unwrap() + } false => decode_buf, - } + }, }; - let e = parse_from_bytes::( - &mut decode_buf[(i.entry_offset) as usize .. (i.entry_offset + i.entry_len) as usize]).unwrap(); - vec.push(e); + vec.push( + parse_from_bytes::( + &mut decode_buf + [(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize], + ) + .unwrap(), + ); } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); - println!("[fetch_entries_to_aio] (end) time cost: {:?} us", start.elapsed().as_micros()); + println!( + "[fetch_entries_to_aio] (end) time cost: {:?} us", + start.elapsed().as_micros() + ); return Ok(ents_idx.len()); } Ok(0) } - pub fn first_index(&self, region_id: u64) -> Option { if let Some(memtable) = self.memtables.get(region_id) { return memtable.read().first_index(); @@ -673,11 +692,17 @@ where }) } -pub(crate) fn submit_read_request_to_file

(pipe_log: &P, mut a:&mut aiocb,handle: FileBlockHandle) -> Result> - where - P: PipeLog, +pub(crate) fn submit_read_request_to_file

( + pipe_log: &P, + seq: usize, + ctx: &mut AioContext, + handle: FileBlockHandle, +) -> Result<()> +where + P: PipeLog, { - Ok(Arc::new(pipe_log.read_bytes_aio(a,handle)?)) + pipe_log.read_bytes_aio(seq, ctx, handle).unwrap(); + Ok(()) } #[cfg(test)] @@ -689,11 +714,11 @@ mod tests { use crate::test_util::{generate_entries, PanicGuard}; use crate::util::ReadableSize; use kvproto::raft_serverpb::RaftLocalState; + use libc::aiocb; use raft::eraftpb::Entry; use std::collections::BTreeSet; use std::fs::OpenOptions; use std::path::PathBuf; - use libc::aiocb; type RaftLogEngine = Engine; impl RaftLogEngine { @@ -787,7 +812,7 @@ mod tests { None, &mut entries, ) - .unwrap(); + .unwrap(); assert_eq!(entries.len(), (end - start) as usize); assert_eq!(entries.first().unwrap().index, start); assert_eq!( @@ -884,13 +909,13 @@ mod tests { } #[test] - fn test_async_read(){ - let normal_batch_size = 4096; + fn test_async_read() { + let normal_batch_size = 8192; let compressed_batch_size = 5120; for &entry_size in &[normal_batch_size] { - if entry_size == normal_batch_size{ + if entry_size == normal_batch_size { println!("[normal_batch_size]"); - }else if entry_size == compressed_batch_size{ + } else if entry_size == compressed_batch_size { println!("[compressed_batch_size]"); } let dir = tempfile::Builder::new() @@ -903,39 +928,34 @@ mod tests { ..Default::default() }; - - let engine = RaftLogEngine::open_with_file_system( - cfg.clone(), - Arc::new(DefaultFileSystem), - ) - .unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), Arc::new(DefaultFileSystem)) + .unwrap(); assert_eq!(engine.path(), dir.path().to_str().unwrap()); let data = vec![b'x'; entry_size]; - for i in 10..510{ - for rid in 10..15{ + for i in 10..1010 { + for rid in 10..15 { let index = i; - engine.append(rid,index,index+1,Some(&data)); + engine.append(rid, index, index + 1, Some(&data)); } } for i in 10..15 { let rid = i; let index = 10; println!("[PREAD]"); - engine.scan_entries(rid, index, index + 500, |_, q, d| { + engine.scan_entries(rid, index, index + 1000, |_, q, d| { assert_eq!(q, LogQueue::Append); assert_eq!(d, &data); }); println!("[AIO]"); - engine.scan_entries_aio(rid, index, index + 500, |_, q, d| { + engine.scan_entries_aio(rid, index, index + 1000, |_, q, d| { assert_eq!(q, LogQueue::Append); assert_eq!(d, &data); }); println!("===================================================================================="); } - } - } #[test] @@ -2143,10 +2163,10 @@ mod tests { type Reader = ::Reader; type Writer = ::Writer; - fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()> { - // todo!() - Ok(()) - } + // fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()> + // { // todo!() + // Ok(()) + // } fn create>(&self, path: P) -> std::io::Result { let handle = self.inner.create(&path)?; @@ -2206,10 +2226,6 @@ mod tests { fn new_writer(&self, h: Arc) -> std::io::Result { self.inner.new_writer(h) } - - fn new_async_context(&self, handle: Arc, ptr: *mut aiocb, buf: Arc>>) -> std::io::Result { - self.inner.new_async_context(handle,ptr,buf) - } } #[test] diff --git a/src/env/default.rs b/src/env/default.rs index 1334b9bd..e17132c0 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -4,21 +4,22 @@ use std::ffi::c_void; use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::os::unix::io::RawFd; use std::path::Path; -use std::ptr; use std::sync::{Arc, Mutex}; +use std::{mem, ptr}; use fail::fail_point; -use libc::{aiocb, off_t}; +use libc::{aio_return, aiocb, off_t}; use log::error; use nix::errno::Errno; use nix::fcntl::{self, OFlag}; +use nix::sys::signal::{SigEvent, SigevNotify}; use nix::sys::stat::Mode; use nix::sys::uio::{pread, pwrite}; use nix::unistd::{close, ftruncate, lseek, Whence}; use nix::NixPath; -use nix::sys::signal::{SigEvent, SigevNotify}; use crate::env::{AsyncContext, FileSystem, Handle, WriteExt}; +use crate::Error; fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { let kind = std::io::Error::from(e).kind(); @@ -101,17 +102,18 @@ impl LogFd { Ok(readed) } - pub fn read_aio(&self,ptr: *mut aiocb,buf: Arc>>,offset: u64){ - let mut buf = buf.lock().unwrap(); + pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) { + let mut buf = ctx.buf_vec.last().unwrap().lock().unwrap(); unsafe { - (*ptr).aio_fildes = self.0; - (*ptr).aio_buf = buf.as_mut_ptr() as *mut c_void; - (*ptr).aio_reqprio = 0; - (*ptr).aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); - (*ptr).aio_nbytes = buf.len() as usize; - (*ptr).aio_lio_opcode = libc::LIO_READ; - (*ptr).aio_offset = offset as off_t; - libc::aio_read(ptr); + let aior = &mut ctx.aio_vec[seq]; + aior.aio_fildes = self.0; + aior.aio_buf = buf.as_mut_ptr() as *mut c_void; + aior.aio_reqprio = 0; + aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); + aior.aio_nbytes = buf.len() as usize; + aior.aio_lio_opcode = libc::LIO_READ; + aior.aio_offset = offset as off_t; + libc::aio_read(&mut ctx.aio_vec[seq]); } } @@ -276,36 +278,55 @@ impl WriteExt for LogFile { } pub struct AioContext { - pub(crate) fd: Arc, - pub(crate) ptr: *mut aiocb, - pub(crate) buf: Arc>>, + pub(crate) aio_vec: Vec, + pub(crate) buf_vec: Vec>>>, } impl AioContext { - pub fn new(fd: Arc,ptr: *mut aiocb, buf: Arc>>) -> Self{ - Self{ - fd, - ptr, - buf, + pub fn new(block_sum: usize) -> Self { + let mut vec = vec![]; + unsafe { + for i in 0..block_sum { + vec.push(mem::zeroed::()); + } + } + Self { + aio_vec: vec, + buf_vec: vec![], } } } -impl AsyncContext for AioContext{ - fn wait(&self) -> IoResult { - let p = self.ptr; - let aio_list_ptr = vec![p].as_ptr() as *const *const aiocb ; //q - let mut len; +impl AsyncContext for AioContext { + fn single_wait(&mut self, seq: usize) -> IoResult { + let buf_len = self.buf_vec[seq].lock().unwrap().len(); unsafe { - let timep = ptr::null::(); //w loop { - libc::aio_suspend(aio_list_ptr, 1 as i32, timep); //e - len = libc::aio_return(p); - if len > 0{ - return Ok(len as usize); + libc::aio_suspend( + vec![&mut self.aio_vec[seq]].as_ptr() as *const *const aiocb, + 1 as i32, + ptr::null::(), + ); + if buf_len == aio_return(&mut self.aio_vec[seq]) as usize{ + return Ok(buf_len); } } } } + + fn wait(&mut self) -> IoResult { + let mut total = 0; + for seq in 0..self.aio_vec.len() { + match self.single_wait(seq) { + Ok(len) => total += 1, + Err(e) => return Err(e), + } + } + Ok(total as usize) + } + + fn data(&self, seq: usize) -> Arc>> { + self.buf_vec[seq].clone() + } } pub struct DefaultFileSystem; @@ -315,9 +336,14 @@ impl FileSystem for DefaultFileSystem { type Reader = LogFile; type Writer = LogFile; - - fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> IoResult<()> { - ctx.fd.read_aio(ctx.ptr, ctx.buf.clone(), offset); + fn read_aio( + &self, + handle: Arc, + seq: usize, + ctx: &mut AioContext, + offset: u64, + ) -> IoResult<()> { + handle.read_aio(seq, ctx, offset); Ok(()) } @@ -345,7 +371,7 @@ impl FileSystem for DefaultFileSystem { Ok(LogFile::new(handle)) } - fn new_async_context(&self, handle: Arc, ptr: *mut aiocb, buf: Arc>>) -> IoResult { - Ok(AioContext::new(handle,ptr,buf)) + fn new_async_context(&self, block_sum: usize) -> IoResult { + Ok(AioContext::new(block_sum)) } } diff --git a/src/env/mod.rs b/src/env/mod.rs index f5637112..70fb2317 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -1,16 +1,16 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use libc::aiocb; use std::io::{Read, Result, Seek, Write}; use std::path::Path; use std::sync::{Arc, Mutex}; -use libc::aiocb; mod default; mod obfuscated; +pub use default::AioContext; pub use default::DefaultFileSystem; pub use obfuscated::ObfuscatedFileSystem; -pub use default::{AioContext}; /// FileSystem pub trait FileSystem: Send + Sync { @@ -18,7 +18,15 @@ pub trait FileSystem: Send + Sync { type Reader: Seek + Read + Send; type Writer: Seek + Write + Send + WriteExt; - fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> Result<()>; + fn read_aio( + &self, + handle: Arc, + seq: usize, + ctx: &mut AioContext, + offset: u64, + ) -> Result<()> { + Ok(()) + } fn create>(&self, path: P) -> Result; @@ -54,7 +62,9 @@ pub trait FileSystem: Send + Sync { fn new_writer(&self, handle: Arc) -> Result; - fn new_async_context(&self,handle: Arc,ptr: *mut aiocb, buf: Arc>>) -> Result; + fn new_async_context(&self, block_sum: usize) -> Result { + Ok(AioContext::new(block_sum)) + } } pub trait Handle { @@ -73,5 +83,7 @@ pub trait WriteExt { } pub trait AsyncContext { - fn wait(&self) -> Result; -} \ No newline at end of file + fn wait(&mut self) -> Result; + fn single_wait(&mut self, seq: usize) -> Result; + fn data(&self, seq: usize) -> Arc>>; +} diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index df79153b..2b7c0c62 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -1,13 +1,13 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use libc::aiocb; use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use libc::aiocb; -use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; use crate::env::default::AioContext; +use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; pub struct ObfuscatedReader(::Reader); @@ -92,11 +92,6 @@ impl FileSystem for ObfuscatedFileSystem { type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; - fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> IoResult<()> { - ctx.fd.read_aio(ctx.ptr, ctx.buf.clone(), offset); - Ok(()) - } - fn create>(&self, path: P) -> IoResult { let r = self.inner.create(path); if r.is_ok() { @@ -134,8 +129,4 @@ impl FileSystem for ObfuscatedFileSystem { fn new_writer(&self, handle: Arc) -> IoResult { Ok(ObfuscatedWriter(self.inner.new_writer(handle)?)) } - - fn new_async_context(&self, handle: Arc, ptr: *mut aiocb, buf: Arc>>) -> IoResult { - Ok(self.inner.new_async_context(handle,ptr,buf)?) - } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 92cb1af7..8ccbcc9b 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -13,7 +13,7 @@ use log::error; use parking_lot::{Mutex, MutexGuard, RwLock}; use crate::config::Config; -use crate::env::{AioContext, FileSystem}; +use crate::env::{AioContext, DefaultFileSystem, FileSystem}; use crate::event_listener::EventListener; use crate::metrics::*; use crate::pipe_log::{ @@ -261,7 +261,6 @@ impl SinglePipe { .handle .clone()) } - /// Creates a new file for write, and rotates the active log file. /// /// This operation is atomic in face of errors. @@ -351,18 +350,22 @@ impl SinglePipe { reader.read(handle) } - fn read_bytes_aio(&self, mut a: &mut aiocb, handle: FileBlockHandle) -> Result{ + fn read_bytes_aio( + &self, + seq: usize, + ctx: &mut AioContext, + handle: FileBlockHandle, + ) -> Result<()> { unsafe { let fd = self.get_fd(handle.id.seq)?; - let p: *mut libc::aiocb = a; - let buf = vec![0;handle.len]; - let mut ctx = self.file_system.as_ref().new_async_context( - fd.clone(), - p, - Arc::new(SyncMutex::new(buf)) - )?; - self.file_system.as_ref().read_aio(&mut ctx, handle.offset).unwrap(); - return Ok(ctx) + let buf = vec![0 as u8; handle.len]; + let buf = Arc::new(SyncMutex::new(buf)); + ctx.buf_vec.push(buf); + self.file_system + .as_ref() + .read_aio(fd, seq, ctx, handle.offset) + .unwrap(); + return Ok(()); } } @@ -533,8 +536,16 @@ impl PipeLog for DualPipes { } #[inline] - fn read_bytes_aio(&self,mut a:&mut aiocb,handle: FileBlockHandle) -> Result { - self.pipes[handle.id.queue as usize].read_bytes_aio(a,handle) + fn read_bytes_aio( + &self, + seq: usize, + ctx: &mut AioContext, + handle: FileBlockHandle, + ) -> Result<()> { + self.pipes[handle.id.queue as usize] + .read_bytes_aio(seq, ctx, handle) + .unwrap(); + Ok(()) } #[inline] diff --git a/src/pipe_log.rs b/src/pipe_log.rs index a60053c0..dc06554f 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -5,13 +5,13 @@ use std::cmp::Ordering; use std::fmt::{self, Display}; +use crate::env::AioContext; use fail::fail_point; use libc::aiocb; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; use serde_repr::{Deserialize_repr, Serialize_repr}; use strum::EnumIter; -use crate::env::AioContext; use crate::Result; @@ -174,7 +174,12 @@ pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; - fn read_bytes_aio(&self, a:&mut aiocb, handle: FileBlockHandle) -> Result ; + fn read_bytes_aio( + &self, + seq: usize, + ctx: &mut AioContext, + handle: FileBlockHandle, + ) -> Result<()>; /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. fn append(