From 00964edeb42db24bb01c84826581c6825fc07308 Mon Sep 17 00:00:00 2001 From: hoslo Date: Wed, 26 Jun 2024 22:14:51 +0800 Subject: [PATCH] feat: fs add concurrent write --- core/Cargo.toml | 2 +- core/src/raw/oio/write/position_write.rs | 18 ++++++ core/src/services/fs/backend.rs | 17 ++++- core/src/services/fs/writer.rs | 81 ++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 3 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index f297c2dc13ae..846a5301ca6e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -134,7 +134,7 @@ services-dbfs = [] services-dropbox = [] services-etcd = ["dep:etcd-client", "dep:bb8"] services-foundationdb = ["dep:foundationdb"] -services-fs = ["tokio/fs"] +services-fs = ["tokio/fs", "internal-tokio-rt"] services-ftp = ["dep:suppaftp", "dep:bb8", "dep:async-tls"] services-gcs = [ "dep:reqsign", diff --git a/core/src/raw/oio/write/position_write.rs b/core/src/raw/oio/write/position_write.rs index a24ec4eaa7e7..edfbbfcf53f3 100644 --- a/core/src/raw/oio/write/position_write.rs +++ b/core/src/raw/oio/write/position_write.rs @@ -49,6 +49,12 @@ pub trait PositionWrite: Send + Sync + Unpin + 'static { offset: u64, buf: Buffer, ) -> impl Future> + MaybeSend; + + /// close is used to close the underlying file. + fn close(&self) -> impl Future> + MaybeSend; + + /// abort is used to abort the underlying abort. + fn abort(&self) -> impl Future> + MaybeSend; } struct WriteInput { @@ -149,12 +155,16 @@ impl oio::Write for PositionWriter { if let Some(buffer) = self.cache.clone() { let offset = self.next_offset; self.w.write_all_at(offset, buffer).await?; + self.w.as_ref().close().await?; self.cache = None; } Ok(()) } async fn abort(&mut self) -> Result<()> { + self.tasks.clear(); + self.cache = None; + self.w.abort().await?; Ok(()) } } @@ -216,6 +226,14 @@ mod tests { Ok(()) } + + async fn close(&self) -> Result<()> { + Ok(()) + } + + async fn abort(&self) -> Result<()> { + Ok(()) + } } #[tokio::test] diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 9c8d5b911f87..96df46692cf8 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -32,6 +32,7 @@ use super::core::*; use super::lister::FsLister; use super::reader::FsReader; use super::writer::FsWriter; +use super::writer::FsWriters; /// config for file system #[derive(Default, Deserialize, Debug)] @@ -174,7 +175,7 @@ pub struct FsBackend { impl Access for FsBackend { type Reader = FsReader; - type Writer = FsWriter; + type Writer = FsWriters; type Lister = Option>; type BlockingReader = FsReader; type BlockingWriter = FsWriter; @@ -318,7 +319,19 @@ impl Access for FsBackend { .await .map_err(new_std_io_error)?; - Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f))) + let w = FsWriter::new(target_path, tmp_path, f); + + let w = if op.append() { + FsWriters::One(w) + } else { + FsWriters::Two(oio::PositionWriter::new( + w, + op.executor().cloned(), + op.concurrent(), + )) + }; + + Ok((RpWrite::default(), w)) } async fn delete(&self, path: &str, _: OpDelete) -> Result { diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index c0d0688750e4..b85c4c082800 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -15,7 +15,12 @@ // specific language governing permissions and limitations // under the License. +use std::fs::File; use std::io::Write; +#[cfg(not(target_os = "windows"))] +use std::os::unix::fs::FileExt; +#[cfg(target_os = "windows")] +use std::os::windows::fs::FileExt; use std::path::PathBuf; use bytes::Buf; @@ -24,6 +29,9 @@ use tokio::io::AsyncWriteExt; use crate::raw::*; use crate::*; +pub type FsWriters = + TwoWays, oio::PositionWriter>>; + pub struct FsWriter { target_path: PathBuf, tmp_path: Option, @@ -101,3 +109,76 @@ impl oio::BlockingWrite for FsWriter { Ok(()) } } + +impl oio::PositionWrite for FsWriter { + async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> { + let f = self.f.as_ref().expect("FsWriter must be initialized"); + + let f = f + .try_clone() + .await + .map_err(new_std_io_error)? + .into_std() + .await; + + tokio::task::spawn_blocking(move || { + let mut buf = buf; + let mut offset = offset; + while !buf.is_empty() { + match write_at(&f, buf.chunk(), offset) { + Ok(n) => { + buf.advance(n); + offset += n as u64 + } + Err(e) => return Err(e), + } + } + Ok(()) + }) + .await + .map_err(new_task_join_error)? + } + + async fn close(&self) -> Result<()> { + let f = self.f.as_ref().expect("FsWriter must be initialized"); + + let mut f = f + .try_clone() + .await + .map_err(new_std_io_error)? + .into_std() + .await; + + f.flush().map_err(new_std_io_error)?; + f.sync_all().map_err(new_std_io_error)?; + + if let Some(tmp_path) = &self.tmp_path { + tokio::fs::rename(tmp_path, &self.target_path) + .await + .map_err(new_std_io_error)?; + } + Ok(()) + } + + async fn abort(&self) -> Result<()> { + if let Some(tmp_path) = &self.tmp_path { + tokio::fs::remove_file(tmp_path) + .await + .map_err(new_std_io_error) + } else { + Err(Error::new( + ErrorKind::Unsupported, + "Fs doesn't support abort if atomic_write_dir is not set", + )) + } + } +} + +#[cfg(target_os = "windows")] +fn write_at(f: &File, buf: &[u8], offset: u64) -> Result { + f.seek_write(buf, offset).map_err(new_std_io_error) +} + +fn write_at(f: &File, buf: &[u8], offset: u64) -> Result { + f.write_at(buf, offset).map_err(new_std_io_error) +}