From 96c01a3bf0545eda3e038b9a33274a469136be3d Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Tue, 23 Apr 2024 10:44:12 +0800 Subject: [PATCH] fix: the `dropping_regions` guards should be dropped on procedure done (#3771) * fix: the `dropping_regions` guards should be dropped on procedure done * fix ci --- .../meta/src/ddl/drop_database/cursor.rs | 4 +- .../meta/src/ddl/drop_database/executor.rs | 6 +- src/common/meta/src/ddl/drop_table.rs | 8 +- src/common/meta/src/ddl/test_util.rs | 14 +- .../src/ddl/tests/alter_logical_tables.rs | 12 +- .../meta/src/ddl/tests/drop_database.rs | 4 +- src/common/meta/src/ddl/tests/drop_table.rs | 132 ++++++++++-------- src/common/meta/src/error.rs | 2 +- src/common/procedure-test/src/lib.rs | 7 + 9 files changed, 108 insertions(+), 81 deletions(-) diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index 254a722e71d4..ed21902e7508 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -165,7 +165,7 @@ mod tests { async fn test_next_without_logical_tables() { let datanode_manager = Arc::new(MockDatanodeManager::new(())); let ddl_context = new_ddl_context(datanode_manager); - create_physical_table(ddl_context.clone(), 0, "phy").await; + create_physical_table(&ddl_context, 0, "phy").await; // It always starts from Logical let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); let mut ctx = DropDatabaseContext { @@ -199,7 +199,7 @@ mod tests { async fn test_next_with_logical_tables() { let datanode_manager = Arc::new(MockDatanodeManager::new(())); let ddl_context = new_ddl_context(datanode_manager); - let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await; create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric_0").await; // It always starts from Logical let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 8b0b967ba350..e3bcf0c004d6 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -161,7 +161,7 @@ mod tests { async fn test_next_with_physical_table() { let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); let ddl_context = new_ddl_context(datanode_manager); - let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await; let (_, table_route) = ddl_context .table_metadata_manager .table_route_manager() @@ -211,7 +211,7 @@ mod tests { async fn test_next_logical_table() { let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); let ddl_context = new_ddl_context(datanode_manager); - let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await; create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric").await; let logical_table_id = physical_table_id + 1; let (_, table_route) = ddl_context @@ -315,7 +315,7 @@ mod tests { async fn test_next_retryable_err() { let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); let ddl_context = new_ddl_context(datanode_manager); - let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await; let (_, table_route) = ddl_context .table_metadata_manager .table_route_manager() diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 14a39b835bd2..fcf1ffb4aa00 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -46,7 +46,7 @@ pub struct DropTableProcedure { /// The serializable data. pub data: DropTableData, /// The guards of opening regions. - pub dropping_regions: Vec, + pub(crate) dropping_regions: Vec, /// The drop table executor. executor: DropTableExecutor, } @@ -153,7 +153,7 @@ impl DropTableProcedure { } /// Deletes metadata tombstone. - async fn on_delete_metadata_tombstone(&self) -> Result { + async fn on_delete_metadata_tombstone(&mut self) -> Result { let table_route_value = &TableRouteValue::new( self.data.task.table_id, // Safety: checked @@ -163,6 +163,8 @@ impl DropTableProcedure { self.executor .on_delete_metadata_tombstone(&self.context, table_route_value) .await?; + + self.dropping_regions.clear(); Ok(Status::done()) } } @@ -266,7 +268,7 @@ impl DropTableData { } /// The state of drop table. -#[derive(Debug, Serialize, Deserialize, AsRefStr)] +#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)] pub enum DropTableState { /// Prepares to drop the table Prepare, diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index eadc4e42e62c..030d0a7b6827 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -47,7 +47,7 @@ pub async fn create_physical_table_metadata( } pub async fn create_physical_table( - ddl_context: DdlContext, + ddl_context: &DdlContext, cluster_id: ClusterId, name: &str, ) -> TableId { @@ -67,7 +67,7 @@ pub async fn create_physical_table( .unwrap(); create_physical_table_task.set_table_id(table_id); create_physical_table_metadata( - &ddl_context, + ddl_context, create_physical_table_task.table_info.clone(), TableRouteValue::Physical(table_route), ) @@ -81,7 +81,7 @@ pub async fn create_logical_table( cluster_id: ClusterId, physical_table_id: TableId, table_name: &str, -) { +) -> TableId { use std::assert_matches::assert_matches; let tasks = vec![test_create_logical_table_task(table_name)]; @@ -91,6 +91,14 @@ pub async fn create_logical_table( assert_matches!(status, Status::Executing { persist: true }); let status = procedure.on_create_metadata().await.unwrap(); assert_matches!(status, Status::Done { .. }); + + let Status::Done { + output: Some(output), + } = status + else { + panic!("Unexpected status: {:?}", status); + }; + output.downcast_ref::>().unwrap()[0] } pub fn test_create_logical_table_task(name: &str) -> CreateTableTask { diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 3ff121b09a74..b970d62a0ebf 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -128,9 +128,9 @@ async fn test_on_prepare_different_physical_table() { let datanode_manager = Arc::new(MockDatanodeManager::new(())); let ddl_context = new_ddl_context(datanode_manager); - let phy1_id = create_physical_table(ddl_context.clone(), cluster_id, "phy1").await; + let phy1_id = create_physical_table(&ddl_context, cluster_id, "phy1").await; create_logical_table(ddl_context.clone(), cluster_id, phy1_id, "table1").await; - let phy2_id = create_physical_table(ddl_context.clone(), cluster_id, "phy2").await; + let phy2_id = create_physical_table(&ddl_context, cluster_id, "phy2").await; create_logical_table(ddl_context.clone(), cluster_id, phy2_id, "table2").await; let tasks = vec![ @@ -150,7 +150,7 @@ async fn test_on_prepare_logical_table_not_exists() { let ddl_context = new_ddl_context(datanode_manager); // Creates physical table - let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; // Creates 3 logical tables create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; @@ -172,7 +172,7 @@ async fn test_on_prepare() { let ddl_context = new_ddl_context(datanode_manager); // Creates physical table - let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; // Creates 3 logical tables create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; @@ -196,7 +196,7 @@ async fn test_on_update_metadata() { let ddl_context = new_ddl_context(datanode_manager); // Creates physical table - let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; // Creates 3 logical tables create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; @@ -233,7 +233,7 @@ async fn test_on_part_duplicate_alter_request() { let ddl_context = new_ddl_context(datanode_manager); // Creates physical table - let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; // Creates 3 logical tables create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; diff --git a/src/common/meta/src/ddl/tests/drop_database.rs b/src/common/meta/src/ddl/tests/drop_database.rs index 7bdd83ca1fb7..d4469195c8b6 100644 --- a/src/common/meta/src/ddl/tests/drop_database.rs +++ b/src/common/meta/src/ddl/tests/drop_database.rs @@ -42,7 +42,7 @@ async fn test_drop_database_with_logical_tables() { .await .unwrap(); // Creates physical table - let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; // Creates 3 logical tables create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; @@ -91,7 +91,7 @@ async fn test_drop_database_retryable_error() { .await .unwrap(); // Creates physical table - let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; // Creates 3 logical tables create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index f422c853f061..5c86bb765c74 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -19,17 +19,19 @@ use api::v1::region::{region_request, RegionRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId}; -use common_procedure_test::MockContextProvider; +use common_procedure::{Procedure, Status}; +use common_procedure_test::{execute_procedure_until_done, new_test_procedure_context}; use store_api::storage::RegionId; +use table::metadata::TableId; use tokio::sync::mpsc; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; -use crate::ddl::drop_table::DropTableProcedure; +use crate::ddl::drop_table::{DropTableProcedure, DropTableState}; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler}; use crate::ddl::test_util::{ - create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task, + create_logical_table, create_physical_table, create_physical_table_metadata, + test_create_logical_table_task, test_create_physical_table_task, }; use crate::ddl::{TableMetadata, TableMetadataAllocatorContext}; use crate::key::table_route::TableRouteValue; @@ -58,14 +60,7 @@ async fn test_on_prepare_table_not_exists_err() { .await .unwrap(); - let task = DropTableTask { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: "bar".to_string(), - table_id, - drop_if_exists: false, - }; - + let task = new_drop_table_task("bar", table_id, false); let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context); let err = procedure.on_prepare().await.unwrap_err(); assert_eq!(err.status_code(), StatusCode::TableNotFound); @@ -90,26 +85,12 @@ async fn test_on_prepare_table() { .await .unwrap(); - let task = DropTableTask { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: "bar".to_string(), - table_id, - drop_if_exists: true, - }; - + let task = new_drop_table_task("bar", table_id, true); // Drop if exists let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); procedure.on_prepare().await.unwrap(); - let task = DropTableTask { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: table_name.to_string(), - table_id, - drop_if_exists: false, - }; - + let task = new_drop_table_task(table_name, table_id, false); // Drop table let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context); procedure.on_prepare().await.unwrap(); @@ -158,13 +139,7 @@ async fn test_on_datanode_drop_regions() { .await .unwrap(); - let task = DropTableTask { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: table_name.to_string(), - table_id, - drop_if_exists: false, - }; + let task = new_drop_table_task(table_name, table_id, false); // Drop table let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context); procedure.on_prepare().await.unwrap(); @@ -234,10 +209,7 @@ async fn test_on_rollback() { ddl_context.clone(), ); procedure.on_prepare().await.unwrap(); - let ctx = ProcedureContext { - procedure_id: ProcedureId::random(), - provider: Arc::new(MockContextProvider::default()), - }; + let ctx = new_test_procedure_context(); procedure.execute(&ctx).await.unwrap(); // Triggers procedure to create table metadata let status = procedure.execute(&ctx).await.unwrap(); @@ -247,20 +219,10 @@ async fn test_on_rollback() { let expected_kvs = kv_backend.dump(); // Drops the physical table { - let task = DropTableTask { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: "phy_table".to_string(), - table_id: physical_table_id, - drop_if_exists: false, - }; + let task = new_drop_table_task("phy_table", physical_table_id, false); let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); procedure.on_prepare().await.unwrap(); procedure.on_delete_metadata().await.unwrap(); - let ctx = ProcedureContext { - procedure_id: ProcedureId::random(), - provider: Arc::new(MockContextProvider::default()), - }; procedure.rollback(&ctx).await.unwrap(); // Rollback again procedure.rollback(&ctx).await.unwrap(); @@ -269,23 +231,71 @@ async fn test_on_rollback() { } // Drops the logical table - let task = DropTableTask { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: "foo".to_string(), - table_id: table_ids[0], - drop_if_exists: false, - }; + let task = new_drop_table_task("foo", table_ids[0], false); let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); procedure.on_prepare().await.unwrap(); procedure.on_delete_metadata().await.unwrap(); - let ctx = ProcedureContext { - procedure_id: ProcedureId::random(), - provider: Arc::new(MockContextProvider::default()), - }; procedure.rollback(&ctx).await.unwrap(); // Rollback again procedure.rollback(&ctx).await.unwrap(); let kvs = kv_backend.dump(); assert_eq!(kvs, expected_kvs); } + +fn new_drop_table_task(table_name: &str, table_id: TableId, drop_if_exists: bool) -> DropTableTask { + DropTableTask { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: table_name.to_string(), + table_id, + drop_if_exists, + } +} + +#[tokio::test] +async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { + let cluster_id = 1; + + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend); + + let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await; + let logical_table_id = + create_logical_table(ddl_context.clone(), cluster_id, physical_table_id, "s").await; + + let inner_test = |task: DropTableTask| async { + let context = new_test_procedure_context(); + let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); + while !matches!( + procedure.execute(&context).await.unwrap(), + Status::Done { .. } + ) { + if procedure.data.state == DropTableState::InvalidateTableCache { + break; + } + } + + // Ensure that after running to the state `InvalidateTableCache`(just past `DeleteMetadata`), + // the dropping regions should be recorded: + let guards = &procedure.dropping_regions; + assert_eq!(guards.len(), 1); + let (datanode_id, region_id) = (0, RegionId::new(physical_table_id, 0)); + assert_eq!(guards[0].info(), (datanode_id, region_id)); + assert!(ddl_context + .memory_region_keeper + .contains(datanode_id, region_id)); + + execute_procedure_until_done(&mut procedure).await; + + // Ensure that when run to the end, the dropping regions should be cleared: + let guards = &procedure.dropping_regions; + assert!(guards.is_empty()); + assert!(!ddl_context + .memory_region_keeper + .contains(datanode_id, region_id)); + }; + + inner_test(new_drop_table_task("s", logical_table_id, false)).await; + inner_test(new_drop_table_task("t", physical_table_id, false)).await; +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 7346ae780d23..fc70bd7e07f5 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -258,7 +258,7 @@ pub enum Error { error: Utf8Error, }, - #[snafu(display("Table nod found, table: {}", table_name))] + #[snafu(display("Table not found: '{}'", table_name))] TableNotFound { table_name: String, location: Location, diff --git a/src/common/procedure-test/src/lib.rs b/src/common/procedure-test/src/lib.rs index fb759b16a2fc..7b8c2fb230e7 100644 --- a/src/common/procedure-test/src/lib.rs +++ b/src/common/procedure-test/src/lib.rs @@ -114,3 +114,10 @@ pub async fn execute_until_suspended_or_done( None } + +pub fn new_test_procedure_context() -> Context { + Context { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + } +}