Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read entries using AIO #286

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open

read entries using AIO #286

wants to merge 16 commits into from

Conversation

ustc-wxy
Copy link

@ustc-wxy ustc-wxy commented Dec 5, 2022

What is changed and how it works?

Issue Number: Ref #248

What's Changed:

1.Add fetch_ entries_aio() interface, which read entries using AIO.

Solution:
Manage single entry-read requests in batch through AsyncIOContext. After submit read request according to different file blocks, fetch the byte stream collection of all blocks from AsyncIOContext. In the end, parse them into Entry in turn.

Check List

Tests

  • Unit test
  • Integration test

Copy link
Member

@tabokie tabokie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the contributing guide here. Needs to sign the commit and run make format & make clippy.

src/engine.rs Outdated
new_block_flags.push(false);
}
}
let mut a_list: Vec<aiocb> = Vec::with_capacity(block_sum);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hide aiocb inside AioContext (it holds Vec<aiocb> and create new ones when a new read request is issued via fs::read_async). No one other than the DefaultFileSystem should have access to aiocb.

Copy link
Author

@ustc-wxy ustc-wxy Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to use one AioContext to manage all read requests? Then I need to redesign the AioContext, like this

pub struct AioContext{
    fd_vec: Vec<Arc<LogFd>>,
    aio_vec: Vec<aiocb>,
    buf_vec: Vec<Vec<u8>>,
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there's a performance difference between waiting on each aiocb separately, or waiting all of them in one single syscall (maybe you can benchmark it too).

If there's no difference, then it makes sense to create new AioContext for each request. But either way, all the details must be put inside the file system implementation. E.g.

impl AioContext {
  pub fn wait() -> Result<()>;
  // UB if `wait()` is not called and returns `Ok(())`.
  pub fn data() -> &[u8];
}

@ustc-wxy ustc-wxy force-pushed the master branch 2 times, most recently from 76f4403 to 193b92e Compare December 7, 2022 11:45
@ustc-wxy ustc-wxy changed the title read entries using AIO (version 1.0) read entries using AIO Dec 7, 2022
@@ -97,6 +102,21 @@ impl LogFd {
Ok(readed)
}

pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) {
let mut buf = ctx.buf_vec.last().unwrap().lock().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is meaningless. The mutable reference (&mut AioContext) ensures there's only one thread accessing the ctx.

Of course, due to the unsafe block, the buf_vec is in fact "leaked" to aio code. That's why we must manually guarantee there's no one reading or writing to buf_vec after calling aio_read.


impl AsyncContext for AioContext {
fn single_wait(&mut self, seq: usize) -> IoResult<usize> {
let buf_len = self.buf_vec[seq].lock().unwrap().len();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, the &mut self has the same effect.

&self,
handle: Arc<Self::Handle>,
seq: usize,
ctx: &mut AioContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AioContext should be a generic type, different file system implementation can have different context.

The code will look like:

trait WaitData {
  fn wait(&mut self, index: usize) -> &[u8];
}

trait FileSystem {
  type AsyncContext: WaitData;
  fn new_async_context(&self) -> Self::AsyncContext;
}

impl FileSystem for DefaultFileSystem {
  type AsyncContext = AioContext;
  fn new_async_context(&self) {
    AioContext::new()
  }
}

src/env/mod.rs Outdated
&self,
handle: Arc<Self::Handle>,
seq: usize,
ctx: &mut AioContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are still leaking details of ctx. Here you should emulate the use of Writer:

type AsyncContext: AioContext;
fn read_async(&self, handle, ctx: &mut Self::AsyncContext);
fn new_async_context(&self) -> Self::AsyncContext;

Copy link
Member

@tabokie tabokie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Please run the checks locally before submitting a commit, as a contributor the basic level of respect is to pass CI before requesting reviews.
  2. A good PR description should contain three parts: the problem, the solution, the verification method (test plan).

@@ -257,12 +275,100 @@ impl WriteExt for LogFile {
}
}

pub struct AioContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use this name outside this file. Just like type Handle = <DefaultFileSystem as FileSystem>::Handle;, you can use the same syntax to reference aio context of base file system without needing to expose this struct.

src/pipe_log.rs Outdated
) -> Result<()>;

/// Reads bytes from multi blocks using 'Async IO'.
fn async_read_bytes(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to create a function that isn't used anywhere. Now, you need to make fetch_entries_to_aio call this function and remove async_entry_read, because (1) concept of entry should not be exposed to pipe log (2) we need the raw bytes to populate block cache (this part can be implemented later in a different PR maybe)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabokie Now my implementation idea is to read all bytes through async_entry_read, and then parse them in turn. like

pub fn fetch_entries_to_aio(){
    ...
    let bytes = async_entry_read();
    for (idx,i) in ents_idx.iter().enumerate(){
          entry = parse_from_bytes(byte[idx]);
          vec.push(entry);
    }
    ...
}

Does it ok?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I meant. But the name should not be async_entry_read, the underlying pipe is not aware of the "entry" concept.

}
}

