Skip to content

Commit

Permalink
feat(services/monoiofs): monoio wrapper (#4885)
Browse files Browse the repository at this point in the history
* fix: typo

* feat(services/monoiofs): monoio wrapper

* fix(services/monoiofs): hide service implementation details
  • Loading branch information
NKID00 authored Jul 18, 2024
1 parent e42a9eb commit 7985cfd
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 8 deletions.
1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ services-memory = []
services-mini-moka = ["dep:mini-moka"]
services-moka = ["dep:moka"]
services-mongodb = ["dep:mongodb"]
services-monoiofs = ["dep:monoio"]
services-monoiofs = ["dep:monoio", "dep:flume"]
services-mysql = ["dep:mysql_async"]
services-obs = [
"dep:reqsign",
Expand Down Expand Up @@ -347,6 +347,7 @@ compio = { version = "0.11.0", optional = true, features = [
# for services-s3
crc32c = { version = "0.6.6", optional = true }
# for services-monoiofs
flume = { version = "0.11", optional = true }
monoio = { version = "0.2.3", optional = true, features = ["sync"] }

# Layers
Expand Down
50 changes: 43 additions & 7 deletions core/src/services/monoiofs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
// under the License.

use std::fmt::Debug;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;

use serde::Deserialize;

use super::core::MonoiofsCore;
use crate::raw::*;
use crate::*;

Expand All @@ -32,7 +35,7 @@ pub struct MonoiofsConfig {
///
/// All operations will happen under this root.
///
/// Default to `/` if not set.
/// Builder::build will return error if not set.
pub root: Option<String>,
}

Expand All @@ -53,7 +56,6 @@ impl MonoiofsBuilder {
} else {
Some(root.to_string())
};

self
}
}
Expand All @@ -65,18 +67,46 @@ impl Builder for MonoiofsBuilder {

fn from_map(map: std::collections::HashMap<String, String>) -> Self {
let config = MonoiofsConfig::deserialize(ConfigDeserializer::new(map))
.expect("config deserialize must succeed");

.expect("config deserialize should success");
MonoiofsBuilder { config }
}

fn build(&mut self) -> Result<Self::Accessor> {
todo!()
let root = self.config.root.take().map(PathBuf::from).ok_or(
Error::new(ErrorKind::ConfigInvalid, "root is not specified")
.with_operation("Builder::build"),
)?;
if let Err(e) = std::fs::metadata(&root) {
if e.kind() == io::ErrorKind::NotFound {
std::fs::create_dir_all(&root).map_err(|e| {
Error::new(ErrorKind::Unexpected, "create root dir failed")
.with_operation("Builder::build")
.with_context("root", root.to_string_lossy())
.set_source(e)
})?;
}
}
let root = root.canonicalize().map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"canonicalize of root directory failed",
)
.with_operation("Builder::build")
.with_context("root", root.to_string_lossy())
.set_source(e)
})?;
let worker_threads = 1; // TODO: test concurrency and default to available_parallelism and bind cpu
let io_uring_entries = 1024;
Ok(MonoiofsBackend {
core: Arc::new(MonoiofsCore::new(root, worker_threads, io_uring_entries)),
})
}
}

#[derive(Debug, Clone)]
pub struct MonoiofsBackend {}
pub struct MonoiofsBackend {
core: Arc<MonoiofsCore>,
}

impl Access for MonoiofsBackend {
type Reader = ();
Expand All @@ -87,6 +117,12 @@ impl Access for MonoiofsBackend {
type BlockingLister = ();

fn info(&self) -> Arc<AccessorInfo> {
todo!()
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Monoiofs)
.set_root(&self.core.root().to_string_lossy())
.set_native_capability(Capability {
..Default::default()
});
am.into()
}
}
230 changes: 230 additions & 0 deletions core/src/services/monoiofs/core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{mem, path::PathBuf, sync::Mutex, time::Duration};

use flume::{Receiver, Sender};
use futures::{channel::oneshot, Future};
use monoio::{FusionDriver, RuntimeBuilder};

/// a boxed function that spawns task in current monoio runtime
type TaskSpawner = Box<dyn FnOnce() + Send>;

#[derive(Debug)]
pub struct MonoiofsCore {
root: PathBuf,
#[allow(dead_code)]
/// sender that sends [`TaskSpawner`] to worker threads
tx: Sender<TaskSpawner>,
#[allow(dead_code)]
/// join handles of worker threads
threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
}

impl MonoiofsCore {
pub fn new(root: PathBuf, worker_threads: usize, io_uring_entries: u32) -> Self {
// Since users use monoiofs in a context of tokio, all monoio
// operations need to be dispatched to a dedicated thread pool
// where a monoio runtime runs on each thread. Here we spawn
// these worker threads.
let (tx, rx) = flume::unbounded();
let threads = (0..worker_threads)
.map(move |i| {
let rx = rx.clone();
std::thread::Builder::new()
.name(format!("monoiofs-worker-{i}"))
.spawn(move || Self::worker_entrypoint(rx, io_uring_entries))
.expect("spawn worker thread should success")
})
.collect();
let threads = Mutex::new(threads);

Self { root, tx, threads }
}

pub fn root(&self) -> &PathBuf {
&self.root
}

/// entrypoint of each worker thread, sets up monoio runtimes and channels
fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
let mut rt = RuntimeBuilder::<FusionDriver>::new()
.enable_all()
.with_entries(io_uring_entries)
.build()
.expect("monoio runtime initialize should success");
// run a infinite loop that receives TaskSpawner and calls
// them in a context of monoio
rt.block_on(async {
while let Ok(spawner) = rx.recv_async().await {
spawner();
}
})
}

