diff --git a/Cargo.lock b/Cargo.lock index e80f9ed742d0..1e6d00e94206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10114,6 +10114,7 @@ dependencies = [ "paste", "prost 0.12.3", "query", + "rand", "rstest", "rstest_reuse", "script", diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 16c11b8e4cdd..5d508adbf659 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -480,6 +480,7 @@ impl StartCommand { table_metadata_manager, table_meta_allocator, Arc::new(MemoryRegionKeeper::default()), + true, ) .context(InitDdlManagerSnafu)?, ); diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index db435d930d30..e63477f47562 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -79,6 +79,7 @@ impl DdlManager { table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, + register_loaders: bool, ) -> Result { let manager = Self { procedure_manager, @@ -88,7 +89,9 @@ impl DdlManager { table_metadata_allocator, memory_region_keeper, }; - manager.register_loaders()?; + if register_loaders { + manager.register_loaders()?; + } Ok(manager) } @@ -767,6 +770,7 @@ mod tests { Arc::new(WalOptionsAllocator::default()), )), Arc::new(MemoryRegionKeeper::default()), + true, ); let expected_loaders = vec![ diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 378d5f42dcc9..e7a561f87bdd 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -415,6 +415,7 @@ fn build_ddl_manager( table_metadata_manager.clone(), table_metadata_allocator.clone(), memory_region_keeper.clone(), + true, ) .context(error::InitDdlManagerSnafu)?, )) diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 1c95982f6181..1e549c6f9337 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -77,6 +77,7 @@ opentelemetry-proto.workspace = true partition.workspace = true paste.workspace = true prost.workspace = true +rand.workspace = true script.workspace = true session = { workspace = true, features = ["testing"] } store-api.workspace = true diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index d427b2a8f27b..600eab950fbd 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -77,8 +77,8 @@ pub struct GreptimeDbClusterBuilder { store_config: Option, store_providers: Option>, datanodes: Option, - wal_config: DatanodeWalConfig, - meta_wal_config: MetaSrvWalConfig, + datanode_wal_config: DatanodeWalConfig, + metasrv_wal_config: MetaSrvWalConfig, shared_home_dir: Option>, meta_selector: Option, } @@ -108,8 +108,8 @@ impl GreptimeDbClusterBuilder { store_config: None, store_providers: None, datanodes: None, - wal_config: DatanodeWalConfig::default(), - meta_wal_config: MetaSrvWalConfig::default(), + datanode_wal_config: DatanodeWalConfig::default(), + metasrv_wal_config: MetaSrvWalConfig::default(), shared_home_dir: None, meta_selector: None, } @@ -134,14 +134,14 @@ impl GreptimeDbClusterBuilder { } #[must_use] - pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self { - self.wal_config = wal_config; + pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self { + self.datanode_wal_config = datanode_wal_config; self } #[must_use] - pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self { - self.meta_wal_config = wal_meta; + pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self { + self.metasrv_wal_config = metasrv_wal_config; self } @@ -174,7 +174,7 @@ impl GreptimeDbClusterBuilder { max_retry_times: 5, retry_delay: Duration::from_secs(1), }, - wal: self.meta_wal_config.clone(), + wal: self.metasrv_wal_config.clone(), ..Default::default() }; @@ -249,7 +249,7 @@ impl GreptimeDbClusterBuilder { store_config.clone(), vec![], home_dir, - self.wal_config.clone(), + self.datanode_wal_config.clone(), ) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( @@ -257,7 +257,7 @@ impl GreptimeDbClusterBuilder { StorageType::File, self.store_providers.clone().unwrap_or_default(), &format!("{}-dn-{}", self.cluster_name, datanode_id), - self.wal_config.clone(), + self.datanode_wal_config.clone(), ); storage_guards.push(guard.storage_guards); diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index edb88136d9c8..d3e700151345 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] + pub mod cluster; mod grpc; mod influxdb; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index b76e6ed38ccb..5360f758c8b9 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -50,8 +50,8 @@ pub struct GreptimeDbStandalone { pub struct GreptimeDbStandaloneBuilder { instance_name: String, - wal_config: DatanodeWalConfig, - meta_wal_config: MetaSrvWalConfig, + datanode_wal_config: DatanodeWalConfig, + metasrv_wal_config: MetaSrvWalConfig, store_providers: Option>, default_store: Option, plugin: Option, @@ -64,8 +64,8 @@ impl GreptimeDbStandaloneBuilder { store_providers: None, plugin: None, default_store: None, - wal_config: DatanodeWalConfig::default(), - meta_wal_config: MetaSrvWalConfig::default(), + datanode_wal_config: DatanodeWalConfig::default(), + metasrv_wal_config: MetaSrvWalConfig::default(), } } @@ -96,23 +96,24 @@ impl GreptimeDbStandaloneBuilder { } #[must_use] - pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self { - self.wal_config = wal_config; + pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self { + self.datanode_wal_config = datanode_wal_config; self } #[must_use] - pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self { - self.meta_wal_config = wal_meta; + pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self { + self.metasrv_wal_config = metasrv_wal_config; self } pub async fn build_with( &self, kv_backend: KvBackendRef, - procedure_manager: ProcedureManagerRef, guard: TestGuard, mix_options: MixOptions, + procedure_manager: ProcedureManagerRef, + register_procedure_loaders: bool, ) -> GreptimeDbStandalone { let plugins = self.plugin.clone().unwrap_or_default(); @@ -153,6 +154,7 @@ impl GreptimeDbStandaloneBuilder { table_metadata_manager, table_meta_allocator, Arc::new(MemoryRegionKeeper::default()), + register_procedure_loaders, ) .unwrap(), ); @@ -194,7 +196,7 @@ impl GreptimeDbStandaloneBuilder { default_store_type, store_types, &self.instance_name, - self.wal_config.clone(), + self.datanode_wal_config.clone(), ); let kv_backend_config = KvBackendConfig::default(); @@ -207,7 +209,7 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); - let wal_meta = self.meta_wal_config.clone(); + let wal_meta = self.metasrv_wal_config.clone(); let mix_options = MixOptions { data_home: opts.storage.data_home.to_string(), procedure: procedure_config, @@ -218,7 +220,7 @@ impl GreptimeDbStandaloneBuilder { wal_meta, }; - self.build_with(kv_backend, procedure_manager, guard, mix_options) + self.build_with(kv_backend, guard, mix_options, procedure_manager, true) .await } } diff --git a/tests-integration/src/tests/instance_kafka_wal_test.rs b/tests-integration/src/tests/instance_kafka_wal_test.rs index 2135c498239a..031fc16bc310 100644 --- a/tests-integration/src/tests/instance_kafka_wal_test.rs +++ b/tests-integration/src/tests/instance_kafka_wal_test.rs @@ -12,31 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use client::OutputData; -use common_query::Output; -use common_recordbatch::util; +use client::DEFAULT_CATALOG_NAME; +use common_query::{Output, OutputData}; use datatypes::vectors::{TimestampMillisecondVector, VectorRef}; -use frontend::error::Result; use frontend::instance::Instance; +use itertools::Itertools; +use rand::rngs::ThreadRng; +use rand::Rng; use rstest::rstest; use rstest_reuse::apply; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; +use tokio::sync::Mutex; use crate::tests::test_util::*; #[apply(both_instances_cases_with_kafka_wal)] -async fn test_create_database_and_insert_query(instance: Option>) { - let Some(instance) = instance else { return }; - +async fn test_create_database_and_insert_query( + rebuildable_instance: Option>, +) { + let Some(instance) = rebuildable_instance else { + return; + }; let instance = instance.frontend(); - let output = execute_sql(&instance, "create database test").await; - assert!(matches!(output.data, OutputData::AffectedRows(1))); + let output = execute_sql_with( + &instance, + "create database test", + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await; + assert_matches!(output.data, OutputData::AffectedRows(1)); - let output = execute_sql( + let output = execute_sql_with( &instance, r#"create table greptime.test.demo( host STRING, @@ -44,25 +56,32 @@ async fn test_create_database_and_insert_query(instance: Option { - let batches = util::collect(s).await.unwrap(); + let batches = common_recordbatch::util::collect(s).await.unwrap(); assert_eq!(1, batches[0].num_columns()); assert_eq!( Arc::new(TimestampMillisecondVector::from_vec(vec![ @@ -75,24 +94,216 @@ async fn test_create_database_and_insert_query(instance: Option, sql: &str) -> Output { - execute_sql_with(instance, sql, QueryContext::arc()).await +/// Maintains metadata of a table. +struct Table { + name: String, + logical_timer: AtomicU64, + inserted: Mutex>, } -async fn try_execute_sql_with( - instance: &Arc, - sql: &str, - query_ctx: QueryContextRef, -) -> Result { - instance.do_query(sql, query_ctx).await.remove(0) +/// Inserts some data to a collection of tables and checks if these data exist after restart. +#[apply(both_instances_cases_with_kafka_wal)] +async fn test_replay(rebuildable_instance: Option>) { + let Some(mut rebuildable_instance) = rebuildable_instance else { + return; + }; + let instance = rebuildable_instance.frontend(); + + let output = execute_sql_with( + &instance, + "create database test", + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await; + assert_matches!(output.data, OutputData::AffectedRows(1)); + + let tables = create_tables("test_replay", &instance, 10).await; + insert_data(&tables, &instance, 15).await; + ensure_data_exists(&tables, &instance).await; + + // Rebuilds to emulate restart which triggers a replay. + rebuildable_instance.rebuild().await; + ensure_data_exists(&tables, &rebuildable_instance.frontend()).await; } +/// Inserts some data to a collection of tables and sends alter table requests to force flushing each table. +/// Then checks if these data exist after restart. +#[apply(both_instances_cases_with_kafka_wal)] +async fn test_flush_then_replay(rebuildable_instance: Option>) { + let Some(mut rebuildable_instance) = rebuildable_instance else { + return; + }; + let instance = rebuildable_instance.frontend(); + + let output = execute_sql_with( + &instance, + "create database test", + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await; + assert_matches!(output.data, OutputData::AffectedRows(1)); + + let tables = create_tables("test_flush_then_replay", &instance, 10).await; + insert_data(&tables, &instance, 15).await; + ensure_data_exists(&tables, &instance).await; + + // Alters tables to force flushing. + futures::future::join_all(tables.iter().map(|table| { + let instance = instance.clone(); + async move { + assert_matches!( + do_alter(&instance, &table.name).await.data, + OutputData::AffectedRows(0) + ); + } + })) + .await; + + // Inserts more data and check all data exists after flushing. + insert_data(&tables, &instance, 15).await; + ensure_data_exists(&tables, &instance).await; + + // Rebuilds to emulate restart which triggers a replay. + rebuildable_instance.rebuild().await; + ensure_data_exists(&tables, &rebuildable_instance.frontend()).await; +} + +/// Creates a given number of tables. +async fn create_tables(test_name: &str, instance: &Arc, num_tables: usize) -> Vec { + futures::future::join_all((0..num_tables).map(|i| { + let instance = instance.clone(); + async move { + let table_name = format!("{}_{}", test_name, i); + assert_matches!( + do_create(&instance, &table_name).await.data, + OutputData::AffectedRows(0) + ); + Table { + name: table_name, + logical_timer: AtomicU64::new(1685508715000), + inserted: Mutex::new(Vec::new()), + } + } + })) + .await +} + +/// Inserts data to the tables in parallel. +/// The reason why the insertion is parallel is that we want to ensure the kafka wal works as expected under parallel write workloads. +async fn insert_data(tables: &[Table], instance: &Arc, num_writers: usize) { + // Each writer randomly chooses a table and inserts a sequence of rows into the table. + futures::future::join_all((0..num_writers).map(|_| async { + let mut rng = rand::thread_rng(); + let table = &tables[rng.gen_range(0..tables.len())]; + for _ in 0..10 { + let ts = table.logical_timer.fetch_add(1000, Ordering::Relaxed); + let row = make_row(ts, &mut rng); + assert_matches!( + do_insert(instance, &table.name, row).await.data, + OutputData::AffectedRows(1) + ); + { + // Inserting into the `inserted` vector and inserting into the database are not atomic + // which requires us to do a sorting upon checking data integrity. + let mut inserted = table.inserted.lock().await; + inserted.push(ts); + } + } + })) + .await; +} + +/// Sends queries to ensure the data exists for each table. +async fn ensure_data_exists(tables: &[Table], instance: &Arc) { + futures::future::join_all(tables.iter().map(|table| async { + let output = do_query(instance, &table.name).await; + let OutputData::Stream(stream) = output.data else { + unreachable!() + }; + let record_batches = common_recordbatch::util::collect(stream).await.unwrap(); + let queried = record_batches + .into_iter() + .flat_map(|rb| { + rb.rows() + .map(|row| row[0].as_timestamp().unwrap().value() as u64) + .collect::>() + }) + .collect::>(); + let inserted = table + .inserted + .lock() + .await + .iter() + .sorted() + .cloned() + .collect::>(); + assert_eq!(queried, inserted); + })) + .await; +} + +/// Sends a create table SQL. +async fn do_create(instance: &Arc, table_name: &str) -> Output { + execute_sql_with( + instance, + &format!( + r#"create table greptime.test.{} ( + host STRING, + cpu DOUBLE, + memory DOUBLE, + ts timestamp, + TIME INDEX(ts) + )"#, + table_name + ), + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await +} + +/// Sends an alter table SQL. +async fn do_alter(instance: &Arc, table_name: &str) -> Output { + execute_sql_with( + instance, + &format!("alter table {} add column new_col STRING", table_name), + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await +} + +/// Sends a insert SQL. +async fn do_insert(instance: &Arc, table_name: &str, row: String) -> Output { + execute_sql_with( + instance, + &format!("insert into test.{table_name}(host, cpu, memory, ts) values {row}"), + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await +} + +/// Sends a query SQL. +async fn do_query(instance: &Arc, table_name: &str) -> Output { + execute_sql_with( + instance, + &format!("select ts from test.{table_name} order by ts"), + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await +} + +/// Sends a SQL with the given context which specifies the catalog name and schema name, aka. database name. +/// The query context is required since the tables are created in the `test` schema rather than the default `public` schema. async fn execute_sql_with( instance: &Arc, sql: &str, query_ctx: QueryContextRef, ) -> Output { - try_execute_sql_with(instance, sql, query_ctx) - .await - .unwrap() + instance.do_query(sql, query_ctx).await.remove(0).unwrap() +} + +fn make_row(ts: u64, rng: &mut ThreadRng) -> String { + let host = format!("host{}", rng.gen_range(0..5)); + let cpu: f64 = rng.gen_range(0.0..99.9); + let memory: f64 = rng.gen_range(0.0..999.9); + format!("('{host}', {cpu}, {memory}, {ts})") } diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 9f3bad224b91..43f3981fee0b 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -114,7 +114,7 @@ impl MockInstanceBuilder { } = instance; MockInstanceImpl::Standalone( builder - .build_with(kv_backend, procedure_manager, guard, mix_options) + .build_with(kv_backend, guard, mix_options, procedure_manager, false) .await, ) } @@ -223,11 +223,11 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option>(); let test_name = uuid::Uuid::new_v4().to_string(); let builder = GreptimeDbStandaloneBuilder::new(&test_name) - .with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig { + .with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig { broker_endpoints: endpoints.clone(), ..Default::default() })) - .with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig { + .with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig { broker_endpoints: endpoints, topic_name_prefix: test_name.to_string(), num_topics: 3, @@ -253,11 +253,11 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option Option>, + rebuildable_instance: Option>, ) { } diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 89175d9093a3..79f7a3d38a68 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -111,12 +111,12 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec