From 8e6016c91e617f66e4399dd55038d690a979581f Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Mon, 2 Oct 2023 12:45:00 +0900 Subject: [PATCH] feat: support custom storage for every table(initial) --- .env.example | 2 +- Cargo.lock | 6 + config/datanode.example.toml | 5 +- config/metasrv.example.toml | 2 +- config/standalone.example.toml | 5 +- src/cmd/src/datanode.rs | 20 ++- src/cmd/src/options.rs | 15 ++- src/cmd/src/standalone.rs | 14 ++- src/common/datasource/Cargo.toml | 1 + src/common/datasource/src/object_store.rs | 79 ++++++++++++ src/datanode/src/config.rs | 11 +- src/datanode/src/datanode.rs | 87 ++++++++++--- src/datanode/src/error.rs | 13 +- src/datanode/src/heartbeat/handler.rs | 2 +- src/datanode/src/store.rs | 30 ++++- src/mito2/Cargo.toml | 1 + src/mito2/src/engine.rs | 14 +-- src/mito2/src/engine/drop_test.rs | 88 ++++++++++++- src/mito2/src/engine/open_test.rs | 70 ++++++++++- src/mito2/src/error.rs | 5 +- src/mito2/src/test_util.rs | 94 +++++++++++--- src/mito2/src/worker.rs | 18 +-- src/mito2/src/worker/handle_create.rs | 19 ++- src/mito2/src/worker/handle_drop.rs | 7 +- src/mito2/src/worker/handle_open.rs | 18 ++- src/object-store/Cargo.toml | 4 + src/object-store/src/error.rs | 45 +++++++ src/object-store/src/lib.rs | 2 + src/object-store/src/object_store_manager.rs | 124 ++++++++++++++++++ src/sql/src/parsers/create_parser.rs | 3 + src/sql/src/statements/create.rs | 3 +- src/table/src/requests.rs | 2 + tests-integration/Cargo.toml | 1 + tests-integration/src/cluster.rs | 23 +++- tests-integration/src/standalone.rs | 21 +++- tests-integration/src/test_util.rs | 108 +++++----------- tests-integration/src/tests.rs | 12 ++ tests-integration/src/tests/instance_test.rs | 126 ++++++++++++++++++- tests-integration/src/tests/test_util.rs | 35 ++++++ tests-integration/tests/grpc.rs | 5 +- tests-integration/tests/http.rs | 11 +- tests-integration/tests/region_failover.rs | 5 +- tests-integration/tests/sql.rs | 5 +- 43 files changed, 965 insertions(+), 196 deletions(-) create mode 100644 src/object-store/src/error.rs create mode 100644 src/object-store/src/object_store_manager.rs diff --git a/.env.example b/.env.example index 4d45913df043..5c44be821ef8 100644 --- a/.env.example +++ b/.env.example @@ -18,4 +18,4 @@ GT_AZBLOB_ENDPOINT=AZBLOB endpoint GT_GCS_BUCKET = GCS bucket GT_GCS_SCOPE = GCS scope GT_GCS_CREDENTIAL_PATH = GCS credential path -GT_GCS_ENDPOINT = GCS end point +GT_GCS_ENDPOINT = GCS end point \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index e2ef70d9bb96..e2d895de2405 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1721,6 +1721,7 @@ dependencies = [ "common-test-util", "datafusion", "derive_builder 0.12.0", + "dotenv", "futures", "lazy_static", "object-store", @@ -5571,6 +5572,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datatypes", + "dotenv", "futures", "humantime-serde", "lazy_static", @@ -6009,6 +6011,8 @@ dependencies = [ "anyhow", "async-trait", "bytes", + "common-error", + "common-macro", "common-telemetry", "common-test-util", "futures", @@ -6017,6 +6021,7 @@ dependencies = [ "metrics", "opendal", "pin-project", + "snafu", "tokio", "uuid", ] @@ -9995,6 +10000,7 @@ dependencies = [ "common-base", "common-catalog", "common-config", + "common-datasource", "common-error", "common-grpc", "common-meta", diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 4f52380d658f..ee873da2d909 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -43,9 +43,10 @@ sync_write = false [storage] # The working home directory. data_home = "/tmp/greptimedb/" +# global_store = "File" +# Storage type. +[[storage.store]] type = "File" -# TTL for all tables. Disabled by default. -# global_ttl = "7d" # Compaction options, see `standalone.example.toml`. [storage.compaction] diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index deb5ec512d3a..f731a6728684 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -34,4 +34,4 @@ retry_delay = "500ms" # [datanode.client_options] # timeout_millis = 10000 # connect_timeout_millis = 10000 -# tcp_nodelay = true +# tcp_nodelay = true \ No newline at end of file diff --git a/config/standalone.example.toml b/config/standalone.example.toml index cd098908a727..970a1361a9d3 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -111,8 +111,11 @@ retry_delay = "500ms" [storage] # The working home directory. data_home = "/tmp/greptimedb/" +#global_store = "File" # Storage type. -type = "File" +#[[storage.store]] +#type = "File" + # TTL for all tables. Disabled by default. # global_ttl = "7d" diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index e2ff160eefb5..c39463777a08 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -179,7 +179,9 @@ mod tests { use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; - use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig}; + use datanode::config::{ + CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig, S3Config, + }; use servers::Mode; use super::*; @@ -212,8 +214,14 @@ mod tests { sync_write = false [storage] - type = "File" data_home = "/tmp/greptimedb/" + [[storage.store]] + type = "File" + + [[storage.store]] + type = "S3" + access_key_id = "access_key_id" + secret_access_key = "secret_access_key" [storage.compaction] max_inflight_tasks = 3 @@ -264,11 +272,15 @@ mod tests { assert_eq!(3000, timeout_millis); assert!(tcp_nodelay); assert_eq!("/tmp/greptimedb/", options.storage.data_home); + assert_eq!(options.storage.store.len(), 2); assert!(matches!( - &options.storage.store, + &options.storage.store[0], ObjectStoreConfig::File(FileConfig { .. }) )); - + assert!(matches!( + &options.storage.store[1], + ObjectStoreConfig::S3(S3Config { .. }) + )); assert_eq!( CompactionConfig { max_inflight_tasks: 3, diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index a95a10b30b30..6130991cb7d4 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -127,7 +127,7 @@ mod tests { use std::time::Duration; use common_test_util::temp_dir::create_named_temp_file; - use datanode::config::{DatanodeOptions, ObjectStoreConfig}; + use datanode::config::DatanodeOptions; use super::*; @@ -182,26 +182,31 @@ mod tests { .join(ENV_VAR_SEP), Some("99"), ), + // Can we pass storage.store.~ as env? + /*** ( - // storage.type = S3 + // storage.store.type = S3 [ env_prefix.to_string(), "storage".to_uppercase(), + "store".to_uppercase(), "type".to_uppercase(), ] .join(ENV_VAR_SEP), Some("S3"), ), ( - // storage.bucket = mybucket + // storage.store.bucket = mybucket [ env_prefix.to_string(), "storage".to_uppercase(), + "store".to_uppercase(), "bucket".to_uppercase(), ] .join(ENV_VAR_SEP), Some("mybucket"), ), + ***/ ( // storage.manifest.gc_duration = 42s [ @@ -243,13 +248,15 @@ mod tests { .unwrap(); // Check the configs from environment variables. + /*** assert_eq!(opts.storage.manifest.checkpoint_margin, Some(99)); - match opts.storage.store { + match opts.storage.store[0].clone() { ObjectStoreConfig::S3(s3_config) => { assert_eq!(s3_config.bucket, "mybucket".to_string()); } _ => panic!("unexpected store type"), } + ***/ assert_eq!( opts.storage.manifest.gc_duration, Some(Duration::from_secs(42)) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index c6482a6ad5c0..8461611d451a 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -381,6 +381,7 @@ mod tests { use auth::{Identity, Password, UserProviderRef}; use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; + use datanode::config::FileConfig; use servers::Mode; use super::*; @@ -429,10 +430,14 @@ mod tests { sync_write = false [storage] + [[storage.store]] type = "S3" access_key_id = "access_key_id" secret_access_key = "secret_access_key" + [[storage.store]] + type = "File" + [storage.compaction] max_inflight_tasks = 3 max_files_in_level0 = 7 @@ -475,8 +480,8 @@ mod tests { assert_eq!(2, fe_opts.mysql.runtime_size); assert_eq!(None, fe_opts.mysql.reject_no_database); assert!(fe_opts.influxdb.enable); - - match &dn_opts.storage.store { + assert_eq!(dn_opts.storage.store.len(), 2); + match &dn_opts.storage.store[0] { datanode::config::ObjectStoreConfig::S3(s3_config) => { assert_eq!( "Secret([REDACTED alloc::string::String])".to_string(), @@ -487,7 +492,10 @@ mod tests { unreachable!() } } - + assert!(matches!( + dn_opts.storage.store[1], + datanode::config::ObjectStoreConfig::File(FileConfig { .. }) + )); assert_eq!("debug", logging_opts.level.as_ref().unwrap()); assert_eq!("/tmp/greptimedb/test/logs".to_string(), logging_opts.dir); } diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index e6bff6953876..fc64ba6765a1 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -34,6 +34,7 @@ strum.workspace = true tokio-util.workspace = true tokio.workspace = true url = "2.3" +dotenv = "0.15" [dev-dependencies] common-test-util = { workspace = true } diff --git a/src/common/datasource/src/object_store.rs b/src/common/datasource/src/object_store.rs index c9e36018c22b..dd3fe9383dc9 100644 --- a/src/common/datasource/src/object_store.rs +++ b/src/common/datasource/src/object_store.rs @@ -15,6 +15,7 @@ pub mod fs; pub mod s3; use std::collections::HashMap; +use std::env; use lazy_static::lazy_static; use object_store::ObjectStore; @@ -84,6 +85,84 @@ pub fn handle_windows_path(url: &str) -> Option { .map(|captures| captures[0].to_string()) } +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum StorageType { + S3, + S3WithCache, + File, + Oss, + Azblob, + Gcs, +} + +impl std::fmt::Display for StorageType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StorageType::S3 => write!(f, "S3"), + StorageType::S3WithCache => write!(f, "S3"), + StorageType::File => write!(f, "File"), + StorageType::Oss => write!(f, "Oss"), + StorageType::Azblob => write!(f, "Azblob"), + StorageType::Gcs => write!(f, "Gcs"), + } + } +} + +impl StorageType { + pub fn build_storage_types_based_on_env() -> Vec { + let mut storage_types = Vec::with_capacity(4); + storage_types.push(StorageType::File); + if env::var("GT_S3_BUCKET").is_ok() { + storage_types.push(StorageType::S3); + } + if env::var("GT_OSS_BUCKET").is_ok() { + storage_types.push(StorageType::Oss); + } + if env::var("GT_AZBLOB_CONTAINER").is_ok() { + storage_types.push(StorageType::Azblob); + } + if env::var("GT_GCS_BUCKET").is_ok() { + storage_types.push(StorageType::Gcs); + } + storage_types + } + pub fn test_on(&self) -> bool { + let _ = dotenv::dotenv(); + + match self { + StorageType::File => true, // always test file + StorageType::S3 | StorageType::S3WithCache => { + if let Ok(b) = env::var("GT_S3_BUCKET") { + !b.is_empty() + } else { + false + } + } + StorageType::Oss => { + if let Ok(b) = env::var("GT_OSS_BUCKET") { + !b.is_empty() + } else { + false + } + } + StorageType::Azblob => { + if let Ok(b) = env::var("GT_AZBLOB_CONTAINER") { + !b.is_empty() + } else { + false + } + } + StorageType::Gcs => { + if let Ok(b) = env::var("GT_GCS_BUCKET") { + !b.is_empty() + } else { + false + } + } + } + } +} + #[cfg(test)] mod tests { use super::handle_windows_path; diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 74c86c5ced53..a98e3232dfb7 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -66,11 +66,11 @@ pub struct StorageConfig { pub global_ttl: Option, /// The working directory of database pub data_home: String, - #[serde(flatten)] - pub store: ObjectStoreConfig, + pub store: Vec, pub compaction: CompactionConfig, pub manifest: RegionManifestConfig, pub flush: FlushConfig, + pub global_store: String, } impl Default for StorageConfig { @@ -78,10 +78,11 @@ impl Default for StorageConfig { Self { global_ttl: None, data_home: DEFAULT_DATA_HOME.to_string(), - store: ObjectStoreConfig::default(), + store: vec![ObjectStoreConfig::default()], compaction: CompactionConfig::default(), manifest: RegionManifestConfig::default(), flush: FlushConfig::default(), + global_store: "File".to_string(), } } } @@ -402,13 +403,13 @@ mod tests { #[test] fn test_secstr() { let toml_str = r#" - [storage] + [[storage.store]] type = "S3" access_key_id = "access_key_id" secret_access_key = "secret_access_key" "#; let opts: DatanodeOptions = toml::from_str(toml_str).unwrap(); - match opts.storage.store { + match opts.storage.store[0].clone() { ObjectStoreConfig::S3(cfg) => { assert_eq!( "Secret([REDACTED alloc::string::String])".to_string(), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 885ed8bbddb5..7ad5e00e2122 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -25,6 +25,7 @@ use common_base::Plugins; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::key::datanode_table::DatanodeTableManager; +use common_meta::key::table_info::TableInfoManager; use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; @@ -35,6 +36,7 @@ use futures_util::StreamExt; use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::client::MetaClient; use mito2::engine::MitoEngine; +use object_store::object_store_manager::ObjectStoreManager; use object_store::util::normalize_dir; use query::QueryEngineFactory; use servers::Mode; @@ -44,14 +46,15 @@ use store_api::path_utils::{region_dir, WAL_DIR}; use store_api::region_engine::RegionEngineRef; use store_api::region_request::{RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; +use table::requests::STORAGE_KEY; use tokio::fs; use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig}; use crate::error::{ - CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, MissingMetaClientSnafu, - MissingMetasrvOptsSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, - ShutdownInstanceSnafu, + CreateDirSnafu, GetMetadataSnafu, InitObjectStorageManagerSnafu, MissingKvBackendSnafu, + MissingMetaClientSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, + RuntimeResourceSnafu, ShutdownInstanceSnafu, TableInfoNotFoundSnafu, }; use crate::event_listener::{ new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef, @@ -61,7 +64,7 @@ use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::{new_metasrv_client, HeartbeatTask}; use crate::region_server::RegionServer; use crate::server::Services; -use crate::store; +use crate::store::new_object_stores; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); @@ -223,10 +226,16 @@ impl DatanodeBuilder { self.plugins.clone(), log_store, region_event_listener, + &self.opts.storage.global_store, + ) + .await?; + self.initialize_region_server( + ®ion_server, + kv_backend, + matches!(mode, Mode::Standalone), + &self.opts.storage.global_store, ) .await?; - self.initialize_region_server(®ion_server, kv_backend, matches!(mode, Mode::Standalone)) - .await?; let heartbeat_task = match mode { Mode::Distributed => { @@ -275,20 +284,44 @@ impl DatanodeBuilder { region_server: &RegionServer, kv_backend: KvBackendRef, open_with_writable: bool, + default_storage_name: &str, ) -> Result<()> { let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; let mut regions = vec![]; let mut table_values = datanode_table_manager.tables(node_id); - while let Some(table_value) = table_values.next().await { let table_value = table_value.context(GetMetadataSnafu)?; + let table_info = TableInfoManager::new(kv_backend.clone()) + .get(table_value.table_id) + .await + .context(GetMetadataSnafu)? + .context(TableInfoNotFoundSnafu { + table_id: table_value.table_id, + })?; + for region_number in table_value.regions { - regions.push(( - RegionId::new(table_value.table_id, region_number), - table_value.engine.clone(), - table_value.region_storage_path.clone(), - )); + if let Some(storage_name) = table_info + .table_info + .meta + .options + .extra_options + .get(STORAGE_KEY) + { + regions.push(( + RegionId::new(table_value.table_id, region_number), + table_value.engine.clone(), + table_value.region_storage_path.clone(), + storage_name.to_string(), + )); + } else { + regions.push(( + RegionId::new(table_value.table_id, region_number), + table_value.engine.clone(), + table_value.region_storage_path.clone(), + default_storage_name.to_string(), + )); + }; } } @@ -296,18 +329,22 @@ impl DatanodeBuilder { let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); let mut tasks = vec![]; - for (region_id, engine, store_path) in regions { + for (region_id, engine, store_path, storage_name) in regions { let region_dir = region_dir(&store_path, region_id); let semaphore_moved = semaphore.clone(); tasks.push(async move { let _permit = semaphore_moved.acquire().await; + // TODO: add some tests in the tests-integration to cover the case where the storage option is passed when initializing region server. + // For now, we've just added some of the tests to mito2. region_server .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: engine.clone(), region_dir, - options: HashMap::new(), + options: vec![(STORAGE_KEY.to_string(), storage_name)] + .into_iter() + .collect::>(), }), ) .await?; @@ -333,6 +370,7 @@ impl DatanodeBuilder { plugins: Arc, log_store: Arc, event_listener: RegionServerEventListenerRef, + global_store: &str, ) -> Result { let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. @@ -354,8 +392,16 @@ impl DatanodeBuilder { let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone(), event_listener); - let object_store = store::new_object_store(opts).await?; - let engines = Self::build_store_engines(opts, log_store, object_store).await?; + let object_store_manager = + ObjectStoreManager::try_new(new_object_stores(opts).await?, global_store) + .context(InitObjectStorageManagerSnafu)?; + let engines = Self::build_store_engines( + opts, + log_store, + object_store_manager.global_object_store(), + object_store_manager, + ) + .await?; for engine in engines { region_server.register_engine(engine); } @@ -389,6 +435,7 @@ impl DatanodeBuilder { opts: &DatanodeOptions, log_store: Arc, object_store: object_store::ObjectStore, + object_store_manager: ObjectStoreManager, ) -> Result> where S: LogStore, @@ -397,11 +444,15 @@ impl DatanodeBuilder { for engine in &opts.region_engine { match engine { RegionEngineConfig::Mito(config) => { - let engine: MitoEngine = - MitoEngine::new(config.clone(), log_store.clone(), object_store.clone()); + let engine: MitoEngine = MitoEngine::new( + config.clone(), + log_store.clone(), + object_store_manager.clone(), + ); engines.push(Arc::new(engine) as _); } RegionEngineConfig::File(config) => { + // TODO: Get FileRegineEngine to have ObjectStoreManager let engine = FileRegionEngine::new(config.clone(), object_store.clone()); engines.push(Arc::new(engine) as _); } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index ab315954aadf..f023f1e72b0c 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -29,6 +29,12 @@ use table::error::Error as TableError; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Failed to init ObjectStorageManager"))] + InitObjectStorageManager { + source: object_store::error::Error, + location: Location, + }, + #[snafu(display("Failed to handle heartbeat response"))] HandleHeartbeatResponse { location: Location, @@ -114,6 +120,9 @@ pub enum Error { location: Location, }, + #[snafu(display("Table info not found: {}", table_id))] + TableInfoNotFound { table_id: u32, location: Location }, + #[snafu(display("Column {} not found in table {}", column_name, table_name))] ColumnNotFound { column_name: String, @@ -491,7 +500,8 @@ impl ErrorExt for Error { } // TODO(yingwen): Further categorize http error. - ParseAddr { .. } + TableInfoNotFound { .. } + | ParseAddr { .. } | CreateDir { .. } | RemoveDir { .. } | Catalog { .. } @@ -525,6 +535,7 @@ impl ErrorExt for Error { WaitProcedure { source, .. } => source.status_code(), HandleRegionRequest { source, .. } => source.status_code(), StopRegionEngine { source, .. } => source.status_code(), + InitObjectStorageManager { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 026245259cec..03ae336c5722 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -54,7 +54,7 @@ impl RegionHeartbeatResponseHandler { let open_region_req = RegionRequest::Open(RegionOpenRequest { engine: region_ident.engine, region_dir: region_dir(®ion_storage_path, region_id), - options: HashMap::new(), + options: HashMap::new(), // TODO: adds some tests to cover a case where `storage` option is passed. }); Ok((region_id, open_region_req)) } diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 937729671fb3..064eb1e3e506 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -20,6 +20,7 @@ mod gcs; mod oss; mod s3; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use std::{env, path}; @@ -35,9 +36,27 @@ use snafu::prelude::*; use crate::config::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; use crate::error::{self, Result}; -pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result { - let data_home = normalize_dir(&opts.storage.data_home); - let object_store = match &opts.storage.store { +pub(crate) async fn new_object_stores( + opts: &DatanodeOptions, +) -> Result> { + let mut stores = HashMap::new(); + for store in &opts.storage.store { + let object_store = new_object_store(&opts.storage.data_home, store).await?; + let name = match store { + ObjectStoreConfig::Azblob(_) => "Azblob".to_string(), + ObjectStoreConfig::File(_) => "File".to_string(), + ObjectStoreConfig::Gcs(_) => "Gcs".to_string(), + ObjectStoreConfig::Oss(_) => "Oss".to_string(), + ObjectStoreConfig::S3(_) => "S3".to_string(), + }; + stores.insert(name, object_store); + } + Ok(stores) +} + +async fn new_object_store(data_home: &str, store: &ObjectStoreConfig) -> Result { + let data_home = normalize_dir(data_home); + let object_store = match store { ObjectStoreConfig::File(file_config) => { fs::new_fs_object_store(&data_home, file_config).await } @@ -50,9 +69,8 @@ pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result( mut config: MitoConfig, log_store: Arc, - object_store: ObjectStore, + object_store_manager: ObjectStoreManager, ) -> MitoEngine { config.sanitize(); MitoEngine { - inner: Arc::new(EngineInner::new(config, log_store, object_store)), + inner: Arc::new(EngineInner::new(config, log_store, object_store_manager)), } } @@ -104,10 +104,10 @@ impl EngineInner { fn new( config: MitoConfig, log_store: Arc, - object_store: ObjectStore, + object_sotre_manager: ObjectStoreManager, ) -> EngineInner { EngineInner { - workers: WorkerGroup::start(config, log_store, object_store), + workers: WorkerGroup::start(config, log_store, object_sotre_manager), } } @@ -229,9 +229,9 @@ impl MitoEngine { pub fn new_for_test( mut config: MitoConfig, log_store: Arc, - object_store: ObjectStore, write_buffer_manager: Option, listener: Option, + object_store_manager: ObjectStoreManager, ) -> MitoEngine { config.sanitize(); @@ -240,9 +240,9 @@ impl MitoEngine { workers: WorkerGroup::start_for_test( config, log_store, - object_store, write_buffer_manager, listener, + object_store_manager, ), }), } diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 35c86c184371..4a31ebf6c4e1 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -23,8 +23,10 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::engine::listener::DropListener; +use crate::engine::MitoEngine; use crate::test_util::{ - build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, + build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, StorageType, + TestEnv, }; use crate::worker::DROPPING_MARKER_FILE; @@ -82,3 +84,87 @@ async fn test_engine_drop_region() { let object_store = env.get_object_store().unwrap(); assert!(!object_store.is_exist(®ion_dir).await.unwrap()); } + +// TODO: find a way to move this test into the tests-integration +#[tokio::test] +async fn test_engine_drop_region_for_custom_store() { + common_telemetry::init_default_ut_logging(); + async fn setup(engine: &MitoEngine, region_id: RegionId, storage_name: &str) { + let request = CreateRequestBuilder::new() + .insert_option("storage", storage_name) + .region_dir(storage_name) + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(engine, region_id, rows).await; + flush_region(engine, region_id).await; + } + let mut env = TestEnv::with_prefix("drop"); + let listener = Arc::new(DropListener::new(Duration::from_millis(100))); + let engine = env + .create_engine_with_multiple_object_stores( + MitoConfig::default(), + None, + Some(listener.clone()), + vec![StorageType::File, StorageType::Gcs], + &StorageType::File.to_string(), + ) + .await; + let object_store_manager = env.get_object_store_manager().unwrap(); + + let global_region_id = RegionId::new(1, 1); + setup(&engine, global_region_id, "File").await; + + let custom_region_id = RegionId::new(2, 1); + setup(&engine, custom_region_id, "Gcs").await; + + let global_region = engine.get_region(global_region_id).unwrap(); + let global_region_dir = global_region.access_layer.region_dir().to_string(); + + let custom_region = engine.get_region(custom_region_id).unwrap(); + let custom_region_dir = custom_region.access_layer.region_dir().to_string(); + + // Before dropping the custom table, both the tables should exist. + assert!(object_store_manager + .find("Gcs") + .unwrap() + .is_exist(&custom_region_dir) + .await + .unwrap()); + assert!(object_store_manager + .find("File") + .unwrap() + .is_exist(&global_region_dir) + .await + .unwrap()); + + // drop the created custom region. + engine + .handle_request(custom_region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap(); + assert!(!engine.is_region_exists(custom_region_id)); + + // Wait for drop task. + listener.wait().await; + + assert!(!object_store_manager + .find("Gcs") + .unwrap() + .is_exist(&custom_region_dir) + .await + .unwrap()); + assert!(object_store_manager + .find("File") + .unwrap() + .is_exist(&global_region_dir) + .await + .unwrap()); +} diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index e8254cf71d05..30dc2c70a040 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -26,7 +26,7 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::test_util::{ - build_rows, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv, + build_rows, put_rows, reopen_region, rows_schema, CreateRequestBuilder, StorageType, TestEnv, }; #[tokio::test] @@ -167,3 +167,71 @@ async fn test_engine_region_open_with_options() { region.version().options.ttl.unwrap() ); } + +// TODO: move this test into the tests-integration. +#[tokio::test] +async fn test_engine_region_for_custom_store() { + let mut env = TestEnv::new(); + let engine = env + .create_engine_with_multiple_object_stores( + MitoConfig::default(), + None, + None, + vec![StorageType::File, StorageType::Gcs], + &StorageType::File.to_string(), + ) + .await; + + let global_region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("storage", "File") + .region_dir("file") + .build(); + engine + .handle_request(global_region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let custom_region_id = RegionId::new(2, 1); + let request = CreateRequestBuilder::new() + .insert_option("storage", "Gcs") + .region_dir("gcs") + .build(); + let custom_region_dir = request.region_dir.clone(); + engine + .handle_request(custom_region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Close both global and custom regions. + engine + .handle_request( + global_region_id, + RegionRequest::Close(RegionCloseRequest {}), + ) + .await + .unwrap(); + engine + .handle_request( + custom_region_id, + RegionRequest::Close(RegionCloseRequest {}), + ) + .await + .unwrap(); + + // Open the custom region again with options. + engine + .handle_request( + custom_region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir: custom_region_dir, + options: HashMap::from([("storage".to_string(), "Gcs".to_string())]), + }), + ) + .await + .unwrap(); + + assert!(engine.get_region(global_region_id).is_none()); + assert!(engine.get_region(custom_region_id).is_some()); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index fcc3842111d1..694127eb17fc 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -388,6 +388,9 @@ pub enum Error { region_dir: String, location: Location, }, + + #[snafu(display("Storage not found: {}", storage))] + StorageNotFound { storage: String, location: Location }, } pub type Result = std::result::Result; @@ -436,7 +439,7 @@ impl ErrorExt for Error { SerializeField { .. } => StatusCode::Internal, NotSupportedField { .. } => StatusCode::Unsupported, DeserializeField { .. } => StatusCode::Unexpected, - InvalidBatch { .. } => StatusCode::InvalidArguments, + StorageNotFound { .. } | InvalidBatch { .. } => StatusCode::InvalidArguments, InvalidRecordBatch { .. } => StatusCode::InvalidArguments, ConvertVector { source, .. } => source.status_code(), ComputeArrow { .. } => StatusCode::Internal, diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 25994fa0e601..1d40e8aaf3d7 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -27,6 +27,7 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{OpType, Row, Rows, SemanticType}; use common_datasource::compression::CompressionType; +pub use common_datasource::object_store::StorageType; use common_query::Output; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; @@ -34,6 +35,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; +use object_store::object_store_manager::ObjectStoreManager; use object_store::services::Fs; use object_store::ObjectStore; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; @@ -70,7 +72,7 @@ pub struct TestEnv { /// Path to store data. data_home: TempDir, logstore: Option>, - object_store: Option, + object_store_manager: Option, } impl Default for TestEnv { @@ -85,7 +87,7 @@ impl TestEnv { TestEnv { data_home: create_temp_dir(""), logstore: None, - object_store: None, + object_store_manager: None, } } @@ -94,7 +96,7 @@ impl TestEnv { TestEnv { data_home: create_temp_dir(prefix), logstore: None, - object_store: None, + object_store_manager: None, } } @@ -103,17 +105,26 @@ impl TestEnv { } pub fn get_object_store(&self) -> Option { - self.object_store.clone() + Some( + self.object_store_manager + .clone() + .unwrap() + .global_object_store(), + ) + } + + pub fn get_object_store_manager(&self) -> Option { + self.object_store_manager.clone() } /// Creates a new engine with specific config under this env. pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine { - let (log_store, object_store) = self.create_log_and_object_store().await; + let (log_store, object_store_manager) = self.create_log_and_object_store().await; let logstore = Arc::new(log_store); self.logstore = Some(logstore.clone()); - self.object_store = Some(object_store.clone()); - MitoEngine::new(config, logstore, object_store) + self.object_store_manager = Some(object_store_manager.clone()); + MitoEngine::new(config, logstore, object_store_manager) } /// Creates a new engine with specific config and manager/listener under this env. @@ -123,12 +134,53 @@ impl TestEnv { manager: Option, listener: Option, ) -> MitoEngine { - let (log_store, object_store) = self.create_log_and_object_store().await; + let (log_store, object_store_manager) = self.create_log_and_object_store().await; + + let logstore = Arc::new(log_store); + self.logstore = Some(logstore.clone()); + self.object_store_manager = Some(object_store_manager.clone()); + + MitoEngine::new_for_test(config, logstore, manager, listener, object_store_manager) + } + + pub async fn create_engine_with_multiple_object_stores( + &mut self, + config: MitoConfig, + manager: Option, + listener: Option, + storage_types: Vec, + default_storage_name: &str, + ) -> MitoEngine { + let log_store = self.create_log().await; let logstore = Arc::new(log_store); self.logstore = Some(logstore.clone()); - self.object_store = Some(object_store.clone()); - MitoEngine::new_for_test(config, logstore, object_store, manager, listener) + let mut stores = HashMap::new(); + for storage_type in storage_types { + let storage_name = storage_type.to_string(); + let data_path = self + .data_home + .path() + .join("data") + .join(&storage_name) + .as_path() + .display() + .to_string(); + let mut builder = Fs::default(); + builder.root(&data_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + stores.insert(storage_name, object_store); + } + self.object_store_manager = + Some(ObjectStoreManager::try_new(stores, default_storage_name).unwrap()); + + MitoEngine::new_for_test( + config, + logstore, + manager, + listener, + self.object_store_manager.clone().unwrap(), + ) } /// Reopen the engine. @@ -138,28 +190,32 @@ impl TestEnv { MitoEngine::new( config, self.logstore.clone().unwrap(), - self.object_store.clone().unwrap(), + self.object_store_manager.clone().unwrap(), ) } /// Creates a new [WorkerGroup] with specific config under this env. pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { - let (log_store, object_store) = self.create_log_and_object_store().await; + let (log_store, object_store_manager) = self.create_log_and_object_store().await; - WorkerGroup::start(config, Arc::new(log_store), object_store) + WorkerGroup::start(config, Arc::new(log_store), object_store_manager) } - async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStore) { + async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStoreManager) { let data_home = self.data_home.path(); let wal_path = data_home.join("wal"); - let data_path = data_home.join("data").as_path().display().to_string(); let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await; - let mut builder = Fs::default(); - builder.root(&data_path); - let object_store = ObjectStore::new(builder).unwrap().finish(); + let data_path = data_home.join("data").as_path().display().to_string(); + let object_store_manager = ObjectStoreManager::new_with(&data_path); + + (log_store, object_store_manager) + } - (log_store, object_store) + async fn create_log(&self) -> RaftEngineLogStore { + let data_home = self.data_home.path(); + let wal_path = data_home.join("wal"); + log_store_util::create_tmp_local_file_log_store(&wal_path).await } /// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata` diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 6b3ea02c1896..2889be729019 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -33,7 +33,7 @@ use std::time::Duration; use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; -use object_store::ObjectStore; +use object_store::object_store_manager::ObjectStoreManager; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::storage::RegionId; @@ -112,7 +112,7 @@ impl WorkerGroup { pub(crate) fn start( config: MitoConfig, log_store: Arc, - object_store: ObjectStore, + object_store_manager: ObjectStoreManager, ) -> WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); @@ -128,11 +128,11 @@ impl WorkerGroup { id: id as WorkerId, config: config.clone(), log_store: log_store.clone(), - object_store: object_store.clone(), write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), listener: WorkerListener::default(), cache_manager: cache_manager.clone(), + object_store_manager: object_store_manager.clone(), } .start() }) @@ -203,9 +203,9 @@ impl WorkerGroup { pub(crate) fn start_for_test( config: MitoConfig, log_store: Arc, - object_store: ObjectStore, write_buffer_manager: Option, listener: Option, + object_store_manager: ObjectStoreManager, ) -> WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); @@ -223,11 +223,11 @@ impl WorkerGroup { id: id as WorkerId, config: config.clone(), log_store: log_store.clone(), - object_store: object_store.clone(), write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), listener: WorkerListener::new(listener.clone()), cache_manager: cache_manager.clone(), + object_store_manager: object_store_manager.clone(), } .start() }) @@ -250,11 +250,11 @@ struct WorkerStarter { id: WorkerId, config: Arc, log_store: Arc, - object_store: ObjectStore, write_buffer_manager: WriteBufferManagerRef, scheduler: SchedulerRef, listener: WorkerListener, cache_manager: CacheManagerRef, + object_store_manager: ObjectStoreManager, } impl WorkerStarter { @@ -272,7 +272,6 @@ impl WorkerStarter { sender: sender.clone(), receiver, wal: Wal::new(self.log_store), - object_store: self.object_store, running: running.clone(), memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some( self.write_buffer_manager.clone(), @@ -284,6 +283,7 @@ impl WorkerStarter { stalled_requests: StalledRequests::default(), listener: self.listener, cache_manager: self.cache_manager, + object_storage_manager: self.object_store_manager, }; let handle = common_runtime::spawn_write(async move { worker_thread.run().await; @@ -420,8 +420,6 @@ struct RegionWorkerLoop { receiver: Receiver, /// WAL of the engine. wal: Wal, - /// Object store for manifest and SSTs. - object_store: ObjectStore, /// Whether the worker thread is still running. running: Arc, /// Memtable builder for each region. @@ -440,6 +438,8 @@ struct RegionWorkerLoop { listener: WorkerListener, /// Cache. cache_manager: CacheManagerRef, + /// Manage object stores for manifest and SSTs. + object_storage_manager: ObjectStoreManager, } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index e9ace31044a5..2f7c89d71f7a 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -18,13 +18,14 @@ use std::sync::Arc; use common_query::Output; use common_telemetry::info; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataBuilder; use store_api::region_request::RegionCreateRequest; use store_api::storage::RegionId; +use table::requests::STORAGE_KEY; -use crate::error::{InvalidMetadataSnafu, Result}; +use crate::error::{InvalidMetadataSnafu, Result, StorageNotFoundSnafu}; use crate::region::opener::{check_recovered_region, RegionOpener}; use crate::worker::RegionWorkerLoop; @@ -54,17 +55,25 @@ impl RegionWorkerLoop { } builder.primary_key(request.primary_key); let metadata = builder.build().context(InvalidMetadataSnafu)?; - + let object_store = if let Some(storage) = request.options.get(STORAGE_KEY) { + self.object_storage_manager + .find(storage) + .context(StorageNotFoundSnafu { + storage: storage.to_string(), + })? + } else { + self.object_storage_manager.global_object_store() + }; // Create a MitoRegion from the RegionMetadata. let region = RegionOpener::new( region_id, &request.region_dir, self.memtable_builder.clone(), - self.object_store.clone(), + object_store, self.scheduler.clone(), ) .metadata(metadata) - .options(request.options) + .options(request.options) // TODO: remove `storage` option from request.options .cache(Some(self.cache_manager.clone())) .create_or_open(&self.config, &self.wal) .await?; diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 0b0431180fd0..682fa6601f2c 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -36,16 +36,17 @@ const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288) impl RegionWorkerLoop { pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result { let region = self.regions.writable_region(region_id)?; + let object_store = region.access_layer.object_store(); info!("Try to drop region: {}", region_id); // write dropping marker let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE); - self.object_store + + object_store .write(&marker_path, vec![]) .await .context(OpenDalSnafu)?; - region.stop().await?; // remove this region from region map to prevent other requests from accessing this region self.regions.remove_region(region_id); @@ -64,9 +65,9 @@ impl RegionWorkerLoop { // detach a background task to delete the region dir let region_dir = region.access_layer.region_dir().to_owned(); - let object_store = self.object_store.clone(); let dropping_regions = self.dropping_regions.clone(); let listener = self.listener.clone(); + let object_store = object_store.clone(); common_runtime::spawn_bg(async move { let gc_duration = listener .on_later_drop_begin(region_id) diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index cfeb43d84d98..c5b0250d8e8c 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -23,6 +23,7 @@ use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; +use table::requests::STORAGE_KEY; use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result}; use crate::region::opener::RegionOpener; @@ -39,15 +40,24 @@ impl RegionWorkerLoop { return Ok(Output::AffectedRows(0)); } + let object_storage = if let Some(storage_name) = request.options.get(STORAGE_KEY) { + self.object_storage_manager.find(storage_name).unwrap() + } else { + self.object_storage_manager.global_object_store() + }; + // Check if this region is pending drop. And clean the entire dir if so. if !self.dropping_regions.is_region_exists(region_id) - && self - .object_store + && object_storage .is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE)) .await .context(OpenDalSnafu)? { - let result = remove_region_dir_once(&request.region_dir, &self.object_store).await; + let result = remove_region_dir_once( + &request.region_dir, + &self.object_storage_manager.global_object_store(), + ) + .await; info!("Region {} is dropped, result: {:?}", region_id, result); return RegionNotFoundSnafu { region_id }.fail(); } @@ -59,7 +69,7 @@ impl RegionWorkerLoop { region_id, &request.region_dir, self.memtable_builder.clone(), - self.object_store.clone(), + object_storage, self.scheduler.clone(), ) .options(request.options) diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index b3b192acf25e..2599c99cfca2 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -18,6 +18,10 @@ opendal = { version = "0.40", features = [ pin-project = "1.0" tokio.workspace = true uuid.workspace = true +snafu.workspace = true +common-telemetry = { workspace = true } +common-macro = { workspace = true } +common-error = { workspace = true } [dev-dependencies] anyhow = "1.0" diff --git a/src/object-store/src/error.rs b/src/object-store/src/error.rs new file mode 100644 index 000000000000..b005679b407a --- /dev/null +++ b/src/object-store/src/error.rs @@ -0,0 +1,45 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Global storage not found: {}", global_object_store))] + GlobalStorageNotFound { + location: Location, + global_object_store: String, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::GlobalStorageNotFound { .. } => StatusCode::StorageUnavailable, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index f1bf27846668..48e43e871e78 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -19,7 +19,9 @@ pub use opendal::{ Operator as ObjectStore, Reader, Result, Writer, }; +pub mod error; pub mod layers; mod metrics; +pub mod object_store_manager; pub mod test_util; pub mod util; diff --git a/src/object-store/src/object_store_manager.rs b/src/object-store/src/object_store_manager.rs new file mode 100644 index 000000000000..69cf5fa71604 --- /dev/null +++ b/src/object-store/src/object_store_manager.rs @@ -0,0 +1,124 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use opendal::services::Fs; +use snafu::ensure; + +use crate::error::Result; +use crate::{error, ObjectStore}; + +#[derive(Clone)] +pub struct ObjectStoreManager { + stores: HashMap, + global_object_store: String, +} + +impl ObjectStoreManager { + pub fn try_new( + stores: HashMap, + global_object_store: &str, + ) -> Result { + ensure!( + stores.keys().any(|key| key == global_object_store), + error::GlobalStorageNotFoundSnafu { + global_object_store: global_object_store.to_string() + } + ); + + Ok(ObjectStoreManager { + stores, + global_object_store: global_object_store.to_string(), + }) + } + pub fn find(&self, name: &str) -> Option { + self.stores.get(name).cloned() + } + pub fn global_object_store(&self) -> ObjectStore { + // Safety: We've already checked in the new method whether this exists. + self.stores.get(&self.global_object_store).cloned().unwrap() + } +} + +// Used in mito2's TestEnv. +impl ObjectStoreManager { + pub fn new_with(data_path: &str) -> Self { + let mut builder = Fs::default(); + builder.root(data_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + ObjectStoreManager { + stores: vec![("File".to_string(), object_store)] + .into_iter() + .collect(), + global_object_store: "File".to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use common_test_util::temp_dir::{create_temp_dir, TempDir}; + + use super::ObjectStoreManager; + use crate::error::Error; + use crate::services::Fs as Builder; + use crate::ObjectStore; + + fn new_object_store(dir: &TempDir) -> ObjectStore { + let store_dir = dir.path().to_str().unwrap(); + let mut builder = Builder::default(); + let _ = builder.root(store_dir); + ObjectStore::new(builder).unwrap().finish() + } + + #[test] + fn test_new_returns_err_when_global_store_not_exist() { + let dir = create_temp_dir("new"); + let object_store = new_object_store(&dir); + let stores: HashMap = vec![ + ("File".to_string(), object_store.clone()), + ("S3".to_string(), object_store.clone()), + ] + .into_iter() + .collect(); + + assert!(matches!( + ObjectStoreManager::try_new(stores, "Gcs"), + Err(Error::GlobalStorageNotFound { .. }) + )); + } + + #[test] + fn test_new_returns_ok() { + let dir = create_temp_dir("new"); + let object_store = new_object_store(&dir); + let stores: HashMap = vec![ + ("File".to_string(), object_store.clone()), + ("S3".to_string(), object_store.clone()), + ] + .into_iter() + .collect(); + let object_store_manager = ObjectStoreManager::try_new(stores, "File").unwrap(); + assert_eq!(object_store_manager.stores.len(), 2); + assert!(object_store_manager.find("File").is_some()); + assert!(object_store_manager.find("S3").is_some()); + assert!(object_store_manager.find("Gcs").is_none()); + + // Panic should not happen. + object_store_manager.global_object_store(); + } +} diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 6d01f8bacc32..787993027eba 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -160,6 +160,7 @@ impl<'a> ParserContext<'a> { .parser .parse_options(Keyword::WITH) .context(error::SyntaxSnafu)?; + for option in options.iter() { ensure!( valid_table_option(&option.name.value), @@ -168,6 +169,8 @@ impl<'a> ParserContext<'a> { } ); } + // Sorts options so that `test_display_create_table` can always pass. + let options = options.into_iter().sorted().collect(); let create_table = CreateTable { if_not_exists, name: table_name, diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 47b511be5616..3cb74b563a57 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -241,7 +241,7 @@ mod tests { PARTITION r2 VALUES LESS THAN (MAXVALUE), ) engine=mito - with(regions=1, ttl='7d'); + with(regions=1, ttl='7d', storage='File'); "; let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap(); assert_eq!(1, result.len()); @@ -267,6 +267,7 @@ PARTITION BY RANGE COLUMNS (ts) ( ENGINE=mito WITH( regions = 1, + storage = 'File', ttl = '7d' )"#, &new_sql diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 9d78f21534fb..59e83bd5fc9d 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -84,6 +84,7 @@ pub struct TableOptions { pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size"; pub const TTL_KEY: &str = "ttl"; pub const REGIONS_KEY: &str = "regions"; +pub const STORAGE_KEY: &str = "storage"; impl TryFrom<&HashMap> for TableOptions { type Error = error::Error; @@ -340,6 +341,7 @@ pub fn valid_table_option(key: &str) -> bool { | WRITE_BUFFER_SIZE_KEY | TTL_KEY | REGIONS_KEY + | STORAGE_KEY ) | is_supported_in_s3(key) } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index f75420d6056f..57e6e596bb97 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -28,6 +28,7 @@ common-recordbatch = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } common-test-util = { workspace = true } +common-datasource = { workspace = true } datanode = { workspace = true } datatypes = { workspace = true } dotenv = "0.15" diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 013eeb681ec6..bfa8b2fb3284 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -20,6 +20,7 @@ use api::v1::meta::Role; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; +use common_datasource::object_store::StorageType; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::peer::Peer; use common_meta::DatanodeId; @@ -42,7 +43,6 @@ use tower::service_fn; use crate::test_util::{ self, create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageGuard, - StorageType, }; pub struct GreptimeDbCluster { @@ -59,6 +59,7 @@ pub struct GreptimeDbClusterBuilder { cluster_name: String, kv_store: KvStoreRef, store_config: Option, + storage_types: Option>, datanodes: Option, } @@ -68,6 +69,7 @@ impl GreptimeDbClusterBuilder { cluster_name: cluster_name.to_string(), kv_store: Arc::new(MemStore::default()), store_config: None, + storage_types: None, datanodes: None, } } @@ -82,6 +84,11 @@ impl GreptimeDbClusterBuilder { self } + pub fn with_storage_types(mut self, storage_types: Vec) -> Self { + self.storage_types = Some(storage_types); + self + } + pub async fn build(self) -> GreptimeDbCluster { let datanodes = self.datanodes.unwrap_or(4); @@ -152,14 +159,16 @@ impl GreptimeDbClusterBuilder { dir_guards.push(FileDirGuard::new(home_tmp_dir)); - create_datanode_opts(store_config.clone(), home_dir) + create_datanode_opts(vec![store_config.clone()], home_dir) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( - StorageType::File, + self.storage_types + .clone() + .unwrap_or(vec![StorageType::File]), &format!("{}-dn-{}", self.cluster_name, datanode_id), ); - storage_guards.push(guard.storage_guard); + storage_guards.push(guard.storage_guards); dir_guards.push(guard.home_guard); opts @@ -171,7 +180,11 @@ impl GreptimeDbClusterBuilder { instances.insert(datanode_id, datanode); } - (instances, storage_guards, dir_guards) + ( + instances, + storage_guards.into_iter().flatten().collect(), + dir_guards, + ) } async fn wait_datanodes_alive( diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index a4748772bacd..5457f8d3ad18 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -17,13 +17,14 @@ use std::sync::Arc; use catalog::kvbackend::KvBackendCatalogManager; use common_base::Plugins; use common_config::KvStoreConfig; +use common_datasource::object_store::StorageType; use common_meta::cache_invalidator::DummyKvCacheInvalidator; use common_procedure::options::ProcedureConfig; use datanode::config::DatanodeOptions; use datanode::datanode::DatanodeBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; -use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; +use crate::test_util::{self, create_tmp_dir_and_datanode_opts, TestGuard}; pub struct GreptimeDbStandalone { pub instance: Arc, @@ -33,7 +34,7 @@ pub struct GreptimeDbStandalone { pub struct GreptimeDbStandaloneBuilder { instance_name: String, - store_type: Option, + store_types: Option>, plugin: Option, } @@ -41,14 +42,22 @@ impl GreptimeDbStandaloneBuilder { pub fn new(instance_name: &str) -> Self { Self { instance_name: instance_name.to_string(), - store_type: None, + store_types: None, plugin: None, } } pub fn with_store_type(self, store_type: StorageType) -> Self { Self { - store_type: Some(store_type), + store_types: Some(vec![store_type]), + ..self + } + } + + #[cfg(test)] + pub fn with_store_types(self, store_types: Vec) -> Self { + Self { + store_types: Some(store_types), ..self } } @@ -62,9 +71,9 @@ impl GreptimeDbStandaloneBuilder { } pub async fn build(self) -> GreptimeDbStandalone { - let store_type = self.store_type.unwrap_or(StorageType::File); + let store_types = self.store_types.unwrap_or(vec![StorageType::File]); - let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, &self.instance_name); + let (opts, guard) = create_tmp_dir_and_datanode_opts(store_types, &self.instance_name); let (kv_store, procedure_manager) = Instance::try_build_standalone_components( format!("{}/kv", &opts.storage.data_home), diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index e3370836eb2f..2865bc8ca986 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::env; -use std::fmt::Display; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -21,6 +20,7 @@ use std::time::Duration; use auth::UserProviderRef; use axum::Router; use catalog::kvbackend::KvBackendCatalogManager; +use common_datasource::object_store::StorageType; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_query::Output; @@ -52,67 +52,6 @@ use session::context::QueryContext; use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub enum StorageType { - S3, - S3WithCache, - File, - Oss, - Azblob, - Gcs, -} - -impl Display for StorageType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - StorageType::S3 => write!(f, "S3"), - StorageType::S3WithCache => write!(f, "S3"), - StorageType::File => write!(f, "File"), - StorageType::Oss => write!(f, "Oss"), - StorageType::Azblob => write!(f, "Azblob"), - StorageType::Gcs => write!(f, "Gcs"), - } - } -} - -impl StorageType { - pub fn test_on(&self) -> bool { - let _ = dotenv::dotenv(); - - match self { - StorageType::File => true, // always test file - StorageType::S3 | StorageType::S3WithCache => { - if let Ok(b) = env::var("GT_S3_BUCKET") { - !b.is_empty() - } else { - false - } - } - StorageType::Oss => { - if let Ok(b) = env::var("GT_OSS_BUCKET") { - !b.is_empty() - } else { - false - } - } - StorageType::Azblob => { - if let Ok(b) = env::var("GT_AZBLOB_CONTAINER") { - !b.is_empty() - } else { - false - } - } - StorageType::Gcs => { - if let Ok(b) = env::var("GT_GCS_BUCKET") { - !b.is_empty() - } else { - false - } - } - } - } -} - fn s3_test_config() -> S3Config { S3Config { root: uuid::Uuid::new_v4().to_string(), @@ -243,7 +182,7 @@ pub enum TempDirGuard { pub struct TestGuard { pub home_guard: FileDirGuard, - pub storage_guard: StorageGuard, + pub storage_guards: Vec, } pub struct FileDirGuard { @@ -260,42 +199,61 @@ pub struct StorageGuard(pub TempDirGuard); impl TestGuard { pub async fn remove_all(&mut self) { - if let TempDirGuard::S3(guard) - | TempDirGuard::Oss(guard) - | TempDirGuard::Azblob(guard) - | TempDirGuard::Gcs(guard) = &mut self.storage_guard.0 - { - guard.remove_all().await.unwrap() + for storage_guard in self.storage_guards.iter_mut() { + if let TempDirGuard::S3(guard) + | TempDirGuard::Oss(guard) + | TempDirGuard::Azblob(guard) + | TempDirGuard::Gcs(guard) = &mut storage_guard.0 + { + guard.remove_all().await.unwrap() + } } } } pub fn create_tmp_dir_and_datanode_opts( - store_type: StorageType, + store_types: Vec, name: &str, ) -> (DatanodeOptions, TestGuard) { let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); - let (store, data_tmp_dir) = get_test_store_config(&store_type); - let opts = create_datanode_opts(store, home_dir); + let mut stores = Vec::with_capacity(store_types.len()); + let mut storage_guards = Vec::with_capacity(store_types.len()); + for store_type in store_types { + let (store, data_tmp_dir) = get_test_store_config(&store_type); + stores.push(store); + storage_guards.push(StorageGuard(data_tmp_dir)) + } + let opts = create_datanode_opts(stores, home_dir); ( opts, TestGuard { home_guard: FileDirGuard::new(home_tmp_dir), - storage_guard: StorageGuard(data_tmp_dir), + storage_guards, }, ) } -pub(crate) fn create_datanode_opts(store: ObjectStoreConfig, home_dir: String) -> DatanodeOptions { +pub(crate) fn create_datanode_opts( + stores: Vec, + home_dir: String, +) -> DatanodeOptions { + let global_store = match &stores[0] { + ObjectStoreConfig::S3(_) => "S3", + ObjectStoreConfig::Azblob(_) => "Azblob", + ObjectStoreConfig::File(_) => "File", + ObjectStoreConfig::Oss(_) => "Oss", + ObjectStoreConfig::Gcs(_) => "Gcs", + }; DatanodeOptions { node_id: Some(0), require_lease_before_startup: true, storage: StorageConfig { data_home: home_dir, - store, + store: stores, + global_store: global_store.to_string(), ..Default::default() }, mode: Mode::Standalone, diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 2781d04b2f86..4478824734ff 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -19,6 +19,7 @@ mod test_util; use std::collections::HashMap; use std::sync::Arc; +use common_datasource::object_store::StorageType; use common_meta::key::TableMetadataManagerRef; use datanode::datanode::Datanode; use frontend::instance::Instance; @@ -45,3 +46,14 @@ pub async fn create_distributed_instance(test_name: &str) -> MockDistributedInst let cluster = GreptimeDbClusterBuilder::new(test_name).build().await; MockDistributedInstance(cluster) } + +pub async fn create_distributed_instance_with_multiple_object_stores( + test_name: &str, + custom_stores: &[StorageType], +) -> MockDistributedInstance { + let cluster = GreptimeDbClusterBuilder::new(test_name) + .with_storage_types(custom_stores.to_vec()) + .build() + .await; + MockDistributedInstance(cluster) +} diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 3e4073bb73a8..cb54e24b206b 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -31,8 +31,9 @@ use session::context::{QueryContext, QueryContextRef}; use crate::test_util::check_output_stream; use crate::tests::test_util::{ - both_instances_cases, check_unordered_output_stream, distributed, find_testing_resource, - prepare_path, standalone, standalone_instance_case, MockInstance, + both_instances_cases, both_instances_cases_with_custom_storages, check_unordered_output_stream, + distributed, distributed_with_multiple_object_stores, find_testing_resource, prepare_path, + standalone, standalone_instance_case, standalone_with_multiple_object_stores, MockInstance, }; #[apply(both_instances_cases)] @@ -1840,3 +1841,124 @@ async fn execute_sql_with( .await .unwrap() } + +#[apply(both_instances_cases_with_custom_storages)] +async fn test_custom_storage(instance: Arc) { + let frontend = instance.frontend(); + let custom_storages = [ + ("S3", "GT_S3_BUCKET"), + ("Oss", "GT_OSS_BUCKET"), + ("Azblob", "GT_AZBLOB_CONTAINER"), + ("Gcs", "GT_GCS_BUCKET"), + ]; + for (storage_name, custom_storage_env) in custom_storages { + if let Ok(env_value) = env::var(custom_storage_env) { + if env_value.is_empty() { + continue; + } + let sql = if instance.is_distributed_mode() { + format!( + r#"create table test_table( + host string, + ts timestamp time index, + ) + PARTITION BY RANGE COLUMNS (ts) ( + PARTITION r0 VALUES LESS THAN (1), + PARTITION r1 VALUES LESS THAN (10), + PARTITION r2 VALUES LESS THAN (100), + PARTITION r3 VALUES LESS THAN (MAXVALUE), + ) + with(storage='{storage_name}') + "# + ) + } else { + format!( + r#"create table test_table(host string, ts timestamp time index)with(storage='{storage_name}');"# + ) + }; + + let output = execute_sql(&instance.frontend(), &sql).await; + assert!(matches!(output, Output::AffectedRows(0))); + let output = execute_sql( + &frontend, + r#"insert into test_table(host, ts) values + ('host1', 1655276557000), + ('host2', 1655276558000) + "#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(2))); + + let output = execute_sql(&frontend, "select * from test_table").await; + let expected = "\ ++-------+---------------------+ +| host | ts | ++-------+---------------------+ +| host1 | 2022-06-15T07:02:37 | +| host2 | 2022-06-15T07:02:38 | ++-------+---------------------+"; + + check_output_stream(output, expected).await; + let output = execute_sql(&frontend, "show create table test_table").await; + let Output::RecordBatches(record_batches) = output else { + unreachable!() + }; + + let record_batches = record_batches.iter().collect::>(); + let column = record_batches[0].column_by_name("Create Table").unwrap(); + let actual = column.get(0); + + let expect = if instance.is_distributed_mode() { + format!( + r#"CREATE TABLE IF NOT EXISTS "test_table" ( + "host" STRING NULL, + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) +PARTITION BY RANGE COLUMNS ("ts") ( + PARTITION r0 VALUES LESS THAN ('1970-01-01 09:00:00.001+0900'), + PARTITION r1 VALUES LESS THAN ('1970-01-01 09:00:00.010+0900'), + PARTITION r2 VALUES LESS THAN ('1970-01-01 09:00:00.100+0900'), + PARTITION r3 VALUES LESS THAN (MAXVALUE) +) +ENGINE=mito +WITH( + regions = 4, + storage = '{storage_name}' +)"# + ) + } else { + format!( + r#"CREATE TABLE IF NOT EXISTS "test_table" ( + "host" STRING NULL, + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) + +ENGINE=mito +WITH( + regions = 1, + storage = '{storage_name}' +)"# + ) + }; + assert_eq!( + actual.to_string(), + expect, + "actual: {}\nexpect: {}\n", + actual.to_string(), + expect + ); + let output = execute_sql(&frontend, "truncate test_table").await; + assert!(matches!(output, Output::AffectedRows(0))); + let output = execute_sql(&frontend, "select * from test_table").await; + let expected = "\ +++ +++"; + + check_output_stream(output, expected).await; + let output = execute_sql(&frontend, "drop table test_table").await; + assert!(matches!(output, Output::AffectedRows(0))); + } + } +} diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 422eba4eaa19..99f891d2fc52 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -14,12 +14,14 @@ use std::sync::Arc; +use common_datasource::object_store::StorageType; use common_query::Output; use common_recordbatch::util; use common_test_util::find_workspace_path; use frontend::instance::Instance; use rstest_reuse::{self, template}; +use super::create_distributed_instance_with_multiple_object_stores; use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; use crate::tests::{create_distributed_instance, MockDistributedInstance}; @@ -61,6 +63,26 @@ pub(crate) async fn distributed() -> Arc { Arc::new(instance) } +pub(crate) async fn standalone_with_multiple_object_stores() -> Arc { + let _ = dotenv::dotenv(); + let test_name = uuid::Uuid::new_v4().to_string(); + let storage_types = StorageType::build_storage_types_based_on_env(); + let instance = GreptimeDbStandaloneBuilder::new(&test_name) + .with_store_types(storage_types) + .build() + .await; + Arc::new(instance) +} + +pub(crate) async fn distributed_with_multiple_object_stores() -> Arc { + let _ = dotenv::dotenv(); + let test_name = uuid::Uuid::new_v4().to_string(); + let storage_types = StorageType::build_storage_types_based_on_env(); + let instance = + create_distributed_instance_with_multiple_object_stores(&test_name, &storage_types).await; + Arc::new(instance) +} + #[template] #[rstest] #[case::test_with_standalone(standalone())] @@ -74,6 +96,19 @@ pub(crate) fn both_instances_cases( ) { } +#[template] +#[rstest] +#[case::test_with_standalone(standalone_with_multiple_object_stores())] +#[case::test_with_distributed(distributed_with_multiple_object_stores())] +#[awt] +#[tokio::test(flavor = "multi_thread")] +pub(crate) fn both_instances_cases_with_custom_storages( + #[future] + #[case] + instance: Arc, +) { +} + #[template] #[rstest] #[case::test_with_standalone(standalone())] diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 3d766a5a37f0..f6c595c7d834 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -22,13 +22,14 @@ use api::v1::{ use auth::user_provider_from_option; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; +use common_datasource::object_store::StorageType; use common_query::Output; use common_recordbatch::RecordBatches; use servers::grpc::GrpcServerConfig; use servers::http::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse}; use servers::server::Server; use tests_integration::test_util::{ - setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType, + setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, }; #[macro_export] @@ -42,7 +43,7 @@ macro_rules! grpc_test { #[$meta] )* async fn [< $test >]() { - let store_type = tests_integration::test_util::StorageType::$service; + let store_type = common_datasource::object_store::StorageType::$service; if store_type.test_on() { let _ = $crate::grpc::$test(store_type).await; } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 2d9ddc6289f4..7a3e9cb19bc8 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -17,6 +17,7 @@ use std::collections::BTreeMap; use auth::user_provider_from_option; use axum::http::StatusCode; use axum_test_helper::TestClient; +use common_datasource::object_store::StorageType; use common_error::status_code::StatusCode as ErrorCode; use serde_json::json; use servers::http::handler::HealthResponse; @@ -25,7 +26,6 @@ use servers::http::{JsonOutput, JsonResponse}; use tests_integration::test_util::{ setup_test_http_app, setup_test_http_app_with_frontend, setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend, - StorageType, }; #[macro_export] @@ -39,7 +39,7 @@ macro_rules! http_test { #[$meta] )* async fn [< $test >]() { - let store_type = tests_integration::test_util::StorageType::$service; + let store_type = common_datasource::object_store::StorageType::$service; if store_type.test_on() { let _ = $crate::http::$test(store_type).await; } @@ -601,7 +601,7 @@ pub async fn test_config_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, _guard) = setup_test_http_app_with_frontend(store_type, "config_api").await; let client = TestClient::new(app); - + let global_store = store_type.to_string(); let res_get = client.get("/config").send().await; assert_eq!(res_get.status(), StatusCode::OK); let expected_toml_str = format!( @@ -631,6 +631,9 @@ read_batch_size = 128 sync_write = false [storage] +global_store = "{}" + +[[storage.store]] type = "{}" [storage.compaction] @@ -670,7 +673,7 @@ sst_meta_cache_size = "128MiB" [logging] enable_jaeger_tracing = false"#, - store_type + global_store, store_type ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); assert_eq!(body_text, expected_toml_str); diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 6941abbce580..ac971435cf8f 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -18,6 +18,7 @@ use std::time::Duration; use api::v1::meta::Peer; use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_datasource::object_store::StorageType; use common_meta::key::table_route::TableRouteKey; use common_meta::key::{RegionDistribution, TableMetaKey}; use common_meta::{distributed_time_constants, RegionIdent}; @@ -35,7 +36,7 @@ use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; use table::metadata::TableId; use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; -use tests_integration::test_util::{check_output_stream, get_test_store_config, StorageType}; +use tests_integration::test_util::{check_output_stream, get_test_store_config}; use tokio::time; #[macro_export] @@ -49,7 +50,7 @@ macro_rules! region_failover_test { #[$meta] )* async fn [< $test >]() { - let store_type = tests_integration::test_util::StorageType::$service; + let store_type = common_datasource::object_store::StorageType::$service; if store_type.test_on() { let _ = $crate::region_failover::$test(store_type).await; } diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index f8e81230eb3c..b1504b4be42c 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -14,12 +14,13 @@ use auth::user_provider_from_option; use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; +use common_datasource::object_store::StorageType; use sqlx::mysql::{MySqlDatabaseError, MySqlPoolOptions}; use sqlx::postgres::{PgDatabaseError, PgPoolOptions}; use sqlx::Row; use tests_integration::test_util::{ setup_mysql_server, setup_mysql_server_with_user_provider, setup_pg_server, - setup_pg_server_with_user_provider, StorageType, + setup_pg_server_with_user_provider, }; use tokio_postgres::NoTls; @@ -34,7 +35,7 @@ macro_rules! sql_test { #[$meta] )* async fn [< $test >]() { - let store_type = tests_integration::test_util::StorageType::$service; + let store_type = common_datasource::object_store::StorageType::$service; if store_type.test_on() { let _ = $crate::sql::$test(store_type).await; }