From 7b239873ae24c0c17caa3ac91fd812cf7268c2aa Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Tue, 7 Nov 2023 20:00:22 +0900 Subject: [PATCH 01/13] feat: support table ddl for custom_storage --- config/datanode.example.toml | 4 +- config/standalone.example.toml | 4 +- src/client/src/region.rs | 9 +- src/cmd/src/datanode.rs | 27 +++- src/cmd/src/options.rs | 4 +- src/cmd/src/standalone.rs | 24 +++- src/datanode/src/config.rs | 23 +++- src/datanode/src/datanode.rs | 38 +++--- src/datanode/src/store.rs | 16 ++- src/sql/src/parsers/create_parser.rs | 2 + src/sql/src/statements/create.rs | 3 +- src/table/src/requests.rs | 3 + tests-integration/src/cluster.rs | 18 ++- tests-integration/src/standalone.rs | 24 +++- tests-integration/src/test_util.rs | 68 ++++++++-- tests-integration/src/tests/instance_test.rs | 120 +++++++++++++++++- tests-integration/src/tests/test_util.rs | 37 ++++++ tests-integration/tests/http.rs | 4 + .../standalone/common/show/show_create.result | 21 +++ .../standalone/common/show/show_create.sql | 18 +++ 20 files changed, 402 insertions(+), 65 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 256970863184..2a8d4e8ebd2c 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -43,7 +43,6 @@ sync_write = false [storage] # The working home directory. data_home = "/tmp/greptimedb/" -type = "File" # TTL for all tables. Disabled by default. # global_ttl = "7d" @@ -53,6 +52,9 @@ type = "File" # The local file cache capacity in bytes. # cache_capacity = "256MB" +[storage.default_store] +type = "File" + # Mito engine options [[region_engine]] [region_engine.mito] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 629c9bdb9a73..248014ff7f8d 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -113,14 +113,14 @@ retry_delay = "500ms" [storage] # The working home directory. data_home = "/tmp/greptimedb/" -# Storage type. -type = "File" # TTL for all tables. Disabled by default. # global_ttl = "7d" # Cache configuration for object storage such as 'S3' etc. # cache_path = "/path/local_cache" # The local file cache capacity in bytes. # cache_capacity = "256MB" +[storage.default_store] +type = "File" # Mito engine options [[region_engine]] diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 8a5895d35fc7..3c1d7365ebce 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -28,6 +28,7 @@ use common_telemetry::error; use prost::Message; use snafu::{location, Location, OptionExt, ResultExt}; use tokio_stream::StreamExt; +use tonic::Code; use crate::error::Error::RegionServer; use crate::error::{ @@ -45,7 +46,13 @@ pub struct RegionRequester { impl Datanode for RegionRequester { async fn handle(&self, request: RegionRequest) -> MetaResult { self.handle_inner(request).await.map_err(|err| { - if matches!(err, RegionServer { .. }) { + if !matches!( + err, + RegionServer { + code: Code::InvalidArgument, + .. + } + ) { meta_error::Error::RetryLater { source: BoxedError::new(err), } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index c007ef8a70e9..f882798bc3dd 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -214,7 +214,9 @@ mod tests { use std::time::Duration; use common_test_util::temp_dir::create_named_temp_file; - use datanode::config::{FileConfig, ObjectStoreConfig}; + use datanode::config::{ + FileConfig, GcsConfig, ObjectStoreConfig, S3Config, + }; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; @@ -251,8 +253,18 @@ mod tests { sync_write = false [storage] - type = "File" data_home = "/tmp/greptimedb/" + [storage.default_store] + type = "File" + + [[storage.custom_stores]] + type = "Gcs" + bucket = "foo" + endpoint = "bar" + + [[storage.custom_stores]] + type = "S3" + bucket = "foo" [logging] level = "debug" @@ -302,9 +314,18 @@ mod tests { assert!(tcp_nodelay); assert_eq!("/tmp/greptimedb/", options.storage.data_home); assert!(matches!( - &options.storage.store, + &options.storage.default_store, ObjectStoreConfig::File(FileConfig { .. }) )); + assert_eq!(options.storage.custom_stores.len(), 2); + assert!(matches!( + options.storage.custom_stores[0], + ObjectStoreConfig::Gcs(GcsConfig { .. }) + )); + assert!(matches!( + options.storage.custom_stores[1], + ObjectStoreConfig::S3(S3Config { .. }) + )); assert_eq!("debug", options.logging.level.unwrap()); assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir); diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index bc340d588a3e..477d23cd0a3c 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -193,6 +193,7 @@ mod tests { [ env_prefix.to_string(), "storage".to_uppercase(), + "default_store".to_uppercase(), "type".to_uppercase(), ] .join(ENV_VAR_SEP), @@ -203,6 +204,7 @@ mod tests { [ env_prefix.to_string(), "storage".to_uppercase(), + "default_store".to_uppercase(), "bucket".to_uppercase(), ] .join(ENV_VAR_SEP), @@ -238,7 +240,7 @@ mod tests { .unwrap(); // Check the configs from environment variables. - match opts.storage.store { + match &opts.storage.default_store { ObjectStoreConfig::S3(s3_config) => { assert_eq!(s3_config.bucket, "mybucket".to_string()); } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 5483c6c74724..a6cba4c2a175 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -412,6 +412,7 @@ mod tests { use auth::{Identity, Password, UserProviderRef}; use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; + use datanode::config::{FileConfig, GcsConfig}; use servers::Mode; use super::*; @@ -459,8 +460,18 @@ mod tests { purge_interval = "10m" read_batch_size = 128 sync_write = false - + [storage] + data_home = "/tmp/greptimedb/" + [storage.default_store] + type = "File" + + [[storage.custom_stores]] + type = "Gcs" + bucket = "foo" + endpoint = "bar" + + [[storage.custom_stores]] type = "S3" access_key_id = "access_key_id" secret_access_key = "secret_access_key" @@ -510,7 +521,16 @@ mod tests { assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap()); - match &dn_opts.storage.store { + assert!(matches!( + &dn_opts.storage.default_store, + datanode::config::ObjectStoreConfig::File(FileConfig { .. }) + )); + assert_eq!(dn_opts.storage.custom_stores.len(), 2); + assert!(matches!( + dn_opts.storage.custom_stores[0], + datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. }) + )); + match &dn_opts.storage.custom_stores[1] { datanode::config::ObjectStoreConfig::S3(s3_config) => { assert_eq!( "Secret([REDACTED alloc::string::String])".to_string(), diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 3c2e9ff88774..9b96d02c9431 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -48,6 +48,18 @@ pub enum ObjectStoreConfig { Gcs(GcsConfig), } +impl ObjectStoreConfig { + pub fn extract_variant_name(&self) -> &'static str { + match self { + Self::File(_) => "File", + Self::S3(_) => "S3", + Self::Oss(_) => "Oss", + Self::Azblob(_) => "Azblob", + Self::Gcs(_) => "Gcs", + } + } +} + /// Storage engine config #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] @@ -61,8 +73,8 @@ pub struct StorageConfig { pub global_ttl: Option, /// The working directory of database pub data_home: String, - #[serde(flatten)] - pub store: ObjectStoreConfig, + pub default_store: ObjectStoreConfig, + pub custom_stores: Vec, } impl Default for StorageConfig { @@ -70,7 +82,8 @@ impl Default for StorageConfig { Self { global_ttl: None, data_home: DEFAULT_DATA_HOME.to_string(), - store: ObjectStoreConfig::default(), + custom_stores: vec![], + default_store: ObjectStoreConfig::default(), } } } @@ -289,13 +302,13 @@ mod tests { #[test] fn test_secstr() { let toml_str = r#" - [storage] + [storage.default_store] type = "S3" access_key_id = "access_key_id" secret_access_key = "secret_access_key" "#; let opts: DatanodeOptions = toml::from_str(toml_str).unwrap(); - match opts.storage.store { + match &opts.storage.default_store { ObjectStoreConfig::S3(cfg) => { assert_eq!( "Secret([REDACTED alloc::string::String])".to_string(), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index f8a658f55aed..6436f6cee32e 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -354,7 +354,6 @@ impl DatanodeBuilder { )); } } - info!("going to open {} regions", regions.len()); let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); let mut tasks = vec![]; @@ -416,22 +415,10 @@ impl DatanodeBuilder { .context(RuntimeResourceSnafu)?, ); - let table_provider_factory = Arc::new(DummyTableProviderFactory); - - let mut region_server = RegionServer::with_table_provider( - query_engine, - runtime, - event_listener, - table_provider_factory, - ); - - let object_store = store::new_object_store(opts).await?; - let object_store_manager = ObjectStoreManager::new( - "default", // TODO: use a name which is set in the configuration when #919 is done. - object_store, - ); - let engines = - Self::build_store_engines(opts, log_store, Arc::new(object_store_manager)).await?; + let mut region_server = + RegionServer::new(query_engine.clone(), runtime.clone(), event_listener); + let object_store_manager = Self::build_object_store_manager(opts).await?; + let engines = Self::build_store_engines(opts, log_store, object_store_manager).await?; for engine in engines { region_server.register_engine(engine); } @@ -496,6 +483,23 @@ impl DatanodeBuilder { } Ok(engines) } + + /// Builds [ObjectStoreManager] + async fn build_object_store_manager(opts: &DatanodeOptions) -> Result { + let object_store = + store::new_object_store(opts.storage.default_store.clone(), &opts.storage.data_home) + .await?; + let default_name = opts.storage.default_store.extract_variant_name(); + let mut object_store_manager = ObjectStoreManager::new(default_name, object_store); + for store in &opts.storage.custom_stores { + let name = store.extract_variant_name(); + object_store_manager.add( + name, + store::new_object_store(store.clone(), &opts.storage.data_home).await?, + ); + } + Ok(Arc::new(object_store_manager)) + } } #[cfg(test)] diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 78528cf6ce6e..571b17f452b5 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -32,12 +32,15 @@ use object_store::util::normalize_dir; use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; -use crate::config::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; +use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; use crate::error::{self, Result}; -pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result { - let data_home = normalize_dir(&opts.storage.data_home); - let object_store = match &opts.storage.store { +pub(crate) async fn new_object_store( + store: ObjectStoreConfig, + data_home: &str, +) -> Result { + let data_home = normalize_dir(data_home); + let object_store = match &store { ObjectStoreConfig::File(file_config) => { fs::new_fs_object_store(&data_home, file_config).await } @@ -50,9 +53,8 @@ pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result ParserContext<'a> { } ); } + // Sorts options so that `test_display_create_table` can always pass. + let options = options.into_iter().sorted().collect(); let create_table = CreateTable { if_not_exists, name: table_name, diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 58957879016d..e7fabd5a2d92 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -241,7 +241,7 @@ mod tests { PARTITION r2 VALUES LESS THAN (MAXVALUE), ) engine=mito - with(regions=1, ttl='7d'); + with(regions=1, ttl='7d', storage='File'); "; let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap(); assert_eq!(1, result.len()); @@ -267,6 +267,7 @@ PARTITION BY RANGE COLUMNS (ts) ( ENGINE=mito WITH( regions = 1, + storage = 'File', ttl = '7d' )"#, &new_sql diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 9d78f21534fb..c3ffceec63e8 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -84,6 +84,7 @@ pub struct TableOptions { pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size"; pub const TTL_KEY: &str = "ttl"; pub const REGIONS_KEY: &str = "regions"; +pub const STORAGE_KEY: &str = "storage"; impl TryFrom<&HashMap> for TableOptions { type Error = error::Error; @@ -340,6 +341,7 @@ pub fn valid_table_option(key: &str) -> bool { | WRITE_BUFFER_SIZE_KEY | TTL_KEY | REGIONS_KEY + | STORAGE_KEY ) | is_supported_in_s3(key) } @@ -365,6 +367,7 @@ mod tests { assert!(valid_table_option(TTL_KEY)); assert!(valid_table_option(REGIONS_KEY)); assert!(valid_table_option(WRITE_BUFFER_SIZE_KEY)); + assert!(valid_table_option(STORAGE_KEY)); assert!(!valid_table_option("foo")); } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index c1862bcba6ed..23a7ccf7f607 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -63,6 +63,7 @@ pub struct GreptimeDbClusterBuilder { cluster_name: String, kv_backend: KvBackendRef, store_config: Option, + custom_store_types: Option>, datanodes: Option, } @@ -87,6 +88,7 @@ impl GreptimeDbClusterBuilder { cluster_name: cluster_name.to_string(), kv_backend, store_config: None, + custom_store_types: None, datanodes: None, } } @@ -96,6 +98,11 @@ impl GreptimeDbClusterBuilder { self } + pub fn with_custom_store_types(mut self, store_types: Vec) -> Self { + self.custom_store_types = Some(store_types); + self + } + pub fn with_datanodes(mut self, datanodes: u32) -> Self { self.datanodes = Some(datanodes); self @@ -171,14 +178,15 @@ impl GreptimeDbClusterBuilder { dir_guards.push(FileDirGuard::new(home_tmp_dir)); - create_datanode_opts(store_config.clone(), home_dir) + create_datanode_opts(store_config.clone(), vec![], home_dir) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( StorageType::File, + self.custom_store_types.clone().unwrap_or_default(), &format!("{}-dn-{}", self.cluster_name, datanode_id), ); - storage_guards.push(guard.storage_guard); + storage_guards.push(guard.storage_guards); dir_guards.push(guard.home_guard); opts @@ -190,7 +198,11 @@ impl GreptimeDbClusterBuilder { instances.insert(datanode_id, datanode); } - (instances, storage_guards, dir_guards) + ( + instances, + storage_guards.into_iter().flatten().collect(), + dir_guards, + ) } async fn wait_datanodes_alive( diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 33afcb02d64a..b11d42352871 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -37,7 +37,8 @@ pub struct GreptimeDbStandalone { pub struct GreptimeDbStandaloneBuilder { instance_name: String, - store_type: Option, + custom_store_types: Option>, + default_store: Option, plugin: Option, } @@ -45,14 +46,23 @@ impl GreptimeDbStandaloneBuilder { pub fn new(instance_name: &str) -> Self { Self { instance_name: instance_name.to_string(), - store_type: None, + custom_store_types: None, plugin: None, + default_store: None, } } - pub fn with_store_type(self, store_type: StorageType) -> Self { + pub fn with_default_store_type(self, store_type: StorageType) -> Self { Self { - store_type: Some(store_type), + default_store: Some(store_type), + ..self + } + } + + #[cfg(test)] + pub fn with_custom_store_types(self, store_types: Vec) -> Self { + Self { + custom_store_types: Some(store_types), ..self } } @@ -66,9 +76,11 @@ impl GreptimeDbStandaloneBuilder { } pub async fn build(self) -> GreptimeDbStandalone { - let store_type = self.store_type.unwrap_or(StorageType::File); + let default_store_type = self.default_store.unwrap_or(StorageType::File); + let store_types = self.custom_store_types.unwrap_or_default(); - let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, &self.instance_name); + let (opts, guard) = + create_tmp_dir_and_datanode_opts(default_store_type, store_types, &self.instance_name); let procedure_config = ProcedureConfig::default(); let kv_backend_config = KvBackendConfig::default(); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index ef47b3c2b974..284e557e61a1 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -77,6 +77,26 @@ impl Display for StorageType { } impl StorageType { + pub fn build_storage_types_based_on_env() -> Vec { + let mut storage_types = Vec::with_capacity(4); + storage_types.push(StorageType::File); + if let Ok(bucket) = env::var("GT_S3_BUCKET") { + if !bucket.is_empty() { + storage_types.push(StorageType::S3); + } + } + if env::var("GT_OSS_BUCKET").is_ok() { + storage_types.push(StorageType::Oss); + } + if env::var("GT_AZBLOB_CONTAINER").is_ok() { + storage_types.push(StorageType::Azblob); + } + if env::var("GT_GCS_BUCKET").is_ok() { + storage_types.push(StorageType::Gcs); + } + storage_types + } + pub fn test_on(&self) -> bool { let _ = dotenv::dotenv(); @@ -244,7 +264,7 @@ pub enum TempDirGuard { pub struct TestGuard { pub home_guard: FileDirGuard, - pub storage_guard: StorageGuard, + pub storage_guards: Vec, } pub struct FileDirGuard { @@ -261,42 +281,62 @@ pub struct StorageGuard(pub TempDirGuard); impl TestGuard { pub async fn remove_all(&mut self) { - if let TempDirGuard::S3(guard) - | TempDirGuard::Oss(guard) - | TempDirGuard::Azblob(guard) - | TempDirGuard::Gcs(guard) = &mut self.storage_guard.0 - { - guard.remove_all().await.unwrap() + for storage_guard in self.storage_guards.iter_mut() { + if let TempDirGuard::S3(guard) + | TempDirGuard::Oss(guard) + | TempDirGuard::Azblob(guard) + | TempDirGuard::Gcs(guard) = &mut storage_guard.0 + { + guard.remove_all().await.unwrap() + } } } } pub fn create_tmp_dir_and_datanode_opts( - store_type: StorageType, + default_store_type: StorageType, + custom_store_types: Vec, name: &str, ) -> (DatanodeOptions, TestGuard) { let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); - let (store, data_tmp_dir) = get_test_store_config(&store_type); - let opts = create_datanode_opts(store, home_dir); + // Excludes the default object store. + let mut custom_stores = Vec::with_capacity(custom_store_types.len()); + // Includes the default object store. + let mut storage_guards = Vec::with_capacity(custom_store_types.len() + 1); + + let (default_store, data_tmp_dir) = get_test_store_config(&default_store_type); + storage_guards.push(StorageGuard(data_tmp_dir)); + + for store_type in custom_store_types { + let (store, data_tmp_dir) = get_test_store_config(&store_type); + custom_stores.push(store); + storage_guards.push(StorageGuard(data_tmp_dir)) + } + let opts = create_datanode_opts(default_store, custom_stores, home_dir); ( opts, TestGuard { home_guard: FileDirGuard::new(home_tmp_dir), - storage_guard: StorageGuard(data_tmp_dir), + storage_guards, }, ) } -pub(crate) fn create_datanode_opts(store: ObjectStoreConfig, home_dir: String) -> DatanodeOptions { +pub(crate) fn create_datanode_opts( + default_store: ObjectStoreConfig, + custom_stores: Vec, + home_dir: String, +) -> DatanodeOptions { DatanodeOptions { node_id: Some(0), require_lease_before_startup: true, storage: StorageConfig { data_home: home_dir, - store, + custom_stores, + default_store, ..Default::default() }, mode: Mode::Standalone, @@ -325,7 +365,7 @@ async fn setup_standalone_instance( store_type: StorageType, ) -> GreptimeDbStandalone { GreptimeDbStandaloneBuilder::new(test_name) - .with_store_type(store_type) + .with_default_store_type(store_type) .build() .await } diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 5e0ecf62af44..8682c3942a76 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -31,8 +31,9 @@ use session::context::{QueryContext, QueryContextRef}; use crate::test_util::check_output_stream; use crate::tests::test_util::{ - both_instances_cases, check_unordered_output_stream, distributed, find_testing_resource, - prepare_path, standalone, standalone_instance_case, MockInstance, + both_instances_cases, both_instances_cases_with_custom_storages, check_unordered_output_stream, + distributed, distributed_with_multiple_object_stores, find_testing_resource, prepare_path, + standalone, standalone_instance_case, standalone_with_multiple_object_stores, MockInstance, }; #[apply(both_instances_cases)] @@ -1840,3 +1841,118 @@ async fn execute_sql_with( .await .unwrap() } + +#[apply(both_instances_cases_with_custom_storages)] +async fn test_custom_storage(instance: Arc) { + let frontend = instance.frontend(); + let custom_storages = [ + ("S3", "GT_S3_BUCKET"), + ("Oss", "GT_OSS_BUCKET"), + ("Azblob", "GT_AZBLOB_CONTAINER"), + ("Gcs", "GT_GCS_BUCKET"), + ]; + for (storage_name, custom_storage_env) in custom_storages { + if let Ok(env_value) = env::var(custom_storage_env) { + if env_value.is_empty() { + continue; + } + let sql = if instance.is_distributed_mode() { + format!( + r#"create table test_table( + host string, + ts timestamp time index, + ) + PARTITION BY RANGE COLUMNS (ts) ( + PARTITION r0 VALUES LESS THAN (1), + PARTITION r1 VALUES LESS THAN (10), + PARTITION r2 VALUES LESS THAN (100), + PARTITION r3 VALUES LESS THAN (MAXVALUE), + ) + with(storage='{storage_name}') + "# + ) + } else { + format!( + r#"create table test_table(host string, ts timestamp time index)with(storage='{storage_name}');"# + ) + }; + + let output = execute_sql(&instance.frontend(), &sql).await; + assert!(matches!(output, Output::AffectedRows(0))); + let output = execute_sql( + &frontend, + r#"insert into test_table(host, ts) values + ('host1', 1655276557000), + ('host2', 1655276558000) + "#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(2))); + + let output = execute_sql(&frontend, "select * from test_table").await; + let expected = "\ ++-------+---------------------+ +| host | ts | ++-------+---------------------+ +| host1 | 2022-06-15T07:02:37 | +| host2 | 2022-06-15T07:02:38 | ++-------+---------------------+"; + + check_output_stream(output, expected).await; + let output = execute_sql(&frontend, "show create table test_table").await; + let Output::RecordBatches(record_batches) = output else { + unreachable!() + }; + + let record_batches = record_batches.iter().collect::>(); + let column = record_batches[0].column_by_name("Create Table").unwrap(); + let actual = column.get(0); + + let expect = if instance.is_distributed_mode() { + format!( + r#"CREATE TABLE IF NOT EXISTS "test_table" ( + "host" STRING NULL, + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) +PARTITION BY RANGE COLUMNS ("ts") ( + PARTITION r0 VALUES LESS THAN ('1970-01-01 09:00:00.001+0900'), + PARTITION r1 VALUES LESS THAN ('1970-01-01 09:00:00.010+0900'), + PARTITION r2 VALUES LESS THAN ('1970-01-01 09:00:00.100+0900'), + PARTITION r3 VALUES LESS THAN (MAXVALUE) +) +ENGINE=mito +WITH( + regions = 4, + storage = '{storage_name}' +)"# + ) + } else { + format!( + r#"CREATE TABLE IF NOT EXISTS "test_table" ( + "host" STRING NULL, + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) + +ENGINE=mito +WITH( + regions = 1, + storage = '{storage_name}' +)"# + ) + }; + assert_eq!(actual.to_string(), expect,); + let output = execute_sql(&frontend, "truncate test_table").await; + assert!(matches!(output, Output::AffectedRows(0))); + let output = execute_sql(&frontend, "select * from test_table").await; + let expected = "\ +++ +++"; + + check_output_stream(output, expected).await; + let output = execute_sql(&frontend, "drop table test_table").await; + assert!(matches!(output, Output::AffectedRows(0))); + } + } +} diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 422eba4eaa19..4265e1fb1cf3 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -20,7 +20,9 @@ use common_test_util::find_workspace_path; use frontend::instance::Instance; use rstest_reuse::{self, template}; +use crate::cluster::GreptimeDbClusterBuilder; use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; +use crate::test_util::StorageType; use crate::tests::{create_distributed_instance, MockDistributedInstance}; pub(crate) trait MockInstance { @@ -61,6 +63,41 @@ pub(crate) async fn distributed() -> Arc { Arc::new(instance) } +pub(crate) async fn standalone_with_multiple_object_stores() -> Arc { + let _ = dotenv::dotenv(); + let test_name = uuid::Uuid::new_v4().to_string(); + let storage_types = StorageType::build_storage_types_based_on_env(); + let instance = GreptimeDbStandaloneBuilder::new(&test_name) + .with_custom_store_types(storage_types) + .build() + .await; + Arc::new(instance) +} + +pub(crate) async fn distributed_with_multiple_object_stores() -> Arc { + let _ = dotenv::dotenv(); + let test_name = uuid::Uuid::new_v4().to_string(); + let custom_storage_types = StorageType::build_storage_types_based_on_env(); + let cluster = GreptimeDbClusterBuilder::new(&test_name) + .with_custom_store_types(custom_storage_types) + .build() + .await; + Arc::new(MockDistributedInstance(cluster)) +} + +#[template] +#[rstest] +#[case::test_with_standalone(standalone_with_multiple_object_stores())] +#[case::test_with_distributed(distributed_with_multiple_object_stores())] +#[awt] +#[tokio::test(flavor = "multi_thread")] +pub(crate) fn both_instances_cases_with_custom_storages( + #[future] + #[case] + instance: Arc, +) { +} + #[template] #[rstest] #[case::test_with_standalone(standalone())] diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index b6caafdad641..67051b3cb616 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -711,6 +711,9 @@ read_batch_size = 128 sync_write = false [datanode.storage] +custom_stores = [] + +[datanode.storage.default_store] type = "{}" [[datanode.region_engine]] @@ -743,6 +746,7 @@ enable_otlp_tracing = false"#, num_cpus::get() / 2 ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); + assert_eq!(body_text, expected_toml_str); } diff --git a/tests/cases/standalone/common/show/show_create.result b/tests/cases/standalone/common/show/show_create.result index b871117b1ad9..0936ab2d7f2e 100644 --- a/tests/cases/standalone/common/show/show_create.result +++ b/tests/cases/standalone/common/show/show_create.result @@ -100,3 +100,24 @@ WITH( Error: 1004(InvalidArguments), Invalid table option key: foo +CREATE TABLE not_supported_table_storage_option ( + id INT UNSIGNED, + host STRING, + cpu DOUBLE, + disk FLOAT, + ts TIMESTAMP NOT NULL DEFAULT current_timestamp(), + TIME INDEX (ts), + PRIMARY KEY (id, host) +) +PARTITION BY RANGE COLUMNS (id) ( + PARTITION r0 VALUES LESS THAN (5), + PARTITION r1 VALUES LESS THAN (9), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +) +ENGINE=mito +WITH( + storage = 'S3' +); + +Error: 1004(InvalidArguments), Object store not found: s3 + diff --git a/tests/cases/standalone/common/show/show_create.sql b/tests/cases/standalone/common/show/show_create.sql index 56b56baadcbc..b64a9cbbde56 100644 --- a/tests/cases/standalone/common/show/show_create.sql +++ b/tests/cases/standalone/common/show/show_create.sql @@ -50,3 +50,21 @@ WITH( ttl = '7d', write_buffer_size = 1024 ); +CREATE TABLE not_supported_table_storage_option ( + id INT UNSIGNED, + host STRING, + cpu DOUBLE, + disk FLOAT, + ts TIMESTAMP NOT NULL DEFAULT current_timestamp(), + TIME INDEX (ts), + PRIMARY KEY (id, host) +) +PARTITION BY RANGE COLUMNS (id) ( + PARTITION r0 VALUES LESS THAN (5), + PARTITION r1 VALUES LESS THAN (9), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +) +ENGINE=mito +WITH( + storage = 'S3' +); \ No newline at end of file From 5116fcf7bf1428581079ae3de36e102021a4c9cc Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Tue, 14 Nov 2023 22:15:09 +0900 Subject: [PATCH 02/13] refactor: rename extract_variant_name to name --- src/datanode/src/config.rs | 2 +- src/datanode/src/datanode.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 9b96d02c9431..769a661051f5 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -49,7 +49,7 @@ pub enum ObjectStoreConfig { } impl ObjectStoreConfig { - pub fn extract_variant_name(&self) -> &'static str { + pub fn name(&self) -> &'static str { match self { Self::File(_) => "File", Self::S3(_) => "S3", diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 6436f6cee32e..adf31c9f64f2 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -489,12 +489,11 @@ impl DatanodeBuilder { let object_store = store::new_object_store(opts.storage.default_store.clone(), &opts.storage.data_home) .await?; - let default_name = opts.storage.default_store.extract_variant_name(); + let default_name = opts.storage.default_store.name(); let mut object_store_manager = ObjectStoreManager::new(default_name, object_store); for store in &opts.storage.custom_stores { - let name = store.extract_variant_name(); object_store_manager.add( - name, + store.name(), store::new_object_store(store.clone(), &opts.storage.data_home).await?, ); } From c56f02d519c884c3c873a9dc02eaac254920590a Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Wed, 15 Nov 2023 08:01:51 +0900 Subject: [PATCH 03/13] chore: add blank --- tests/cases/standalone/common/show/show_create.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cases/standalone/common/show/show_create.sql b/tests/cases/standalone/common/show/show_create.sql index b64a9cbbde56..50fb9b5a85da 100644 --- a/tests/cases/standalone/common/show/show_create.sql +++ b/tests/cases/standalone/common/show/show_create.sql @@ -67,4 +67,4 @@ PARTITION BY RANGE COLUMNS (id) ( ENGINE=mito WITH( storage = 'S3' -); \ No newline at end of file +); From 16fa098431b4f0f73b4cfc03a32e42d0e576f90f Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Wed, 15 Nov 2023 08:16:04 +0900 Subject: [PATCH 04/13] chore: keep compatible --- src/cmd/src/datanode.rs | 2 +- src/cmd/src/options.rs | 4 +--- src/cmd/src/standalone.rs | 2 +- src/datanode/src/config.rs | 6 ++++-- src/datanode/src/datanode.rs | 5 ++--- tests-integration/src/test_util.rs | 2 +- 6 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index f882798bc3dd..6ea152a4471a 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -314,7 +314,7 @@ mod tests { assert!(tcp_nodelay); assert_eq!("/tmp/greptimedb/", options.storage.data_home); assert!(matches!( - &options.storage.default_store, + &options.storage.store, ObjectStoreConfig::File(FileConfig { .. }) )); assert_eq!(options.storage.custom_stores.len(), 2); diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 477d23cd0a3c..5e730de7d2c5 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -193,7 +193,6 @@ mod tests { [ env_prefix.to_string(), "storage".to_uppercase(), - "default_store".to_uppercase(), "type".to_uppercase(), ] .join(ENV_VAR_SEP), @@ -204,7 +203,6 @@ mod tests { [ env_prefix.to_string(), "storage".to_uppercase(), - "default_store".to_uppercase(), "bucket".to_uppercase(), ] .join(ENV_VAR_SEP), @@ -240,7 +238,7 @@ mod tests { .unwrap(); // Check the configs from environment variables. - match &opts.storage.default_store { + match &opts.storage.store { ObjectStoreConfig::S3(s3_config) => { assert_eq!(s3_config.bucket, "mybucket".to_string()); } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index a6cba4c2a175..cfc5f900a4e7 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -522,7 +522,7 @@ mod tests { assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap()); assert!(matches!( - &dn_opts.storage.default_store, + &dn_opts.storage.store, datanode::config::ObjectStoreConfig::File(FileConfig { .. }) )); assert_eq!(dn_opts.storage.custom_stores.len(), 2); diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 769a661051f5..ba1387cd7a7f 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -73,7 +73,8 @@ pub struct StorageConfig { pub global_ttl: Option, /// The working directory of database pub data_home: String, - pub default_store: ObjectStoreConfig, + #[serde(flatten)] + pub store: ObjectStoreConfig, pub custom_stores: Vec, } @@ -82,6 +83,7 @@ impl Default for StorageConfig { Self { global_ttl: None, data_home: DEFAULT_DATA_HOME.to_string(), + store: ObjectStoreConfig::default(), custom_stores: vec![], default_store: ObjectStoreConfig::default(), } @@ -308,7 +310,7 @@ mod tests { secret_access_key = "secret_access_key" "#; let opts: DatanodeOptions = toml::from_str(toml_str).unwrap(); - match &opts.storage.default_store { + match &opts.storage.store { ObjectStoreConfig::S3(cfg) => { assert_eq!( "Secret([REDACTED alloc::string::String])".to_string(), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index adf31c9f64f2..f505873c9f78 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -487,9 +487,8 @@ impl DatanodeBuilder { /// Builds [ObjectStoreManager] async fn build_object_store_manager(opts: &DatanodeOptions) -> Result { let object_store = - store::new_object_store(opts.storage.default_store.clone(), &opts.storage.data_home) - .await?; - let default_name = opts.storage.default_store.name(); + store::new_object_store(opts.storage.store.clone(), &opts.storage.data_home).await?; + let default_name = opts.storage.store.name(); let mut object_store_manager = ObjectStoreManager::new(default_name, object_store); for store in &opts.storage.custom_stores { object_store_manager.add( diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 284e557e61a1..851ee07ba3ef 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -336,7 +336,7 @@ pub(crate) fn create_datanode_opts( storage: StorageConfig { data_home: home_dir, custom_stores, - default_store, + store: default_store, ..Default::default() }, mode: Mode::Standalone, From 6e73df6ec382e40dba593af0f26075510c979160 Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Wed, 15 Nov 2023 08:47:32 +0900 Subject: [PATCH 05/13] feat: rename custom_stores to providers --- src/cmd/src/datanode.rs | 10 +++++----- src/cmd/src/standalone.rs | 10 +++++----- src/datanode/src/config.rs | 5 ++--- src/datanode/src/datanode.rs | 2 +- tests-integration/src/test_util.rs | 10 +++++----- tests-integration/tests/http.rs | 5 +---- 6 files changed, 19 insertions(+), 23 deletions(-) diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 6ea152a4471a..f54b97cb220b 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -257,12 +257,12 @@ mod tests { [storage.default_store] type = "File" - [[storage.custom_stores]] + [[storage.providers]] type = "Gcs" bucket = "foo" endpoint = "bar" - [[storage.custom_stores]] + [[storage.providers]] type = "S3" bucket = "foo" @@ -317,13 +317,13 @@ mod tests { &options.storage.store, ObjectStoreConfig::File(FileConfig { .. }) )); - assert_eq!(options.storage.custom_stores.len(), 2); + assert_eq!(options.storage.providers.len(), 2); assert!(matches!( - options.storage.custom_stores[0], + options.storage.providers[0], ObjectStoreConfig::Gcs(GcsConfig { .. }) )); assert!(matches!( - options.storage.custom_stores[1], + options.storage.providers[1], ObjectStoreConfig::S3(S3Config { .. }) )); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index cfc5f900a4e7..1a78fd78a8ce 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -466,12 +466,12 @@ mod tests { [storage.default_store] type = "File" - [[storage.custom_stores]] + [[storage.providers]] type = "Gcs" bucket = "foo" endpoint = "bar" - [[storage.custom_stores]] + [[storage.providers]] type = "S3" access_key_id = "access_key_id" secret_access_key = "secret_access_key" @@ -525,12 +525,12 @@ mod tests { &dn_opts.storage.store, datanode::config::ObjectStoreConfig::File(FileConfig { .. }) )); - assert_eq!(dn_opts.storage.custom_stores.len(), 2); + assert_eq!(dn_opts.storage.providers.len(), 2); assert!(matches!( - dn_opts.storage.custom_stores[0], + dn_opts.storage.providers[0], datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. }) )); - match &dn_opts.storage.custom_stores[1] { + match &dn_opts.storage.providers[1] { datanode::config::ObjectStoreConfig::S3(s3_config) => { assert_eq!( "Secret([REDACTED alloc::string::String])".to_string(), diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index ba1387cd7a7f..c92366b61828 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -75,7 +75,7 @@ pub struct StorageConfig { pub data_home: String, #[serde(flatten)] pub store: ObjectStoreConfig, - pub custom_stores: Vec, + pub providers: Vec, } impl Default for StorageConfig { @@ -84,8 +84,7 @@ impl Default for StorageConfig { global_ttl: None, data_home: DEFAULT_DATA_HOME.to_string(), store: ObjectStoreConfig::default(), - custom_stores: vec![], - default_store: ObjectStoreConfig::default(), + providers: vec![], } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index f505873c9f78..150b6caee599 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -490,7 +490,7 @@ impl DatanodeBuilder { store::new_object_store(opts.storage.store.clone(), &opts.storage.data_home).await?; let default_name = opts.storage.store.name(); let mut object_store_manager = ObjectStoreManager::new(default_name, object_store); - for store in &opts.storage.custom_stores { + for store in &opts.storage.providers { object_store_manager.add( store.name(), store::new_object_store(store.clone(), &opts.storage.data_home).await?, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 851ee07ba3ef..4b0572fc32a6 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -302,7 +302,7 @@ pub fn create_tmp_dir_and_datanode_opts( let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); // Excludes the default object store. - let mut custom_stores = Vec::with_capacity(custom_store_types.len()); + let mut providers = Vec::with_capacity(custom_store_types.len()); // Includes the default object store. let mut storage_guards = Vec::with_capacity(custom_store_types.len() + 1); @@ -311,10 +311,10 @@ pub fn create_tmp_dir_and_datanode_opts( for store_type in custom_store_types { let (store, data_tmp_dir) = get_test_store_config(&store_type); - custom_stores.push(store); + providers.push(store); storage_guards.push(StorageGuard(data_tmp_dir)) } - let opts = create_datanode_opts(default_store, custom_stores, home_dir); + let opts = create_datanode_opts(default_store, providers, home_dir); ( opts, @@ -327,7 +327,7 @@ pub fn create_tmp_dir_and_datanode_opts( pub(crate) fn create_datanode_opts( default_store: ObjectStoreConfig, - custom_stores: Vec, + providers: Vec, home_dir: String, ) -> DatanodeOptions { DatanodeOptions { @@ -335,7 +335,7 @@ pub(crate) fn create_datanode_opts( require_lease_before_startup: true, storage: StorageConfig { data_home: home_dir, - custom_stores, + providers, store: default_store, ..Default::default() }, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 67051b3cb616..bea3f0f8e9a1 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -711,10 +711,8 @@ read_batch_size = 128 sync_write = false [datanode.storage] -custom_stores = [] - -[datanode.storage.default_store] type = "{}" +providers = [] [[datanode.region_engine]] @@ -746,7 +744,6 @@ enable_otlp_tracing = false"#, num_cpus::get() / 2 ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); - assert_eq!(body_text, expected_toml_str); } From 21a87f9689b86bc732a32018f91bfbf9a0caa857 Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Thu, 16 Nov 2023 12:47:26 +0900 Subject: [PATCH 06/13] chore: rename --- tests-integration/src/cluster.rs | 10 +++++----- tests-integration/src/standalone.rs | 10 +++++----- tests-integration/src/test_util.rs | 12 ++++++------ tests-integration/src/tests/test_util.rs | 6 +++--- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 23a7ccf7f607..a37c120b70af 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -63,7 +63,7 @@ pub struct GreptimeDbClusterBuilder { cluster_name: String, kv_backend: KvBackendRef, store_config: Option, - custom_store_types: Option>, + store_providers: Option>, datanodes: Option, } @@ -88,7 +88,7 @@ impl GreptimeDbClusterBuilder { cluster_name: cluster_name.to_string(), kv_backend, store_config: None, - custom_store_types: None, + store_providers: None, datanodes: None, } } @@ -98,8 +98,8 @@ impl GreptimeDbClusterBuilder { self } - pub fn with_custom_store_types(mut self, store_types: Vec) -> Self { - self.custom_store_types = Some(store_types); + pub fn with_store_providers(mut self, store_providers: Vec) -> Self { + self.store_providers = Some(store_providers); self } @@ -182,7 +182,7 @@ impl GreptimeDbClusterBuilder { } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( StorageType::File, - self.custom_store_types.clone().unwrap_or_default(), + self.store_providers.clone().unwrap_or_default(), &format!("{}-dn-{}", self.cluster_name, datanode_id), ); diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index b11d42352871..7bd36962cd07 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -37,7 +37,7 @@ pub struct GreptimeDbStandalone { pub struct GreptimeDbStandaloneBuilder { instance_name: String, - custom_store_types: Option>, + store_providers: Option>, default_store: Option, plugin: Option, } @@ -46,7 +46,7 @@ impl GreptimeDbStandaloneBuilder { pub fn new(instance_name: &str) -> Self { Self { instance_name: instance_name.to_string(), - custom_store_types: None, + store_providers: None, plugin: None, default_store: None, } @@ -60,9 +60,9 @@ impl GreptimeDbStandaloneBuilder { } #[cfg(test)] - pub fn with_custom_store_types(self, store_types: Vec) -> Self { + pub fn with_store_providers(self, store_providers: Vec) -> Self { Self { - custom_store_types: Some(store_types), + store_providers: Some(store_providers), ..self } } @@ -77,7 +77,7 @@ impl GreptimeDbStandaloneBuilder { pub async fn build(self) -> GreptimeDbStandalone { let default_store_type = self.default_store.unwrap_or(StorageType::File); - let store_types = self.custom_store_types.unwrap_or_default(); + let store_types = self.store_providers.unwrap_or_default(); let (opts, guard) = create_tmp_dir_and_datanode_opts(default_store_type, store_types, &self.instance_name); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 4b0572fc32a6..ad69c40ada33 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -295,26 +295,26 @@ impl TestGuard { pub fn create_tmp_dir_and_datanode_opts( default_store_type: StorageType, - custom_store_types: Vec, + store_provider_types: Vec, name: &str, ) -> (DatanodeOptions, TestGuard) { let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); // Excludes the default object store. - let mut providers = Vec::with_capacity(custom_store_types.len()); + let mut store_providers = Vec::with_capacity(store_provider_types.len()); // Includes the default object store. - let mut storage_guards = Vec::with_capacity(custom_store_types.len() + 1); + let mut storage_guards = Vec::with_capacity(store_provider_types.len() + 1); let (default_store, data_tmp_dir) = get_test_store_config(&default_store_type); storage_guards.push(StorageGuard(data_tmp_dir)); - for store_type in custom_store_types { + for store_type in store_provider_types { let (store, data_tmp_dir) = get_test_store_config(&store_type); - providers.push(store); + store_providers.push(store); storage_guards.push(StorageGuard(data_tmp_dir)) } - let opts = create_datanode_opts(default_store, providers, home_dir); + let opts = create_datanode_opts(default_store, store_providers, home_dir); ( opts, diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 4265e1fb1cf3..23bec64a701f 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -68,7 +68,7 @@ pub(crate) async fn standalone_with_multiple_object_stores() -> Arc Arc Arc { let _ = dotenv::dotenv(); let test_name = uuid::Uuid::new_v4().to_string(); - let custom_storage_types = StorageType::build_storage_types_based_on_env(); + let providers = StorageType::build_storage_types_based_on_env(); let cluster = GreptimeDbClusterBuilder::new(&test_name) - .with_custom_store_types(custom_storage_types) + .with_store_providers(providers) .build() .await; Arc::new(MockDistributedInstance(cluster)) From 85a67d110161d1fb86b8e1224fd28bd4edae2f87 Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Thu, 16 Nov 2023 21:45:58 +0900 Subject: [PATCH 07/13] chore: config --- config/datanode.example.toml | 2 ++ config/standalone.example.toml | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 2a8d4e8ebd2c..b38343c22de7 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -43,6 +43,8 @@ sync_write = false [storage] # The working home directory. data_home = "/tmp/greptimedb/" +# Storage type. +type = "File" # TTL for all tables. Disabled by default. # global_ttl = "7d" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 248014ff7f8d..629c9bdb9a73 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -113,14 +113,14 @@ retry_delay = "500ms" [storage] # The working home directory. data_home = "/tmp/greptimedb/" +# Storage type. +type = "File" # TTL for all tables. Disabled by default. # global_ttl = "7d" # Cache configuration for object storage such as 'S3' etc. # cache_path = "/path/local_cache" # The local file cache capacity in bytes. # cache_capacity = "256MB" -[storage.default_store] -type = "File" # Mito engine options [[region_engine]] From 40936586b33c0908f288ce8be6bc6151d4766e7a Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Fri, 17 Nov 2023 08:27:17 +0900 Subject: [PATCH 08/13] refactor: add should_retry in client Error --- src/client/src/error.rs | 12 ++++++++++++ src/client/src/region.rs | 10 +--------- src/datanode/src/config.rs | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/client/src/error.rs b/src/client/src/error.rs index 6b25e5c58ac3..ae573d037de3 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -131,3 +131,15 @@ impl From for Error { Self::Server { code, msg } } } + +impl Error { + pub fn should_retry(&self) -> bool { + !matches!( + self, + Self::RegionServer { + code: Code::InvalidArgument, + .. + } + ) + } +} diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 3c1d7365ebce..95bef40b2ac2 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -28,9 +28,7 @@ use common_telemetry::error; use prost::Message; use snafu::{location, Location, OptionExt, ResultExt}; use tokio_stream::StreamExt; -use tonic::Code; -use crate::error::Error::RegionServer; use crate::error::{ self, ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu, MissingFieldSnafu, Result, ServerSnafu, @@ -46,13 +44,7 @@ pub struct RegionRequester { impl Datanode for RegionRequester { async fn handle(&self, request: RegionRequest) -> MetaResult { self.handle_inner(request).await.map_err(|err| { - if !matches!( - err, - RegionServer { - code: Code::InvalidArgument, - .. - } - ) { + if err.should_retry() { meta_error::Error::RetryLater { source: BoxedError::new(err), } diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index c92366b61828..f26b0828c873 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -303,7 +303,7 @@ mod tests { #[test] fn test_secstr() { let toml_str = r#" - [storage.default_store] + [storage] type = "S3" access_key_id = "access_key_id" secret_access_key = "secret_access_key" From 18050e0facaba4a174242cd3ebc361709d51b0c3 Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Mon, 20 Nov 2023 21:09:47 +0900 Subject: [PATCH 09/13] fix: test fail --- tests-integration/src/tests/instance_test.rs | 44 ++++++++++---------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 8682c3942a76..6bf5fc0d4784 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -1859,10 +1859,10 @@ async fn test_custom_storage(instance: Arc) { let sql = if instance.is_distributed_mode() { format!( r#"create table test_table( - host string, + a int null primary key, ts timestamp time index, ) - PARTITION BY RANGE COLUMNS (ts) ( + PARTITION BY RANGE COLUMNS (a) ( PARTITION r0 VALUES LESS THAN (1), PARTITION r1 VALUES LESS THAN (10), PARTITION r2 VALUES LESS THAN (100), @@ -1873,7 +1873,7 @@ async fn test_custom_storage(instance: Arc) { ) } else { format!( - r#"create table test_table(host string, ts timestamp time index)with(storage='{storage_name}');"# + r#"create table test_table(a int primary key, ts timestamp time index)with(storage='{storage_name}');"# ) }; @@ -1881,9 +1881,9 @@ async fn test_custom_storage(instance: Arc) { assert!(matches!(output, Output::AffectedRows(0))); let output = execute_sql( &frontend, - r#"insert into test_table(host, ts) values - ('host1', 1655276557000), - ('host2', 1655276558000) + r#"insert into test_table(a, ts) values + (1, 1655276557000), + (1000, 1655276558000) "#, ) .await; @@ -1891,12 +1891,12 @@ async fn test_custom_storage(instance: Arc) { let output = execute_sql(&frontend, "select * from test_table").await; let expected = "\ -+-------+---------------------+ -| host | ts | -+-------+---------------------+ -| host1 | 2022-06-15T07:02:37 | -| host2 | 2022-06-15T07:02:38 | -+-------+---------------------+"; ++------+---------------------+ +| a | ts | ++------+---------------------+ +| 1 | 2022-06-15T07:02:37 | +| 1000 | 2022-06-15T07:02:38 | ++------+---------------------+"; check_output_stream(output, expected).await; let output = execute_sql(&frontend, "show create table test_table").await; @@ -1911,14 +1911,15 @@ async fn test_custom_storage(instance: Arc) { let expect = if instance.is_distributed_mode() { format!( r#"CREATE TABLE IF NOT EXISTS "test_table" ( - "host" STRING NULL, + "a" INT NULL, "ts" TIMESTAMP(3) NOT NULL, - TIME INDEX ("ts") + TIME INDEX ("ts"), + PRIMARY KEY ("a") ) -PARTITION BY RANGE COLUMNS ("ts") ( - PARTITION r0 VALUES LESS THAN ('1970-01-01 09:00:00.001+0900'), - PARTITION r1 VALUES LESS THAN ('1970-01-01 09:00:00.010+0900'), - PARTITION r2 VALUES LESS THAN ('1970-01-01 09:00:00.100+0900'), +PARTITION BY RANGE COLUMNS ("a") ( + PARTITION r0 VALUES LESS THAN (1), + PARTITION r1 VALUES LESS THAN (10), + PARTITION r2 VALUES LESS THAN (100), PARTITION r3 VALUES LESS THAN (MAXVALUE) ) ENGINE=mito @@ -1930,9 +1931,10 @@ WITH( } else { format!( r#"CREATE TABLE IF NOT EXISTS "test_table" ( - "host" STRING NULL, + "a" INT NULL, "ts" TIMESTAMP(3) NOT NULL, - TIME INDEX ("ts") + TIME INDEX ("ts"), + PRIMARY KEY ("a") ) ENGINE=mito @@ -1942,7 +1944,7 @@ WITH( )"# ) }; - assert_eq!(actual.to_string(), expect,); + assert_eq!(actual.to_string(), expect); let output = execute_sql(&frontend, "truncate test_table").await; assert!(matches!(output, Output::AffectedRows(0))); let output = execute_sql(&frontend, "select * from test_table").await; From 9033c6487b97d7dc9278f29e39f17fdcf724991a Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Thu, 30 Nov 2023 12:29:03 +0900 Subject: [PATCH 10/13] chore: remove unused options --- src/cmd/src/datanode.rs | 5 +---- src/cmd/src/standalone.rs | 1 - tests-integration/src/tests/test_util.rs | 1 + 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index f54b97cb220b..e1d3163d0164 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -214,9 +214,7 @@ mod tests { use std::time::Duration; use common_test_util::temp_dir::create_named_temp_file; - use datanode::config::{ - FileConfig, GcsConfig, ObjectStoreConfig, S3Config, - }; + use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config}; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; @@ -254,7 +252,6 @@ mod tests { [storage] data_home = "/tmp/greptimedb/" - [storage.default_store] type = "File" [[storage.providers]] diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 1a78fd78a8ce..0c638d2f6b7f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -463,7 +463,6 @@ mod tests { [storage] data_home = "/tmp/greptimedb/" - [storage.default_store] type = "File" [[storage.providers]] diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 23bec64a701f..edf21ba7601d 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -79,6 +79,7 @@ pub(crate) async fn distributed_with_multiple_object_stores() -> Arc Date: Wed, 6 Dec 2023 23:50:43 +0900 Subject: [PATCH 11/13] chore: remove unused import --- src/datanode/src/datanode.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 150b6caee599..9624a7abecad 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -64,7 +64,7 @@ use crate::event_listener::{ }; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::HeartbeatTask; -use crate::region_server::{DummyTableProviderFactory, RegionServer}; +use crate::region_server::RegionServer; use crate::store; const OPEN_REGION_PARALLELISM: usize = 16; From 853c879571fd0d632981ae42d30c30705b6538b3 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Wed, 6 Dec 2023 23:09:47 +0800 Subject: [PATCH 12/13] chore: remove the blanks. --- src/cmd/src/standalone.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 0c638d2f6b7f..09dcd145b8d8 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -460,7 +460,6 @@ mod tests { purge_interval = "10m" read_batch_size = 128 sync_write = false - [storage] data_home = "/tmp/greptimedb/" type = "File" From c835ec47a90bdab070993a245a6432997629dbd2 Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Thu, 7 Dec 2023 00:48:46 +0900 Subject: [PATCH 13/13] chore: revert --- src/datanode/src/datanode.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 9624a7abecad..8910f444e328 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -64,7 +64,7 @@ use crate::event_listener::{ }; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::HeartbeatTask; -use crate::region_server::RegionServer; +use crate::region_server::{DummyTableProviderFactory, RegionServer}; use crate::store; const OPEN_REGION_PARALLELISM: usize = 16; @@ -415,8 +415,14 @@ impl DatanodeBuilder { .context(RuntimeResourceSnafu)?, ); - let mut region_server = - RegionServer::new(query_engine.clone(), runtime.clone(), event_listener); + let table_provider_factory = Arc::new(DummyTableProviderFactory); + let mut region_server = RegionServer::with_table_provider( + query_engine, + runtime, + event_listener, + table_provider_factory, + ); + let object_store_manager = Self::build_object_store_manager(opts).await?; let engines = Self::build_store_engines(opts, log_store, object_store_manager).await?; for engine in engines {