impl AsyncContext for AioContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation wouldn't work with custom file system such as ObfuscatedFileSystem. If you add a test that reads async with obfuscated fs, the result would be wrong.

Also, another minor detail is, it's not intuitive to have async context be used both passively and actively. i.e. fs::new_reader(ctx, handle) should not coexist with ctx::wait().

Let's do this instead:

pub trait FileSystem {
  pub type AsyncContext;
  fn async_read(&self, ctx: &mut Self::AsyncContext, handle: Arc<Self::Handle>, block: FileBlockHandle) -> Result<()>;
  fn async_finish(&self, ctx: Self::AsyncContext) -> Result<Vec<Vec<u8>>;
}

// for obfuscated.rs
pub struct ObfuscatedContext(<DefaultFileSystem as FileSystem>::AsyncIoContext);
impl FileSystem for ObfuscatedFileSystem {
  fn async_finish(&self, ctx: Self::AsyncContext) -> Result<Vec<Vec<u8>>> {
    let base = self.0.async_finish(ctx.0)?;
    for v in &mut base {
      // do obfuscation
      for c in v {
        c.wrapping_sub(1);
      }
    }
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabokie Whether it is necessary to consider such situations? That is in an fetch_entries_to_aio call, some handles belong to the Append queue and others belong to the Rewrite queue. They correspond to different files_ system, then you need to call 2 times async_finish().In more complex cases, they are interspersed, like Append, Rewrite, Append, Rewrite, Rewrite...,then the design of async_finish() will doesn't work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this:

impl DualPipes {
  fn read_async(&self, handls: Vec<FileBlockHandle>) ->Vec<Vec<u8>> {
    let mut ctx = fs.new_context();
    for handle in handles {
      fs.read_async(&mut ctx, handle);
    }
    fs.async_finish(ctx);
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabokie How can I determine which fs to use? Use self.pipes[LogQueue::Append].file_system, or self.pipes[LogQueue::Rewrite].file_system, or both?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either one is fine. They are always the same.

fn single_wait(&mut self, seq: usize) -> IoResult<usize> {
let buf_len = self.buf_vec[seq].len();
unsafe {
loop {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the read hits EOF? It will loop forever? This needs testing as well.

ustc-wxy and others added 7 commits February 26, 2023 03:00
@ustc-wxy
Copy link
Author

I have completed the code modification according to your comments and run the unit test locally, all of them have passed. PTAL. @tabokie

@tabokie
Copy link
Member

tabokie commented Feb 27, 2023

@ustc-wxy format and clippy failed.

Signed-off-by: root <[email protected]>
Signed-off-by: root <[email protected]>
@codecov
Copy link

codecov bot commented Feb 27, 2023

Codecov Report

Patch coverage: 92.93% and project coverage change: -0.34 ⚠️

Comparison is base (3353011) 97.74% compared to head (f3a89c0) 97.40%.

❗ Current head f3a89c0 differs from pull request most recent head a732d52. Consider uploading reports for the commit a732d52 to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #286      +/-   ##
==========================================
- Coverage   97.74%   97.40%   -0.34%     
==========================================
  Files          30       30              
  Lines       11287    11668     +381     
==========================================
+ Hits        11032    11365     +333     
- Misses        255      303      +48     
Impacted Files Coverage Δ
src/file_pipe_log/mod.rs 98.46% <ø> (ø)
src/pipe_log.rs 95.45% <ø> (ø)
tests/failpoints/mod.rs 100.00% <ø> (ø)
tests/failpoints/util.rs 98.86% <ø> (-0.09%) ⬇️
src/env/obfuscated.rs 85.71% <66.66%> (-9.94%) ⬇️
src/env/default.rs 91.11% <87.64%> (-1.71%) ⬇️
src/engine.rs 97.58% <92.37%> (-0.62%) ⬇️
src/file_pipe_log/format.rs 99.10% <94.44%> (-0.41%) ⬇️
src/file_pipe_log/pipe.rs 97.95% <95.60%> (-1.55%) ⬇️
src/file_pipe_log/pipe_builder.rs 95.49% <96.12%> (-0.12%) ⬇️
... and 6 more

... and 1 file with indirect coverage changes

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

src/pipe_log.rs Outdated
@@ -172,6 +173,9 @@ pub trait PipeLog: Sized {
/// Reads some bytes from the specified position.
fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>>;

/// Reads bytes from multi blocks using 'Async IO'.
fn async_read_bytes(&self, ents_idx: &mut Vec<EntryIndex>) -> Result<Vec<Vec<u8>>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned before, pipe is not aware of the "entry" concept. Here should use Vec<FileBlockHandle>.

self.pipes[LogQueue::Append as usize].async_read(block, &mut ctx);
}
LogQueue::Rewrite => {
self.pipes[LogQueue::Rewrite as usize].async_read(block, &mut ctx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the function to async_read(ctx, buf). It's the common order to pass in a context.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabokie Did you mean async_read(ctx, blocks)? Like

impl<F: FileSystem> PipeLog for DualPipes<F> {
  fn async_read_bytes(blocks:Vec<FileBlockHandle>) -> Result<Vec<Vec<u8>>>{
      ...
      self.pipes[LogQueue::Append].async_read(ctx,blocks);
      let bytes = fs.async_finish(ctx);
      ...
  }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

}
}
}
let res = fs.async_finish(&mut ctx).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it to async_finish(ctx: Context) (instead of async_finish(ctx: &mut Context)). This makes sure the context isn't reused afterwards.

@@ -444,6 +451,32 @@ impl<F: FileSystem> PipeLog for DualPipes<F> {
self.pipes[handle.id.queue as usize].read_bytes(handle)
}

#[inline]
fn async_read_bytes(&self, ents_idx: &mut Vec<EntryIndex>) -> Result<Vec<Vec<u8>>> {
let mut blocks: Vec<FileBlockHandle> = vec![];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this compile? let mut blocks = Vec::new();

@@ -254,6 +254,13 @@ impl<F: FileSystem> SinglePipe<F> {
reader.read(handle)
}

fn async_read(&self, block: &mut FileBlockHandle, ctx: &mut F::AsyncIoContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need for the block to be mutable.

let fd = self.get_fd(block.id.seq).unwrap();
let buf = vec![0_u8; block.len];

self.file_system.async_read(ctx, fd, buf, block).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not creating the vector inside async_read? So there's one less argument to pass into (and validate).


impl FileSystem for ObfuscatedFileSystem {
type Handle = <DefaultFileSystem as FileSystem>::Handle;
type Reader = ObfuscatedReader;
type Writer = ObfuscatedWriter;
type AsyncIoContext = ObfuscatedContext;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Doesn't this work already?
type AsyncIoContext = <DefaultFileSystem as FileSystem>::AsyncIoContext

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabokie I introduced the ObfuscatedContext struct based on your comments:#286 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a layer of wrapper is only useful if you have implemented something on top of the base context. Right now you didn't implement anything, so there is no need to wrap it.

Comment on lines 91 to 93
fn wait(&mut self) -> IoResult<usize> {
self.0.wait()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These codes are not covered, meaning you didn't test ObfuscatedFileSystem in unit tests.

}
}

pub fn set_fd(&mut self, fd: Arc<LogFd>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is to hide the LogFd. But it isn't necessary if you treat AsyncContext as a data-only struct (inline all code into file system, and remove the trait AsyncContext entirely). i.e.

impl SomeFileSystem {
  fn async_read(ctx: &mut Self::Context, handle: Self::Handle, block: FileBlockHandle) {
    handle.submit_async_read(ctx.buf[i], ...)?;
  }
}

Comment on lines 291 to 293
for _ in 0..block_sum {
aio_vec.push(mem::zeroed::<libc::aiocb>());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why creating them before hand, instead of creating them the same time as buf_vec?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabokie If libc:: aiocb is created in multiple function calls, the compiler will assign the same address to the pointer every time, which will leads incorrect result. I'm not so familiar with the memory allocate API of trust, do you have any other good methods?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's possible to have multiple variables with the same address. It's possible that you mistakenly free some of them.

ustc-wxy added 2 commits March 2, 2023 05:08
Signed-off-by: root <[email protected]>
@ustc-wxy
Copy link
Author

ustc-wxy commented Mar 2, 2023

@tabokie PTAL

Signed-off-by: root <[email protected]>
Copy link
Member

@tabokie tabokie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#286 (comment) and #286 (comment) are not addressed.

@@ -184,7 +185,6 @@ impl<F: FileSystem> SinglePipe<F> {
}
Ok(files[(file_seq - files[0].seq) as usize].handle.clone())
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restore the newline.

@@ -254,6 +254,13 @@ impl<F: FileSystem> SinglePipe<F> {
reader.read(handle)
}

fn async_read(&self, blocks: Vec<FileBlockHandle>, ctx: &mut F::AsyncIoContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move ctx first, as explained in #286 (comment).

Comment on lines 259 to 260
let fd = self.get_fd(block.id.seq).unwrap();
self.file_system.async_read(ctx, fd, block).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't use unwrap.

let mut res = vec![];
for v in base {
let mut temp = vec![];
//do obfuscation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//do obfuscation.
// do obfuscation.

let base = self.inner.async_finish(ctx).unwrap();
let mut res = vec![];
for v in base {
let mut temp = vec![];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why you need to create another vector instead of modifying base directly.

src/env/mod.rs Outdated
@@ -7,14 +7,25 @@ use std::sync::Arc;
mod default;
mod obfuscated;

pub use default::AioContext;
Copy link
Member

@tabokie tabokie Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this, as explained in #286 (comment)

Signed-off-by: root <[email protected]>
@ustc-wxy
Copy link
Author

ustc-wxy commented Mar 6, 2023

@tabokie PTAL.

@tabokie
Copy link
Member

tabokie commented Mar 6, 2023

#286 (comment) is still not addressed. Is there any confusion?

shouldn't use unwrap.

expect is the same as unwrap except with some message. What I meant is the code shouldn't panic on error. It should bubble the error. And there're obviously other unwrap-s (like the line above it) beside the line I was pointing at.

@ustc-wxy
Copy link
Author

ustc-wxy commented Mar 8, 2023

@tabokie PTAL, thx!

src/engine.rs Outdated
return Ok(ents_idx.len());
}
Ok(0)
}

pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>(
pub fn fetch_entries_to_aio<M: MessageExt>(

src/env/mod.rs Outdated
/// FileSystem
pub trait FileSystem: Send + Sync {
type Handle: Send + Sync + Handle;
type Reader: Seek + Read + Send;
type Writer: Seek + Write + Send + WriteExt;
type AsyncIoContext;

fn async_read(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all the async happens inside the implementation, we can rename this to something like RocksDB's MultiGet, i.e. multi_read.

src/env/mod.rs Outdated
/// FileSystem
pub trait FileSystem: Send + Sync {
type Handle: Send + Sync + Handle;
type Reader: Seek + Read + Send;
type Writer: Seek + Write + Send + WriteExt;
type AsyncIoContext;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This to MultiReadContext.

src/engine.rs Outdated
Comment on lines 744 to 745
}
fn scan_entries_aio<FR: Fn(u64, LogQueue, &[u8])>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a newline between functions.

src/engine.rs Outdated
Comment on lines 317 to 324
pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>(
&self,
region_id: u64,
begin: u64,
end: u64,
max_size: Option<usize>,
vec: &mut Vec<M::Entry>,
) -> Result<usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think based on the tests, you can antomatically select single_read or multi_read and avoid creating two different engine methods, e.g. use aio when blocks.len() > 4 or something.

One issue though is that I'm not sure if aio syscall is portable enough. You might need to do some research on how to detect if aio is available (maybe take a look at how RocksDB did it).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let me try.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants