Skip to content

Commit

Permalink
Merge branch 'main' into refactor-prometheus-layer-2
Browse files Browse the repository at this point in the history
  • Loading branch information
koushiro authored Aug 24, 2024
2 parents 4542feb + 84c5599 commit b750a03
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 21 deletions.
5 changes: 3 additions & 2 deletions core/src/services/github/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ impl GithubBuilder {
///
/// required.
pub fn token(mut self, token: &str) -> Self {
self.config.token = Some(token.to_string());

if !token.is_empty() {
self.config.token = Some(token.to_string());
}
self
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/http/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl HttpBuilder {
/// default: no access token
pub fn token(mut self, token: &str) -> Self {
if !token.is_empty() {
self.config.token = Some(token.to_owned());
self.config.token = Some(token.to_string());
}
self
}
Expand Down
96 changes: 92 additions & 4 deletions core/src/services/monoiofs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use std::path::PathBuf;
use std::sync::Arc;

use chrono::DateTime;
use monoio::fs::OpenOptions;

use super::core::MonoiofsCore;
use super::core::BUFFER_SIZE;
use super::reader::MonoiofsReader;
use super::writer::MonoiofsWriter;
use crate::raw::*;
Expand Down Expand Up @@ -114,7 +116,11 @@ impl Access for MonoiofsBackend {
stat: true,
read: true,
write: true,
write_can_append: true,
delete: true,
rename: true,
create_dir: true,
copy: true,
..Default::default()
});
am.into()
Expand Down Expand Up @@ -150,10 +156,9 @@ impl Access for MonoiofsBackend {
Ok((RpRead::default(), reader))
}

async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
// TODO: create parent directory before write
let path = self.core.prepare_path(path);
let writer = MonoiofsWriter::new(self.core.clone(), path).await?;
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path = self.core.prepare_write_path(path).await?;
let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
Ok((RpWrite::default(), writer))
}

Expand Down Expand Up @@ -186,4 +191,87 @@ impl Access for MonoiofsBackend {
Err(err) => Err(new_std_io_error(err)),
}
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from = self.core.prepare_path(from);
// ensure file exists
self.core
.dispatch({
let from = from.clone();
move || monoio::fs::metadata(from)
})
.await
.map_err(new_std_io_error)?;
let to = self.core.prepare_write_path(to).await?;
self.core
.dispatch(move || monoio::fs::rename(from, to))
.await
.map_err(new_std_io_error)?;
Ok(RpRename::default())
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
let path = self.core.prepare_path(path);
self.core
.dispatch(move || monoio::fs::create_dir_all(path))
.await
.map_err(new_std_io_error)?;
Ok(RpCreateDir::default())
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let from = self.core.prepare_path(from);
// ensure file exists
self.core
.dispatch({
let from = from.clone();
move || monoio::fs::metadata(from)
})
.await
.map_err(new_std_io_error)?;
let to = self.core.prepare_write_path(to).await?;
self.core
.dispatch({
let core = self.core.clone();
move || async move {
let from = OpenOptions::new().read(true).open(from).await?;
let to = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(to)
.await?;

// AsyncReadRent and AsyncWriteRent is not implemented
// for File, so we can't write this:
// monoio::io::copy(&mut from, &mut to).await?;

let mut pos = 0;
// allocate and resize buffer
let mut buf = core.buf_pool.get();
// set capacity of buf to exact size to avoid excessive read
buf.reserve(BUFFER_SIZE);
let _ = buf.split_off(BUFFER_SIZE);

loop {
let result;
(result, buf) = from.read_at(buf, pos).await;
if result? == 0 {
// EOF
break;
}
let result;
(result, buf) = to.write_all_at(buf, pos).await;
result?;
pos += buf.len() as u64;
buf.clear();
}
core.buf_pool.put(buf);
Ok(())
}
})
.await
.map_err(new_std_io_error)?;
Ok(RpCopy::default())
}
}
20 changes: 20 additions & 0 deletions core/src/services/monoiofs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,30 @@ impl MonoiofsCore {
&self.root
}

