diff --git a/core/Cargo.lock b/core/Cargo.lock index 6797045e4bd3..a1a1bd89772b 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4799,6 +4799,7 @@ dependencies = [ "etcd-client", "fastrace", "flagset", + "flume", "foundationdb", "futures", "getrandom 0.2.15", diff --git a/core/Cargo.toml b/core/Cargo.toml index 4117188d8e91..f6aa652c2b1c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -159,7 +159,7 @@ services-memory = [] services-mini-moka = ["dep:mini-moka"] services-moka = ["dep:moka"] services-mongodb = ["dep:mongodb"] -services-monoiofs = ["dep:monoio"] +services-monoiofs = ["dep:monoio", "dep:flume"] services-mysql = ["dep:mysql_async"] services-obs = [ "dep:reqsign", @@ -347,6 +347,7 @@ compio = { version = "0.11.0", optional = true, features = [ # for services-s3 crc32c = { version = "0.6.6", optional = true } # for services-monoiofs +flume = { version = "0.11", optional = true } monoio = { version = "0.2.3", optional = true, features = ["sync"] } # Layers diff --git a/core/src/services/monoiofs/backend.rs b/core/src/services/monoiofs/backend.rs index b7c85e880843..93d52ef568cb 100644 --- a/core/src/services/monoiofs/backend.rs +++ b/core/src/services/monoiofs/backend.rs @@ -16,10 +16,13 @@ // under the License. use std::fmt::Debug; +use std::io; +use std::path::PathBuf; use std::sync::Arc; use serde::Deserialize; +use super::core::MonoiofsCore; use crate::raw::*; use crate::*; @@ -32,7 +35,7 @@ pub struct MonoiofsConfig { /// /// All operations will happen under this root. /// - /// Default to `/` if not set. + /// Builder::build will return error if not set. pub root: Option, } @@ -53,7 +56,6 @@ impl MonoiofsBuilder { } else { Some(root.to_string()) }; - self } } @@ -65,18 +67,46 @@ impl Builder for MonoiofsBuilder { fn from_map(map: std::collections::HashMap) -> Self { let config = MonoiofsConfig::deserialize(ConfigDeserializer::new(map)) - .expect("config deserialize must succeed"); - + .expect("config deserialize should success"); MonoiofsBuilder { config } } fn build(&mut self) -> Result { - todo!() + let root = self.config.root.take().map(PathBuf::from).ok_or( + Error::new(ErrorKind::ConfigInvalid, "root is not specified") + .with_operation("Builder::build"), + )?; + if let Err(e) = std::fs::metadata(&root) { + if e.kind() == io::ErrorKind::NotFound { + std::fs::create_dir_all(&root).map_err(|e| { + Error::new(ErrorKind::Unexpected, "create root dir failed") + .with_operation("Builder::build") + .with_context("root", root.to_string_lossy()) + .set_source(e) + })?; + } + } + let root = root.canonicalize().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "canonicalize of root directory failed", + ) + .with_operation("Builder::build") + .with_context("root", root.to_string_lossy()) + .set_source(e) + })?; + let worker_threads = 1; // TODO: test concurrency and default to available_parallelism and bind cpu + let io_uring_entries = 1024; + Ok(MonoiofsBackend { + core: Arc::new(MonoiofsCore::new(root, worker_threads, io_uring_entries)), + }) } } #[derive(Debug, Clone)] -pub struct MonoiofsBackend {} +pub struct MonoiofsBackend { + core: Arc, +} impl Access for MonoiofsBackend { type Reader = (); @@ -87,6 +117,12 @@ impl Access for MonoiofsBackend { type BlockingLister = (); fn info(&self) -> Arc { - todo!() + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Monoiofs) + .set_root(&self.core.root().to_string_lossy()) + .set_native_capability(Capability { + ..Default::default() + }); + am.into() } } diff --git a/core/src/services/monoiofs/core.rs b/core/src/services/monoiofs/core.rs new file mode 100644 index 000000000000..6bf376610d02 --- /dev/null +++ b/core/src/services/monoiofs/core.rs @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{mem, path::PathBuf, sync::Mutex, time::Duration}; + +use flume::{Receiver, Sender}; +use futures::{channel::oneshot, Future}; +use monoio::{FusionDriver, RuntimeBuilder}; + +/// a boxed function that spawns task in current monoio runtime +type TaskSpawner = Box; + +#[derive(Debug)] +pub struct MonoiofsCore { + root: PathBuf, + #[allow(dead_code)] + /// sender that sends [`TaskSpawner`] to worker threads + tx: Sender, + #[allow(dead_code)] + /// join handles of worker threads + threads: Mutex>>, +} + +impl MonoiofsCore { + pub fn new(root: PathBuf, worker_threads: usize, io_uring_entries: u32) -> Self { + // Since users use monoiofs in a context of tokio, all monoio + // operations need to be dispatched to a dedicated thread pool + // where a monoio runtime runs on each thread. Here we spawn + // these worker threads. + let (tx, rx) = flume::unbounded(); + let threads = (0..worker_threads) + .map(move |i| { + let rx = rx.clone(); + std::thread::Builder::new() + .name(format!("monoiofs-worker-{i}")) + .spawn(move || Self::worker_entrypoint(rx, io_uring_entries)) + .expect("spawn worker thread should success") + }) + .collect(); + let threads = Mutex::new(threads); + + Self { root, tx, threads } + } + + pub fn root(&self) -> &PathBuf { + &self.root + } + + /// entrypoint of each worker thread, sets up monoio runtimes and channels + fn worker_entrypoint(rx: Receiver, io_uring_entries: u32) { + let mut rt = RuntimeBuilder::::new() + .enable_all() + .with_entries(io_uring_entries) + .build() + .expect("monoio runtime initialize should success"); + // run a infinite loop that receives TaskSpawner and calls + // them in a context of monoio + rt.block_on(async { + while let Ok(spawner) = rx.recv_async().await { + spawner(); + } + }) + } + + #[allow(dead_code)] + /// create a TaskSpawner, send it to the thread pool and wait + /// for its result + pub async fn dispatch(&self, f: F) -> T + where + F: FnOnce() -> Fut + 'static + Send, + Fut: Future, + T: 'static + Send, + { + // oneshot channel to send result back + let (tx, rx) = oneshot::channel(); + self.tx + .send_async(Box::new(move || { + monoio::spawn(async move { + tx.send(f().await) + // discard result because it may be non-Debug and + // we don't need it to appear in the panic message + .map_err(|_| ()) + .expect("send result from worker thread should success"); + }); + })) + .await + .expect("send new TaskSpawner to worker thread should success"); + match rx.await { + Ok(result) => result, + // tx is dropped without sending result, probably the worker + // thread has panicked. + Err(_) => self.propagate_worker_panic(), + } + } + + /// This method always panics. It is called only when at least a + /// worker thread has panicked or meet a broken rx, which is + /// unrecoverable. It propagates worker thread's panic if there + /// is any and panics on normally exited thread. + fn propagate_worker_panic(&self) -> ! { + let mut guard = self.threads.lock().unwrap(); + // wait until the panicked thread exits + std::thread::sleep(Duration::from_millis(100)); + let threads = mem::take(&mut *guard); + // we don't know which thread panicked, so check them one by one + for thread in threads { + if thread.is_finished() { + // worker thread runs an infinite loop, hence finished + // thread must have panicked or meet a broken rx. + match thread.join() { + // rx is broken + Ok(()) => panic!("worker thread should not exit, tx may be dropped"), + // thread has panicked + Err(e) => std::panic::resume_unwind(e), + } + } + } + unreachable!("this method should panic") + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + use futures::{ + channel::mpsc::{self, UnboundedSender}, + StreamExt, + }; + + use super::*; + + fn new_core(worker_threads: usize) -> Arc { + Arc::new(MonoiofsCore::new(PathBuf::new(), worker_threads, 1024)) + } + + async fn dispatch_simple(core: Arc) { + let result = core.dispatch(|| async { 42 }).await; + assert_eq!(result, 42); + let bytes: Vec = vec![1, 2, 3, 4, 5, 6, 7, 8]; + let bytes_clone = bytes.clone(); + let result = core.dispatch(move || async move { bytes }).await; + assert_eq!(result, bytes_clone); + } + + async fn dispatch_concurrent(core: Arc) { + let (tx, mut rx) = mpsc::unbounded(); + + async fn spawn_task(core: Arc, tx: UnboundedSender, sleep_millis: u64) { + tokio::spawn(async move { + let result = core + .dispatch(move || async move { + monoio::time::sleep(Duration::from_millis(sleep_millis)).await; + sleep_millis + }) + .await; + assert_eq!(result, sleep_millis); + tx.unbounded_send(result).unwrap(); + }); + } + + spawn_task(core.clone(), tx.clone(), 200).await; + spawn_task(core.clone(), tx.clone(), 20).await; + drop(tx); + let first = rx.next().await; + let second = rx.next().await; + let third = rx.next().await; + assert_eq!(first, Some(20)); + assert_eq!(second, Some(200)); + assert_eq!(third, None); + } + + async fn dispatch_panic(core: Arc) { + core.dispatch(|| async { panic!("BOOM") }).await; + } + + #[tokio::test] + async fn test_monoio_single_thread_dispatch() { + let core = new_core(1); + assert_eq!(core.threads.lock().unwrap().len(), 1); + dispatch_simple(core).await; + } + + #[tokio::test] + async fn test_monoio_single_thread_dispatch_concurrent() { + let core = new_core(1); + dispatch_concurrent(core).await; + } + + #[tokio::test] + #[should_panic(expected = "BOOM")] + async fn test_monoio_single_thread_dispatch_panic() { + let core = new_core(1); + dispatch_panic(core).await; + } + + #[tokio::test] + async fn test_monoio_multi_thread_dispatch() { + let core = new_core(4); + assert_eq!(core.threads.lock().unwrap().len(), 4); + dispatch_simple(core).await; + } + + #[tokio::test] + async fn test_monoio_multi_thread_dispatch_concurrent() { + let core = new_core(4); + dispatch_concurrent(core).await; + } + + #[tokio::test] + #[should_panic(expected = "BOOM")] + async fn test_monoio_multi_thread_dispatch_panic() { + let core = new_core(4); + dispatch_panic(core).await; + } +} diff --git a/core/src/services/monoiofs/mod.rs b/core/src/services/monoiofs/mod.rs index 68c23e3f1e35..66d296936d90 100644 --- a/core/src/services/monoiofs/mod.rs +++ b/core/src/services/monoiofs/mod.rs @@ -18,3 +18,5 @@ mod backend; pub use backend::MonoiofsBuilder as Monoiofs; pub use backend::MonoiofsConfig; + +mod core;