Skip to content

Commit

Permalink
feat(services/monoiofs): impl read and write, add behavior test (#4944)
Browse files Browse the repository at this point in the history
* feat(services/monoiofs): impl read and write, add behavior test

* fix(ci): add monoiofs feature flag for bindings

* fix(services/monoiofs): propagate worker thread panic and swallow error in worker thread

* feat(services/monoiofs): panic message after worker thread has already panicked

* style(services/monoiofs): cargo fmt

* feat: use latest release of monoio

* feat: use monoio's native fs metadata and remove

* docs: update available capabilities
  • Loading branch information
NKID00 authored Aug 21, 2024
1 parent 79a7d11 commit 90832a4
Show file tree
Hide file tree
Showing 12 changed files with 484 additions and 31 deletions.
27 changes: 27 additions & 0 deletions .github/services/monoiofs/monoiofs/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: monoiofs
description: 'Behavior test for monoiofs'

runs:
using: "composite"
steps:
- name: Setup
shell: bash
run: |
echo "OPENDAL_MONOIOFS_ROOT=${{ runner.temp }}/" >> $GITHUB_ENV
1 change: 1 addition & 0 deletions bindings/java/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ services-memcached = ["opendal/services-memcached"]
services-mini-moka = ["opendal/services-mini-moka"]
services-moka = ["opendal/services-moka"]
services-mongodb = ["opendal/services-mongodb"]
services-monoiofs = ["opendal/services-monoiofs"]
services-mysql = ["opendal/services-mysql"]
services-onedrive = ["opendal/services-onedrive"]
services-persy = ["opendal/services-persy"]
Expand Down
1 change: 1 addition & 0 deletions bindings/nodejs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ services-memcached = ["opendal/services-memcached"]
services-mini-moka = ["opendal/services-mini-moka"]
services-moka = ["opendal/services-moka"]
services-mongodb = ["opendal/services-mongodb"]
services-monoiofs = ["opendal/services-monoiofs"]
services-mysql = ["opendal/services-mysql"]
services-onedrive = ["opendal/services-onedrive"]
services-persy = ["opendal/services-persy"]
Expand Down
1 change: 1 addition & 0 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ services-memcached = ["opendal/services-memcached"]
services-mini-moka = ["opendal/services-mini-moka"]
services-moka = ["opendal/services-moka"]
services-mongodb = ["opendal/services-mongodb"]
services-monoiofs = ["opendal/services-monoiofs"]
services-mysql = ["opendal/services-mysql"]
services-onedrive = ["opendal/services-onedrive"]
services-persy = ["opendal/services-persy"]
Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,12 @@ compio = { version = "0.11.0", optional = true, features = [
crc32c = { version = "0.6.6", optional = true }
# for services-monoiofs
flume = { version = "0.11", optional = true }
monoio = { version = "0.2.3", optional = true, features = ["sync"] }
monoio = { version = "0.2.4", optional = true, features = [
"sync",
"mkdirat",
"unlinkat",
"renameat",
] }

# Layers
# for layers-async-backtrace
Expand Down
78 changes: 76 additions & 2 deletions core/src/services/monoiofs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ use std::io;
use std::path::PathBuf;
use std::sync::Arc;

use chrono::DateTime;
use serde::Deserialize;
use serde::Serialize;

use super::core::MonoiofsCore;
use super::reader::MonoiofsReader;
use super::writer::MonoiofsWriter;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -110,8 +113,8 @@ pub struct MonoiofsBackend {
}

impl Access for MonoiofsBackend {
type Reader = ();
type Writer = ();
type Reader = MonoiofsReader;
type Writer = MonoiofsWriter;
type Lister = ();
type BlockingReader = ();
type BlockingWriter = ();
Expand All @@ -122,8 +125,79 @@ impl Access for MonoiofsBackend {
am.set_scheme(Scheme::Monoiofs)
.set_root(&self.core.root().to_string_lossy())
.set_native_capability(Capability {
stat: true,
read: true,
write: true,
delete: true,
..Default::default()
});
am.into()
}

async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
let path = self.core.prepare_path(path);
let meta = self
.core
.dispatch(move || monoio::fs::metadata(path))
.await
.map_err(new_std_io_error)?;
let mode = if meta.is_dir() {
EntryMode::DIR
} else if meta.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let m = Metadata::new(mode)
.with_content_length(meta.len())
.with_last_modified(
meta.modified()
.map(DateTime::from)
.map_err(new_std_io_error)?,
);
Ok(RpStat::new(m))
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let path = self.core.prepare_path(path);
let reader = MonoiofsReader::new(self.core.clone(), path, args.range()).await?;
Ok((RpRead::default(), reader))
}

async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
// TODO: create parent directory before write
let path = self.core.prepare_path(path);
let writer = MonoiofsWriter::new(self.core.clone(), path).await?;
Ok((RpWrite::default(), writer))
}

async fn delete(&self, path: &str, _args: OpDelete) -> Result<RpDelete> {
let path = self.core.prepare_path(path);
let meta = self
.core
.dispatch({
let path = path.clone();
move || monoio::fs::metadata(path)
})
.await;
match meta {
Ok(meta) => {
if meta.is_dir() {
self.core
.dispatch(move || monoio::fs::remove_dir(path))
.await
.map_err(new_std_io_error)?;
} else {
self.core
.dispatch(move || monoio::fs::remove_file(path))
.await
.map_err(new_std_io_error)?;
}

Ok(RpDelete::default())
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(RpDelete::default()),
Err(err) => Err(new_std_io_error(err)),
}
}
}
90 changes: 68 additions & 22 deletions core/src/services/monoiofs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,26 @@ use flume::Receiver;
use flume::Sender;
use futures::channel::oneshot;
use futures::Future;
use monoio::task::JoinHandle;
use monoio::FusionDriver;
use monoio::RuntimeBuilder;

use crate::raw::*;
use crate::*;

pub const BUFFER_SIZE: usize = 2 * 1024 * 1024; // 2 MiB

/// a boxed function that spawns task in current monoio runtime
type TaskSpawner = Box<dyn FnOnce() + Send>;

#[derive(Debug)]
pub struct MonoiofsCore {
root: PathBuf,
#[allow(dead_code)]
/// sender that sends [`TaskSpawner`] to worker threads
tx: Sender<TaskSpawner>,
#[allow(dead_code)]
/// join handles of worker threads
threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
pub buf_pool: oio::PooledBuf,
}

impl MonoiofsCore {
Expand All @@ -59,13 +64,22 @@ impl MonoiofsCore {
.collect();
let threads = Mutex::new(threads);

Self { root, tx, threads }
Self {
root,
tx,
threads,
buf_pool: oio::PooledBuf::new(16).with_initial_capacity(BUFFER_SIZE),
}
}

pub fn root(&self) -> &PathBuf {
&self.root
}

pub fn prepare_path(&self, path: &str) -> PathBuf {
self.root.join(path.trim_end_matches('/'))
}

/// entrypoint of each worker thread, sets up monoio runtimes and channels
fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
let mut rt = RuntimeBuilder::<FusionDriver>::new()
Expand All @@ -82,9 +96,8 @@ impl MonoiofsCore {
})
}

#[allow(dead_code)]
/// create a TaskSpawner, send it to the thread pool and wait
/// for its result
/// Create a TaskSpawner, send it to the thread pool and wait
/// for its result. Task panic will propagate.
pub async fn dispatch<F, Fut, T>(&self, f: F) -> T
where
F: FnOnce() -> Fut + 'static + Send,
Expand All @@ -93,32 +106,54 @@ impl MonoiofsCore {
{
// oneshot channel to send result back
let (tx, rx) = oneshot::channel();
self.tx
let result = self
.tx
.send_async(Box::new(move || {
// task will be spawned on current thread, task panic
// will cause current worker thread panic
monoio::spawn(async move {
tx.send(f().await)
// discard result because it may be non-Debug and
// we don't need it to appear in the panic message
.map_err(|_| ())
.expect("send result from worker thread should success");
// discard the result if send failed due to
// MonoiofsCore::dispatch cancelled
let _ = tx.send(f().await);
});
}))
.await
.expect("send new TaskSpawner to worker thread should success");
match rx.await {
Ok(result) => result,
// tx is dropped without sending result, probably the worker
// thread has panicked.
Err(_) => self.propagate_worker_panic(),
}
.await;
self.unwrap(result);
self.unwrap(rx.await)
}

/// Create a TaskSpawner, send it to the thread pool, spawn the task
/// and return its [`JoinHandle`]. Task panic cannot propagate
/// through the [`JoinHandle`] and should be handled elsewhere.
pub async fn spawn<F, Fut, T>(&self, f: F) -> JoinHandle<T>
where
F: FnOnce() -> Fut + 'static + Send,
Fut: Future<Output = T>,
T: 'static + Send,
{
// oneshot channel to send JoinHandle back
let (tx, rx) = oneshot::channel();
let result = self
.tx
.send_async(Box::new(move || {
// task will be spawned on current thread, task panic
// will cause current worker thread panic
let handle = monoio::spawn(async move { f().await });
// discard the result if send failed due to
// MonoiofsCore::spawn cancelled
let _ = tx.send(handle);
}))
.await;
self.unwrap(result);
self.unwrap(rx.await)
}

/// This method always panics. It is called only when at least a
/// worker thread has panicked or meet a broken rx, which is
/// unrecoverable. It propagates worker thread's panic if there
/// is any and panics on normally exited thread.
fn propagate_worker_panic(&self) -> ! {
let mut guard = self.threads.lock().unwrap();
pub fn propagate_worker_panic(&self) -> ! {
let mut guard = self.threads.lock().expect("worker thread has panicked");
// wait until the panicked thread exits
std::thread::sleep(Duration::from_millis(100));
let threads = mem::take(&mut *guard);
Expand All @@ -137,6 +172,17 @@ impl MonoiofsCore {
}
unreachable!("this method should panic")
}

/// Unwrap result if result is Ok, otherwise propagates worker thread's
/// panic. This method facilitates panic propagation in situation where
/// Err returned by broken channel indicates that the worker thread has
/// panicked.
pub fn unwrap<T, E>(&self, result: Result<T, E>) -> T {
match result {
Ok(result) => result,
Err(_) => self.propagate_worker_panic(),
}
}
}

#[cfg(test)]
Expand Down
8 changes: 4 additions & 4 deletions core/src/services/monoiofs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

This service can be used to:

- [ ] stat
- [ ] read
- [ ] write
- [x] stat
- [x] read
- [x] write
- [ ] append
- [ ] create_dir
- [ ] delete
- [x] delete
- [ ] copy
- [ ] rename
- [ ] list
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/monoiofs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ pub use backend::MonoiofsBuilder as Monoiofs;
pub use backend::MonoiofsConfig;

mod core;
mod reader;
mod writer;
Loading

0 comments on commit 90832a4

Please sign in to comment.