diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 7793e148ac4c..23a1875fa979 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -43,7 +43,6 @@ sync_write = false [storage] # The working home directory. data_home = "/tmp/greptimedb/" -type = "File" # TTL for all tables. Disabled by default. # global_ttl = "7d" @@ -53,6 +52,9 @@ type = "File" # The local file cache capacity in bytes. # cache_capacity = "256MB" +[storage.default_store] +type = "File" + # Compaction options, see `standalone.example.toml`. [storage.compaction] max_inflight_tasks = 4 diff --git a/config/standalone.example.toml b/config/standalone.example.toml index e4bd41a616c4..a62dcd57409b 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -113,14 +113,14 @@ retry_delay = "500ms" [storage] # The working home directory. data_home = "/tmp/greptimedb/" -# Storage type. -type = "File" # TTL for all tables. Disabled by default. # global_ttl = "7d" # Cache configuration for object storage such as 'S3' etc. # cache_path = "/path/local_cache" # The local file cache capacity in bytes. # cache_capacity = "256MB" +[storage.default_store] +type = "File" # Compaction options. [storage.compaction] diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 8a5895d35fc7..3c1d7365ebce 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -28,6 +28,7 @@ use common_telemetry::error; use prost::Message; use snafu::{location, Location, OptionExt, ResultExt}; use tokio_stream::StreamExt; +use tonic::Code; use crate::error::Error::RegionServer; use crate::error::{ @@ -45,7 +46,13 @@ pub struct RegionRequester { impl Datanode for RegionRequester { async fn handle(&self, request: RegionRequest) -> MetaResult { self.handle_inner(request).await.map_err(|err| { - if matches!(err, RegionServer { .. }) { + if !matches!( + err, + RegionServer { + code: Code::InvalidArgument, + .. + } + ) { meta_error::Error::RetryLater { source: BoxedError::new(err), } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 0be8b08c63f4..0d29e1fbeb42 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -192,7 +192,9 @@ mod tests { use std::time::Duration; use common_test_util::temp_dir::create_named_temp_file; - use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig}; + use datanode::config::{ + CompactionConfig, FileConfig, GcsConfig, ObjectStoreConfig, RegionManifestConfig, S3Config, + }; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; @@ -229,9 +231,19 @@ mod tests { sync_write = false [storage] - type = "File" data_home = "/tmp/greptimedb/" + [storage.default_store] + type = "File" + [[storage.custom_stores]] + type = "Gcs" + bucket = "foo" + endpoint = "bar" + + [[storage.custom_stores]] + type = "S3" + bucket = "foo" + [storage.compaction] max_inflight_tasks = 3 max_files_in_level0 = 7 @@ -290,9 +302,18 @@ mod tests { assert!(tcp_nodelay); assert_eq!("/tmp/greptimedb/", options.storage.data_home); assert!(matches!( - &options.storage.store, + &options.storage.default_store, ObjectStoreConfig::File(FileConfig { .. }) )); + assert_eq!(options.storage.custom_stores.len(), 2); + assert!(matches!( + options.storage.custom_stores[0], + ObjectStoreConfig::Gcs(GcsConfig { .. }) + )); + assert!(matches!( + options.storage.custom_stores[1], + ObjectStoreConfig::S3(S3Config { .. }) + )); assert_eq!( CompactionConfig { diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 3908eda60fc8..fd60f5882b35 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -201,6 +201,7 @@ mod tests { [ env_prefix.to_string(), "storage".to_uppercase(), + "default_store".to_uppercase(), "type".to_uppercase(), ] .join(ENV_VAR_SEP), @@ -211,6 +212,7 @@ mod tests { [ env_prefix.to_string(), "storage".to_uppercase(), + "default_store".to_uppercase(), "bucket".to_uppercase(), ] .join(ENV_VAR_SEP), @@ -258,7 +260,7 @@ mod tests { // Check the configs from environment variables. assert_eq!(opts.storage.manifest.checkpoint_margin, Some(99)); - match opts.storage.store { + match &opts.storage.default_store { ObjectStoreConfig::S3(s3_config) => { assert_eq!(s3_config.bucket, "mybucket".to_string()); } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index c7adfd0834d9..ca5c6199223f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -418,6 +418,7 @@ mod tests { use auth::{Identity, Password, UserProviderRef}; use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; + use datanode::config::{FileConfig, GcsConfig}; use servers::Mode; use super::*; @@ -465,8 +466,18 @@ mod tests { purge_interval = "10m" read_batch_size = 128 sync_write = false - + [storage] + data_home = "/tmp/greptimedb/" + [storage.default_store] + type = "File" + + [[storage.custom_stores]] + type = "Gcs" + bucket = "foo" + endpoint = "bar" + + [[storage.custom_stores]] type = "S3" access_key_id = "access_key_id" secret_access_key = "secret_access_key" @@ -516,7 +527,16 @@ mod tests { assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap()); - match &dn_opts.storage.store { + assert!(matches!( + &dn_opts.storage.default_store, + datanode::config::ObjectStoreConfig::File(FileConfig { .. }) + )); + assert_eq!(dn_opts.storage.custom_stores.len(), 2); + assert!(matches!( + dn_opts.storage.custom_stores[0], + datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. }) + )); + match &dn_opts.storage.custom_stores[1] { datanode::config::ObjectStoreConfig::S3(s3_config) => { assert_eq!( "Secret([REDACTED alloc::string::String])".to_string(), diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 739d451d95e8..b5961f4830ef 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -53,6 +53,18 @@ pub enum ObjectStoreConfig { Gcs(GcsConfig), } +impl ObjectStoreConfig { + pub fn extract_variant_name(&self) -> &'static str { + match self { + Self::File(_) => "File", + Self::S3(_) => "S3", + Self::Oss(_) => "Oss", + Self::Azblob(_) => "Azblob", + Self::Gcs(_) => "Gcs", + } + } +} + /// Storage engine config #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] @@ -66,8 +78,8 @@ pub struct StorageConfig { pub global_ttl: Option, /// The working directory of database pub data_home: String, - #[serde(flatten)] - pub store: ObjectStoreConfig, + pub default_store: ObjectStoreConfig, + pub custom_stores: Vec, pub compaction: CompactionConfig, pub manifest: RegionManifestConfig, pub flush: FlushConfig, @@ -78,10 +90,11 @@ impl Default for StorageConfig { Self { global_ttl: None, data_home: DEFAULT_DATA_HOME.to_string(), - store: ObjectStoreConfig::default(), + custom_stores: vec![], compaction: CompactionConfig::default(), manifest: RegionManifestConfig::default(), flush: FlushConfig::default(), + default_store: ObjectStoreConfig::default(), } } } @@ -403,13 +416,13 @@ mod tests { #[test] fn test_secstr() { let toml_str = r#" - [storage] + [storage.default_store] type = "S3" access_key_id = "access_key_id" secret_access_key = "secret_access_key" "#; let opts: DatanodeOptions = toml::from_str(toml_str).unwrap(); - match opts.storage.store { + match &opts.storage.default_store { ObjectStoreConfig::S3(cfg) => { assert_eq!( "Secret([REDACTED alloc::string::String])".to_string(), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 9dc157f3f943..85e9c730b81c 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -291,7 +291,6 @@ impl DatanodeBuilder { )); } } - info!("going to open {} regions", regions.len()); let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); let mut tasks = vec![]; @@ -354,13 +353,8 @@ impl DatanodeBuilder { let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone(), event_listener); - let object_store = store::new_object_store(opts).await?; - let object_store_manager = ObjectStoreManager::new( - "default", // TODO: use a name which is set in the configuration when #919 is done. - object_store, - ); - let engines = - Self::build_store_engines(opts, log_store, Arc::new(object_store_manager)).await?; + let object_store_manager = Self::build_object_store_manager(opts).await?; + let engines = Self::build_store_engines(opts, log_store, object_store_manager).await?; for engine in engines { region_server.register_engine(engine); } @@ -425,6 +419,23 @@ impl DatanodeBuilder { } Ok(engines) } + + /// Builds [ObjectStoreManager] + async fn build_object_store_manager(opts: &DatanodeOptions) -> Result { + let object_store = + store::new_object_store(opts.storage.default_store.clone(), &opts.storage.data_home) + .await?; + let default_name = opts.storage.default_store.extract_variant_name(); + let mut object_store_manager = ObjectStoreManager::new(default_name, object_store); + for store in &opts.storage.custom_stores { + let name = store.extract_variant_name(); + object_store_manager.add( + name, + store::new_object_store(store.clone(), &opts.storage.data_home).await?, + ); + } + Ok(Arc::new(object_store_manager)) + } } #[cfg(test)] diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 78528cf6ce6e..571b17f452b5 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -32,12 +32,15 @@ use object_store::util::normalize_dir; use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; -use crate::config::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; +use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; use crate::error::{self, Result}; -pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result { - let data_home = normalize_dir(&opts.storage.data_home); - let object_store = match &opts.storage.store { +pub(crate) async fn new_object_store( + store: ObjectStoreConfig, + data_home: &str, +) -> Result { + let data_home = normalize_dir(data_home); + let object_store = match &store { ObjectStoreConfig::File(file_config) => { fs::new_fs_object_store(&data_home, file_config).await } @@ -50,9 +53,8 @@ pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result ParserContext<'a> { } ); } + // Sorts options so that `test_display_create_table` can always pass. + let options = options.into_iter().sorted().collect(); let create_table = CreateTable { if_not_exists, name: table_name, diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 47b511be5616..3cb74b563a57 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -241,7 +241,7 @@ mod tests { PARTITION r2 VALUES LESS THAN (MAXVALUE), ) engine=mito - with(regions=1, ttl='7d'); + with(regions=1, ttl='7d', storage='File'); "; let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap(); assert_eq!(1, result.len()); @@ -267,6 +267,7 @@ PARTITION BY RANGE COLUMNS (ts) ( ENGINE=mito WITH( regions = 1, + storage = 'File', ttl = '7d' )"#, &new_sql diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 9d78f21534fb..c3ffceec63e8 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -84,6 +84,7 @@ pub struct TableOptions { pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size"; pub const TTL_KEY: &str = "ttl"; pub const REGIONS_KEY: &str = "regions"; +pub const STORAGE_KEY: &str = "storage"; impl TryFrom<&HashMap> for TableOptions { type Error = error::Error; @@ -340,6 +341,7 @@ pub fn valid_table_option(key: &str) -> bool { | WRITE_BUFFER_SIZE_KEY | TTL_KEY | REGIONS_KEY + | STORAGE_KEY ) | is_supported_in_s3(key) } @@ -365,6 +367,7 @@ mod tests { assert!(valid_table_option(TTL_KEY)); assert!(valid_table_option(REGIONS_KEY)); assert!(valid_table_option(WRITE_BUFFER_SIZE_KEY)); + assert!(valid_table_option(STORAGE_KEY)); assert!(!valid_table_option("foo")); } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 3e2c1383a7fb..6df84a5ec5f1 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -59,6 +59,7 @@ pub struct GreptimeDbClusterBuilder { cluster_name: String, kv_backend: KvBackendRef, store_config: Option, + custom_store_types: Option>, datanodes: Option, } @@ -68,6 +69,7 @@ impl GreptimeDbClusterBuilder { cluster_name: cluster_name.to_string(), kv_backend: Arc::new(MemoryKvBackend::new()), store_config: None, + custom_store_types: None, datanodes: None, } } @@ -77,6 +79,11 @@ impl GreptimeDbClusterBuilder { self } + pub fn with_custom_store_types(mut self, store_types: Vec) -> Self { + self.custom_store_types = Some(store_types); + self + } + pub fn with_datanodes(mut self, datanodes: u32) -> Self { self.datanodes = Some(datanodes); self @@ -152,14 +159,15 @@ impl GreptimeDbClusterBuilder { dir_guards.push(FileDirGuard::new(home_tmp_dir)); - create_datanode_opts(store_config.clone(), home_dir) + create_datanode_opts(store_config.clone(), vec![], home_dir) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( StorageType::File, + self.custom_store_types.clone().unwrap_or_default(), &format!("{}-dn-{}", self.cluster_name, datanode_id), ); - storage_guards.push(guard.storage_guard); + storage_guards.push(guard.storage_guards); dir_guards.push(guard.home_guard); opts @@ -171,7 +179,11 @@ impl GreptimeDbClusterBuilder { instances.insert(datanode_id, datanode); } - (instances, storage_guards, dir_guards) + ( + instances, + storage_guards.into_iter().flatten().collect(), + dir_guards, + ) } async fn wait_datanodes_alive( diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 153ad46e0608..b472941bca3f 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -37,7 +37,8 @@ pub struct GreptimeDbStandalone { pub struct GreptimeDbStandaloneBuilder { instance_name: String, - store_type: Option, + custom_store_types: Option>, + default_store: Option, plugin: Option, } @@ -45,14 +46,23 @@ impl GreptimeDbStandaloneBuilder { pub fn new(instance_name: &str) -> Self { Self { instance_name: instance_name.to_string(), - store_type: None, + custom_store_types: None, plugin: None, + default_store: None, } } - pub fn with_store_type(self, store_type: StorageType) -> Self { + pub fn with_default_store_type(self, store_type: StorageType) -> Self { Self { - store_type: Some(store_type), + default_store: Some(store_type), + ..self + } + } + + #[cfg(test)] + pub fn with_custom_store_types(self, store_types: Vec) -> Self { + Self { + custom_store_types: Some(store_types), ..self } } @@ -66,9 +76,11 @@ impl GreptimeDbStandaloneBuilder { } pub async fn build(self) -> GreptimeDbStandalone { - let store_type = self.store_type.unwrap_or(StorageType::File); + let default_store_type = self.default_store.unwrap_or(StorageType::File); + let store_types = self.custom_store_types.unwrap_or_default(); - let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, &self.instance_name); + let (opts, guard) = + create_tmp_dir_and_datanode_opts(default_store_type, store_types, &self.instance_name); let procedure_config = ProcedureConfig::default(); let kv_backend_config = KvBackendConfig::default(); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index ef47b3c2b974..c2bce0eb8e2c 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -77,6 +77,24 @@ impl Display for StorageType { } impl StorageType { + pub fn build_storage_types_based_on_env() -> Vec { + let mut storage_types = Vec::with_capacity(4); + storage_types.push(StorageType::File); + if env::var("GT_S3_BUCKET").is_ok() { + storage_types.push(StorageType::S3); + } + if env::var("GT_OSS_BUCKET").is_ok() { + storage_types.push(StorageType::Oss); + } + if env::var("GT_AZBLOB_CONTAINER").is_ok() { + storage_types.push(StorageType::Azblob); + } + if env::var("GT_GCS_BUCKET").is_ok() { + storage_types.push(StorageType::Gcs); + } + storage_types + } + pub fn test_on(&self) -> bool { let _ = dotenv::dotenv(); @@ -244,7 +262,7 @@ pub enum TempDirGuard { pub struct TestGuard { pub home_guard: FileDirGuard, - pub storage_guard: StorageGuard, + pub storage_guards: Vec, } pub struct FileDirGuard { @@ -261,42 +279,62 @@ pub struct StorageGuard(pub TempDirGuard); impl TestGuard { pub async fn remove_all(&mut self) { - if let TempDirGuard::S3(guard) - | TempDirGuard::Oss(guard) - | TempDirGuard::Azblob(guard) - | TempDirGuard::Gcs(guard) = &mut self.storage_guard.0 - { - guard.remove_all().await.unwrap() + for storage_guard in self.storage_guards.iter_mut() { + if let TempDirGuard::S3(guard) + | TempDirGuard::Oss(guard) + | TempDirGuard::Azblob(guard) + | TempDirGuard::Gcs(guard) = &mut storage_guard.0 + { + guard.remove_all().await.unwrap() + } } } } pub fn create_tmp_dir_and_datanode_opts( - store_type: StorageType, + default_store_type: StorageType, + custom_store_types: Vec, name: &str, ) -> (DatanodeOptions, TestGuard) { let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); - let (store, data_tmp_dir) = get_test_store_config(&store_type); - let opts = create_datanode_opts(store, home_dir); + // Excludes the default object store. + let mut custom_stores = Vec::with_capacity(custom_store_types.len()); + // Includes the default object store. + let mut storage_guards = Vec::with_capacity(custom_store_types.len() + 1); + + let (default_store, data_tmp_dir) = get_test_store_config(&default_store_type); + storage_guards.push(StorageGuard(data_tmp_dir)); + + for store_type in custom_store_types { + let (store, data_tmp_dir) = get_test_store_config(&store_type); + custom_stores.push(store); + storage_guards.push(StorageGuard(data_tmp_dir)) + } + let opts = create_datanode_opts(default_store, custom_stores, home_dir); ( opts, TestGuard { home_guard: FileDirGuard::new(home_tmp_dir), - storage_guard: StorageGuard(data_tmp_dir), + storage_guards, }, ) } -pub(crate) fn create_datanode_opts(store: ObjectStoreConfig, home_dir: String) -> DatanodeOptions { +pub(crate) fn create_datanode_opts( + default_store: ObjectStoreConfig, + custom_stores: Vec, + home_dir: String, +) -> DatanodeOptions { DatanodeOptions { node_id: Some(0), require_lease_before_startup: true, storage: StorageConfig { data_home: home_dir, - store, + custom_stores, + default_store, ..Default::default() }, mode: Mode::Standalone, @@ -325,7 +363,7 @@ async fn setup_standalone_instance( store_type: StorageType, ) -> GreptimeDbStandalone { GreptimeDbStandaloneBuilder::new(test_name) - .with_store_type(store_type) + .with_default_store_type(store_type) .build() .await } diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 5e0ecf62af44..8682c3942a76 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -31,8 +31,9 @@ use session::context::{QueryContext, QueryContextRef}; use crate::test_util::check_output_stream; use crate::tests::test_util::{ - both_instances_cases, check_unordered_output_stream, distributed, find_testing_resource, - prepare_path, standalone, standalone_instance_case, MockInstance, + both_instances_cases, both_instances_cases_with_custom_storages, check_unordered_output_stream, + distributed, distributed_with_multiple_object_stores, find_testing_resource, prepare_path, + standalone, standalone_instance_case, standalone_with_multiple_object_stores, MockInstance, }; #[apply(both_instances_cases)] @@ -1840,3 +1841,118 @@ async fn execute_sql_with( .await .unwrap() } + +#[apply(both_instances_cases_with_custom_storages)] +async fn test_custom_storage(instance: Arc) { + let frontend = instance.frontend(); + let custom_storages = [ + ("S3", "GT_S3_BUCKET"), + ("Oss", "GT_OSS_BUCKET"), + ("Azblob", "GT_AZBLOB_CONTAINER"), + ("Gcs", "GT_GCS_BUCKET"), + ]; + for (storage_name, custom_storage_env) in custom_storages { + if let Ok(env_value) = env::var(custom_storage_env) { + if env_value.is_empty() { + continue; + } + let sql = if instance.is_distributed_mode() { + format!( + r#"create table test_table( + host string, + ts timestamp time index, + ) + PARTITION BY RANGE COLUMNS (ts) ( + PARTITION r0 VALUES LESS THAN (1), + PARTITION r1 VALUES LESS THAN (10), + PARTITION r2 VALUES LESS THAN (100), + PARTITION r3 VALUES LESS THAN (MAXVALUE), + ) + with(storage='{storage_name}') + "# + ) + } else { + format!( + r#"create table test_table(host string, ts timestamp time index)with(storage='{storage_name}');"# + ) + }; + + let output = execute_sql(&instance.frontend(), &sql).await; + assert!(matches!(output, Output::AffectedRows(0))); + let output = execute_sql( + &frontend, + r#"insert into test_table(host, ts) values + ('host1', 1655276557000), + ('host2', 1655276558000) + "#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(2))); + + let output = execute_sql(&frontend, "select * from test_table").await; + let expected = "\ ++-------+---------------------+ +| host | ts | ++-------+---------------------+ +| host1 | 2022-06-15T07:02:37 | +| host2 | 2022-06-15T07:02:38 | ++-------+---------------------+"; + + check_output_stream(output, expected).await; + let output = execute_sql(&frontend, "show create table test_table").await; + let Output::RecordBatches(record_batches) = output else { + unreachable!() + }; + + let record_batches = record_batches.iter().collect::>(); + let column = record_batches[0].column_by_name("Create Table").unwrap(); + let actual = column.get(0); + + let expect = if instance.is_distributed_mode() { + format!( + r#"CREATE TABLE IF NOT EXISTS "test_table" ( + "host" STRING NULL, + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) +PARTITION BY RANGE COLUMNS ("ts") ( + PARTITION r0 VALUES LESS THAN ('1970-01-01 09:00:00.001+0900'), + PARTITION r1 VALUES LESS THAN ('1970-01-01 09:00:00.010+0900'), + PARTITION r2 VALUES LESS THAN ('1970-01-01 09:00:00.100+0900'), + PARTITION r3 VALUES LESS THAN (MAXVALUE) +) +ENGINE=mito +WITH( + regions = 4, + storage = '{storage_name}' +)"# + ) + } else { + format!( + r#"CREATE TABLE IF NOT EXISTS "test_table" ( + "host" STRING NULL, + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) + +ENGINE=mito +WITH( + regions = 1, + storage = '{storage_name}' +)"# + ) + }; + assert_eq!(actual.to_string(), expect,); + let output = execute_sql(&frontend, "truncate test_table").await; + assert!(matches!(output, Output::AffectedRows(0))); + let output = execute_sql(&frontend, "select * from test_table").await; + let expected = "\ +++ +++"; + + check_output_stream(output, expected).await; + let output = execute_sql(&frontend, "drop table test_table").await; + assert!(matches!(output, Output::AffectedRows(0))); + } + } +} diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 422eba4eaa19..4265e1fb1cf3 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -20,7 +20,9 @@ use common_test_util::find_workspace_path; use frontend::instance::Instance; use rstest_reuse::{self, template}; +use crate::cluster::GreptimeDbClusterBuilder; use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; +use crate::test_util::StorageType; use crate::tests::{create_distributed_instance, MockDistributedInstance}; pub(crate) trait MockInstance { @@ -61,6 +63,41 @@ pub(crate) async fn distributed() -> Arc { Arc::new(instance) } +pub(crate) async fn standalone_with_multiple_object_stores() -> Arc { + let _ = dotenv::dotenv(); + let test_name = uuid::Uuid::new_v4().to_string(); + let storage_types = StorageType::build_storage_types_based_on_env(); + let instance = GreptimeDbStandaloneBuilder::new(&test_name) + .with_custom_store_types(storage_types) + .build() + .await; + Arc::new(instance) +} + +pub(crate) async fn distributed_with_multiple_object_stores() -> Arc { + let _ = dotenv::dotenv(); + let test_name = uuid::Uuid::new_v4().to_string(); + let custom_storage_types = StorageType::build_storage_types_based_on_env(); + let cluster = GreptimeDbClusterBuilder::new(&test_name) + .with_custom_store_types(custom_storage_types) + .build() + .await; + Arc::new(MockDistributedInstance(cluster)) +} + +#[template] +#[rstest] +#[case::test_with_standalone(standalone_with_multiple_object_stores())] +#[case::test_with_distributed(distributed_with_multiple_object_stores())] +#[awt] +#[tokio::test(flavor = "multi_thread")] +pub(crate) fn both_instances_cases_with_custom_storages( + #[future] + #[case] + instance: Arc, +) { +} + #[template] #[rstest] #[case::test_with_standalone(standalone())] diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e9af72ef9e70..cf65c0865a82 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -700,6 +700,9 @@ read_batch_size = 128 sync_write = false [datanode.storage] +custom_stores = [] + +[datanode.storage.default_store] type = "{}" [datanode.storage.compaction] @@ -746,6 +749,7 @@ enable_jaeger_tracing = false"#, store_type ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); + assert_eq!(body_text, expected_toml_str); } diff --git a/tests/cases/distributed/show/show_create.result b/tests/cases/distributed/show/show_create.result index b871117b1ad9..0936ab2d7f2e 100644 --- a/tests/cases/distributed/show/show_create.result +++ b/tests/cases/distributed/show/show_create.result @@ -100,3 +100,24 @@ WITH( Error: 1004(InvalidArguments), Invalid table option key: foo +CREATE TABLE not_supported_table_storage_option ( + id INT UNSIGNED, + host STRING, + cpu DOUBLE, + disk FLOAT, + ts TIMESTAMP NOT NULL DEFAULT current_timestamp(), + TIME INDEX (ts), + PRIMARY KEY (id, host) +) +PARTITION BY RANGE COLUMNS (id) ( + PARTITION r0 VALUES LESS THAN (5), + PARTITION r1 VALUES LESS THAN (9), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +) +ENGINE=mito +WITH( + storage = 'S3' +); + +Error: 1004(InvalidArguments), Object store not found: s3 + diff --git a/tests/cases/distributed/show/show_create.sql b/tests/cases/distributed/show/show_create.sql index 56b56baadcbc..b64a9cbbde56 100644 --- a/tests/cases/distributed/show/show_create.sql +++ b/tests/cases/distributed/show/show_create.sql @@ -50,3 +50,21 @@ WITH( ttl = '7d', write_buffer_size = 1024 ); +CREATE TABLE not_supported_table_storage_option ( + id INT UNSIGNED, + host STRING, + cpu DOUBLE, + disk FLOAT, + ts TIMESTAMP NOT NULL DEFAULT current_timestamp(), + TIME INDEX (ts), + PRIMARY KEY (id, host) +) +PARTITION BY RANGE COLUMNS (id) ( + PARTITION r0 VALUES LESS THAN (5), + PARTITION r1 VALUES LESS THAN (9), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +) +ENGINE=mito +WITH( + storage = 'S3' +); \ No newline at end of file diff --git a/tests/cases/standalone/show/show_create.result b/tests/cases/standalone/show/show_create.result index e9e22f060cf3..23f0df6dc84f 100644 --- a/tests/cases/standalone/show/show_create.result +++ b/tests/cases/standalone/show/show_create.result @@ -61,3 +61,24 @@ WITH( Error: 1004(InvalidArguments), Invalid table option key: foo +CREATE TABLE not_supported_table_storage_option ( + id INT UNSIGNED, + host STRING, + cpu DOUBLE, + disk FLOAT, + ts TIMESTAMP NOT NULL DEFAULT current_timestamp(), + TIME INDEX (ts), + PRIMARY KEY (id, host) +) +PARTITION BY RANGE COLUMNS (id) ( + PARTITION r0 VALUES LESS THAN (5), + PARTITION r1 VALUES LESS THAN (9), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +) +ENGINE=mito +WITH( + storage = 'S3' +); + +Error: 1004(InvalidArguments), Object store not found: s3 + diff --git a/tests/cases/standalone/show/show_create.sql b/tests/cases/standalone/show/show_create.sql index 86faf1a604aa..60928c8f69ae 100644 --- a/tests/cases/standalone/show/show_create.sql +++ b/tests/cases/standalone/show/show_create.sql @@ -33,3 +33,22 @@ WITH( ttl = '7d', write_buffer_size = 1024 ); + +CREATE TABLE not_supported_table_storage_option ( + id INT UNSIGNED, + host STRING, + cpu DOUBLE, + disk FLOAT, + ts TIMESTAMP NOT NULL DEFAULT current_timestamp(), + TIME INDEX (ts), + PRIMARY KEY (id, host) +) +PARTITION BY RANGE COLUMNS (id) ( + PARTITION r0 VALUES LESS THAN (5), + PARTITION r1 VALUES LESS THAN (9), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +) +ENGINE=mito +WITH( + storage = 'S3' +); \ No newline at end of file