From 3d5ea55510543f17cc0997c5ffd235d05c2478fa Mon Sep 17 00:00:00 2001 From: NKID00 Date: Tue, 30 Jul 2024 21:59:54 +0800 Subject: [PATCH 1/8] feat(services/monoiofs): impl read and write, add behavior test --- .github/services/monoiofs/monoiofs/action.yml | 27 ++++ core/src/services/monoiofs/backend.rs | 71 +++++++- core/src/services/monoiofs/core.rs | 71 ++++++-- core/src/services/monoiofs/docs.md | 4 +- core/src/services/monoiofs/mod.rs | 2 + core/src/services/monoiofs/reader.rs | 143 ++++++++++++++++ core/src/services/monoiofs/writer.rs | 153 ++++++++++++++++++ 7 files changed, 457 insertions(+), 14 deletions(-) create mode 100644 .github/services/monoiofs/monoiofs/action.yml create mode 100644 core/src/services/monoiofs/reader.rs create mode 100644 core/src/services/monoiofs/writer.rs diff --git a/.github/services/monoiofs/monoiofs/action.yml b/.github/services/monoiofs/monoiofs/action.yml new file mode 100644 index 000000000000..72816f17af52 --- /dev/null +++ b/.github/services/monoiofs/monoiofs/action.yml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: monoiofs +description: 'Behavior test for monoiofs' + +runs: + using: "composite" + steps: + - name: Setup + shell: bash + run: | + echo "OPENDAL_MONOIOFS_ROOT=${{ runner.temp }}/" >> $GITHUB_ENV diff --git a/core/src/services/monoiofs/backend.rs b/core/src/services/monoiofs/backend.rs index e05026933010..df04bd32f7f0 100644 --- a/core/src/services/monoiofs/backend.rs +++ b/core/src/services/monoiofs/backend.rs @@ -20,10 +20,13 @@ use std::io; use std::path::PathBuf; use std::sync::Arc; +use chrono::DateTime; use serde::Deserialize; use serde::Serialize; use super::core::MonoiofsCore; +use super::reader::MonoiofsReader; +use super::writer::MonoiofsWriter; use crate::raw::*; use crate::*; @@ -109,8 +112,8 @@ pub struct MonoiofsBackend { } impl Access for MonoiofsBackend { - type Reader = (); - type Writer = (); + type Reader = MonoiofsReader; + type Writer = MonoiofsWriter; type Lister = (); type BlockingReader = (); type BlockingWriter = (); @@ -121,8 +124,72 @@ impl Access for MonoiofsBackend { am.set_scheme(Scheme::Monoiofs) .set_root(&self.core.root().to_string_lossy()) .set_native_capability(Capability { + stat: true, + read: true, + write: true, + delete: true, ..Default::default() }); am.into() } + + async fn stat(&self, path: &str, _args: OpStat) -> Result { + let path = self.core.prepare_path(path); + // TODO: borrowed from FsBackend because statx support for monoio + // is not released yet, but stat capability is required for read + // and write + let meta = tokio::fs::metadata(&path).await.map_err(new_std_io_error)?; + let mode = if meta.is_dir() { + EntryMode::DIR + } else if meta.is_file() { + EntryMode::FILE + } else { + EntryMode::Unknown + }; + let m = Metadata::new(mode) + .with_content_length(meta.len()) + .with_last_modified( + meta.modified() + .map(DateTime::from) + .map_err(new_std_io_error)?, + ); + Ok(RpStat::new(m)) + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let path = self.core.prepare_path(path); + let reader = MonoiofsReader::new(self.core.clone(), path, args.range()).await?; + Ok((RpRead::default(), reader)) + } + + async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + // TODO: create parent directory before write + let path = self.core.prepare_path(path); + let writer = MonoiofsWriter::new(self.core.clone(), path).await?; + Ok((RpWrite::default(), writer)) + } + + async fn delete(&self, path: &str, _args: OpDelete) -> Result { + let path = self.core.prepare_path(path); + // TODO: borrowed from FsBackend because monoio doesn't support unlink, + // but delete capability is required for behavior tests + let meta = tokio::fs::metadata(&path).await; + match meta { + Ok(meta) => { + if meta.is_dir() { + tokio::fs::remove_dir(&path) + .await + .map_err(new_std_io_error)?; + } else { + tokio::fs::remove_file(&path) + .await + .map_err(new_std_io_error)?; + } + + Ok(RpDelete::default()) + } + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(RpDelete::default()), + Err(err) => Err(new_std_io_error(err)), + } + } } diff --git a/core/src/services/monoiofs/core.rs b/core/src/services/monoiofs/core.rs index 13388eaac937..fdabf3875f84 100644 --- a/core/src/services/monoiofs/core.rs +++ b/core/src/services/monoiofs/core.rs @@ -24,9 +24,15 @@ use flume::Receiver; use flume::Sender; use futures::channel::oneshot; use futures::Future; +use monoio::task::JoinHandle; use monoio::FusionDriver; use monoio::RuntimeBuilder; +use crate::raw::*; +use crate::*; + +pub const BUFFER_SIZE: usize = 2 * 1024 * 1024; // 2 MiB + /// a boxed function that spawns task in current monoio runtime type TaskSpawner = Box; @@ -39,6 +45,7 @@ pub struct MonoiofsCore { #[allow(dead_code)] /// join handles of worker threads threads: Mutex>>, + pub buf_pool: oio::PooledBuf, } impl MonoiofsCore { @@ -59,13 +66,22 @@ impl MonoiofsCore { .collect(); let threads = Mutex::new(threads); - Self { root, tx, threads } + Self { + root, + tx, + threads, + buf_pool: oio::PooledBuf::new(16).with_initial_capacity(BUFFER_SIZE), + } } pub fn root(&self) -> &PathBuf { &self.root } + pub fn prepare_path(&self, path: &str) -> PathBuf { + self.root.join(path.trim_end_matches('/')) + } + /// entrypoint of each worker thread, sets up monoio runtimes and channels fn worker_entrypoint(rx: Receiver, io_uring_entries: u32) { let mut rt = RuntimeBuilder::::new() @@ -83,8 +99,8 @@ impl MonoiofsCore { } #[allow(dead_code)] - /// create a TaskSpawner, send it to the thread pool and wait - /// for its result + /// Create a TaskSpawner, send it to the thread pool and wait + /// for its result. Task panic will propagate. pub async fn dispatch(&self, f: F) -> T where F: FnOnce() -> Fut + 'static + Send, @@ -95,6 +111,8 @@ impl MonoiofsCore { let (tx, rx) = oneshot::channel(); self.tx .send_async(Box::new(move || { + // task will be spawned on current thread, task panic + // will cause current worker thread panic monoio::spawn(async move { tx.send(f().await) // discard result because it may be non-Debug and @@ -105,19 +123,41 @@ impl MonoiofsCore { })) .await .expect("send new TaskSpawner to worker thread should success"); - match rx.await { - Ok(result) => result, - // tx is dropped without sending result, probably the worker - // thread has panicked. - Err(_) => self.propagate_worker_panic(), - } + self.unwrap(rx.await) + } + + /// Create a TaskSpawner, send it to the thread pool, spawn the task + /// and return its [`JoinHandle`]. Task panic cannot propagate + /// through the [`JoinHandle`] and should be handled elsewhere. + pub async fn spawn(&self, f: F) -> JoinHandle + where + F: FnOnce() -> Fut + 'static + Send, + Fut: Future, + T: 'static + Send, + { + // oneshot channel to send JoinHandle back + let (tx, rx) = oneshot::channel(); + self.tx + .send_async(Box::new(move || { + // task will be spawned on current thread, task panic + // will cause current worker thread panic + let handle = monoio::spawn(async move { f().await }); + tx.send(handle) + // discard result because it may be non-Debug and + // we don't need it to appear in the panic message + .map_err(|_| ()) + .expect("send result from worker thread should success"); + })) + .await + .expect("send new TaskSpawner to worker thread should success"); + self.unwrap(rx.await) } /// This method always panics. It is called only when at least a /// worker thread has panicked or meet a broken rx, which is /// unrecoverable. It propagates worker thread's panic if there /// is any and panics on normally exited thread. - fn propagate_worker_panic(&self) -> ! { + pub fn propagate_worker_panic(&self) -> ! { let mut guard = self.threads.lock().unwrap(); // wait until the panicked thread exits std::thread::sleep(Duration::from_millis(100)); @@ -137,6 +177,17 @@ impl MonoiofsCore { } unreachable!("this method should panic") } + + /// Unwrap result if result is Ok, otherwise propagates worker thread's + /// panic. This method facilitates panic propagation in situation where + /// Err returned by broken channel indicates that the worker thread has + /// panicked. + pub fn unwrap(&self, result: Result) -> T { + match result { + Ok(result) => result, + Err(_) => self.propagate_worker_panic(), + } + } } #[cfg(test)] diff --git a/core/src/services/monoiofs/docs.md b/core/src/services/monoiofs/docs.md index f81ef4d731c0..3293fd7574d0 100644 --- a/core/src/services/monoiofs/docs.md +++ b/core/src/services/monoiofs/docs.md @@ -3,8 +3,8 @@ This service can be used to: - [ ] stat -- [ ] read -- [ ] write +- [x] read +- [x] write - [ ] append - [ ] create_dir - [ ] delete diff --git a/core/src/services/monoiofs/mod.rs b/core/src/services/monoiofs/mod.rs index 66d296936d90..dbda5ac14786 100644 --- a/core/src/services/monoiofs/mod.rs +++ b/core/src/services/monoiofs/mod.rs @@ -20,3 +20,5 @@ pub use backend::MonoiofsBuilder as Monoiofs; pub use backend::MonoiofsConfig; mod core; +mod reader; +mod writer; diff --git a/core/src/services/monoiofs/reader.rs b/core/src/services/monoiofs/reader.rs new file mode 100644 index 000000000000..6e4e30790685 --- /dev/null +++ b/core/src/services/monoiofs/reader.rs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::path::PathBuf; +use std::sync::Arc; + +use bytes::BytesMut; +use futures::channel::{mpsc, oneshot}; +use futures::{SinkExt, StreamExt}; +use monoio::fs::OpenOptions; + +use super::core::{MonoiofsCore, BUFFER_SIZE}; +use crate::raw::*; +use crate::*; + +enum ReaderRequest { + Read { + pos: u64, + buf: BytesMut, + tx: oneshot::Sender>, + }, +} + +pub struct MonoiofsReader { + core: Arc, + tx: mpsc::UnboundedSender, + pos: u64, + end_pos: Option, +} + +impl MonoiofsReader { + pub async fn new(core: Arc, path: PathBuf, range: BytesRange) -> Result { + let (open_result_tx, open_result_rx) = oneshot::channel(); + let (tx, rx) = mpsc::unbounded(); + core.spawn(move || Self::worker_entrypoint(path, rx, open_result_tx)) + .await; + core.unwrap(open_result_rx.await)?; + Ok(Self { + core, + tx, + pos: range.offset(), + end_pos: range.size().map(|size| range.offset() + size), + }) + } + + /// entrypoint of worker task that runs in context of monoio + async fn worker_entrypoint( + path: PathBuf, + mut rx: mpsc::UnboundedReceiver, + open_result_tx: oneshot::Sender>, + ) { + let result = OpenOptions::new().read(true).open(path).await; + // [`monoio::fs::File`] is non-Send, hence it is kept within + // worker thread + let file = match result { + Ok(file) => { + open_result_tx + .send(Ok(())) + .expect("send result from worker thread should success"); + file + } + Err(e) => { + open_result_tx + .send(Err(new_std_io_error(e))) + .expect("send result from worker thread should success"); + return; + } + }; + // wait for read request and send back result to main thread + loop { + let Some(req) = rx.next().await else { + // MonoiofsReader is dropped, exit worker task + break; + }; + match req { + ReaderRequest::Read { pos, buf, tx } => { + let (result, buf) = file.read_at(buf, pos).await; + // buf.len() will be set to n by monoio if read successfully, + // so n is dropped + let result = result.map(move |_| buf).map_err(new_std_io_error); + // discard the result if send failed due to + // MonoiofsReader::read cancelled + let _ = tx.send(result); + } + } + } + } +} + +impl oio::Read for MonoiofsReader { + /// Send read request to worker thread and wait for result. Actual + /// read happens in [`MonoiofsReader::worker_entrypoint`] running + /// on worker thread. + async fn read(&mut self) -> Result { + if let Some(end_pos) = self.end_pos { + if self.pos >= end_pos { + return Ok(Buffer::new()); + } + } + + // allocate and resize buffer + let mut buf = self.core.buf_pool.get(); + let size = self + .end_pos + .map_or(BUFFER_SIZE, |end_pos| (end_pos - self.pos) as usize); + // set capacity of buf to exact size to avoid excessive read + buf.reserve(size); + let _ = buf.split_off(size); + + // send read request to worker thread and wait for result + let (tx, rx) = oneshot::channel(); + self.core.unwrap( + self.tx + .send(ReaderRequest::Read { + pos: self.pos, + buf, + tx, + }) + .await, + ); + let mut buf = self.core.unwrap(rx.await)?; + + // advance cursor if read successfully + self.pos += buf.len() as u64; + let buffer = Buffer::from(buf.split().freeze()); + self.core.buf_pool.put(buf); + Ok(buffer) + } +} diff --git a/core/src/services/monoiofs/writer.rs b/core/src/services/monoiofs/writer.rs new file mode 100644 index 000000000000..20ffe65e1eff --- /dev/null +++ b/core/src/services/monoiofs/writer.rs @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::path::PathBuf; +use std::sync::Arc; + +use bytes::{Buf, Bytes}; +use futures::channel::{mpsc, oneshot}; +use futures::{SinkExt, StreamExt}; +use monoio::fs::OpenOptions; + +use super::core::MonoiofsCore; +use crate::raw::*; +use crate::*; + +enum WriterRequest { + Write { + pos: u64, + buf: Bytes, + tx: oneshot::Sender>, + }, + Close { + tx: oneshot::Sender>, + }, +} + +pub struct MonoiofsWriter { + core: Arc, + tx: mpsc::UnboundedSender, + pos: u64, +} + +impl MonoiofsWriter { + pub async fn new(core: Arc, path: PathBuf) -> Result { + let (open_result_tx, open_result_rx) = oneshot::channel(); + let (tx, rx) = mpsc::unbounded(); + core.spawn(move || Self::worker_entrypoint(path, rx, open_result_tx)) + .await; + core.unwrap(open_result_rx.await)?; + Ok(Self { core, tx, pos: 0 }) + } + + /// entrypoint of worker task that runs in context of monoio + async fn worker_entrypoint( + path: PathBuf, + mut rx: mpsc::UnboundedReceiver, + open_result_tx: oneshot::Sender>, + ) { + let result = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await; + // [`monoio::fs::File`] is non-Send, hence it is kept within + // worker thread + let file = match result { + Ok(file) => { + open_result_tx + .send(Ok(())) + .expect("send result from worker thread should success"); + file + } + Err(e) => { + open_result_tx + .send(Err(new_std_io_error(e))) + .expect("send result from worker thread should success"); + return; + } + }; + // wait for write or close request and send back result to main thread + loop { + let Some(req) = rx.next().await else { + // MonoiofsWriter is dropped, exit worker task + break; + }; + match req { + WriterRequest::Write { pos, buf, tx } => { + let (result, _) = file.write_all_at(buf, pos).await; + // discard the result if send failed due to + // MonoiofsWriter::write cancelled + let _ = tx.send(result.map_err(new_std_io_error)); + } + WriterRequest::Close { tx } => { + let result = file.sync_all().await; + // discard the result if send failed due to + // MonoiofsWriter::close cancelled + let _ = tx.send(result.map_err(new_std_io_error)); + // file is closed in background and result is useless + let _ = file.close().await; + break; + } + } + } + } +} + +impl oio::Write for MonoiofsWriter { + /// Send write request to worker thread and wait for result. Actual + /// write happens in [`MonoiofsWriter::worker_entrypoint`] running + /// on worker thread. + async fn write(&mut self, mut bs: Buffer) -> Result<()> { + while bs.has_remaining() { + let buf = bs.current(); + let n = buf.len(); + let (tx, rx) = oneshot::channel(); + self.core.unwrap( + self.tx + .send(WriterRequest::Write { + pos: self.pos, + buf, + tx, + }) + .await, + ); + self.core.unwrap(rx.await)?; + self.pos += n as u64; + bs.advance(n); + } + Ok(()) + } + + /// Send close request to worker thread and wait for result. Actual + /// close happens in [`MonoiofsWriter::worker_entrypoint`] running + /// on worker thread. + async fn close(&mut self) -> Result<()> { + let (tx, rx) = oneshot::channel(); + self.core + .unwrap(self.tx.send(WriterRequest::Close { tx }).await); + self.core.unwrap(rx.await) + } + + async fn abort(&mut self) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Monoiofs doesn't support abort", + )) + } +} From bcafd56f648738ca2e188a5124394eb82c6fb5b4 Mon Sep 17 00:00:00 2001 From: NKID00 Date: Tue, 30 Jul 2024 22:49:22 +0800 Subject: [PATCH 2/8] fix(ci): add monoiofs feature flag for bindings --- bindings/java/Cargo.toml | 1 + bindings/nodejs/Cargo.toml | 1 + bindings/python/Cargo.toml | 1 + 3 files changed, 3 insertions(+) diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml index b63e336a25b7..d61386755043 100644 --- a/bindings/java/Cargo.toml +++ b/bindings/java/Cargo.toml @@ -134,6 +134,7 @@ services-memcached = ["opendal/services-memcached"] services-mini-moka = ["opendal/services-mini-moka"] services-moka = ["opendal/services-moka"] services-mongodb = ["opendal/services-mongodb"] +services-monoiofs = ["opendal/services-monoiofs"] services-mysql = ["opendal/services-mysql"] services-onedrive = ["opendal/services-onedrive"] services-persy = ["opendal/services-persy"] diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index 997e0697678c..5ef45b866bdf 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -132,6 +132,7 @@ services-memcached = ["opendal/services-memcached"] services-mini-moka = ["opendal/services-mini-moka"] services-moka = ["opendal/services-moka"] services-mongodb = ["opendal/services-mongodb"] +services-monoiofs = ["opendal/services-monoiofs"] services-mysql = ["opendal/services-mysql"] services-onedrive = ["opendal/services-onedrive"] services-persy = ["opendal/services-persy"] diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 99869a36a959..2235af438787 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -131,6 +131,7 @@ services-memcached = ["opendal/services-memcached"] services-mini-moka = ["opendal/services-mini-moka"] services-moka = ["opendal/services-moka"] services-mongodb = ["opendal/services-mongodb"] +services-monoiofs = ["opendal/services-monoiofs"] services-mysql = ["opendal/services-mysql"] services-onedrive = ["opendal/services-onedrive"] services-persy = ["opendal/services-persy"] From 818b4dc7f6492ecb355d6ae365a8dc4c50e7f944 Mon Sep 17 00:00:00 2001 From: NKID00 Date: Tue, 30 Jul 2024 23:15:11 +0800 Subject: [PATCH 3/8] fix(services/monoiofs): propagate worker thread panic and swallow error in worker thread --- core/src/services/monoiofs/core.rs | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/core/src/services/monoiofs/core.rs b/core/src/services/monoiofs/core.rs index fdabf3875f84..7a7ee4b72c1b 100644 --- a/core/src/services/monoiofs/core.rs +++ b/core/src/services/monoiofs/core.rs @@ -109,20 +109,19 @@ impl MonoiofsCore { { // oneshot channel to send result back let (tx, rx) = oneshot::channel(); - self.tx + let result = self + .tx .send_async(Box::new(move || { // task will be spawned on current thread, task panic // will cause current worker thread panic monoio::spawn(async move { - tx.send(f().await) - // discard result because it may be non-Debug and - // we don't need it to appear in the panic message - .map_err(|_| ()) - .expect("send result from worker thread should success"); + // discard the result if send failed due to + // MonoiofsCore::dispatch cancelled + let _ = tx.send(f().await); }); })) - .await - .expect("send new TaskSpawner to worker thread should success"); + .await; + self.unwrap(result); self.unwrap(rx.await) } @@ -137,19 +136,18 @@ impl MonoiofsCore { { // oneshot channel to send JoinHandle back let (tx, rx) = oneshot::channel(); - self.tx + let result = self + .tx .send_async(Box::new(move || { // task will be spawned on current thread, task panic // will cause current worker thread panic let handle = monoio::spawn(async move { f().await }); - tx.send(handle) - // discard result because it may be non-Debug and - // we don't need it to appear in the panic message - .map_err(|_| ()) - .expect("send result from worker thread should success"); + // discard the result if send failed due to + // MonoiofsCore::spawn cancelled + let _ = tx.send(handle); })) - .await - .expect("send new TaskSpawner to worker thread should success"); + .await; + self.unwrap(result); self.unwrap(rx.await) } From 6712fdde424989593c91f1fdc9cf7c2dc73438a1 Mon Sep 17 00:00:00 2001 From: NKID00 Date: Tue, 30 Jul 2024 23:41:32 +0800 Subject: [PATCH 4/8] feat(services/monoiofs): panic message after worker thread has already panicked --- core/src/services/monoiofs/core.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/services/monoiofs/core.rs b/core/src/services/monoiofs/core.rs index 7a7ee4b72c1b..8b52ced6eafa 100644 --- a/core/src/services/monoiofs/core.rs +++ b/core/src/services/monoiofs/core.rs @@ -156,7 +156,10 @@ impl MonoiofsCore { /// unrecoverable. It propagates worker thread's panic if there /// is any and panics on normally exited thread. pub fn propagate_worker_panic(&self) -> ! { - let mut guard = self.threads.lock().unwrap(); + let mut guard = self + .threads + .lock() + .expect("worker thread has panicked"); // wait until the panicked thread exits std::thread::sleep(Duration::from_millis(100)); let threads = mem::take(&mut *guard); From f2c8c26d609afaad97d308d0cc9b1c5ee28815ff Mon Sep 17 00:00:00 2001 From: NKID00 Date: Tue, 30 Jul 2024 23:54:34 +0800 Subject: [PATCH 5/8] style(services/monoiofs): cargo fmt --- core/src/services/monoiofs/core.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/services/monoiofs/core.rs b/core/src/services/monoiofs/core.rs index 8b52ced6eafa..f576474633b7 100644 --- a/core/src/services/monoiofs/core.rs +++ b/core/src/services/monoiofs/core.rs @@ -156,10 +156,7 @@ impl MonoiofsCore { /// unrecoverable. It propagates worker thread's panic if there /// is any and panics on normally exited thread. pub fn propagate_worker_panic(&self) -> ! { - let mut guard = self - .threads - .lock() - .expect("worker thread has panicked"); + let mut guard = self.threads.lock().expect("worker thread has panicked"); // wait until the panicked thread exits std::thread::sleep(Duration::from_millis(100)); let threads = mem::take(&mut *guard); From 24a5ea5112e2ad3496030070eb382ef0fe99161d Mon Sep 17 00:00:00 2001 From: NKID00 Date: Tue, 20 Aug 2024 21:46:38 +0800 Subject: [PATCH 6/8] feat: use latest release of monoio --- core/Cargo.lock | 4 ++-- core/Cargo.toml | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 8973adf91aad..76fc1ef72c9f 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4535,9 +4535,9 @@ dependencies = [ [[package]] name = "monoio" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60824f1a372fa200937885a2d434610333fe47d6c6f3d0855bd4555b46316fe7" +checksum = "3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a" dependencies = [ "auto-const-array", "bytes", diff --git a/core/Cargo.toml b/core/Cargo.toml index 79cb8e3c2e23..d4926a9587b6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -350,7 +350,12 @@ compio = { version = "0.11.0", optional = true, features = [ crc32c = { version = "0.6.6", optional = true } # for services-monoiofs flume = { version = "0.11", optional = true } -monoio = { version = "0.2.3", optional = true, features = ["sync"] } +monoio = { version = "0.2.4", optional = true, features = [ + "sync", + "mkdirat", + "unlinkat", + "renameat", +] } # Layers # for layers-async-backtrace From 99d1701be86227a4ab4a37c2fd73c116ab1fcead Mon Sep 17 00:00:00 2001 From: NKID00 Date: Tue, 20 Aug 2024 21:48:40 +0800 Subject: [PATCH 7/8] feat: use monoio's native fs metadata and remove --- core/src/services/monoiofs/backend.rs | 25 ++++++++++++++++--------- core/src/services/monoiofs/core.rs | 3 --- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/core/src/services/monoiofs/backend.rs b/core/src/services/monoiofs/backend.rs index c482e39214a4..c2443f148b95 100644 --- a/core/src/services/monoiofs/backend.rs +++ b/core/src/services/monoiofs/backend.rs @@ -136,10 +136,11 @@ impl Access for MonoiofsBackend { async fn stat(&self, path: &str, _args: OpStat) -> Result { let path = self.core.prepare_path(path); - // TODO: borrowed from FsBackend because statx support for monoio - // is not released yet, but stat capability is required for read - // and write - let meta = tokio::fs::metadata(&path).await.map_err(new_std_io_error)?; + let meta = self + .core + .dispatch(move || monoio::fs::metadata(path)) + .await + .map_err(new_std_io_error)?; let mode = if meta.is_dir() { EntryMode::DIR } else if meta.is_file() { @@ -172,17 +173,23 @@ impl Access for MonoiofsBackend { async fn delete(&self, path: &str, _args: OpDelete) -> Result { let path = self.core.prepare_path(path); - // TODO: borrowed from FsBackend because monoio doesn't support unlink, - // but delete capability is required for behavior tests - let meta = tokio::fs::metadata(&path).await; + let meta = self + .core + .dispatch({ + let path = path.clone(); + move || monoio::fs::metadata(path) + }) + .await; match meta { Ok(meta) => { if meta.is_dir() { - tokio::fs::remove_dir(&path) + self.core + .dispatch(move || monoio::fs::remove_dir(path)) .await .map_err(new_std_io_error)?; } else { - tokio::fs::remove_file(&path) + self.core + .dispatch(move || monoio::fs::remove_file(path)) .await .map_err(new_std_io_error)?; } diff --git a/core/src/services/monoiofs/core.rs b/core/src/services/monoiofs/core.rs index f576474633b7..d00e7b847178 100644 --- a/core/src/services/monoiofs/core.rs +++ b/core/src/services/monoiofs/core.rs @@ -39,10 +39,8 @@ type TaskSpawner = Box; #[derive(Debug)] pub struct MonoiofsCore { root: PathBuf, - #[allow(dead_code)] /// sender that sends [`TaskSpawner`] to worker threads tx: Sender, - #[allow(dead_code)] /// join handles of worker threads threads: Mutex>>, pub buf_pool: oio::PooledBuf, @@ -98,7 +96,6 @@ impl MonoiofsCore { }) } - #[allow(dead_code)] /// Create a TaskSpawner, send it to the thread pool and wait /// for its result. Task panic will propagate. pub async fn dispatch(&self, f: F) -> T From e6d41eb081e2b2cf26a010e13dedc7e316cf0b55 Mon Sep 17 00:00:00 2001 From: NKID00 Date: Tue, 20 Aug 2024 22:24:04 +0800 Subject: [PATCH 8/8] docs: update available capabilities --- core/src/services/monoiofs/docs.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/services/monoiofs/docs.md b/core/src/services/monoiofs/docs.md index 3293fd7574d0..2db804fd6f7f 100644 --- a/core/src/services/monoiofs/docs.md +++ b/core/src/services/monoiofs/docs.md @@ -2,12 +2,12 @@ This service can be used to: -- [ ] stat +- [x] stat - [x] read - [x] write - [ ] append - [ ] create_dir -- [ ] delete +- [x] delete - [ ] copy - [ ] rename - [ ] list