Skip to content

Commit

Permalink
feat: fs add concurrent write
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Jul 8, 2024
1 parent ccc9351 commit 765fa10
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 3 deletions.
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ services-dbfs = []
services-dropbox = []
services-etcd = ["dep:etcd-client", "dep:bb8"]
services-foundationdb = ["dep:foundationdb"]
services-fs = ["tokio/fs"]
services-fs = ["tokio/fs", "internal-tokio-rt"]
services-ftp = ["dep:suppaftp", "dep:bb8", "dep:async-tls"]
services-gcs = [
"dep:reqsign",
Expand Down
18 changes: 18 additions & 0 deletions core/src/raw/oio/write/position_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ pub trait PositionWrite: Send + Sync + Unpin + 'static {
offset: u64,
buf: Buffer,
) -> impl Future<Output = Result<()>> + MaybeSend;

/// close is used to close the underlying file.
fn close(&self) -> impl Future<Output = Result<()>> + MaybeSend;

/// abort is used to abort the underlying abort.
fn abort(&self) -> impl Future<Output = Result<()>> + MaybeSend;
}

struct WriteInput<W: PositionWrite> {
Expand Down Expand Up @@ -149,12 +155,16 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> {
if let Some(buffer) = self.cache.clone() {
let offset = self.next_offset;
self.w.write_all_at(offset, buffer).await?;
self.w.as_ref().close().await?;
self.cache = None;
}
Ok(())
}

async fn abort(&mut self) -> Result<()> {
self.tasks.clear();
self.cache = None;
self.w.abort().await?;
Ok(())
}
}
Expand Down Expand Up @@ -216,6 +226,14 @@ mod tests {

Ok(())
}

async fn close(&self) -> Result<()> {
Ok(())
}

async fn abort(&self) -> Result<()> {
Ok(())
}
}

#[tokio::test]
Expand Down
17 changes: 15 additions & 2 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use super::core::*;
use super::lister::FsLister;
use super::reader::FsReader;
use super::writer::FsWriter;
use super::writer::FsWriters;

/// config for file system
#[derive(Default, Deserialize, Debug)]
Expand Down Expand Up @@ -174,7 +175,7 @@ pub struct FsBackend {

impl Access for FsBackend {
type Reader = FsReader<tokio::fs::File>;
type Writer = FsWriter<tokio::fs::File>;
type Writer = FsWriters;
type Lister = Option<FsLister<tokio::fs::ReadDir>>;
type BlockingReader = FsReader<std::fs::File>;
type BlockingWriter = FsWriter<std::fs::File>;
Expand Down Expand Up @@ -318,7 +319,19 @@ impl Access for FsBackend {
.await
.map_err(new_std_io_error)?;

Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
let w = FsWriter::new(target_path, tmp_path, f);

let w = if op.append() {
FsWriters::One(w)
} else {
FsWriters::Two(oio::PositionWriter::new(
w,
op.executor().cloned(),
op.concurrent(),
))
};

Ok((RpWrite::default(), w))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
Expand Down
115 changes: 115 additions & 0 deletions core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
// under the License.

use std::io::Write;
#[cfg(not(target_os = "windows"))]
use std::os::unix::fs::FileExt;
#[cfg(target_os = "windows")]
use std::os::windows::fs::FileExt;
use std::path::PathBuf;

use bytes::Buf;
Expand All @@ -24,6 +28,9 @@ use tokio::io::AsyncWriteExt;
use crate::raw::*;
use crate::*;

pub type FsWriters =
TwoWays<FsWriter<tokio::fs::File>, oio::PositionWriter<FsWriter<tokio::fs::File>>>;

pub struct FsWriter<F> {
target_path: PathBuf,
tmp_path: Option<PathBuf>,
Expand Down Expand Up @@ -101,3 +108,111 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
Ok(())
}
}

impl oio::PositionWrite for FsWriter<tokio::fs::File> {
#[cfg(not(target_os = "windows"))]
async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
let f = self.f.as_ref().expect("FsWriter must be initialized");

let f = f
.try_clone()
.await
.map_err(new_std_io_error)?
.into_std()
.await;

tokio::task::spawn_blocking(move || {
let mut buf = buf;
let size = buf.len();
f.write_all_at(&buf.copy_to_bytes(size), offset)
.map_err(new_std_io_error)
})
.await
.map_err(new_task_join_error)?
}

#[cfg(target_os = "windows")]
async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
let f = self.f.as_ref().expect("FsWriter must be initialized");

let f = f
.try_clone()
.await
.map_err(new_std_io_error)?
.into_std()
.await;

tokio::task::spawn_blocking(move || {
let mut offset = offset;
while !buf.is_empty() {
match f.seek_write(buf.chunk(), offset) {
Ok(n) => {
buf.advance(n);
offset += n as u64
}
Err(e) => return Err(e).map_err(new_std_io_error),
}
}
Ok(())
})
.await
.map_err(new_task_join_error)?
}

#[cfg(target_os = "windows")]
async fn close(&self) -> Result<()> {
let f = self.f.as_ref().expect("FsWriter must be initialized");

let mut f = f
.try_clone()
.await
.map_err(new_std_io_error)?
.into_std()
.await;

f.flush().map_err(new_std_io_error)?;
f.sync_all().map_err(new_std_io_error)?;

if let Some(tmp_path) = &self.tmp_path {
tokio::fs::rename(tmp_path, &self.target_path)
.await
.map_err(new_std_io_error)?;
}
Ok(())
}

#[cfg(not(target_os = "windows"))]
async fn close(&self) -> Result<()> {
let f = self.f.as_ref().expect("FsWriter must be initialized");

let mut f = f
.try_clone()
.await
.map_err(new_std_io_error)?
.into_std()
.await;

f.flush().map_err(new_std_io_error)?;
f.sync_all().map_err(new_std_io_error)?;

if let Some(tmp_path) = &self.tmp_path {
tokio::fs::rename(tmp_path, &self.target_path)
.await
.map_err(new_std_io_error)?;
}
Ok(())
}

async fn abort(&self) -> Result<()> {
if let Some(tmp_path) = &self.tmp_path {
tokio::fs::remove_file(tmp_path)
.await
.map_err(new_std_io_error)
} else {
Err(Error::new(
ErrorKind::Unsupported,
"Fs doesn't support abort if atomic_write_dir is not set",
))
}
}
}

0 comments on commit 765fa10

Please sign in to comment.