From b0fa034a5f27eed3425bc58bbc088762f2314528 Mon Sep 17 00:00:00 2001 From: wxy <1019495690@qq.com> Date: Sat, 3 Dec 2022 20:09:33 +0800 Subject: [PATCH 01/15] version 1.0 Signed-off-by: root Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 198 +++++++++++++++++++++++++++++++++++++- src/env/default.rs | 65 ++++++++++++- src/env/mod.rs | 12 ++- src/env/obfuscated.rs | 13 ++- src/file_pipe_log/pipe.rs | 24 ++++- src/pipe_log.rs | 3 + 6 files changed, 308 insertions(+), 7 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 2968823e..180fba47 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2,17 +2,19 @@ use std::cell::{Cell, RefCell}; use std::marker::PhantomData; +use std::mem; use std::path::Path; use std::sync::{mpsc, Arc, Mutex}; use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; +use libc::aiocb; use log::{error, info}; use protobuf::{parse_from_bytes, Message}; use crate::config::{Config, RecoveryMode}; use crate::consistency::ConsistencyChecker; -use crate::env::{DefaultFileSystem, FileSystem}; +use crate::env::{AioContext, AsyncContext, DefaultFileSystem, FileSystem}; use crate::event_listener::EventListener; use crate::file_pipe_log::debug::LogItemReader; use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder}; @@ -298,6 +300,7 @@ where max_size: Option, vec: &mut Vec, ) -> Result { + let start = Instant::now(); let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { let mut ents_idx: Vec = Vec::with_capacity((end - begin) as usize); @@ -308,11 +311,95 @@ where vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); + println!("[fetch_entries_to] time cost: {:?} us", start.elapsed().as_micros()); + return Ok(ents_idx.len()); + } + Ok(0) + } + + pub fn fetch_entries_to_aio >( + &self, + region_id: u64, + begin: u64, + end: u64, + max_size: Option, + vec: &mut Vec, + ) -> Result { + let start = Instant::now(); + println!("[fetch_entries_to_aio] region id: {} left: {}, right: {}",region_id,begin,end); + let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); + if let Some(memtable) = self.memtables.get(region_id) { + let length = (end - begin) as usize; + let mut ents_idx: Vec = Vec::with_capacity(length); + memtable + .read() + .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; + println!("[fetch_entries_to_aio] (stage1) time cost: {:?} us", start.elapsed().as_micros()); + + let mut new_block_flags:Vec = Vec::with_capacity(length); + let mut block_sum = 0; + for (t,i) in ents_idx.iter().enumerate(){ + if match t { + 0 => true, + _ => ents_idx[t-1].entries.unwrap() != ents_idx[t].entries.unwrap(), + }{ + block_sum += 1; + new_block_flags.push(true); + }else{ + new_block_flags.push(false); + } + } + let mut a_list: Vec = Vec::with_capacity(block_sum); + let mut ctx_vec: Vec> = Vec::with_capacity(block_sum); + unsafe { + for i in 0..block_sum{ + a_list.push(mem::zeroed::()); + } + } + for (t,i) in ents_idx.iter().enumerate() { + if new_block_flags[t]{ + ctx_vec.push(submit_read_request_to_file(self.pipe_log.as_ref(), &mut a_list[t],i.entries.unwrap(), )?); + } + } + println!("[fetch_entries_to_aio] (stage2) time cost: {:?} us", start.elapsed().as_micros()); + let mut j = 0; + + ctx_vec[0].wait()?; + let mut decode_buf = LogBatch::decode_entries_block( + &ctx_vec[0].buf.lock().unwrap(), + ents_idx[0].entries.unwrap(), + ents_idx[0].compression_type, + ).unwrap(); + + for (t,i) in ents_idx.iter().enumerate() { + decode_buf = match t{ + 0 => decode_buf, + _ => match new_block_flags[t] { + true => { + j+=1; + ctx_vec[j].wait(); + LogBatch::decode_entries_block( + &ctx_vec[j].buf.lock().unwrap(), + i.entries.unwrap(), + i.compression_type, + ).unwrap() + }, + false => decode_buf, + } + }; + let e = parse_from_bytes::( + &mut decode_buf[(i.entry_offset) as usize .. (i.entry_offset + i.entry_len) as usize]).unwrap(); + vec.push(e); + } + ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); + println!("[fetch_entries_to_aio] (end) time cost: {:?} us", start.elapsed().as_micros()); return Ok(ents_idx.len()); } + Ok(0) } + pub fn first_index(&self, region_id: u64) -> Option { if let Some(memtable) = self.memtables.get(region_id) { return memtable.read().first_index(); @@ -586,10 +673,17 @@ where }) } +pub(crate) fn submit_read_request_to_file

(pipe_log: &P, mut a:&mut aiocb,handle: FileBlockHandle) -> Result> + where + P: PipeLog, +{ + Ok(Arc::new(pipe_log.read_bytes_aio(a,handle)?)) +} + #[cfg(test)] mod tests { use super::*; - use crate::env::ObfuscatedFileSystem; + use crate::env::{AioContext, ObfuscatedFileSystem}; use crate::file_pipe_log::FileNameExt; use crate::pipe_log::Version; use crate::test_util::{generate_entries, PanicGuard}; @@ -599,6 +693,7 @@ mod tests { use std::collections::BTreeSet; use std::fs::OpenOptions; use std::path::PathBuf; + use libc::aiocb; type RaftLogEngine = Engine; impl RaftLogEngine { @@ -677,6 +772,41 @@ mod tests { reader(e.index, entry_index.entries.unwrap().id.queue, &e.data); } } + fn scan_entries_aio( + &self, + rid: u64, + start: u64, + end: u64, + reader: FR, + ) { + let mut entries = Vec::new(); + self.fetch_entries_to_aio::( + rid, + self.first_index(rid).unwrap(), + self.last_index(rid).unwrap() + 1, + None, + &mut entries, + ) + .unwrap(); + assert_eq!(entries.len(), (end - start) as usize); + assert_eq!(entries.first().unwrap().index, start); + assert_eq!( + entries.last().unwrap().index, + self.decode_last_index(rid).unwrap() + ); + assert_eq!(entries.last().unwrap().index + 1, end); + for e in entries.iter() { + let entry_index = self + .memtables + .get(rid) + .unwrap() + .read() + .get_entry(e.index) + .unwrap(); + assert_eq!(&self.get_entry::(rid, e.index).unwrap().unwrap(), e); + reader(e.index, entry_index.entries.unwrap().id.queue, &e.data); + } + } fn file_count(&self, queue: Option) -> usize { if let Some(queue) = queue { @@ -753,6 +883,61 @@ mod tests { } } + #[test] + fn test_async_read(){ + let normal_batch_size = 4096; + let compressed_batch_size = 5120; + for &entry_size in &[normal_batch_size] { + if entry_size == normal_batch_size{ + println!("[normal_batch_size]"); + }else if entry_size == compressed_batch_size{ + println!("[compressed_batch_size]"); + } + let dir = tempfile::Builder::new() + .prefix("test_get_entry") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize::gb(1), + ..Default::default() + }; + + + let engine = RaftLogEngine::open_with_file_system( + cfg.clone(), + Arc::new(DefaultFileSystem), + ) + .unwrap(); + + assert_eq!(engine.path(), dir.path().to_str().unwrap()); + let data = vec![b'x'; entry_size]; + for i in 10..510{ + for rid in 10..15{ + let index = i; + engine.append(rid,index,index+1,Some(&data)); + } + } + for i in 10..15 { + let rid = i; + let index = 10; + println!("[PREAD]"); + engine.scan_entries(rid, index, index + 500, |_, q, d| { + assert_eq!(q, LogQueue::Append); + assert_eq!(d, &data); + }); + println!("[AIO]"); + engine.scan_entries_aio(rid, index, index + 500, |_, q, d| { + assert_eq!(q, LogQueue::Append); + assert_eq!(d, &data); + }); + println!("===================================================================================="); + } + + } + + } + #[test] fn test_clean_raft_group() { fn run_steps(steps: &[Option<(u64, u64)>]) { @@ -1958,6 +2143,11 @@ mod tests { type Reader = ::Reader; type Writer = ::Writer; + fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()> { + // todo!() + Ok(()) + } + fn create>(&self, path: P) -> std::io::Result { let handle = self.inner.create(&path)?; self.update_metadata(path.as_ref(), false); @@ -2016,6 +2206,10 @@ mod tests { fn new_writer(&self, h: Arc) -> std::io::Result { self.inner.new_writer(h) } + + fn new_async_context(&self, handle: Arc, ptr: *mut aiocb, buf: Arc>>) -> std::io::Result { + self.inner.new_async_context(handle,ptr,buf) + } } #[test] diff --git a/src/env/default.rs b/src/env/default.rs index 451ee9fe..1334b9bd 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -1,11 +1,14 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use std::ffi::c_void; use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::os::unix::io::RawFd; use std::path::Path; -use std::sync::Arc; +use std::ptr; +use std::sync::{Arc, Mutex}; use fail::fail_point; +use libc::{aiocb, off_t}; use log::error; use nix::errno::Errno; use nix::fcntl::{self, OFlag}; @@ -13,8 +16,9 @@ use nix::sys::stat::Mode; use nix::sys::uio::{pread, pwrite}; use nix::unistd::{close, ftruncate, lseek, Whence}; use nix::NixPath; +use nix::sys::signal::{SigEvent, SigevNotify}; -use crate::env::{FileSystem, Handle, WriteExt}; +use crate::env::{AsyncContext, FileSystem, Handle, WriteExt}; fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { let kind = std::io::Error::from(e).kind(); @@ -97,6 +101,20 @@ impl LogFd { Ok(readed) } + pub fn read_aio(&self,ptr: *mut aiocb,buf: Arc>>,offset: u64){ + let mut buf = buf.lock().unwrap(); + unsafe { + (*ptr).aio_fildes = self.0; + (*ptr).aio_buf = buf.as_mut_ptr() as *mut c_void; + (*ptr).aio_reqprio = 0; + (*ptr).aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); + (*ptr).aio_nbytes = buf.len() as usize; + (*ptr).aio_lio_opcode = libc::LIO_READ; + (*ptr).aio_offset = offset as off_t; + libc::aio_read(ptr); + } + } + /// Writes some bytes to this file starting at `offset`. Returns how many /// bytes were written. pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult { @@ -257,6 +275,39 @@ impl WriteExt for LogFile { } } +pub struct AioContext { + pub(crate) fd: Arc, + pub(crate) ptr: *mut aiocb, + pub(crate) buf: Arc>>, +} +impl AioContext { + pub fn new(fd: Arc,ptr: *mut aiocb, buf: Arc>>) -> Self{ + Self{ + fd, + ptr, + buf, + } + } +} + +impl AsyncContext for AioContext{ + fn wait(&self) -> IoResult { + let p = self.ptr; + let aio_list_ptr = vec![p].as_ptr() as *const *const aiocb ; //q + let mut len; + unsafe { + let timep = ptr::null::(); //w + loop { + libc::aio_suspend(aio_list_ptr, 1 as i32, timep); //e + len = libc::aio_return(p); + if len > 0{ + return Ok(len as usize); + } + } + } + } +} + pub struct DefaultFileSystem; impl FileSystem for DefaultFileSystem { @@ -264,6 +315,12 @@ impl FileSystem for DefaultFileSystem { type Reader = LogFile; type Writer = LogFile; + + fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> IoResult<()> { + ctx.fd.read_aio(ctx.ptr, ctx.buf.clone(), offset); + Ok(()) + } + fn create>(&self, path: P) -> IoResult { LogFd::create(path.as_ref()) } @@ -287,4 +344,8 @@ impl FileSystem for DefaultFileSystem { fn new_writer(&self, handle: Arc) -> IoResult { Ok(LogFile::new(handle)) } + + fn new_async_context(&self, handle: Arc, ptr: *mut aiocb, buf: Arc>>) -> IoResult { + Ok(AioContext::new(handle,ptr,buf)) + } } diff --git a/src/env/mod.rs b/src/env/mod.rs index 6ae4bf9d..f5637112 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -2,13 +2,15 @@ use std::io::{Read, Result, Seek, Write}; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use libc::aiocb; mod default; mod obfuscated; pub use default::DefaultFileSystem; pub use obfuscated::ObfuscatedFileSystem; +pub use default::{AioContext}; /// FileSystem pub trait FileSystem: Send + Sync { @@ -16,6 +18,8 @@ pub trait FileSystem: Send + Sync { type Reader: Seek + Read + Send; type Writer: Seek + Write + Send + WriteExt; + fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> Result<()>; + fn create>(&self, path: P) -> Result; fn open>(&self, path: P) -> Result; @@ -49,6 +53,8 @@ pub trait FileSystem: Send + Sync { fn new_reader(&self, handle: Arc) -> Result; fn new_writer(&self, handle: Arc) -> Result; + + fn new_async_context(&self,handle: Arc,ptr: *mut aiocb, buf: Arc>>) -> Result; } pub trait Handle { @@ -65,3 +71,7 @@ pub trait WriteExt { fn truncate(&mut self, offset: usize) -> Result<()>; fn allocate(&mut self, offset: usize, size: usize) -> Result<()>; } + +pub trait AsyncContext { + fn wait(&self) -> Result; +} \ No newline at end of file diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 831f5343..df79153b 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -3,9 +3,11 @@ use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use libc::aiocb; use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; +use crate::env::default::AioContext; pub struct ObfuscatedReader(::Reader); @@ -90,6 +92,11 @@ impl FileSystem for ObfuscatedFileSystem { type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; + fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> IoResult<()> { + ctx.fd.read_aio(ctx.ptr, ctx.buf.clone(), offset); + Ok(()) + } + fn create>(&self, path: P) -> IoResult { let r = self.inner.create(path); if r.is_ok() { @@ -127,4 +134,8 @@ impl FileSystem for ObfuscatedFileSystem { fn new_writer(&self, handle: Arc) -> IoResult { Ok(ObfuscatedWriter(self.inner.new_writer(handle)?)) } + + fn new_async_context(&self, handle: Arc, ptr: *mut aiocb, buf: Arc>>) -> IoResult { + Ok(self.inner.new_async_context(handle,ptr,buf)?) + } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index ad99904f..92cb1af7 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -4,14 +4,16 @@ use std::collections::VecDeque; use std::fs::File; use std::path::PathBuf; use std::sync::Arc; +use std::sync::Mutex as SyncMutex; use crossbeam::utils::CachePadded; use fail::fail_point; +use libc::aiocb; use log::error; use parking_lot::{Mutex, MutexGuard, RwLock}; use crate::config::Config; -use crate::env::FileSystem; +use crate::env::{AioContext, FileSystem}; use crate::event_listener::EventListener; use crate::metrics::*; use crate::pipe_log::{ @@ -349,6 +351,21 @@ impl SinglePipe { reader.read(handle) } + fn read_bytes_aio(&self, mut a: &mut aiocb, handle: FileBlockHandle) -> Result{ + unsafe { + let fd = self.get_fd(handle.id.seq)?; + let p: *mut libc::aiocb = a; + let buf = vec![0;handle.len]; + let mut ctx = self.file_system.as_ref().new_async_context( + fd.clone(), + p, + Arc::new(SyncMutex::new(buf)) + )?; + self.file_system.as_ref().read_aio(&mut ctx, handle.offset).unwrap(); + return Ok(ctx) + } + } + fn append(&self, bytes: &mut T) -> Result { fail_point!("file_pipe_log::append"); let mut active_file = self.active_file.lock(); @@ -515,6 +532,11 @@ impl PipeLog for DualPipes { self.pipes[handle.id.queue as usize].read_bytes(handle) } + #[inline] + fn read_bytes_aio(&self,mut a:&mut aiocb,handle: FileBlockHandle) -> Result { + self.pipes[handle.id.queue as usize].read_bytes_aio(a,handle) + } + #[inline] fn append( &self, diff --git a/src/pipe_log.rs b/src/pipe_log.rs index aa9b7925..a60053c0 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -6,10 +6,12 @@ use std::cmp::Ordering; use std::fmt::{self, Display}; use fail::fail_point; +use libc::aiocb; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; use serde_repr::{Deserialize_repr, Serialize_repr}; use strum::EnumIter; +use crate::env::AioContext; use crate::Result; @@ -172,6 +174,7 @@ pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; + fn read_bytes_aio(&self, a:&mut aiocb, handle: FileBlockHandle) -> Result ; /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. fn append( From 147130d3b7dd8d13236065c51af8cdbc7d295c8d Mon Sep 17 00:00:00 2001 From: wxy <1019495690@qq.com> Date: Wed, 7 Dec 2022 15:36:40 +0800 Subject: [PATCH 02/15] Optimize code structure(version 1.1). Signed-off-by: root Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 166 +++++++++++++++++++++----------------- src/env/default.rs | 98 +++++++++++++--------- src/env/mod.rs | 24 ++++-- src/env/obfuscated.rs | 13 +-- src/file_pipe_log/pipe.rs | 39 +++++---- src/pipe_log.rs | 9 ++- 6 files changed, 205 insertions(+), 144 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 180fba47..9a6984eb 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,5 +1,6 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use libc::{aio_return, aiocb}; use std::cell::{Cell, RefCell}; use std::marker::PhantomData; use std::mem; @@ -7,7 +8,6 @@ use std::path::Path; use std::sync::{mpsc, Arc, Mutex}; use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; -use libc::aiocb; use log::{error, info}; use protobuf::{parse_from_bytes, Message}; @@ -311,13 +311,16 @@ where vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); - println!("[fetch_entries_to] time cost: {:?} us", start.elapsed().as_micros()); + println!( + "[fetch_entries_to] time cost: {:?} us", + start.elapsed().as_micros() + ); return Ok(ents_idx.len()); } Ok(0) } - pub fn fetch_entries_to_aio >( + pub fn fetch_entries_to_aio>( &self, region_id: u64, begin: u64, @@ -326,7 +329,6 @@ where vec: &mut Vec, ) -> Result { let start = Instant::now(); - println!("[fetch_entries_to_aio] region id: {} left: {}, right: {}",region_id,begin,end); let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { let length = (end - begin) as usize; @@ -334,72 +336,89 @@ where memtable .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; - println!("[fetch_entries_to_aio] (stage1) time cost: {:?} us", start.elapsed().as_micros()); + println!( + "[fetch_entries_to_aio] (stage1) time cost: {:?} us", + start.elapsed().as_micros() + ); - let mut new_block_flags:Vec = Vec::with_capacity(length); + let mut new_block_flags: Vec = Vec::with_capacity(length); let mut block_sum = 0; - for (t,i) in ents_idx.iter().enumerate(){ + for (t, i) in ents_idx.iter().enumerate() { if match t { 0 => true, - _ => ents_idx[t-1].entries.unwrap() != ents_idx[t].entries.unwrap(), - }{ + _ => ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap(), + } { block_sum += 1; new_block_flags.push(true); - }else{ + } else { new_block_flags.push(false); } } - let mut a_list: Vec = Vec::with_capacity(block_sum); - let mut ctx_vec: Vec> = Vec::with_capacity(block_sum); - unsafe { - for i in 0..block_sum{ - a_list.push(mem::zeroed::()); - } - } - for (t,i) in ents_idx.iter().enumerate() { - if new_block_flags[t]{ - ctx_vec.push(submit_read_request_to_file(self.pipe_log.as_ref(), &mut a_list[t],i.entries.unwrap(), )?); + + let mut ctx = AioContext::new(block_sum); + for (seq, i) in ents_idx.iter().enumerate() { + if new_block_flags[seq] { + submit_read_request_to_file( + self.pipe_log.as_ref(), + seq, + &mut ctx, + i.entries.unwrap(), + ) + .unwrap(); } } - println!("[fetch_entries_to_aio] (stage2) time cost: {:?} us", start.elapsed().as_micros()); - let mut j = 0; - - ctx_vec[0].wait()?; - let mut decode_buf = LogBatch::decode_entries_block( - &ctx_vec[0].buf.lock().unwrap(), - ents_idx[0].entries.unwrap(), - ents_idx[0].compression_type, - ).unwrap(); - - for (t,i) in ents_idx.iter().enumerate() { - decode_buf = match t{ - 0 => decode_buf, + println!( + "[fetch_entries_to_aio] (stage2) time cost: {:?} us", + start.elapsed().as_micros() + ); + + let mut seq = 0; + let mut decode_buf = vec![]; + + for (t, i) in ents_idx.iter().enumerate() { + decode_buf = match t { + 0 => { + ctx.single_wait(0).unwrap(); + LogBatch::decode_entries_block( + &ctx.data(0).lock().unwrap(), + ents_idx[0].entries.unwrap(), + ents_idx[0].compression_type, + ) + .unwrap() + } _ => match new_block_flags[t] { true => { - j+=1; - ctx_vec[j].wait(); + seq += 1; + ctx.single_wait(seq).unwrap(); LogBatch::decode_entries_block( - &ctx_vec[j].buf.lock().unwrap(), + &ctx.data(seq).lock().unwrap(), i.entries.unwrap(), i.compression_type, - ).unwrap() - }, + ) + .unwrap() + } false => decode_buf, - } + }, }; - let e = parse_from_bytes::( - &mut decode_buf[(i.entry_offset) as usize .. (i.entry_offset + i.entry_len) as usize]).unwrap(); - vec.push(e); + vec.push( + parse_from_bytes::( + &mut decode_buf + [(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize], + ) + .unwrap(), + ); } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); - println!("[fetch_entries_to_aio] (end) time cost: {:?} us", start.elapsed().as_micros()); + println!( + "[fetch_entries_to_aio] (end) time cost: {:?} us", + start.elapsed().as_micros() + ); return Ok(ents_idx.len()); } Ok(0) } - pub fn first_index(&self, region_id: u64) -> Option { if let Some(memtable) = self.memtables.get(region_id) { return memtable.read().first_index(); @@ -673,11 +692,17 @@ where }) } -pub(crate) fn submit_read_request_to_file

(pipe_log: &P, mut a:&mut aiocb,handle: FileBlockHandle) -> Result> - where - P: PipeLog, +pub(crate) fn submit_read_request_to_file

( + pipe_log: &P, + seq: usize, + ctx: &mut AioContext, + handle: FileBlockHandle, +) -> Result<()> +where + P: PipeLog, { - Ok(Arc::new(pipe_log.read_bytes_aio(a,handle)?)) + pipe_log.read_bytes_aio(seq, ctx, handle).unwrap(); + Ok(()) } #[cfg(test)] @@ -689,11 +714,11 @@ mod tests { use crate::test_util::{generate_entries, PanicGuard}; use crate::util::ReadableSize; use kvproto::raft_serverpb::RaftLocalState; + use libc::aiocb; use raft::eraftpb::Entry; use std::collections::BTreeSet; use std::fs::OpenOptions; use std::path::PathBuf; - use libc::aiocb; type RaftLogEngine = Engine; impl RaftLogEngine { @@ -787,7 +812,7 @@ mod tests { None, &mut entries, ) - .unwrap(); + .unwrap(); assert_eq!(entries.len(), (end - start) as usize); assert_eq!(entries.first().unwrap().index, start); assert_eq!( @@ -884,13 +909,13 @@ mod tests { } #[test] - fn test_async_read(){ - let normal_batch_size = 4096; + fn test_async_read() { + let normal_batch_size = 8192; let compressed_batch_size = 5120; for &entry_size in &[normal_batch_size] { - if entry_size == normal_batch_size{ + if entry_size == normal_batch_size { println!("[normal_batch_size]"); - }else if entry_size == compressed_batch_size{ + } else if entry_size == compressed_batch_size { println!("[compressed_batch_size]"); } let dir = tempfile::Builder::new() @@ -903,39 +928,34 @@ mod tests { ..Default::default() }; - - let engine = RaftLogEngine::open_with_file_system( - cfg.clone(), - Arc::new(DefaultFileSystem), - ) - .unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), Arc::new(DefaultFileSystem)) + .unwrap(); assert_eq!(engine.path(), dir.path().to_str().unwrap()); let data = vec![b'x'; entry_size]; - for i in 10..510{ - for rid in 10..15{ + for i in 10..1010 { + for rid in 10..15 { let index = i; - engine.append(rid,index,index+1,Some(&data)); + engine.append(rid, index, index + 1, Some(&data)); } } for i in 10..15 { let rid = i; let index = 10; println!("[PREAD]"); - engine.scan_entries(rid, index, index + 500, |_, q, d| { + engine.scan_entries(rid, index, index + 1000, |_, q, d| { assert_eq!(q, LogQueue::Append); assert_eq!(d, &data); }); println!("[AIO]"); - engine.scan_entries_aio(rid, index, index + 500, |_, q, d| { + engine.scan_entries_aio(rid, index, index + 1000, |_, q, d| { assert_eq!(q, LogQueue::Append); assert_eq!(d, &data); }); println!("===================================================================================="); } - } - } #[test] @@ -2143,10 +2163,10 @@ mod tests { type Reader = ::Reader; type Writer = ::Writer; - fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()> { - // todo!() - Ok(()) - } + // fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()> + // { // todo!() + // Ok(()) + // } fn create>(&self, path: P) -> std::io::Result { let handle = self.inner.create(&path)?; @@ -2206,10 +2226,6 @@ mod tests { fn new_writer(&self, h: Arc) -> std::io::Result { self.inner.new_writer(h) } - - fn new_async_context(&self, handle: Arc, ptr: *mut aiocb, buf: Arc>>) -> std::io::Result { - self.inner.new_async_context(handle,ptr,buf) - } } #[test] diff --git a/src/env/default.rs b/src/env/default.rs index 1334b9bd..e17132c0 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -4,21 +4,22 @@ use std::ffi::c_void; use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::os::unix::io::RawFd; use std::path::Path; -use std::ptr; use std::sync::{Arc, Mutex}; +use std::{mem, ptr}; use fail::fail_point; -use libc::{aiocb, off_t}; +use libc::{aio_return, aiocb, off_t}; use log::error; use nix::errno::Errno; use nix::fcntl::{self, OFlag}; +use nix::sys::signal::{SigEvent, SigevNotify}; use nix::sys::stat::Mode; use nix::sys::uio::{pread, pwrite}; use nix::unistd::{close, ftruncate, lseek, Whence}; use nix::NixPath; -use nix::sys::signal::{SigEvent, SigevNotify}; use crate::env::{AsyncContext, FileSystem, Handle, WriteExt}; +use crate::Error; fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { let kind = std::io::Error::from(e).kind(); @@ -101,17 +102,18 @@ impl LogFd { Ok(readed) } - pub fn read_aio(&self,ptr: *mut aiocb,buf: Arc>>,offset: u64){ - let mut buf = buf.lock().unwrap(); + pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) { + let mut buf = ctx.buf_vec.last().unwrap().lock().unwrap(); unsafe { - (*ptr).aio_fildes = self.0; - (*ptr).aio_buf = buf.as_mut_ptr() as *mut c_void; - (*ptr).aio_reqprio = 0; - (*ptr).aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); - (*ptr).aio_nbytes = buf.len() as usize; - (*ptr).aio_lio_opcode = libc::LIO_READ; - (*ptr).aio_offset = offset as off_t; - libc::aio_read(ptr); + let aior = &mut ctx.aio_vec[seq]; + aior.aio_fildes = self.0; + aior.aio_buf = buf.as_mut_ptr() as *mut c_void; + aior.aio_reqprio = 0; + aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); + aior.aio_nbytes = buf.len() as usize; + aior.aio_lio_opcode = libc::LIO_READ; + aior.aio_offset = offset as off_t; + libc::aio_read(&mut ctx.aio_vec[seq]); } } @@ -276,36 +278,55 @@ impl WriteExt for LogFile { } pub struct AioContext { - pub(crate) fd: Arc, - pub(crate) ptr: *mut aiocb, - pub(crate) buf: Arc>>, + pub(crate) aio_vec: Vec, + pub(crate) buf_vec: Vec>>>, } impl AioContext { - pub fn new(fd: Arc,ptr: *mut aiocb, buf: Arc>>) -> Self{ - Self{ - fd, - ptr, - buf, + pub fn new(block_sum: usize) -> Self { + let mut vec = vec![]; + unsafe { + for i in 0..block_sum { + vec.push(mem::zeroed::()); + } + } + Self { + aio_vec: vec, + buf_vec: vec![], } } } -impl AsyncContext for AioContext{ - fn wait(&self) -> IoResult { - let p = self.ptr; - let aio_list_ptr = vec![p].as_ptr() as *const *const aiocb ; //q - let mut len; +impl AsyncContext for AioContext { + fn single_wait(&mut self, seq: usize) -> IoResult { + let buf_len = self.buf_vec[seq].lock().unwrap().len(); unsafe { - let timep = ptr::null::(); //w loop { - libc::aio_suspend(aio_list_ptr, 1 as i32, timep); //e - len = libc::aio_return(p); - if len > 0{ - return Ok(len as usize); + libc::aio_suspend( + vec![&mut self.aio_vec[seq]].as_ptr() as *const *const aiocb, + 1 as i32, + ptr::null::(), + ); + if buf_len == aio_return(&mut self.aio_vec[seq]) as usize{ + return Ok(buf_len); } } } } + + fn wait(&mut self) -> IoResult { + let mut total = 0; + for seq in 0..self.aio_vec.len() { + match self.single_wait(seq) { + Ok(len) => total += 1, + Err(e) => return Err(e), + } + } + Ok(total as usize) + } + + fn data(&self, seq: usize) -> Arc>> { + self.buf_vec[seq].clone() + } } pub struct DefaultFileSystem; @@ -315,9 +336,14 @@ impl FileSystem for DefaultFileSystem { type Reader = LogFile; type Writer = LogFile; - - fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> IoResult<()> { - ctx.fd.read_aio(ctx.ptr, ctx.buf.clone(), offset); + fn read_aio( + &self, + handle: Arc, + seq: usize, + ctx: &mut AioContext, + offset: u64, + ) -> IoResult<()> { + handle.read_aio(seq, ctx, offset); Ok(()) } @@ -345,7 +371,7 @@ impl FileSystem for DefaultFileSystem { Ok(LogFile::new(handle)) } - fn new_async_context(&self, handle: Arc, ptr: *mut aiocb, buf: Arc>>) -> IoResult { - Ok(AioContext::new(handle,ptr,buf)) + fn new_async_context(&self, block_sum: usize) -> IoResult { + Ok(AioContext::new(block_sum)) } } diff --git a/src/env/mod.rs b/src/env/mod.rs index f5637112..70fb2317 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -1,16 +1,16 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use libc::aiocb; use std::io::{Read, Result, Seek, Write}; use std::path::Path; use std::sync::{Arc, Mutex}; -use libc::aiocb; mod default; mod obfuscated; +pub use default::AioContext; pub use default::DefaultFileSystem; pub use obfuscated::ObfuscatedFileSystem; -pub use default::{AioContext}; /// FileSystem pub trait FileSystem: Send + Sync { @@ -18,7 +18,15 @@ pub trait FileSystem: Send + Sync { type Reader: Seek + Read + Send; type Writer: Seek + Write + Send + WriteExt; - fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> Result<()>; + fn read_aio( + &self, + handle: Arc, + seq: usize, + ctx: &mut AioContext, + offset: u64, + ) -> Result<()> { + Ok(()) + } fn create>(&self, path: P) -> Result; @@ -54,7 +62,9 @@ pub trait FileSystem: Send + Sync { fn new_writer(&self, handle: Arc) -> Result; - fn new_async_context(&self,handle: Arc,ptr: *mut aiocb, buf: Arc>>) -> Result; + fn new_async_context(&self, block_sum: usize) -> Result { + Ok(AioContext::new(block_sum)) + } } pub trait Handle { @@ -73,5 +83,7 @@ pub trait WriteExt { } pub trait AsyncContext { - fn wait(&self) -> Result; -} \ No newline at end of file + fn wait(&mut self) -> Result; + fn single_wait(&mut self, seq: usize) -> Result; + fn data(&self, seq: usize) -> Arc>>; +} diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index df79153b..2b7c0c62 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -1,13 +1,13 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use libc::aiocb; use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use libc::aiocb; -use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; use crate::env::default::AioContext; +use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; pub struct ObfuscatedReader(::Reader); @@ -92,11 +92,6 @@ impl FileSystem for ObfuscatedFileSystem { type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; - fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> IoResult<()> { - ctx.fd.read_aio(ctx.ptr, ctx.buf.clone(), offset); - Ok(()) - } - fn create>(&self, path: P) -> IoResult { let r = self.inner.create(path); if r.is_ok() { @@ -134,8 +129,4 @@ impl FileSystem for ObfuscatedFileSystem { fn new_writer(&self, handle: Arc) -> IoResult { Ok(ObfuscatedWriter(self.inner.new_writer(handle)?)) } - - fn new_async_context(&self, handle: Arc, ptr: *mut aiocb, buf: Arc>>) -> IoResult { - Ok(self.inner.new_async_context(handle,ptr,buf)?) - } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 92cb1af7..8ccbcc9b 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -13,7 +13,7 @@ use log::error; use parking_lot::{Mutex, MutexGuard, RwLock}; use crate::config::Config; -use crate::env::{AioContext, FileSystem}; +use crate::env::{AioContext, DefaultFileSystem, FileSystem}; use crate::event_listener::EventListener; use crate::metrics::*; use crate::pipe_log::{ @@ -261,7 +261,6 @@ impl SinglePipe { .handle .clone()) } - /// Creates a new file for write, and rotates the active log file. /// /// This operation is atomic in face of errors. @@ -351,18 +350,22 @@ impl SinglePipe { reader.read(handle) } - fn read_bytes_aio(&self, mut a: &mut aiocb, handle: FileBlockHandle) -> Result{ + fn read_bytes_aio( + &self, + seq: usize, + ctx: &mut AioContext, + handle: FileBlockHandle, + ) -> Result<()> { unsafe { let fd = self.get_fd(handle.id.seq)?; - let p: *mut libc::aiocb = a; - let buf = vec![0;handle.len]; - let mut ctx = self.file_system.as_ref().new_async_context( - fd.clone(), - p, - Arc::new(SyncMutex::new(buf)) - )?; - self.file_system.as_ref().read_aio(&mut ctx, handle.offset).unwrap(); - return Ok(ctx) + let buf = vec![0 as u8; handle.len]; + let buf = Arc::new(SyncMutex::new(buf)); + ctx.buf_vec.push(buf); + self.file_system + .as_ref() + .read_aio(fd, seq, ctx, handle.offset) + .unwrap(); + return Ok(()); } } @@ -533,8 +536,16 @@ impl PipeLog for DualPipes { } #[inline] - fn read_bytes_aio(&self,mut a:&mut aiocb,handle: FileBlockHandle) -> Result { - self.pipes[handle.id.queue as usize].read_bytes_aio(a,handle) + fn read_bytes_aio( + &self, + seq: usize, + ctx: &mut AioContext, + handle: FileBlockHandle, + ) -> Result<()> { + self.pipes[handle.id.queue as usize] + .read_bytes_aio(seq, ctx, handle) + .unwrap(); + Ok(()) } #[inline] diff --git a/src/pipe_log.rs b/src/pipe_log.rs index a60053c0..dc06554f 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -5,13 +5,13 @@ use std::cmp::Ordering; use std::fmt::{self, Display}; +use crate::env::AioContext; use fail::fail_point; use libc::aiocb; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; use serde_repr::{Deserialize_repr, Serialize_repr}; use strum::EnumIter; -use crate::env::AioContext; use crate::Result; @@ -174,7 +174,12 @@ pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; - fn read_bytes_aio(&self, a:&mut aiocb, handle: FileBlockHandle) -> Result ; + fn read_bytes_aio( + &self, + seq: usize, + ctx: &mut AioContext, + handle: FileBlockHandle, + ) -> Result<()>; /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. fn append( From 93282f9cac733aa6300a608a98fabd947a7079fe Mon Sep 17 00:00:00 2001 From: root Date: Wed, 7 Dec 2022 19:41:36 +0800 Subject: [PATCH 03/15] fix. Signed-off-by: root Signed-off-by: root <1019495690@qq.com> --- src/env/default.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/env/default.rs b/src/env/default.rs index e17132c0..7295f7b2 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -278,7 +278,7 @@ impl WriteExt for LogFile { } pub struct AioContext { - pub(crate) aio_vec: Vec, + aio_vec: Vec, pub(crate) buf_vec: Vec>>>, } impl AioContext { @@ -306,7 +306,7 @@ impl AsyncContext for AioContext { 1 as i32, ptr::null::(), ); - if buf_len == aio_return(&mut self.aio_vec[seq]) as usize{ + if buf_len == aio_return(&mut self.aio_vec[seq]) as usize { return Ok(buf_len); } } From 169b6fed115333994f38a32e0b3b51ad0a5f9487 Mon Sep 17 00:00:00 2001 From: wxy <1019495690@qq.com> Date: Thu, 8 Dec 2022 15:55:54 +0800 Subject: [PATCH 04/15] Remove lock of buf_vec. Signed-off-by: root Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 4 ++-- src/env/default.rs | 15 +++++++-------- src/env/mod.rs | 2 +- src/file_pipe_log/pipe.rs | 3 +-- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 9a6984eb..fc2c0f03 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -380,7 +380,7 @@ where 0 => { ctx.single_wait(0).unwrap(); LogBatch::decode_entries_block( - &ctx.data(0).lock().unwrap(), + &ctx.data(0), ents_idx[0].entries.unwrap(), ents_idx[0].compression_type, ) @@ -391,7 +391,7 @@ where seq += 1; ctx.single_wait(seq).unwrap(); LogBatch::decode_entries_block( - &ctx.data(seq).lock().unwrap(), + &ctx.data(seq), i.entries.unwrap(), i.compression_type, ) diff --git a/src/env/default.rs b/src/env/default.rs index 7295f7b2..4380672d 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -103,17 +103,16 @@ impl LogFd { } pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) { - let mut buf = ctx.buf_vec.last().unwrap().lock().unwrap(); unsafe { let aior = &mut ctx.aio_vec[seq]; aior.aio_fildes = self.0; - aior.aio_buf = buf.as_mut_ptr() as *mut c_void; aior.aio_reqprio = 0; aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); - aior.aio_nbytes = buf.len() as usize; + aior.aio_nbytes = ctx.buf_vec[seq].len() as usize; + aior.aio_buf = ctx.buf_vec[seq].as_mut_ptr() as *mut c_void; aior.aio_lio_opcode = libc::LIO_READ; aior.aio_offset = offset as off_t; - libc::aio_read(&mut ctx.aio_vec[seq]); + libc::aio_read(aior); } } @@ -279,7 +278,7 @@ impl WriteExt for LogFile { pub struct AioContext { aio_vec: Vec, - pub(crate) buf_vec: Vec>>>, + pub(crate) buf_vec: Vec>, } impl AioContext { pub fn new(block_sum: usize) -> Self { @@ -298,7 +297,7 @@ impl AioContext { impl AsyncContext for AioContext { fn single_wait(&mut self, seq: usize) -> IoResult { - let buf_len = self.buf_vec[seq].lock().unwrap().len(); + let buf_len = self.buf_vec[seq].len(); unsafe { loop { libc::aio_suspend( @@ -324,8 +323,8 @@ impl AsyncContext for AioContext { Ok(total as usize) } - fn data(&self, seq: usize) -> Arc>> { - self.buf_vec[seq].clone() + fn data(&self, seq: usize) -> Vec { + self.buf_vec[seq].to_vec() } } diff --git a/src/env/mod.rs b/src/env/mod.rs index 70fb2317..f60385af 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -85,5 +85,5 @@ pub trait WriteExt { pub trait AsyncContext { fn wait(&mut self) -> Result; fn single_wait(&mut self, seq: usize) -> Result; - fn data(&self, seq: usize) -> Arc>>; + fn data(&self, seq: usize) -> Vec; } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 8ccbcc9b..c5c9db72 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -358,8 +358,7 @@ impl SinglePipe { ) -> Result<()> { unsafe { let fd = self.get_fd(handle.id.seq)?; - let buf = vec![0 as u8; handle.len]; - let buf = Arc::new(SyncMutex::new(buf)); + let mut buf = vec![0 as u8; handle.len]; ctx.buf_vec.push(buf); self.file_system .as_ref() From ad3cbcf4179d636a7140630fa336d1d5a4bea676 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 19 Dec 2022 21:47:39 +0800 Subject: [PATCH 05/15] Optimize code structure(version 1.3). Signed-off-by: root Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 100 ++++-------------------- src/env/default.rs | 78 ++++++++++++------- src/env/mod.rs | 19 ++--- src/env/obfuscated.rs | 13 ++++ src/file_pipe_log/pipe.rs | 159 ++++++++++++++++++++++++++++++++------ src/pipe_log.rs | 19 +++-- 6 files changed, 237 insertions(+), 151 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index fc2c0f03..c9d515c0 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -341,73 +341,10 @@ where start.elapsed().as_micros() ); - let mut new_block_flags: Vec = Vec::with_capacity(length); - let mut block_sum = 0; - for (t, i) in ents_idx.iter().enumerate() { - if match t { - 0 => true, - _ => ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap(), - } { - block_sum += 1; - new_block_flags.push(true); - } else { - new_block_flags.push(false); - } - } - - let mut ctx = AioContext::new(block_sum); - for (seq, i) in ents_idx.iter().enumerate() { - if new_block_flags[seq] { - submit_read_request_to_file( - self.pipe_log.as_ref(), - seq, - &mut ctx, - i.entries.unwrap(), - ) - .unwrap(); - } - } - println!( - "[fetch_entries_to_aio] (stage2) time cost: {:?} us", - start.elapsed().as_micros() - ); + self.pipe_log + .async_entry_read::(&mut ents_idx, vec) + .unwrap(); - let mut seq = 0; - let mut decode_buf = vec![]; - - for (t, i) in ents_idx.iter().enumerate() { - decode_buf = match t { - 0 => { - ctx.single_wait(0).unwrap(); - LogBatch::decode_entries_block( - &ctx.data(0), - ents_idx[0].entries.unwrap(), - ents_idx[0].compression_type, - ) - .unwrap() - } - _ => match new_block_flags[t] { - true => { - seq += 1; - ctx.single_wait(seq).unwrap(); - LogBatch::decode_entries_block( - &ctx.data(seq), - i.entries.unwrap(), - i.compression_type, - ) - .unwrap() - } - false => decode_buf, - }, - }; - vec.push( - parse_from_bytes::( - &mut decode_buf - [(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize], - ) - .unwrap(), - ); - } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); println!( "[fetch_entries_to_aio] (end) time cost: {:?} us", @@ -692,19 +629,6 @@ where }) } -pub(crate) fn submit_read_request_to_file