#[allow(dead_code)]
/// create a TaskSpawner, send it to the thread pool and wait
/// for its result
pub async fn dispatch<F, Fut, T>(&self, f: F) -> T
where
F: FnOnce() -> Fut + 'static + Send,
Fut: Future<Output = T>,
T: 'static + Send,
{
// oneshot channel to send result back
let (tx, rx) = oneshot::channel();
self.tx
.send_async(Box::new(move || {
monoio::spawn(async move {
tx.send(f().await)
// discard result because it may be non-Debug and
// we don't need it to appear in the panic message
.map_err(|_| ())
.expect("send result from worker thread should success");
});
}))
.await
.expect("send new TaskSpawner to worker thread should success");
match rx.await {
Ok(result) => result,
// tx is dropped without sending result, probably the worker
// thread has panicked.
Err(_) => self.propagate_worker_panic(),
}
}

/// This method always panics. It is called only when at least a
/// worker thread has panicked or meet a broken rx, which is
/// unrecoverable. It propagates worker thread's panic if there
/// is any and panics on normally exited thread.
fn propagate_worker_panic(&self) -> ! {
let mut guard = self.threads.lock().unwrap();
// wait until the panicked thread exits
std::thread::sleep(Duration::from_millis(100));
let threads = mem::take(&mut *guard);
// we don't know which thread panicked, so check them one by one
for thread in threads {
if thread.is_finished() {
// worker thread runs an infinite loop, hence finished
// thread must have panicked or meet a broken rx.
match thread.join() {
// rx is broken
Ok(()) => panic!("worker thread should not exit, tx may be dropped"),
// thread has panicked
Err(e) => std::panic::resume_unwind(e),
}
}
}
unreachable!("this method should panic")
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

use futures::{
channel::mpsc::{self, UnboundedSender},
StreamExt,
};

use super::*;

fn new_core(worker_threads: usize) -> Arc<MonoiofsCore> {
Arc::new(MonoiofsCore::new(PathBuf::new(), worker_threads, 1024))
}

async fn dispatch_simple(core: Arc<MonoiofsCore>) {
let result = core.dispatch(|| async { 42 }).await;
assert_eq!(result, 42);
let bytes: Vec<u8> = vec![1, 2, 3, 4, 5, 6, 7, 8];
let bytes_clone = bytes.clone();
let result = core.dispatch(move || async move { bytes }).await;
assert_eq!(result, bytes_clone);
}

async fn dispatch_concurrent(core: Arc<MonoiofsCore>) {
let (tx, mut rx) = mpsc::unbounded();

async fn spawn_task(core: Arc<MonoiofsCore>, tx: UnboundedSender<u64>, sleep_millis: u64) {
tokio::spawn(async move {
let result = core
.dispatch(move || async move {
monoio::time::sleep(Duration::from_millis(sleep_millis)).await;
sleep_millis
})
.await;
assert_eq!(result, sleep_millis);
tx.unbounded_send(result).unwrap();
});
}

spawn_task(core.clone(), tx.clone(), 200).await;
spawn_task(core.clone(), tx.clone(), 20).await;
drop(tx);
let first = rx.next().await;
let second = rx.next().await;
let third = rx.next().await;
assert_eq!(first, Some(20));
assert_eq!(second, Some(200));
assert_eq!(third, None);
}

async fn dispatch_panic(core: Arc<MonoiofsCore>) {
core.dispatch(|| async { panic!("BOOM") }).await;
}

#[tokio::test]
async fn test_monoio_single_thread_dispatch() {
let core = new_core(1);
assert_eq!(core.threads.lock().unwrap().len(), 1);
dispatch_simple(core).await;
}

#[tokio::test]
async fn test_monoio_single_thread_dispatch_concurrent() {
let core = new_core(1);
dispatch_concurrent(core).await;
}

#[tokio::test]
#[should_panic(expected = "BOOM")]
async fn test_monoio_single_thread_dispatch_panic() {
let core = new_core(1);
dispatch_panic(core).await;
}

#[tokio::test]
async fn test_monoio_multi_thread_dispatch() {
let core = new_core(4);
assert_eq!(core.threads.lock().unwrap().len(), 4);
dispatch_simple(core).await;
}

#[tokio::test]
async fn test_monoio_multi_thread_dispatch_concurrent() {
let core = new_core(4);
dispatch_concurrent(core).await;
}

#[tokio::test]
#[should_panic(expected = "BOOM")]
async fn test_monoio_multi_thread_dispatch_panic() {
let core = new_core(4);
dispatch_panic(core).await;
}
}
2 changes: 2 additions & 0 deletions core/src/services/monoiofs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@
mod backend;
pub use backend::MonoiofsBuilder as Monoiofs;
pub use backend::MonoiofsConfig;

mod core;

0 comments on commit 7985cfd

Please sign in to comment.