diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index e1aa74b6bcc2..757d853a5b85 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -19,11 +19,15 @@ use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::{RegionDistribution, TableMetadataManagerRef}; use common_meta::peer::Peer; use common_query::Output; +use common_recordbatch::RecordBatches; use common_telemetry::info; use common_test_util::recordbatch::check_output_stream; use common_test_util::temp_dir::create_temp_dir; use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; +use datatypes::prelude::ScalarVector; +use datatypes::value::Value; +use datatypes::vectors::{Helper, UInt64Vector}; use frontend::error::Result as FrontendResult; use frontend::instance::Instance; use futures::future::BoxFuture; @@ -76,6 +80,7 @@ macro_rules! region_migration_tests { $service, test_region_migration, + test_region_migration_by_sql, test_region_migration_multiple_regions, test_region_migration_all_regions, test_region_migration_incorrect_from_peer, @@ -212,6 +217,125 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec) { + let cluster_name = "test_region_migration"; + let peer_factory = |id| Peer { + id, + addr: PEER_PLACEHOLDER_ADDR.to_string(), + }; + + // Prepares test cluster. + let (store_config, _guard) = get_test_store_config(&store_type); + let home_dir = create_temp_dir("test_migration_data_home"); + let datanodes = 5u64; + let builder = GreptimeDbClusterBuilder::new(cluster_name).await; + let const_selector = Arc::new(ConstNodeSelector::new(vec![ + peer_factory(1), + peer_factory(2), + peer_factory(3), + ])); + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig { + broker_endpoints: endpoints.clone(), + linger: Duration::from_millis(25), + ..Default::default() + })) + .with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig { + broker_endpoints: endpoints, + num_topics: 3, + topic_name_prefix: Uuid::new_v4().to_string(), + ..Default::default() + })) + .with_shared_home_dir(Arc::new(home_dir)) + .with_meta_selector(const_selector.clone()) + .build() + .await; + let mut logical_timer = 1685508715000; + + // Prepares test table. + let table_id = prepare_testing_table(&cluster).await; + + // Inserts data + let results = insert_values(&cluster.frontend, logical_timer).await; + logical_timer += 1000; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // The region distribution + let mut distribution = find_region_distribution_by_sql(&cluster).await; + + let old_distribution = distribution.clone(); + + // Selecting target of region migration. + let region_migration_manager = cluster.meta_srv.region_migration_manager(); + let (from_peer_id, from_regions) = distribution.pop_first().unwrap(); + info!( + "Selecting from peer: {from_peer_id}, and regions: {:?}", + from_regions + ); + let (to_peer_id, to_regions) = distribution.pop_first().unwrap(); + info!( + "Selecting to peer: {to_peer_id}, and regions: {:?}", + to_regions + ); + + let region_id = RegionId::new(table_id, from_regions[0]); + // Trigger region migration. + let procedure_id = + trigger_migration_by_sql(&cluster, region_id.as_u64(), from_peer_id, to_peer_id).await; + + info!("Started region procedure: {}!", procedure_id); + + // Waits condition by checking procedure state + let frontend = cluster.frontend.clone(); + wait_condition( + Duration::from_secs(10), + Box::pin(async move { + loop { + let state = query_procedure_by_sql(&frontend, &procedure_id).await; + if state == "{\"status\":\"Done\"}" { + info!("Migration done: {state}"); + break; + } else { + info!("Migration not finished: {state}"); + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + }), + ) + .await; + + // Inserts more table. + let results = insert_values(&cluster.frontend, logical_timer).await; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // Asserts the writes. + assert_values(&cluster.frontend).await; + + // Triggers again. + let procedure = region_migration_manager + .submit_procedure(RegionMigrationProcedureTask::new( + 0, + region_id, + peer_factory(from_peer_id), + peer_factory(to_peer_id), + Duration::from_millis(1000), + )) + .await + .unwrap(); + assert!(procedure.is_none()); + + let new_distribution = find_region_distribution_by_sql(&cluster).await; + + assert_ne!(old_distribution, new_distribution); +} + /// A region migration test for a region server contains multiple regions of the table. pub async fn test_region_migration_multiple_regions( store_type: StorageType, @@ -724,12 +848,103 @@ async fn find_region_distribution( .unwrap() } +/// Find region distribution by SQL query +async fn find_region_distribution_by_sql(cluster: &GreptimeDbCluster) -> RegionDistribution { + let query_ctx = QueryContext::arc(); + + let Output::Stream(stream) = run_sql( + &cluster.frontend, + &format!(r#"select b.peer_id as datanode_id, + a.greptime_partition_id as region_id + from information_schema.partitions a left join information_schema.greptime_region_peers b + on a.greptime_partition_id = b.region_id + where a.table_name='{TEST_TABLE_NAME}' order by datanode_id asc"# + ), + query_ctx.clone(), + ) + .await.unwrap() else { + unreachable!(); + }; + + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + + let mut distribution = RegionDistribution::new(); + + for batch in recordbatches.take() { + let datanode_ids: &UInt64Vector = + unsafe { Helper::static_cast(batch.column_by_name("datanode_id").unwrap()) }; + let region_ids: &UInt64Vector = + unsafe { Helper::static_cast(batch.column_by_name("region_id").unwrap()) }; + + for (datanode_id, region_id) in datanode_ids.iter_data().zip(region_ids.iter_data()) { + let (Some(datanode_id), Some(region_id)) = (datanode_id, region_id) else { + unreachable!(); + }; + + let region_id = RegionId::from_u64(region_id); + distribution + .entry(datanode_id) + .or_default() + .push(region_id.region_number()); + } + } + + distribution +} + +/// Trigger the region migration by SQL, returns the procedure id if success. +async fn trigger_migration_by_sql( + cluster: &GreptimeDbCluster, + region_id: u64, + from_peer_id: u64, + to_peer_id: u64, +) -> String { + let Output::Stream(stream) = run_sql( + &cluster.frontend, + &format!("select migrate_region({region_id}, {from_peer_id}, {to_peer_id})"), + QueryContext::arc(), + ) + .await + .unwrap() else { + unreachable!(); + }; + + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + + let Value::String(procedure_id) = recordbatches.take()[0].column(0).get(0) else { + unreachable!(); + }; + + procedure_id.as_utf8().to_string() +} + +/// Query procedure state by SQL. +async fn query_procedure_by_sql(instance: &Arc, pid: &str) -> String { + let Output::Stream(stream) = run_sql( + instance, + &format!("select procedure_state('{pid}')"), + QueryContext::arc(), + ) + .await + .unwrap() else { + unreachable!(); + }; + + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + + let Value::String(state) = recordbatches.take()[0].column(0).get(0) else { + unreachable!(); + }; + + state.as_utf8().to_string() +} + async fn insert_values(instance: &Arc, ts: u64) -> Vec> { let query_ctx = QueryContext::arc(); let mut results = Vec::new(); for range in [5, 15, 55] { - let result = insert_value( + let result = run_sql( instance, &format!("INSERT INTO {TEST_TABLE_NAME} VALUES ({},{})", range, ts), query_ctx.clone(), @@ -741,10 +956,11 @@ async fn insert_values(instance: &Arc, ts: u64) -> Vec, sql: &str, query_ctx: QueryContextRef, ) -> FrontendResult { + info!("Run SQL: {sql}"); instance.do_query(sql, query_ctx).await.remove(0) }