( - pipe_log: &P, - seq: usize, - ctx: &mut AioContext, - handle: FileBlockHandle, -) -> Result<()> -where - P: PipeLog, -{ - pipe_log.read_bytes_aio(seq, ctx, handle).unwrap(); - Ok(()) -} - #[cfg(test)] mod tests { use super::*; @@ -910,7 +834,7 @@ mod tests { #[test] fn test_async_read() { - let normal_batch_size = 8192; + let normal_batch_size = 4096; let compressed_batch_size = 5120; for &entry_size in &[normal_batch_size] { if entry_size == normal_batch_size { @@ -2162,11 +2086,15 @@ mod tests { type Handle = ::Handle; type Reader = ::Reader; type Writer = ::Writer; + type AsyncIoContext = AioContext; - // fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()> - // { // todo!() - // Ok(()) - // } + fn new_async_reader( + &self, + handle: Arc, + ctx: &mut Self::AsyncIoContext, + ) -> std::io::Result<()> { + ctx.new_reader(handle) + } fn create>(&self, path: P) -> std::io::Result { let handle = self.inner.create(&path)?; @@ -2226,6 +2154,10 @@ mod tests { fn new_writer(&self, h: Arc) -> std::io::Result { self.inner.new_writer(h) } + + fn new_async_io_context(&self, block_sum: usize) -> std::io::Result { + todo!() + } } #[test] diff --git a/src/env/default.rs b/src/env/default.rs index 4380672d..c6eb0760 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -102,14 +102,13 @@ impl LogFd { Ok(readed) } - pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) { + pub fn read_aio(&self, aior: &mut aiocb, len: usize, pbuf: *mut u8, offset: u64) { unsafe { - let aior = &mut ctx.aio_vec[seq]; aior.aio_fildes = self.0; aior.aio_reqprio = 0; aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); - aior.aio_nbytes = ctx.buf_vec[seq].len() as usize; - aior.aio_buf = ctx.buf_vec[seq].as_mut_ptr() as *mut c_void; + aior.aio_nbytes = len; + aior.aio_buf = pbuf as *mut c_void; aior.aio_lio_opcode = libc::LIO_READ; aior.aio_offset = offset as off_t; libc::aio_read(aior); @@ -277,25 +276,52 @@ impl WriteExt for LogFile { } pub struct AioContext { + inner: Option>, + offset: u64, + index: usize, aio_vec: Vec, pub(crate) buf_vec: Vec>, } impl AioContext { pub fn new(block_sum: usize) -> Self { - let mut vec = vec![]; + let mut aio_vec = vec![]; + let mut buf_vec = vec![]; unsafe { for i in 0..block_sum { - vec.push(mem::zeroed::()); + aio_vec.push(mem::zeroed::()); } } Self { - aio_vec: vec, - buf_vec: vec![], + inner: None, + offset: 0, + index: 0, + aio_vec, + buf_vec, } } + + pub fn new_reader(&mut self, fd: Arc) -> IoResult<()> { + self.inner = Some(fd); + Ok(()) + } } impl AsyncContext for AioContext { + fn wait(&mut self) -> IoResult { + let mut total = 0; + for seq in 0..self.aio_vec.len() { + match self.single_wait(seq) { + Ok(len) => total += 1, + Err(e) => return Err(e), + } + } + Ok(total as usize) + } + + fn data(&self, seq: usize) -> Vec { + self.buf_vec[seq].to_vec() + } + fn single_wait(&mut self, seq: usize) -> IoResult { let buf_len = self.buf_vec[seq].len(); unsafe { @@ -312,19 +338,19 @@ impl AsyncContext for AioContext { } } - fn wait(&mut self) -> IoResult { - let mut total = 0; - for seq in 0..self.aio_vec.len() { - match self.single_wait(seq) { - Ok(len) => total += 1, - Err(e) => return Err(e), - } - } - Ok(total as usize) - } + fn submit_read_req(&mut self, buf: Vec, offset: u64) -> IoResult<()> { + let seq = self.index; + self.index += 1; + self.buf_vec.push(buf); - fn data(&self, seq: usize) -> Vec { - self.buf_vec[seq].to_vec() + self.inner.as_ref().unwrap().read_aio( + &mut self.aio_vec[seq], + self.buf_vec[seq].len(), + self.buf_vec[seq].as_mut_ptr(), + offset, + ); + + Ok(()) } } @@ -334,16 +360,14 @@ impl FileSystem for DefaultFileSystem { type Handle = LogFd; type Reader = LogFile; type Writer = LogFile; + type AsyncIoContext = AioContext; - fn read_aio( + fn new_async_reader( &self, handle: Arc, - seq: usize, - ctx: &mut AioContext, - offset: u64, + ctx: &mut Self::AsyncIoContext, ) -> IoResult<()> { - handle.read_aio(seq, ctx, offset); - Ok(()) + ctx.new_reader(handle) } fn create>(&self, path: P) -> IoResult { @@ -370,7 +394,7 @@ impl FileSystem for DefaultFileSystem { Ok(LogFile::new(handle)) } - fn new_async_context(&self, block_sum: usize) -> IoResult { + fn new_async_io_context(&self, block_sum: usize) -> IoResult { Ok(AioContext::new(block_sum)) } } diff --git a/src/env/mod.rs b/src/env/mod.rs index f60385af..838d7968 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -17,16 +17,13 @@ pub trait FileSystem: Send + Sync { type Handle: Send + Sync + Handle; type Reader: Seek + Read + Send; type Writer: Seek + Write + Send + WriteExt; + type AsyncIoContext: AsyncContext; - fn read_aio( + fn new_async_reader( &self, handle: Arc, - seq: usize, - ctx: &mut AioContext, - offset: u64, - ) -> Result<()> { - Ok(()) - } + ctx: &mut Self::AsyncIoContext, + ) -> Result<()>; fn create>(&self, path: P) -> Result; @@ -62,9 +59,7 @@ pub trait FileSystem: Send + Sync { fn new_writer(&self, handle: Arc) -> Result; - fn new_async_context(&self, block_sum: usize) -> Result { - Ok(AioContext::new(block_sum)) - } + fn new_async_io_context(&self, block_sum: usize) -> Result; } pub trait Handle { @@ -84,6 +79,8 @@ pub trait WriteExt { pub trait AsyncContext { fn wait(&mut self) -> Result; - fn single_wait(&mut self, seq: usize) -> Result; fn data(&self, seq: usize) -> Vec; + fn single_wait(&mut self, seq: usize) -> Result; + + fn submit_read_req(&mut self, buf: Vec, offset: u64) -> Result<()>; } diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 2b7c0c62..30b5eec3 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -91,6 +91,15 @@ impl FileSystem for ObfuscatedFileSystem { type Handle = ::Handle; type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; + type AsyncIoContext = AioContext; + + fn new_async_reader( + &self, + handle: Arc, + ctx: &mut Self::AsyncIoContext, + ) -> IoResult<()> { + ctx.new_reader(handle) + } fn create>(&self, path: P) -> IoResult { let r = self.inner.create(path); @@ -129,4 +138,8 @@ impl FileSystem for ObfuscatedFileSystem { fn new_writer(&self, handle: Arc) -> IoResult { Ok(ObfuscatedWriter(self.inner.new_writer(handle)?)) } + + fn new_async_io_context(&self, block_sum: usize) -> IoResult { + Ok(AioContext::new(block_sum)) + } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index c5c9db72..34a8c3fb 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -11,15 +11,17 @@ use fail::fail_point; use libc::aiocb; use log::error; use parking_lot::{Mutex, MutexGuard, RwLock}; +use protobuf::{parse_from_bytes, Message}; use crate::config::Config; -use crate::env::{AioContext, DefaultFileSystem, FileSystem}; +use crate::env::{AioContext, AsyncContext, DefaultFileSystem, FileSystem}; use crate::event_listener::EventListener; +use crate::memtable::EntryIndex; use crate::metrics::*; use crate::pipe_log::{ FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes, }; -use crate::{perf_context, Error, Result}; +use crate::{perf_context, Error, LogBatch, MessageExt, Result}; use super::format::{FileNameExt, LogFileFormat}; use super::log_file::{build_file_reader, build_file_writer, LogFileWriter}; @@ -350,22 +352,15 @@ impl SinglePipe { reader.read(handle) } - fn read_bytes_aio( + fn submit_read_req( &self, - seq: usize, - ctx: &mut AioContext, - handle: FileBlockHandle, - ) -> Result<()> { - unsafe { - let fd = self.get_fd(handle.id.seq)?; - let mut buf = vec![0 as u8; handle.len]; - ctx.buf_vec.push(buf); - self.file_system - .as_ref() - .read_aio(fd, seq, ctx, handle.offset) - .unwrap(); - return Ok(()); - } + handle: &mut FileBlockHandle, + ctx: &mut F::AsyncIoContext, + ) { + let fd = self.get_fd(handle.id.seq).unwrap(); + let mut buf = vec![0 as u8; handle.len]; + self.file_system.as_ref().new_async_reader(fd, ctx).unwrap(); + ctx.submit_read_req(buf, handle.offset).unwrap(); } fn append(&self, bytes: &mut T) -> Result { @@ -533,19 +528,135 @@ impl PipeLog for DualPipes { fn read_bytes(&self, handle: FileBlockHandle) -> Result> { self.pipes[handle.id.queue as usize].read_bytes(handle) } - #[inline] - fn read_bytes_aio( + fn async_entry_read>( &self, - seq: usize, - ctx: &mut AioContext, - handle: FileBlockHandle, + ents_idx: &mut Vec, + vec: &mut Vec, ) -> Result<()> { - self.pipes[handle.id.queue as usize] - .read_bytes_aio(seq, ctx, handle) + let mut handles: Vec = vec![]; + for (t, i) in ents_idx.iter().enumerate() { + if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) { + handles.push(i.entries.unwrap()); + } + } + + let mut ctx_append = self.pipes[LogQueue::Append as usize] + .file_system + .new_async_io_context(handles.len() as usize) .unwrap(); + let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] + .file_system + .new_async_io_context(handles.len() as usize) + .unwrap(); + + for handle in handles.iter_mut() { + match handle.id.queue { + LogQueue::Append => { + self.pipes[LogQueue::Append as usize].submit_read_req( + handle, + &mut ctx_append, + ); + } + LogQueue::Rewrite => { + self.pipes[LogQueue::Rewrite as usize].submit_read_req( + handle, + &mut ctx_rewrite, + ); + } + } + } + + let mut decode_buf = vec![]; + let mut seq_append: i32 = -1; + let mut seq_rewrite: i32 = -1; + + for (t, i) in ents_idx.iter().enumerate() { + decode_buf = + match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() { + true => match ents_idx[t].entries.unwrap().id.queue { + LogQueue::Append => { + seq_append += 1; + ctx_append.single_wait(seq_append as usize).unwrap(); + LogBatch::decode_entries_block( + &ctx_append.data(seq_append as usize), + i.entries.unwrap(), + i.compression_type, + ) + .unwrap() + } + LogQueue::Rewrite => { + seq_rewrite += 1; + ctx_rewrite.single_wait(seq_rewrite as usize).unwrap(); + LogBatch::decode_entries_block( + &ctx_rewrite.data(seq_rewrite as usize), + i.entries.unwrap(), + i.compression_type, + ) + .unwrap() + } + }, + false => decode_buf, + }; + + vec.push( + parse_from_bytes::( + &mut decode_buf + [(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize], + ) + .unwrap(), + ); + } Ok(()) } + #[inline] + fn async_read_bytes(&self, handles: &mut Vec) -> Result>> { + let mut res: Vec> = vec![]; + + let mut ctx_append = self.pipes[LogQueue::Append as usize] + .file_system + .new_async_io_context(handles.len() as usize) + .unwrap(); + let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] + .file_system + .new_async_io_context(handles.len() as usize) + .unwrap(); + + for (seq, handle) in handles.iter_mut().enumerate() { + match handle.id.queue { + LogQueue::Append => { + self.pipes[LogQueue::Append as usize].submit_read_req( + handle, + &mut ctx_append, + ); + } + LogQueue::Rewrite => { + self.pipes[LogQueue::Rewrite as usize].submit_read_req( + handle, + &mut ctx_rewrite, + ); + } + } + } + let mut seq_append = 0; + let mut seq_rewrite = 0; + for handle in handles.iter_mut(){ + match handle.id.queue { + LogQueue::Append => { + ctx_append.single_wait(seq_append); + res.push(ctx_append.data(seq_append)); + seq_append += 1; + } + LogQueue::Rewrite => { + ctx_rewrite.single_wait(seq_rewrite); + res.push(ctx_rewrite.data(seq_rewrite)); + seq_rewrite += 1; + } + } + } + + Ok(res) + } #[inline] fn append( diff --git a/src/pipe_log.rs b/src/pipe_log.rs index dc06554f..c85e667e 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -6,14 +6,16 @@ use std::cmp::Ordering; use std::fmt::{self, Display}; use crate::env::AioContext; +use crate::memtable::EntryIndex; use fail::fail_point; use libc::aiocb; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; +use protobuf::Message; use serde_repr::{Deserialize_repr, Serialize_repr}; use strum::EnumIter; -use crate::Result; +use crate::{MessageExt, Result}; /// The type of log queue. #[repr(u8)] @@ -174,12 +176,19 @@ pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; - fn read_bytes_aio( + /// Read entries from pipe logs using 'Async IO'. + fn async_entry_read>( &self, - seq: usize, - ctx: &mut AioContext, - handle: FileBlockHandle, + ents_idx: &mut Vec, + vec: &mut Vec, ) -> Result<()>; + + /// Reads bytes from multi blocks using 'Async IO'. + fn async_read_bytes( + &self, + handles: &mut Vec, + ) -> Result>>; + /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. fn append( From dfe2340376709f815a1e025e14cbaaf56224c77a Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Sun, 26 Feb 2023 02:40:55 +0800 Subject: [PATCH 06/15] Code structure optimization (rough) Signed-off-by:wxy 1019495690@qq.com Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 54 ++++++++-- src/env/default.rs | 31 ++++-- src/env/mod.rs | 8 +- src/env/obfuscated.rs | 48 ++++++++- src/file_pipe_log/pipe.rs | 218 +++++++++++++++++--------------------- src/pipe_log.rs | 12 +-- 6 files changed, 213 insertions(+), 158 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index c9d515c0..ef5f28f5 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -341,9 +341,8 @@ where start.elapsed().as_micros() ); - self.pipe_log - .async_entry_read::(&mut ents_idx, vec) - .unwrap(); + let bytes = self.pipe_log.async_read_bytes(&mut ents_idx).unwrap(); + parse_entries_from_bytes::(bytes, &mut ents_idx, vec); ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); println!( @@ -582,7 +581,36 @@ impl BlockCache { thread_local! { static BLOCK_CACHE: BlockCache = BlockCache::new(); } - +pub(crate) fn parse_entries_from_bytes( + bytes: Vec>, + ents_idx: &mut Vec, + vec: &mut Vec, +) { + let mut decode_buf = vec![]; + let mut seq: i32 = -1; + for (t, idx) in ents_idx.iter().enumerate() { + decode_buf = + match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() { + true => { + seq += 1; + bytes[seq as usize].to_vec() + } + false => decode_buf, + }; + vec.push( + parse_from_bytes( + &LogBatch::decode_entries_block( + &decode_buf, + idx.entries.unwrap(), + idx.compression_type, + ) + .unwrap() + [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], + ) + .unwrap(), + ); + } +} pub(crate) fn read_entry_from_file(pipe_log: &P, idx: &EntryIndex) -> Result where M: MessageExt, @@ -834,7 +862,7 @@ mod tests { #[test] fn test_async_read() { - let normal_batch_size = 4096; + let normal_batch_size = 10; let compressed_batch_size = 5120; for &entry_size in &[normal_batch_size] { if entry_size == normal_batch_size { @@ -2086,14 +2114,20 @@ mod tests { type Handle = ::Handle; type Reader = ::Reader; type Writer = ::Writer; - type AsyncIoContext = AioContext; + type AsyncIoContext = ::AsyncIoContext; - fn new_async_reader( + fn async_read( &self, - handle: Arc, ctx: &mut Self::AsyncIoContext, + handle: Arc, + buf: Vec, + block: &mut FileBlockHandle, ) -> std::io::Result<()> { - ctx.new_reader(handle) + todo!() + } + + fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> std::io::Result>> { + todo!() } fn create>(&self, path: P) -> std::io::Result { @@ -2156,7 +2190,7 @@ mod tests { } fn new_async_io_context(&self, block_sum: usize) -> std::io::Result { - todo!() + self.inner.new_async_io_context(block_sum) } } diff --git a/src/env/default.rs b/src/env/default.rs index c6eb0760..d7821c69 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -1,7 +1,7 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. use std::ffi::c_void; -use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; +use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write,Error, ErrorKind}; use std::os::unix::io::RawFd; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -19,7 +19,9 @@ use nix::unistd::{close, ftruncate, lseek, Whence}; use nix::NixPath; use crate::env::{AsyncContext, FileSystem, Handle, WriteExt}; -use crate::Error; +use crate::pipe_log::FileBlockHandle; + +const MAX_ASYNC_READ_TRY_TIME:usize = 10; fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { let kind = std::io::Error::from(e).kind(); @@ -300,9 +302,8 @@ impl AioContext { } } - pub fn new_reader(&mut self, fd: Arc) -> IoResult<()> { + pub fn set_fd(&mut self, fd: Arc) { self.inner = Some(fd); - Ok(()) } } @@ -324,8 +325,9 @@ impl AsyncContext for AioContext { fn single_wait(&mut self, seq: usize) -> IoResult { let buf_len = self.buf_vec[seq].len(); + unsafe { - loop { + for _ in 0..MAX_ASYNC_READ_TRY_TIME{ libc::aio_suspend( vec![&mut self.aio_vec[seq]].as_ptr() as *const *const aiocb, 1 as i32, @@ -336,6 +338,7 @@ impl AsyncContext for AioContext { } } } + Err(Error::new(ErrorKind::Other, "Async IO panic.")) } fn submit_read_req(&mut self, buf: Vec, offset: u64) -> IoResult<()> { @@ -362,12 +365,24 @@ impl FileSystem for DefaultFileSystem { type Writer = LogFile; type AsyncIoContext = AioContext; - fn new_async_reader( + fn async_read( &self, - handle: Arc, ctx: &mut Self::AsyncIoContext, + handle: Arc, + buf: Vec, + block: &mut FileBlockHandle, ) -> IoResult<()> { - ctx.new_reader(handle) + ctx.set_fd(handle); + ctx.submit_read_req(buf, block.offset) + } + + fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult>> { + let mut res = vec![]; + for seq in 0..ctx.index { + ctx.single_wait(seq); + res.push(ctx.data(seq).to_vec()); + } + Ok(res) } fn create>(&self, path: P) -> IoResult { diff --git a/src/env/mod.rs b/src/env/mod.rs index 838d7968..e7ffca76 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -12,6 +12,7 @@ pub use default::AioContext; pub use default::DefaultFileSystem; pub use obfuscated::ObfuscatedFileSystem; +use crate::pipe_log::FileBlockHandle; /// FileSystem pub trait FileSystem: Send + Sync { type Handle: Send + Sync + Handle; @@ -19,11 +20,14 @@ pub trait FileSystem: Send + Sync { type Writer: Seek + Write + Send + WriteExt; type AsyncIoContext: AsyncContext; - fn new_async_reader( + fn async_read( &self, - handle: Arc, ctx: &mut Self::AsyncIoContext, + handle: Arc, + buf: Vec, + block: &mut FileBlockHandle, ) -> Result<()>; + fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> Result>>; fn create>(&self, path: P) -> Result; diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 30b5eec3..ebc3bcfe 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -9,6 +9,8 @@ use std::sync::{Arc, Mutex}; use crate::env::default::AioContext; use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; +use super::AsyncContext; +use crate::pipe_log::FileBlockHandle; pub struct ObfuscatedReader(::Reader); impl Read for ObfuscatedReader { @@ -86,19 +88,53 @@ impl ObfuscatedFileSystem { self.files.load(Ordering::Relaxed) } } +pub struct ObfuscatedContext(::AsyncIoContext); +impl AsyncContext for ObfuscatedContext { + fn wait(&mut self) -> IoResult { + self.0.wait() + } + + fn data(&self, seq: usize) -> Vec { + self.0.data(seq) + } + + fn single_wait(&mut self, seq: usize) -> IoResult { + self.0.single_wait(seq) + } + + fn submit_read_req(&mut self, buf: Vec, offset: u64) -> IoResult<()> { + self.0.submit_read_req(buf, offset) + } +} impl FileSystem for ObfuscatedFileSystem { type Handle = ::Handle; type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; - type AsyncIoContext = AioContext; + type AsyncIoContext = ObfuscatedContext; - fn new_async_reader( + fn async_read( &self, - handle: Arc, ctx: &mut Self::AsyncIoContext, + handle: Arc, + buf: Vec, + block: &mut FileBlockHandle, ) -> IoResult<()> { - ctx.new_reader(handle) + self.inner.async_read(&mut ctx.0, handle, buf, block) + } + + fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult>> { + let base = self.inner.async_finish(&mut ctx.0).unwrap(); + let mut res = vec![]; + for v in base { + let mut temp = vec![]; + //do obfuscation. + for c in v { + temp.push(c.wrapping_sub(1)); + } + res.push(temp); + } + Ok(res) } fn create>(&self, path: P) -> IoResult { @@ -140,6 +176,8 @@ impl FileSystem for ObfuscatedFileSystem { } fn new_async_io_context(&self, block_sum: usize) -> IoResult { - Ok(AioContext::new(block_sum)) + Ok(ObfuscatedContext( + self.inner.new_async_io_context(block_sum)?, + )) } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 34a8c3fb..7cef2fa6 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -352,15 +352,11 @@ impl SinglePipe { reader.read(handle) } - fn submit_read_req( - &self, - handle: &mut FileBlockHandle, - ctx: &mut F::AsyncIoContext, - ) { - let fd = self.get_fd(handle.id.seq).unwrap(); - let mut buf = vec![0 as u8; handle.len]; - self.file_system.as_ref().new_async_reader(fd, ctx).unwrap(); - ctx.submit_read_req(buf, handle.offset).unwrap(); + fn async_read(&self, block: &mut FileBlockHandle, ctx: &mut F::AsyncIoContext) { + let fd = self.get_fd(block.id.seq).unwrap(); + let buf = vec![0 as u8; block.len]; + + self.file_system.async_read(ctx, fd, buf, block).unwrap(); } fn append(&self, bytes: &mut T) -> Result { @@ -528,133 +524,111 @@ impl PipeLog for DualPipes { fn read_bytes(&self, handle: FileBlockHandle) -> Result> { self.pipes[handle.id.queue as usize].read_bytes(handle) } + // #[inline] + // fn async_entry_read>( + // &self, + // ents_idx: &mut Vec, + // vec: &mut Vec, + // ) -> Result<()> { + // let mut handles: Vec = vec![]; + // for (t, i) in ents_idx.iter().enumerate() { + // if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) + // { handles.push(i.entries.unwrap()); + // } + // } + + // let mut ctx_append = self.pipes[LogQueue::Append as usize] + // .file_system + // .new_async_io_context(handles.len() as usize) + // .unwrap(); + // let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] + // .file_system + // .new_async_io_context(handles.len() as usize) + // .unwrap(); + + // for handle in handles.iter_mut() { + // match handle.id.queue { + // LogQueue::Append => { + // self.pipes[LogQueue::Append as usize].submit_read_req( + // handle, + // &mut ctx_append, + // ); + // } + // LogQueue::Rewrite => { + // self.pipes[LogQueue::Rewrite as usize].submit_read_req( + // handle, + // &mut ctx_rewrite, + // ); + // } + // } + // } + + // let mut decode_buf = vec![]; + // let mut seq_append: i32 = -1; + // let mut seq_rewrite: i32 = -1; + + // for (t, i) in ents_idx.iter().enumerate() { + // decode_buf = + // match t == 0 || ents_idx[t - 1].entries.unwrap() != + // ents_idx[t].entries.unwrap() { true => match + // ents_idx[t].entries.unwrap().id.queue { + // LogQueue::Append => { seq_append += 1; + // ctx_append.single_wait(seq_append as usize).unwrap(); + // LogBatch::decode_entries_block( + // &ctx_append.data(seq_append as usize), + // i.entries.unwrap(), + // i.compression_type, + // ) + // .unwrap() + // } + // LogQueue::Rewrite => { + // seq_rewrite += 1; + // ctx_rewrite.single_wait(seq_rewrite as + // usize).unwrap(); LogBatch::decode_entries_block( + // &ctx_rewrite.data(seq_rewrite as usize), + // i.entries.unwrap(), + // i.compression_type, + // ) + // .unwrap() + // } + // }, + // false => decode_buf, + // }; + + // vec.push( + // parse_from_bytes::( + // &mut decode_buf + // [(i.entry_offset) as usize..(i.entry_offset + + // i.entry_len) as usize], ) + // .unwrap(), + // ); + // } + // Ok(()) + // } #[inline] - fn async_entry_read>( - &self, - ents_idx: &mut Vec, - vec: &mut Vec, - ) -> Result<()> { - let mut handles: Vec = vec![]; + fn async_read_bytes(&self, ents_idx: &mut Vec) -> Result>> { + let mut blocks: Vec = vec![]; for (t, i) in ents_idx.iter().enumerate() { if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) { - handles.push(i.entries.unwrap()); - } - } - - let mut ctx_append = self.pipes[LogQueue::Append as usize] - .file_system - .new_async_io_context(handles.len() as usize) - .unwrap(); - let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] - .file_system - .new_async_io_context(handles.len() as usize) - .unwrap(); - - for handle in handles.iter_mut() { - match handle.id.queue { - LogQueue::Append => { - self.pipes[LogQueue::Append as usize].submit_read_req( - handle, - &mut ctx_append, - ); - } - LogQueue::Rewrite => { - self.pipes[LogQueue::Rewrite as usize].submit_read_req( - handle, - &mut ctx_rewrite, - ); - } + blocks.push(i.entries.unwrap()); } } - - let mut decode_buf = vec![]; - let mut seq_append: i32 = -1; - let mut seq_rewrite: i32 = -1; - - for (t, i) in ents_idx.iter().enumerate() { - decode_buf = - match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() { - true => match ents_idx[t].entries.unwrap().id.queue { - LogQueue::Append => { - seq_append += 1; - ctx_append.single_wait(seq_append as usize).unwrap(); - LogBatch::decode_entries_block( - &ctx_append.data(seq_append as usize), - i.entries.unwrap(), - i.compression_type, - ) - .unwrap() - } - LogQueue::Rewrite => { - seq_rewrite += 1; - ctx_rewrite.single_wait(seq_rewrite as usize).unwrap(); - LogBatch::decode_entries_block( - &ctx_rewrite.data(seq_rewrite as usize), - i.entries.unwrap(), - i.compression_type, - ) - .unwrap() - } - }, - false => decode_buf, - }; - - vec.push( - parse_from_bytes::( - &mut decode_buf - [(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize], - ) - .unwrap(), - ); - } - Ok(()) - } - #[inline] - fn async_read_bytes(&self, handles: &mut Vec) -> Result>> { let mut res: Vec> = vec![]; - let mut ctx_append = self.pipes[LogQueue::Append as usize] - .file_system - .new_async_io_context(handles.len() as usize) - .unwrap(); - let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] - .file_system - .new_async_io_context(handles.len() as usize) - .unwrap(); + let fs = &self.pipes[LogQueue::Append as usize].file_system; + let mut ctx = fs.new_async_io_context(blocks.len() as usize).unwrap(); - for (seq, handle) in handles.iter_mut().enumerate() { - match handle.id.queue { - LogQueue::Append => { - self.pipes[LogQueue::Append as usize].submit_read_req( - handle, - &mut ctx_append, - ); - } - LogQueue::Rewrite => { - self.pipes[LogQueue::Rewrite as usize].submit_read_req( - handle, - &mut ctx_rewrite, - ); - } - } - } - let mut seq_append = 0; - let mut seq_rewrite = 0; - for handle in handles.iter_mut(){ - match handle.id.queue { + for (seq, block) in blocks.iter_mut().enumerate() { + match block.id.queue { LogQueue::Append => { - ctx_append.single_wait(seq_append); - res.push(ctx_append.data(seq_append)); - seq_append += 1; + self.pipes[LogQueue::Append as usize].async_read(block, &mut ctx); } LogQueue::Rewrite => { - ctx_rewrite.single_wait(seq_rewrite); - res.push(ctx_rewrite.data(seq_rewrite)); - seq_rewrite += 1; + self.pipes[LogQueue::Rewrite as usize].async_read(block, &mut ctx); } } } - + let res = fs.async_finish(&mut ctx).unwrap(); Ok(res) } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index c85e667e..7ef3cfce 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -176,18 +176,8 @@ pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; - /// Read entries from pipe logs using 'Async IO'. - fn async_entry_read>( - &self, - ents_idx: &mut Vec, - vec: &mut Vec, - ) -> Result<()>; - /// Reads bytes from multi blocks using 'Async IO'. - fn async_read_bytes( - &self, - handles: &mut Vec, - ) -> Result>>; + fn async_read_bytes(&self, ents_idx: &mut Vec) -> Result>>; /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. From aadefae47718093987c94c70ba9f81626c6a73bb Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Sun, 26 Feb 2023 02:51:19 +0800 Subject: [PATCH 07/15] fix unit test. Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 64 +++++++-------------------------------------------- 1 file changed, 8 insertions(+), 56 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index ef5f28f5..3374c12b 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -336,19 +336,13 @@ where memtable .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; - println!( - "[fetch_entries_to_aio] (stage1) time cost: {:?} us", - start.elapsed().as_micros() - ); + let bytes = self.pipe_log.async_read_bytes(&mut ents_idx).unwrap(); parse_entries_from_bytes::(bytes, &mut ents_idx, vec); ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); - println!( - "[fetch_entries_to_aio] (end) time cost: {:?} us", - start.elapsed().as_micros() - ); + return Ok(ents_idx.len()); } @@ -846,66 +840,24 @@ mod tests { assert_eq!(d, &data); }); } - - // Recover the engine. - let engine = engine.reopen(); for i in 10..20 { let rid = i; let index = i; - engine.scan_entries(rid, index, index + 2, |_, q, d| { + engine.scan_entries_aio(rid, index, index + 2, |_, q, d| { assert_eq!(q, LogQueue::Append); assert_eq!(d, &data); }); } - } - } - #[test] - fn test_async_read() { - let normal_batch_size = 10; - let compressed_batch_size = 5120; - for &entry_size in &[normal_batch_size] { - if entry_size == normal_batch_size { - println!("[normal_batch_size]"); - } else if entry_size == compressed_batch_size { - println!("[compressed_batch_size]"); - } - let dir = tempfile::Builder::new() - .prefix("test_get_entry") - .tempdir() - .unwrap(); - let cfg = Config { - dir: dir.path().to_str().unwrap().to_owned(), - target_file_size: ReadableSize::gb(1), - ..Default::default() - }; - - let engine = - RaftLogEngine::open_with_file_system(cfg.clone(), Arc::new(DefaultFileSystem)) - .unwrap(); - - assert_eq!(engine.path(), dir.path().to_str().unwrap()); - let data = vec![b'x'; entry_size]; - for i in 10..1010 { - for rid in 10..15 { - let index = i; - engine.append(rid, index, index + 1, Some(&data)); - } - } - for i in 10..15 { + // Recover the engine. + let engine = engine.reopen(); + for i in 10..20 { let rid = i; - let index = 10; - println!("[PREAD]"); - engine.scan_entries(rid, index, index + 1000, |_, q, d| { - assert_eq!(q, LogQueue::Append); - assert_eq!(d, &data); - }); - println!("[AIO]"); - engine.scan_entries_aio(rid, index, index + 1000, |_, q, d| { + let index = i; + engine.scan_entries(rid, index, index + 2, |_, q, d| { assert_eq!(q, LogQueue::Append); assert_eq!(d, &data); }); - println!("===================================================================================="); } } } From 1a41d8c58fca3253be6ea08a75bfa396a64a8447 Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Mon, 27 Feb 2023 15:20:37 +0800 Subject: [PATCH 08/15] fix fmt. Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 12 +++--------- src/env/default.rs | 22 +++++++++++----------- src/env/mod.rs | 2 +- src/env/obfuscated.rs | 2 +- src/file_pipe_log/log_file.rs | 2 +- src/file_pipe_log/pipe.rs | 4 ++-- 6 files changed, 19 insertions(+), 25 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 429bdd39..0a611e41 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -300,7 +300,6 @@ where max_size: Option, vec: &mut Vec, ) -> Result { - let start = Instant::now(); let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { let mut ents_idx: Vec = Vec::with_capacity((end - begin) as usize); @@ -311,10 +310,7 @@ where vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); - println!( - "[fetch_entries_to] time cost: {:?} us", - start.elapsed().as_micros() - ); + return Ok(ents_idx.len()); } Ok(0) @@ -328,7 +324,6 @@ where max_size: Option, vec: &mut Vec, ) -> Result { - let start = Instant::now(); let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { let length = (end - begin) as usize; @@ -336,13 +331,12 @@ where memtable .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; - let bytes = self.pipe_log.async_read_bytes(&mut ents_idx).unwrap(); parse_entries_from_bytes::(bytes, &mut ents_idx, vec); ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); - + return Ok(ents_idx.len()); } @@ -582,7 +576,7 @@ thread_local! { } pub(crate) fn parse_entries_from_bytes( bytes: Vec>, - ents_idx: &mut Vec, + ents_idx: &mut [EntryIndex], vec: &mut Vec, ) { let mut decode_buf = vec![]; diff --git a/src/env/default.rs b/src/env/default.rs index 1da6a768..66d4a315 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -1,7 +1,7 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. use std::ffi::c_void; -use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write,Error, ErrorKind}; +use std::io::{Error, ErrorKind, Read, Result as IoResult, Seek, SeekFrom, Write}; use std::os::unix::io::RawFd; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -21,7 +21,7 @@ use nix::NixPath; use crate::env::{AsyncContext, FileSystem, Handle, WriteExt}; use crate::pipe_log::FileBlockHandle; -const MAX_ASYNC_READ_TRY_TIME:usize = 10; +const MAX_ASYNC_READ_TRY_TIME: usize = 10; fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { let kind = std::io::Error::from(e).kind(); @@ -279,7 +279,6 @@ impl WriteExt for LogFile { pub struct AioContext { inner: Option>, - offset: u64, index: usize, aio_vec: Vec, pub(crate) buf_vec: Vec>, @@ -295,7 +294,6 @@ impl AioContext { } Self { inner: None, - offset: 0, index: 0, aio_vec, buf_vec, @@ -319,18 +317,18 @@ impl AsyncContext for AioContext { Ok(total as usize) } - fn data(&self, seq: usize) -> Vec { - self.buf_vec[seq].to_vec() + fn data(&self, seq: usize) -> &[u8] { + &self.buf_vec[seq] } fn single_wait(&mut self, seq: usize) -> IoResult { let buf_len = self.buf_vec[seq].len(); - + unsafe { - for _ in 0..MAX_ASYNC_READ_TRY_TIME{ + for _ in 0..MAX_ASYNC_READ_TRY_TIME { libc::aio_suspend( vec![&mut self.aio_vec[seq]].as_ptr() as *const *const aiocb, - 1 as i32, + 1_i32, ptr::null::(), ); if buf_len == aio_return(&mut self.aio_vec[seq]) as usize { @@ -379,8 +377,10 @@ impl FileSystem for DefaultFileSystem { fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult>> { let mut res = vec![]; for seq in 0..ctx.index { - ctx.single_wait(seq); - res.push(ctx.data(seq).to_vec()); + match ctx.single_wait(seq) { + Ok(_) => res.push(ctx.data(seq).to_vec()), + Err(e) => return Err(e), + } } Ok(res) } diff --git a/src/env/mod.rs b/src/env/mod.rs index 44315ce0..45b2ab36 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -89,7 +89,7 @@ pub trait WriteExt { pub trait AsyncContext { fn wait(&mut self) -> Result; - fn data(&self, seq: usize) -> Vec; + fn data(&self, seq: usize) -> &[u8]; fn single_wait(&mut self, seq: usize) -> Result; fn submit_read_req(&mut self, buf: Vec, offset: u64) -> Result<()>; diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index ebc3bcfe..5336606b 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -94,7 +94,7 @@ impl AsyncContext for ObfuscatedContext { self.0.wait() } - fn data(&self, seq: usize) -> Vec { + fn data(&self, seq: usize) -> &[u8] { self.0.data(seq) } diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index a2a6f53d..638ef2ec 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -172,7 +172,7 @@ impl LogFileReader { } pub fn read(&mut self, handle: FileBlockHandle) -> Result> { - let mut buf = vec![0; handle.len as usize]; + let mut buf = vec![0; handle.len]; let size = self.read_to(handle.offset, &mut buf)?; buf.truncate(size); Ok(buf) diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index c455f4f4..f80d626a 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -259,7 +259,7 @@ impl SinglePipe { fn async_read(&self, block: &mut FileBlockHandle, ctx: &mut F::AsyncIoContext) { let fd = self.get_fd(block.id.seq).unwrap(); - let buf = vec![0 as u8; block.len]; + let buf = vec![0_u8; block.len]; self.file_system.async_read(ctx, fd, buf, block).unwrap(); } @@ -545,7 +545,7 @@ impl PipeLog for DualPipes { let mut res: Vec> = vec![]; let fs = &self.pipes[LogQueue::Append as usize].file_system; - let mut ctx = fs.new_async_io_context(blocks.len() as usize).unwrap(); + let mut ctx = fs.new_async_io_context(blocks.len()).unwrap(); for (seq, block) in blocks.iter_mut().enumerate() { match block.id.queue { From f3a89c0979de80590b5b1032c0acec6cecc612c9 Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Mon, 27 Feb 2023 18:17:06 +0800 Subject: [PATCH 09/15] code clean up. Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 11 ++--- src/env/default.rs | 8 ++-- src/env/mod.rs | 3 +- src/env/obfuscated.rs | 4 +- src/file_pipe_log/pipe.rs | 92 ++------------------------------------- src/pipe_log.rs | 5 +-- 6 files changed, 15 insertions(+), 108 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 0a611e41..8905b590 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,9 +1,7 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use libc::{aio_return, aiocb}; use std::cell::{Cell, RefCell}; use std::marker::PhantomData; -use std::mem; use std::path::Path; use std::sync::{mpsc, Arc, Mutex}; use std::thread::{Builder as ThreadBuilder, JoinHandle}; @@ -14,7 +12,7 @@ use protobuf::{parse_from_bytes, Message}; use crate::config::{Config, RecoveryMode}; use crate::consistency::ConsistencyChecker; -use crate::env::{AioContext, AsyncContext, DefaultFileSystem, FileSystem}; +use crate::env::{DefaultFileSystem, FileSystem}; use crate::event_listener::EventListener; use crate::file_pipe_log::debug::LogItemReader; use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder}; @@ -653,14 +651,13 @@ where #[cfg(test)] mod tests { use super::*; - use crate::env::{AioContext, ObfuscatedFileSystem}; + use crate::env::ObfuscatedFileSystem; use crate::file_pipe_log::{parse_recycled_file_name, FileNameExt}; use crate::log_batch::AtomicGroupBuilder; use crate::pipe_log::Version; use crate::test_util::{generate_entries, PanicGuard}; use crate::util::ReadableSize; use kvproto::raft_serverpb::RaftLocalState; - use libc::aiocb; use raft::eraftpb::Entry; use std::collections::{BTreeSet, HashSet}; use std::fs::OpenOptions; @@ -2106,11 +2103,11 @@ mod tests { buf: Vec, block: &mut FileBlockHandle, ) -> std::io::Result<()> { - todo!() + self.inner.async_read(ctx, handle, buf, block) } fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> std::io::Result>> { - todo!() + self.inner.async_finish(ctx) } fn create>(&self, path: P) -> std::io::Result { diff --git a/src/env/default.rs b/src/env/default.rs index 66d4a315..7b182f66 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -4,7 +4,7 @@ use std::ffi::c_void; use std::io::{Error, ErrorKind, Read, Result as IoResult, Seek, SeekFrom, Write}; use std::os::unix::io::RawFd; use std::path::Path; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::{mem, ptr}; use fail::fail_point; @@ -286,9 +286,9 @@ pub struct AioContext { impl AioContext { pub fn new(block_sum: usize) -> Self { let mut aio_vec = vec![]; - let mut buf_vec = vec![]; + let buf_vec = vec![]; unsafe { - for i in 0..block_sum { + for _ in 0..block_sum { aio_vec.push(mem::zeroed::()); } } @@ -310,7 +310,7 @@ impl AsyncContext for AioContext { let mut total = 0; for seq in 0..self.aio_vec.len() { match self.single_wait(seq) { - Ok(len) => total += 1, + Ok(_) => total += 1, Err(e) => return Err(e), } } diff --git a/src/env/mod.rs b/src/env/mod.rs index 45b2ab36..b2490554 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -1,9 +1,8 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use libc::aiocb; use std::io::{Read, Result, Seek, Write}; use std::path::Path; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; mod default; mod obfuscated; diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 5336606b..9893f220 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -1,12 +1,10 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use libc::aiocb; use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; -use crate::env::default::AioContext; use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; use super::AsyncContext; diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index f80d626a..c15d87d6 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -4,24 +4,21 @@ use std::collections::VecDeque; use std::fs::File as StdFile; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::sync::Mutex as SyncMutex; use crossbeam::utils::CachePadded; use fail::fail_point; -use libc::aiocb; use log::error; use parking_lot::{Mutex, MutexGuard, RwLock}; -use protobuf::{parse_from_bytes, Message}; use crate::config::Config; -use crate::env::{AioContext, AsyncContext, DefaultFileSystem, FileSystem}; +use crate::env::FileSystem; use crate::event_listener::EventListener; use crate::memtable::EntryIndex; use crate::metrics::*; use crate::pipe_log::{ FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes, }; -use crate::{perf_context, Error, LogBatch, MessageExt, Result}; +use crate::{perf_context, Error, Result}; use super::format::{build_recycled_file_name, FileNameExt, LogFileFormat}; use super::log_file::build_file_reader; @@ -453,87 +450,7 @@ impl PipeLog for DualPipes { fn read_bytes(&self, handle: FileBlockHandle) -> Result> { self.pipes[handle.id.queue as usize].read_bytes(handle) } - // #[inline] - // fn async_entry_read>( - // &self, - // ents_idx: &mut Vec, - // vec: &mut Vec, - // ) -> Result<()> { - // let mut handles: Vec = vec![]; - // for (t, i) in ents_idx.iter().enumerate() { - // if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) - // { handles.push(i.entries.unwrap()); - // } - // } - - // let mut ctx_append = self.pipes[LogQueue::Append as usize] - // .file_system - // .new_async_io_context(handles.len() as usize) - // .unwrap(); - // let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] - // .file_system - // .new_async_io_context(handles.len() as usize) - // .unwrap(); - - // for handle in handles.iter_mut() { - // match handle.id.queue { - // LogQueue::Append => { - // self.pipes[LogQueue::Append as usize].submit_read_req( - // handle, - // &mut ctx_append, - // ); - // } - // LogQueue::Rewrite => { - // self.pipes[LogQueue::Rewrite as usize].submit_read_req( - // handle, - // &mut ctx_rewrite, - // ); - // } - // } - // } - - // let mut decode_buf = vec![]; - // let mut seq_append: i32 = -1; - // let mut seq_rewrite: i32 = -1; - - // for (t, i) in ents_idx.iter().enumerate() { - // decode_buf = - // match t == 0 || ents_idx[t - 1].entries.unwrap() != - // ents_idx[t].entries.unwrap() { true => match - // ents_idx[t].entries.unwrap().id.queue { - // LogQueue::Append => { seq_append += 1; - // ctx_append.single_wait(seq_append as usize).unwrap(); - // LogBatch::decode_entries_block( - // &ctx_append.data(seq_append as usize), - // i.entries.unwrap(), - // i.compression_type, - // ) - // .unwrap() - // } - // LogQueue::Rewrite => { - // seq_rewrite += 1; - // ctx_rewrite.single_wait(seq_rewrite as - // usize).unwrap(); LogBatch::decode_entries_block( - // &ctx_rewrite.data(seq_rewrite as usize), - // i.entries.unwrap(), - // i.compression_type, - // ) - // .unwrap() - // } - // }, - // false => decode_buf, - // }; - - // vec.push( - // parse_from_bytes::( - // &mut decode_buf - // [(i.entry_offset) as usize..(i.entry_offset + - // i.entry_len) as usize], ) - // .unwrap(), - // ); - // } - // Ok(()) - // } + #[inline] fn async_read_bytes(&self, ents_idx: &mut Vec) -> Result>> { let mut blocks: Vec = vec![]; @@ -542,12 +459,11 @@ impl PipeLog for DualPipes { blocks.push(i.entries.unwrap()); } } - let mut res: Vec> = vec![]; let fs = &self.pipes[LogQueue::Append as usize].file_system; let mut ctx = fs.new_async_io_context(blocks.len()).unwrap(); - for (seq, block) in blocks.iter_mut().enumerate() { + for block in blocks.iter_mut() { match block.id.queue { LogQueue::Append => { self.pipes[LogQueue::Append as usize].async_read(block, &mut ctx); diff --git a/src/pipe_log.rs b/src/pipe_log.rs index ffa36f54..2f33ae95 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -5,17 +5,14 @@ use std::cmp::Ordering; use std::fmt::{self, Display}; -use crate::env::AioContext; use crate::memtable::EntryIndex; use fail::fail_point; -use libc::aiocb; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; -use protobuf::Message; use serde_repr::{Deserialize_repr, Serialize_repr}; use strum::EnumIter; -use crate::{MessageExt, Result}; +use crate::Result; /// The type of log queue. #[repr(u8)] From 10105352ce48f30ca22c8a29f1e25e0b5c5b5325 Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Thu, 2 Mar 2023 05:08:39 +0800 Subject: [PATCH 10/15] modify code. Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 17 +++-- src/env/default.rs | 127 ++++++++++++-------------------------- src/env/mod.rs | 15 +---- src/env/obfuscated.rs | 34 ++-------- src/file_pipe_log/pipe.rs | 32 +++------- src/pipe_log.rs | 2 +- 6 files changed, 67 insertions(+), 160 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 8905b590..8a63a833 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -330,7 +330,13 @@ where .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; - let bytes = self.pipe_log.async_read_bytes(&mut ents_idx).unwrap(); + let mut blocks: Vec = Vec::new(); + for (t, i) in ents_idx.iter().enumerate() { + if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) { + blocks.push(i.entries.unwrap()); + } + } + let bytes = self.pipe_log.async_read_bytes(blocks).unwrap(); parse_entries_from_bytes::(bytes, &mut ents_idx, vec); ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); @@ -2100,10 +2106,9 @@ mod tests { &self, ctx: &mut Self::AsyncIoContext, handle: Arc, - buf: Vec, - block: &mut FileBlockHandle, + block: &FileBlockHandle, ) -> std::io::Result<()> { - self.inner.async_read(ctx, handle, buf, block) + self.inner.async_read(ctx, handle, block) } fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> std::io::Result>> { @@ -2172,8 +2177,8 @@ mod tests { self.inner.new_writer(h) } - fn new_async_io_context(&self, block_sum: usize) -> std::io::Result { - self.inner.new_async_io_context(block_sum) + fn new_async_io_context(&self) -> std::io::Result { + self.inner.new_async_io_context() } } diff --git a/src/env/default.rs b/src/env/default.rs index 7b182f66..6a6d2f56 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -1,28 +1,28 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. use std::ffi::c_void; -use std::io::{Error, ErrorKind, Read, Result as IoResult, Seek, SeekFrom, Write}; +use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::os::unix::io::RawFd; use std::path::Path; +use std::pin::Pin; +use std::slice; use std::sync::Arc; -use std::{mem, ptr}; use fail::fail_point; -use libc::{aio_return, aiocb, off_t}; +use libc::{aiocb, off_t}; use log::error; use nix::errno::Errno; use nix::fcntl::{self, OFlag}; +use nix::sys::aio::{aio_suspend, Aio, AioRead}; use nix::sys::signal::{SigEvent, SigevNotify}; use nix::sys::stat::Mode; use nix::sys::uio::{pread, pwrite}; use nix::unistd::{close, ftruncate, lseek, Whence}; use nix::NixPath; -use crate::env::{AsyncContext, FileSystem, Handle, WriteExt}; +use crate::env::{FileSystem, Handle, WriteExt}; use crate::pipe_log::FileBlockHandle; -const MAX_ASYNC_READ_TRY_TIME: usize = 10; - fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { let kind = std::io::Error::from(e).kind(); std::io::Error::new(kind, custom) @@ -278,80 +278,16 @@ impl WriteExt for LogFile { } pub struct AioContext { - inner: Option>, - index: usize, - aio_vec: Vec, - pub(crate) buf_vec: Vec>, + aio_vec: Vec>>>, + buf_vec: Vec>, } + impl AioContext { - pub fn new(block_sum: usize) -> Self { - let mut aio_vec = vec![]; - let buf_vec = vec![]; - unsafe { - for _ in 0..block_sum { - aio_vec.push(mem::zeroed::()); - } - } + pub fn new() -> Self { Self { - inner: None, - index: 0, - aio_vec, - buf_vec, - } - } - - pub fn set_fd(&mut self, fd: Arc) { - self.inner = Some(fd); - } -} - -impl AsyncContext for AioContext { - fn wait(&mut self) -> IoResult { - let mut total = 0; - for seq in 0..self.aio_vec.len() { - match self.single_wait(seq) { - Ok(_) => total += 1, - Err(e) => return Err(e), - } - } - Ok(total as usize) - } - - fn data(&self, seq: usize) -> &[u8] { - &self.buf_vec[seq] - } - - fn single_wait(&mut self, seq: usize) -> IoResult { - let buf_len = self.buf_vec[seq].len(); - - unsafe { - for _ in 0..MAX_ASYNC_READ_TRY_TIME { - libc::aio_suspend( - vec![&mut self.aio_vec[seq]].as_ptr() as *const *const aiocb, - 1_i32, - ptr::null::(), - ); - if buf_len == aio_return(&mut self.aio_vec[seq]) as usize { - return Ok(buf_len); - } - } + aio_vec: Vec::new(), + buf_vec: Vec::new(), } - Err(Error::new(ErrorKind::Other, "Async IO panic.")) - } - - fn submit_read_req(&mut self, buf: Vec, offset: u64) -> IoResult<()> { - let seq = self.index; - self.index += 1; - self.buf_vec.push(buf); - - self.inner.as_ref().unwrap().read_aio( - &mut self.aio_vec[seq], - self.buf_vec[seq].len(), - self.buf_vec[seq].as_mut_ptr(), - offset, - ); - - Ok(()) } } @@ -367,21 +303,34 @@ impl FileSystem for DefaultFileSystem { &self, ctx: &mut Self::AsyncIoContext, handle: Arc, - buf: Vec, - block: &mut FileBlockHandle, + block: &FileBlockHandle, ) -> IoResult<()> { - ctx.set_fd(handle); - ctx.submit_read_req(buf, block.offset) - } + let buf = vec![0_u8; block.len]; + ctx.buf_vec.push(buf); + + let mut aior = Box::pin(AioRead::new( + handle.0, + block.offset as i64, + unsafe { + slice::from_raw_parts_mut(ctx.buf_vec.last_mut().unwrap().as_mut_ptr(), block.len) + }, + 0, + SigevNotify::SigevNone, + )); + aior.as_mut() + .submit() + .expect("aio read request submit failed"); + ctx.aio_vec.push(aior); + Ok(()) + } fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult>> { - let mut res = vec![]; - for seq in 0..ctx.index { - match ctx.single_wait(seq) { - Ok(_) => res.push(ctx.data(seq).to_vec()), - Err(e) => return Err(e), - } + for seq in 0..ctx.aio_vec.len() { + let buf_len = ctx.buf_vec[seq].len(); + aio_suspend(&[&*ctx.aio_vec[seq]], None).expect("aio_suspend failed"); + assert_eq!(ctx.aio_vec[seq].as_mut().aio_return().unwrap(), buf_len); } + let res = ctx.buf_vec.to_owned(); Ok(res) } @@ -410,7 +359,7 @@ impl FileSystem for DefaultFileSystem { Ok(LogFile::new(handle)) } - fn new_async_io_context(&self, block_sum: usize) -> IoResult { - Ok(AioContext::new(block_sum)) + fn new_async_io_context(&self) -> IoResult { + Ok(AioContext::new()) } } diff --git a/src/env/mod.rs b/src/env/mod.rs index b2490554..e40ed430 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -17,14 +17,13 @@ pub trait FileSystem: Send + Sync { type Handle: Send + Sync + Handle; type Reader: Seek + Read + Send; type Writer: Seek + Write + Send + WriteExt; - type AsyncIoContext: AsyncContext; + type AsyncIoContext; fn async_read( &self, ctx: &mut Self::AsyncIoContext, handle: Arc, - buf: Vec, - block: &mut FileBlockHandle, + block: &FileBlockHandle, ) -> Result<()>; fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> Result>>; @@ -68,7 +67,7 @@ pub trait FileSystem: Send + Sync { fn new_writer(&self, handle: Arc) -> Result; - fn new_async_io_context(&self, block_sum: usize) -> Result; + fn new_async_io_context(&self) -> Result; } pub trait Handle { @@ -85,11 +84,3 @@ pub trait WriteExt { fn truncate(&mut self, offset: usize) -> Result<()>; fn allocate(&mut self, offset: usize, size: usize) -> Result<()>; } - -pub trait AsyncContext { - fn wait(&mut self) -> Result; - fn data(&self, seq: usize) -> &[u8]; - fn single_wait(&mut self, seq: usize) -> Result; - - fn submit_read_req(&mut self, buf: Vec, offset: u64) -> Result<()>; -} diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 9893f220..576f5040 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; -use super::AsyncContext; use crate::pipe_log::FileBlockHandle; pub struct ObfuscatedReader(::Reader); @@ -86,43 +85,24 @@ impl ObfuscatedFileSystem { self.files.load(Ordering::Relaxed) } } -pub struct ObfuscatedContext(::AsyncIoContext); -impl AsyncContext for ObfuscatedContext { - fn wait(&mut self) -> IoResult { - self.0.wait() - } - - fn data(&self, seq: usize) -> &[u8] { - self.0.data(seq) - } - - fn single_wait(&mut self, seq: usize) -> IoResult { - self.0.single_wait(seq) - } - - fn submit_read_req(&mut self, buf: Vec, offset: u64) -> IoResult<()> { - self.0.submit_read_req(buf, offset) - } -} impl FileSystem for ObfuscatedFileSystem { type Handle = ::Handle; type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; - type AsyncIoContext = ObfuscatedContext; + type AsyncIoContext = ::AsyncIoContext; fn async_read( &self, ctx: &mut Self::AsyncIoContext, handle: Arc, - buf: Vec, - block: &mut FileBlockHandle, + block: &FileBlockHandle, ) -> IoResult<()> { - self.inner.async_read(&mut ctx.0, handle, buf, block) + self.inner.async_read(ctx, handle, block) } fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult>> { - let base = self.inner.async_finish(&mut ctx.0).unwrap(); + let base = self.inner.async_finish(ctx).unwrap(); let mut res = vec![]; for v in base { let mut temp = vec![]; @@ -173,9 +153,7 @@ impl FileSystem for ObfuscatedFileSystem { Ok(ObfuscatedWriter(self.inner.new_writer(handle)?)) } - fn new_async_io_context(&self, block_sum: usize) -> IoResult { - Ok(ObfuscatedContext( - self.inner.new_async_io_context(block_sum)?, - )) + fn new_async_io_context(&self) -> IoResult { + Ok(self.inner.new_async_io_context()?) } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index c15d87d6..f9f92f59 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -254,11 +254,11 @@ impl SinglePipe { reader.read(handle) } - fn async_read(&self, block: &mut FileBlockHandle, ctx: &mut F::AsyncIoContext) { - let fd = self.get_fd(block.id.seq).unwrap(); - let buf = vec![0_u8; block.len]; - - self.file_system.async_read(ctx, fd, buf, block).unwrap(); + fn async_read(&self, blocks: Vec, ctx: &mut F::AsyncIoContext) { + for block in blocks.iter() { + let fd = self.get_fd(block.id.seq).unwrap(); + self.file_system.async_read(ctx, fd, block).unwrap(); + } } fn append(&self, bytes: &mut T) -> Result { @@ -452,27 +452,11 @@ impl PipeLog for DualPipes { } #[inline] - fn async_read_bytes(&self, ents_idx: &mut Vec) -> Result>> { - let mut blocks: Vec = vec![]; - for (t, i) in ents_idx.iter().enumerate() { - if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) { - blocks.push(i.entries.unwrap()); - } - } - + fn async_read_bytes(&self, blocks: Vec) -> Result>> { let fs = &self.pipes[LogQueue::Append as usize].file_system; - let mut ctx = fs.new_async_io_context(blocks.len()).unwrap(); + let mut ctx = fs.new_async_io_context().unwrap(); - for block in blocks.iter_mut() { - match block.id.queue { - LogQueue::Append => { - self.pipes[LogQueue::Append as usize].async_read(block, &mut ctx); - } - LogQueue::Rewrite => { - self.pipes[LogQueue::Rewrite as usize].async_read(block, &mut ctx); - } - } - } + self.pipes[LogQueue::Append as usize].async_read(blocks, &mut ctx); let res = fs.async_finish(&mut ctx).unwrap(); Ok(res) } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 2f33ae95..03994b17 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -174,7 +174,7 @@ pub trait PipeLog: Sized { fn read_bytes(&self, handle: FileBlockHandle) -> Result>; /// Reads bytes from multi blocks using 'Async IO'. - fn async_read_bytes(&self, ents_idx: &mut Vec) -> Result>>; + fn async_read_bytes(&self, blocks: Vec) -> Result>>; /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. From 1bc88889a9272c6d0b6ff8d8e6cfc84d0bb10501 Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Thu, 2 Mar 2023 10:19:16 +0800 Subject: [PATCH 11/15] code fmt & clean up. Signed-off-by: root <1019495690@qq.com> --- src/env/default.rs | 17 +---------------- src/env/obfuscated.rs | 2 +- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/src/env/default.rs b/src/env/default.rs index 6a6d2f56..8d34eafb 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -1,6 +1,5 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::ffi::c_void; use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::os::unix::io::RawFd; use std::path::Path; @@ -9,12 +8,11 @@ use std::slice; use std::sync::Arc; use fail::fail_point; -use libc::{aiocb, off_t}; use log::error; use nix::errno::Errno; use nix::fcntl::{self, OFlag}; use nix::sys::aio::{aio_suspend, Aio, AioRead}; -use nix::sys::signal::{SigEvent, SigevNotify}; +use nix::sys::signal::SigevNotify; use nix::sys::stat::Mode; use nix::sys::uio::{pread, pwrite}; use nix::unistd::{close, ftruncate, lseek, Whence}; @@ -104,19 +102,6 @@ impl LogFd { Ok(readed) } - pub fn read_aio(&self, aior: &mut aiocb, len: usize, pbuf: *mut u8, offset: u64) { - unsafe { - aior.aio_fildes = self.0; - aior.aio_reqprio = 0; - aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); - aior.aio_nbytes = len; - aior.aio_buf = pbuf as *mut c_void; - aior.aio_lio_opcode = libc::LIO_READ; - aior.aio_offset = offset as off_t; - libc::aio_read(aior); - } - } - /// Writes some bytes to this file starting at `offset`. Returns how many /// bytes were written. pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult { diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 576f5040..cefff4c6 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -154,6 +154,6 @@ impl FileSystem for ObfuscatedFileSystem { } fn new_async_io_context(&self) -> IoResult { - Ok(self.inner.new_async_io_context()?) + self.inner.new_async_io_context() } } From 5337a74b81b675b8d0ed113b3a1c318b6e0e1118 Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Thu, 2 Mar 2023 15:57:40 +0800 Subject: [PATCH 12/15] fix clippy. Signed-off-by: root <1019495690@qq.com> --- src/env/default.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/env/default.rs b/src/env/default.rs index 8d34eafb..9da6fca0 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -266,7 +266,11 @@ pub struct AioContext { aio_vec: Vec>>>, buf_vec: Vec>, } - +impl Default for AioContext { + fn default() -> Self { + Self::new() + } +} impl AioContext { pub fn new() -> Self { Self { From 0b9988de7b597ea54f2d2d14f3711194a9ef374f Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Mon, 6 Mar 2023 17:52:14 +0800 Subject: [PATCH 13/15] modify code. Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 2 +- src/env/default.rs | 2 +- src/env/mod.rs | 3 +-- src/env/obfuscated.rs | 18 ++++++++---------- src/file_pipe_log/pipe.rs | 11 +++++++---- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 8a63a833..a869b556 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2111,7 +2111,7 @@ mod tests { self.inner.async_read(ctx, handle, block) } - fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> std::io::Result>> { + fn async_finish(&self, ctx: Self::AsyncIoContext) -> std::io::Result>> { self.inner.async_finish(ctx) } diff --git a/src/env/default.rs b/src/env/default.rs index 9da6fca0..838eb723 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -313,7 +313,7 @@ impl FileSystem for DefaultFileSystem { Ok(()) } - fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult>> { + fn async_finish(&self, mut ctx: Self::AsyncIoContext) -> IoResult>> { for seq in 0..ctx.aio_vec.len() { let buf_len = ctx.buf_vec[seq].len(); aio_suspend(&[&*ctx.aio_vec[seq]], None).expect("aio_suspend failed"); diff --git a/src/env/mod.rs b/src/env/mod.rs index e40ed430..65ad441d 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -7,7 +7,6 @@ use std::sync::Arc; mod default; mod obfuscated; -pub use default::AioContext; pub use default::DefaultFileSystem; pub use obfuscated::ObfuscatedFileSystem; @@ -25,7 +24,7 @@ pub trait FileSystem: Send + Sync { handle: Arc, block: &FileBlockHandle, ) -> Result<()>; - fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> Result>>; + fn async_finish(&self, ctx: Self::AsyncIoContext) -> Result>>; fn create>(&self, path: P) -> Result; diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index cefff4c6..3ce313f3 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -101,18 +101,16 @@ impl FileSystem for ObfuscatedFileSystem { self.inner.async_read(ctx, handle, block) } - fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult>> { - let base = self.inner.async_finish(ctx).unwrap(); - let mut res = vec![]; - for v in base { - let mut temp = vec![]; - //do obfuscation. - for c in v { - temp.push(c.wrapping_sub(1)); + fn async_finish(&self, ctx: Self::AsyncIoContext) -> IoResult>> { + let mut base = self.inner.async_finish(ctx).unwrap(); + + for v in base.iter_mut() { + for c in v.iter_mut() { + // do obfuscation. + *c = c.wrapping_sub(1); } - res.push(temp); } - Ok(res) + Ok(base) } fn create>(&self, path: P) -> IoResult { diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index f9f92f59..833dd208 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -185,6 +185,7 @@ impl SinglePipe { } Ok(files[(file_seq - files[0].seq) as usize].handle.clone()) } + /// Creates a new file for write, and rotates the active log file. /// /// This operation is atomic in face of errors. @@ -254,10 +255,12 @@ impl SinglePipe { reader.read(handle) } - fn async_read(&self, blocks: Vec, ctx: &mut F::AsyncIoContext) { + fn async_read(&self, ctx: &mut F::AsyncIoContext, blocks: Vec) { for block in blocks.iter() { let fd = self.get_fd(block.id.seq).unwrap(); - self.file_system.async_read(ctx, fd, block).unwrap(); + self.file_system + .async_read(ctx, fd, block) + .expect("Async read failed."); } } @@ -456,8 +459,8 @@ impl PipeLog for DualPipes { let fs = &self.pipes[LogQueue::Append as usize].file_system; let mut ctx = fs.new_async_io_context().unwrap(); - self.pipes[LogQueue::Append as usize].async_read(blocks, &mut ctx); - let res = fs.async_finish(&mut ctx).unwrap(); + self.pipes[LogQueue::Append as usize].async_read(&mut ctx, blocks); + let res = fs.async_finish(ctx).unwrap(); Ok(res) } From 198ea087c67bebecf9b4bfc6bec10531970697b1 Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Tue, 7 Mar 2023 00:08:39 +0800 Subject: [PATCH 14/15] modify err handle & unit test. Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 69 ++++++++++++++++++++++++++++++--------- src/env/default.rs | 25 +++----------- src/file_pipe_log/pipe.rs | 5 +-- 3 files changed, 61 insertions(+), 38 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index a869b556..0149d768 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -336,8 +336,8 @@ where blocks.push(i.entries.unwrap()); } } - let bytes = self.pipe_log.async_read_bytes(blocks).unwrap(); - parse_entries_from_bytes::(bytes, &mut ents_idx, vec); + let bytes = self.pipe_log.async_read_bytes(blocks)?; + parse_entries_from_bytes::(bytes, &mut ents_idx, vec)?; ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); @@ -582,7 +582,7 @@ pub(crate) fn parse_entries_from_bytes( bytes: Vec>, ents_idx: &mut [EntryIndex], vec: &mut Vec, -) { +) -> Result<()> { let mut decode_buf = vec![]; let mut seq: i32 = -1; for (t, idx) in ents_idx.iter().enumerate() { @@ -594,19 +594,15 @@ pub(crate) fn parse_entries_from_bytes( } false => decode_buf, }; - vec.push( - parse_from_bytes( - &LogBatch::decode_entries_block( - &decode_buf, - idx.entries.unwrap(), - idx.compression_type, - ) - .unwrap() - [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], - ) - .unwrap(), - ); + vec.push(parse_from_bytes( + &LogBatch::decode_entries_block( + &decode_buf, + idx.entries.unwrap(), + idx.compression_type, + )?[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], + )?); } + Ok(()) } pub(crate) fn read_entry_from_file(pipe_log: &P, idx: &EntryIndex) -> Result where @@ -843,6 +839,47 @@ mod tests { assert_eq!(d, &data); }); } + + // Recover the engine. + let engine = engine.reopen(); + for i in 10..20 { + let rid = i; + let index = i; + engine.scan_entries(rid, index, index + 2, |_, q, d| { + assert_eq!(q, LogQueue::Append); + assert_eq!(d, &data); + }); + } + } + } + + #[test] + fn test_async_get_entry() { + let normal_batch_size = 10; + let compressed_batch_size = 5120; + for &entry_size in &[normal_batch_size, compressed_batch_size] { + let dir = tempfile::Builder::new() + .prefix("test_get_entry") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(1), + ..Default::default() + }; + + let engine = RaftLogEngine::open_with_file_system( + cfg.clone(), + Arc::new(ObfuscatedFileSystem::default()), + ) + .unwrap(); + assert_eq!(engine.path(), dir.path().to_str().unwrap()); + let data = vec![b'x'; entry_size]; + for i in 10..20 { + let rid = i; + let index = i; + engine.append(rid, index, index + 2, Some(&data)); + } for i in 10..20 { let rid = i; let index = i; @@ -857,7 +894,7 @@ mod tests { for i in 10..20 { let rid = i; let index = i; - engine.scan_entries(rid, index, index + 2, |_, q, d| { + engine.scan_entries_aio(rid, index, index + 2, |_, q, d| { assert_eq!(q, LogQueue::Append); assert_eq!(d, &data); }); diff --git a/src/env/default.rs b/src/env/default.rs index 838eb723..ea3c110d 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -261,24 +261,11 @@ impl WriteExt for LogFile { self.inner.allocate(offset, size) } } - +#[derive(Default)] pub struct AioContext { aio_vec: Vec>>>, buf_vec: Vec>, } -impl Default for AioContext { - fn default() -> Self { - Self::new() - } -} -impl AioContext { - pub fn new() -> Self { - Self { - aio_vec: Vec::new(), - buf_vec: Vec::new(), - } - } -} pub struct DefaultFileSystem; @@ -306,9 +293,7 @@ impl FileSystem for DefaultFileSystem { 0, SigevNotify::SigevNone, )); - aior.as_mut() - .submit() - .expect("aio read request submit failed"); + aior.as_mut().submit()?; ctx.aio_vec.push(aior); Ok(()) @@ -316,8 +301,8 @@ impl FileSystem for DefaultFileSystem { fn async_finish(&self, mut ctx: Self::AsyncIoContext) -> IoResult>> { for seq in 0..ctx.aio_vec.len() { let buf_len = ctx.buf_vec[seq].len(); - aio_suspend(&[&*ctx.aio_vec[seq]], None).expect("aio_suspend failed"); - assert_eq!(ctx.aio_vec[seq].as_mut().aio_return().unwrap(), buf_len); + aio_suspend(&[&*ctx.aio_vec[seq]], None)?; + assert_eq!(ctx.aio_vec[seq].as_mut().aio_return()?, buf_len); } let res = ctx.buf_vec.to_owned(); Ok(res) @@ -349,6 +334,6 @@ impl FileSystem for DefaultFileSystem { } fn new_async_io_context(&self) -> IoResult { - Ok(AioContext::new()) + Ok(AioContext::default()) } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 833dd208..64a0bff2 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -457,10 +457,11 @@ impl PipeLog for DualPipes { #[inline] fn async_read_bytes(&self, blocks: Vec) -> Result>> { let fs = &self.pipes[LogQueue::Append as usize].file_system; - let mut ctx = fs.new_async_io_context().unwrap(); + let mut ctx = fs.new_async_io_context()?; self.pipes[LogQueue::Append as usize].async_read(&mut ctx, blocks); - let res = fs.async_finish(ctx).unwrap(); + let res = fs.async_finish(ctx)?; + Ok(res) } From a732d5218eb4bc16eaa270dae0d7a970766ca221 Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Sat, 25 Mar 2023 22:44:22 +0800 Subject: [PATCH 15/15] unify asyncIO & syncIO. Signed-off-by: root <1019495690@qq.com> --- src/engine.rs | 155 ++++++++++++++------------------------ src/env/default.rs | 11 +-- src/env/mod.rs | 10 +-- src/env/obfuscated.rs | 12 +-- src/file_pipe_log/pipe.rs | 4 +- 5 files changed, 76 insertions(+), 116 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 0149d768..b0fe7454 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -301,49 +301,34 @@ where let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { let mut ents_idx: Vec = Vec::with_capacity((end - begin) as usize); - memtable - .read() - .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; - for i in ents_idx.iter() { - vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); - } - ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); - - return Ok(ents_idx.len()); - } - Ok(0) - } - - pub fn fetch_entries_to_aio>( - &self, - region_id: u64, - begin: u64, - end: u64, - max_size: Option, - vec: &mut Vec, - ) -> Result { - let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM); - if let Some(memtable) = self.memtables.get(region_id) { - let length = (end - begin) as usize; - let mut ents_idx: Vec = Vec::with_capacity(length); memtable .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; let mut blocks: Vec = Vec::new(); + let mut total_bytes = 0; for (t, i) in ents_idx.iter().enumerate() { if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) { blocks.push(i.entries.unwrap()); + total_bytes += i.entries.unwrap().len; + } + } + + if blocks.len() > 5 && total_bytes > 1024 * 1024 { + //Async IO + let bytes = self.pipe_log.async_read_bytes(blocks)?; + parse_entries_from_bytes::(bytes, &mut ents_idx, vec)?; + } else { + //Sync IO + for i in ents_idx.iter() { + vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); } } - let bytes = self.pipe_log.async_read_bytes(blocks)?; - parse_entries_from_bytes::(bytes, &mut ents_idx, vec)?; ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); return Ok(ents_idx.len()); } - Ok(0) } @@ -578,32 +563,39 @@ impl BlockCache { thread_local! { static BLOCK_CACHE: BlockCache = BlockCache::new(); } + pub(crate) fn parse_entries_from_bytes( bytes: Vec>, ents_idx: &mut [EntryIndex], vec: &mut Vec, ) -> Result<()> { - let mut decode_buf = vec![]; - let mut seq: i32 = -1; - for (t, idx) in ents_idx.iter().enumerate() { - decode_buf = - match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() { - true => { - seq += 1; - bytes[seq as usize].to_vec() - } - false => decode_buf, - }; - vec.push(parse_from_bytes( - &LogBatch::decode_entries_block( - &decode_buf, - idx.entries.unwrap(), - idx.compression_type, - )?[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], - )?); + let mut seq = 0; + for idx in ents_idx { + BLOCK_CACHE.with(|cache| { + if cache.key.get() != idx.entries.unwrap() { + cache.insert( + idx.entries.unwrap(), + LogBatch::decode_entries_block( + &bytes[seq], + idx.entries.unwrap(), + idx.compression_type, + ) + .unwrap(), + ); + seq += 1; + } + let e = parse_from_bytes( + &cache.block.borrow() + [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], + ) + .unwrap(); + assert_eq!(M::index(&e), idx.index); + vec.push(e); + }); } Ok(()) } + pub(crate) fn read_entry_from_file(pipe_log: &P, idx: &EntryIndex) -> Result where M: MessageExt, @@ -742,41 +734,6 @@ mod tests { reader(e.index, entry_index.entries.unwrap().id.queue, &e.data); } } - fn scan_entries_aio( - &self, - rid: u64, - start: u64, - end: u64, - reader: FR, - ) { - let mut entries = Vec::new(); - self.fetch_entries_to_aio::( - rid, - self.first_index(rid).unwrap(), - self.last_index(rid).unwrap() + 1, - None, - &mut entries, - ) - .unwrap(); - assert_eq!(entries.len(), (end - start) as usize); - assert_eq!(entries.first().unwrap().index, start); - assert_eq!( - entries.last().unwrap().index, - self.decode_last_index(rid).unwrap() - ); - assert_eq!(entries.last().unwrap().index + 1, end); - for e in entries.iter() { - let entry_index = self - .memtables - .get(rid) - .unwrap() - .read() - .get_entry(e.index) - .unwrap(); - assert_eq!(&self.get_entry::(rid, e.index).unwrap().unwrap(), e); - reader(e.index, entry_index.entries.unwrap().id.queue, &e.data); - } - } fn file_count(&self, queue: Option) -> usize { if let Some(queue) = queue { @@ -854,12 +811,12 @@ mod tests { } #[test] - fn test_async_get_entry() { - let normal_batch_size = 10; - let compressed_batch_size = 5120; - for &entry_size in &[normal_batch_size, compressed_batch_size] { + fn test_multi_read_entry() { + let sync_batch_size = 1024; + let async_batch_size = 1024 * 1024; + for &entry_size in &[sync_batch_size, async_batch_size] { let dir = tempfile::Builder::new() - .prefix("test_get_entry") + .prefix("test_multi_read_entry") .tempdir() .unwrap(); let cfg = Config { @@ -875,15 +832,17 @@ mod tests { .unwrap(); assert_eq!(engine.path(), dir.path().to_str().unwrap()); let data = vec![b'x'; entry_size]; - for i in 10..20 { - let rid = i; - let index = i; - engine.append(rid, index, index + 2, Some(&data)); + + for i in 0..10 { + for rid in 10..20 { + let index = i + rid; + engine.append(rid, index, index + 1, Some(&data)); + } } for i in 10..20 { let rid = i; let index = i; - engine.scan_entries_aio(rid, index, index + 2, |_, q, d| { + engine.scan_entries(rid, index, index + 10, |_, q, d| { assert_eq!(q, LogQueue::Append); assert_eq!(d, &data); }); @@ -894,7 +853,7 @@ mod tests { for i in 10..20 { let rid = i; let index = i; - engine.scan_entries_aio(rid, index, index + 2, |_, q, d| { + engine.scan_entries(rid, index, index + 10, |_, q, d| { assert_eq!(q, LogQueue::Append); assert_eq!(d, &data); }); @@ -2137,18 +2096,18 @@ mod tests { type Handle = ::Handle; type Reader = ::Reader; type Writer = ::Writer; - type AsyncIoContext = ::AsyncIoContext; + type MultiReadContext = ::MultiReadContext; - fn async_read( + fn multi_read( &self, - ctx: &mut Self::AsyncIoContext, + ctx: &mut Self::MultiReadContext, handle: Arc, block: &FileBlockHandle, ) -> std::io::Result<()> { - self.inner.async_read(ctx, handle, block) + self.inner.multi_read(ctx, handle, block) } - fn async_finish(&self, ctx: Self::AsyncIoContext) -> std::io::Result>> { + fn async_finish(&self, ctx: Self::MultiReadContext) -> std::io::Result>> { self.inner.async_finish(ctx) } @@ -2214,7 +2173,7 @@ mod tests { self.inner.new_writer(h) } - fn new_async_io_context(&self) -> std::io::Result { + fn new_async_io_context(&self) -> std::io::Result { self.inner.new_async_io_context() } } diff --git a/src/env/default.rs b/src/env/default.rs index ea3c110d..d218c606 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -273,11 +273,11 @@ impl FileSystem for DefaultFileSystem { type Handle = LogFd; type Reader = LogFile; type Writer = LogFile; - type AsyncIoContext = AioContext; + type MultiReadContext = AioContext; - fn async_read( + fn multi_read( &self, - ctx: &mut Self::AsyncIoContext, + ctx: &mut Self::MultiReadContext, handle: Arc, block: &FileBlockHandle, ) -> IoResult<()> { @@ -298,7 +298,8 @@ impl FileSystem for DefaultFileSystem { Ok(()) } - fn async_finish(&self, mut ctx: Self::AsyncIoContext) -> IoResult>> { + + fn async_finish(&self, mut ctx: Self::MultiReadContext) -> IoResult>> { for seq in 0..ctx.aio_vec.len() { let buf_len = ctx.buf_vec[seq].len(); aio_suspend(&[&*ctx.aio_vec[seq]], None)?; @@ -333,7 +334,7 @@ impl FileSystem for DefaultFileSystem { Ok(LogFile::new(handle)) } - fn new_async_io_context(&self) -> IoResult { + fn new_async_io_context(&self) -> IoResult { Ok(AioContext::default()) } } diff --git a/src/env/mod.rs b/src/env/mod.rs index 65ad441d..1e6b350a 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -16,15 +16,15 @@ pub trait FileSystem: Send + Sync { type Handle: Send + Sync + Handle; type Reader: Seek + Read + Send; type Writer: Seek + Write + Send + WriteExt; - type AsyncIoContext; + type MultiReadContext; - fn async_read( + fn multi_read( &self, - ctx: &mut Self::AsyncIoContext, + ctx: &mut Self::MultiReadContext, handle: Arc, block: &FileBlockHandle, ) -> Result<()>; - fn async_finish(&self, ctx: Self::AsyncIoContext) -> Result>>; + fn async_finish(&self, ctx: Self::MultiReadContext) -> Result>>; fn create>(&self, path: P) -> Result; @@ -66,7 +66,7 @@ pub trait FileSystem: Send + Sync { fn new_writer(&self, handle: Arc) -> Result; - fn new_async_io_context(&self) -> Result; + fn new_async_io_context(&self) -> Result; } pub trait Handle { diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 3ce313f3..62e244f8 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -90,18 +90,18 @@ impl FileSystem for ObfuscatedFileSystem { type Handle = ::Handle; type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; - type AsyncIoContext = ::AsyncIoContext; + type MultiReadContext = ::MultiReadContext; - fn async_read( + fn multi_read( &self, - ctx: &mut Self::AsyncIoContext, + ctx: &mut Self::MultiReadContext, handle: Arc, block: &FileBlockHandle, ) -> IoResult<()> { - self.inner.async_read(ctx, handle, block) + self.inner.multi_read(ctx, handle, block) } - fn async_finish(&self, ctx: Self::AsyncIoContext) -> IoResult>> { + fn async_finish(&self, ctx: Self::MultiReadContext) -> IoResult>> { let mut base = self.inner.async_finish(ctx).unwrap(); for v in base.iter_mut() { @@ -151,7 +151,7 @@ impl FileSystem for ObfuscatedFileSystem { Ok(ObfuscatedWriter(self.inner.new_writer(handle)?)) } - fn new_async_io_context(&self) -> IoResult { + fn new_async_io_context(&self) -> IoResult { self.inner.new_async_io_context() } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 64a0bff2..e23c6d77 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -255,11 +255,11 @@ impl SinglePipe { reader.read(handle) } - fn async_read(&self, ctx: &mut F::AsyncIoContext, blocks: Vec) { + fn async_read(&self, ctx: &mut F::MultiReadContext, blocks: Vec) { for block in blocks.iter() { let fd = self.get_fd(block.id.seq).unwrap(); self.file_system - .async_read(ctx, fd, block) + .multi_read(ctx, fd, block) .expect("Async read failed."); } }