/// join root and path
pub fn prepare_path(&self, path: &str) -> PathBuf {
self.root.join(path.trim_end_matches('/'))
}

/// join root and path, create parent dirs
pub async fn prepare_write_path(&self, path: &str) -> Result<PathBuf> {
let path = self.prepare_path(path);
let parent = path
.parent()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"path should have parent but not, it must be malformed",
)
.with_context("input", path.to_string_lossy())
})?
.to_path_buf();
self.dispatch(move || monoio::fs::create_dir_all(parent))
.await
.map_err(new_std_io_error)?;
Ok(path)
}

/// entrypoint of each worker thread, sets up monoio runtimes and channels
fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
let mut rt = RuntimeBuilder::<FusionDriver>::new()
Expand Down
8 changes: 4 additions & 4 deletions core/src/services/monoiofs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ This service can be used to:
- [x] stat
- [x] read
- [x] write
- [ ] append
- [ ] create_dir
- [x] append
- [x] create_dir
- [x] delete
- [ ] copy
- [ ] rename
- [x] copy
- [x] rename
- [ ] list
- [ ] ~~presign~~
- [ ] blocking
Expand Down
8 changes: 5 additions & 3 deletions core/src/services/monoiofs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ pub struct MonoiofsWriter {
}

impl MonoiofsWriter {
pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf) -> Result<Self> {
pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf, append: bool) -> Result<Self> {
let (open_result_tx, open_result_rx) = oneshot::channel();
let (tx, rx) = mpsc::unbounded();
core.spawn(move || Self::worker_entrypoint(path, rx, open_result_tx))
core.spawn(move || Self::worker_entrypoint(path, append, rx, open_result_tx))
.await;
core.unwrap(open_result_rx.await)?;
Ok(Self { core, tx, pos: 0 })
Expand All @@ -57,13 +57,15 @@ impl MonoiofsWriter {
/// entrypoint of worker task that runs in context of monoio
async fn worker_entrypoint(
path: PathBuf,
append: bool,
mut rx: mpsc::UnboundedReceiver<WriterRequest>,
open_result_tx: oneshot::Sender<Result<()>>,
) {
let result = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.append(append)
.truncate(!append)
.open(path)
.await;
// [`monoio::fs::File`] is non-Send, hence it is kept within
Expand Down
11 changes: 6 additions & 5 deletions core/src/services/vercel_blob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ impl VercelBlobBuilder {
/// Get from Vercel environment variable `BLOB_READ_WRITE_TOKEN`.
/// It is required.
pub fn token(mut self, token: &str) -> Self {
self.config.token = token.to_string();

if !token.is_empty() {
self.config.token = Some(token.to_string());
}
self
}

Expand Down Expand Up @@ -111,11 +112,11 @@ impl Builder for VercelBlobBuilder {
debug!("backend use root {}", &root);

// Handle token.
if self.config.token.is_empty() {
let Some(token) = self.config.token.clone() else {
return Err(Error::new(ErrorKind::ConfigInvalid, "token is empty")
.with_operation("Builder::build")
.with_context("service", Scheme::VercelBlob));
}
};

let client = if let Some(client) = self.http_client {
client
Expand All @@ -129,7 +130,7 @@ impl Builder for VercelBlobBuilder {
Ok(VercelBlobBackend {
core: Arc::new(VercelBlobCore {
root,
token: self.config.token.clone(),
token,
client,
}),
})
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/vercel_blob/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct VercelBlobConfig {
/// All operations will happen under this root.
pub root: Option<String>,
/// vercel blob token.
pub token: String,
pub token: Option<String>,
}

impl Debug for VercelBlobConfig {
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/webdav/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl WebdavBuilder {
/// default: no access token
pub fn token(mut self, token: &str) -> Self {
if !token.is_empty() {
self.config.token = Some(token.to_owned());
self.config.token = Some(token.to_string());
}
self
}
Expand Down

0 comments on commit b750a03

Please sign in to comment.