From 9dcfd28f6111c6cc68be04241fec55fae0b4a793 Mon Sep 17 00:00:00 2001 From: Niwaka <61189782+NiwakaDev@users.noreply.github.com> Date: Mon, 23 Oct 2023 12:00:29 +0900 Subject: [PATCH] feat: impl ObjectStoreManager for custom_storage (#2621) * feat: impl ObjectStoreManager for custom_storage * fix: rename object_store_manager to manager * fix: rename global to default * chore: add document for ObjectStoreManager * refactor: simplify default_object_store * fix: address review --- Cargo.lock | 3 + src/object-store/Cargo.toml | 3 + src/object-store/src/error.rs | 45 ++++++++++++++ src/object-store/src/lib.rs | 2 + src/object-store/src/manager.rs | 107 ++++++++++++++++++++++++++++++++ 5 files changed, 160 insertions(+) create mode 100644 src/object-store/src/error.rs create mode 100644 src/object-store/src/manager.rs diff --git a/Cargo.lock b/Cargo.lock index 0d4b42e03c45..b96060df31d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5965,6 +5965,8 @@ dependencies = [ "anyhow", "async-trait", "bytes", + "common-error", + "common-macro", "common-runtime", "common-telemetry", "common-test-util", @@ -5973,6 +5975,7 @@ dependencies = [ "metrics", "moka", "opendal", + "snafu", "tokio", "uuid", ] diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 49bf01464d4a..9d1d055ef2c5 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -7,6 +7,8 @@ license.workspace = true [dependencies] async-trait = "0.1" bytes = "1.4" +common-error.workspace = true +common-macro.workspace = true common-runtime.workspace = true common-telemetry.workspace = true futures.workspace = true @@ -17,6 +19,7 @@ opendal = { version = "0.40", features = [ "layers-tracing", "layers-metrics", ] } +snafu.workspace = true uuid.workspace = true [dev-dependencies] diff --git a/src/object-store/src/error.rs b/src/object-store/src/error.rs new file mode 100644 index 000000000000..8ea360d11683 --- /dev/null +++ b/src/object-store/src/error.rs @@ -0,0 +1,45 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Default storage not found: {}", default_object_store))] + DefaultStorageNotFound { + location: Location, + default_object_store: String, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::DefaultStorageNotFound { .. } => StatusCode::InvalidArguments, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index f1bf27846668..9623ef9a4ec6 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -19,7 +19,9 @@ pub use opendal::{ Operator as ObjectStore, Reader, Result, Writer, }; +pub mod error; pub mod layers; +pub mod manager; mod metrics; pub mod test_util; pub mod util; diff --git a/src/object-store/src/manager.rs b/src/object-store/src/manager.rs new file mode 100644 index 000000000000..d7cb323057cf --- /dev/null +++ b/src/object-store/src/manager.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use snafu::OptionExt; + +use crate::error::{DefaultStorageNotFoundSnafu, Result}; +use crate::ObjectStore; + +/// Manages multiple object stores so that users can configure a storage for each table. +/// This struct certainly have one default object store, and can have zero or more custom object stores. +pub struct ObjectStoreManager { + stores: HashMap, + default_object_store: ObjectStore, +} + +impl ObjectStoreManager { + /// Creates a new manager with specific object stores. Returns an error if `stores` doesn't contain the default object store. + pub fn try_new( + stores: HashMap, + default_object_store: &str, + ) -> Result { + let default_object_store = stores + .get(default_object_store) + .context(DefaultStorageNotFoundSnafu { + default_object_store, + })? + .clone(); + Ok(ObjectStoreManager { + stores, + default_object_store, + }) + } + + pub fn find(&self, name: &str) -> Option<&ObjectStore> { + self.stores.get(name) + } + + pub fn default_object_store(&self) -> &ObjectStore { + &self.default_object_store + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use common_test_util::temp_dir::{create_temp_dir, TempDir}; + + use super::ObjectStoreManager; + use crate::error::Error; + use crate::services::Fs as Builder; + use crate::ObjectStore; + + fn new_object_store(dir: &TempDir) -> ObjectStore { + let store_dir = dir.path().to_str().unwrap(); + let mut builder = Builder::default(); + let _ = builder.root(store_dir); + ObjectStore::new(builder).unwrap().finish() + } + + #[test] + fn test_new_returns_err_when_global_store_not_exist() { + let dir = create_temp_dir("new"); + let object_store = new_object_store(&dir); + let stores: HashMap = vec![ + ("File".to_string(), object_store.clone()), + ("S3".to_string(), object_store.clone()), + ] + .into_iter() + .collect(); + + assert!(matches!( + ObjectStoreManager::try_new(stores, "Gcs"), + Err(Error::DefaultStorageNotFound { .. }) + )); + } + + #[test] + fn test_new_returns_ok() { + let dir = create_temp_dir("new"); + let object_store = new_object_store(&dir); + let stores: HashMap = vec![ + ("File".to_string(), object_store.clone()), + ("S3".to_string(), object_store.clone()), + ] + .into_iter() + .collect(); + let object_store_manager = ObjectStoreManager::try_new(stores, "File").unwrap(); + assert_eq!(object_store_manager.stores.len(), 2); + assert!(object_store_manager.find("File").is_some()); + assert!(object_store_manager.find("S3").is_some()); + assert!(object_store_manager.find("Gcs").is_none()); + } +}