From 963bd1c877f56f5620cc922cf6df54fd149bb43b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 7 Feb 2024 10:50:51 +0000 Subject: [PATCH] test(create_logical_tables): add tests for on_create_metadata --- Cargo.lock | 1 + src/common/meta/Cargo.toml | 1 + .../meta/src/ddl/create_logical_tables.rs | 4 + .../src/ddl/tests/create_logical_tables.rs | 194 +++++++++++++++++- 4 files changed, 199 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index b9e1c20227cd..7915f0f60763 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1919,6 +1919,7 @@ dependencies = [ "common-grpc-expr", "common-macro", "common-procedure", + "common-procedure-test", "common-recordbatch", "common-runtime", "common-telemetry", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 8a6942ba12ca..61414e80497d 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -19,6 +19,7 @@ common-catalog.workspace = true common-error.workspace = true common-grpc-expr.workspace = true common-macro.workspace = true +common-procedure-test.workspace = true common-procedure.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index c74801559fc3..fa1b1f39d149 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -163,6 +163,10 @@ impl CreateLogicalTablesProcedure { self.create_regions(region_routes).await } + /// Creates table metadata + /// + /// Abort(not-retry): + /// - Failed to create table metadata. pub async fn on_create_metadata(&self) -> Result { let manager = &self.context.table_metadata_manager; let physical_table_id = self.creator.data.physical_table_id(); diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index ce72b57ba20e..4733d5a1c4ec 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -21,8 +21,10 @@ use api::v1::region::{QueryRequest, RegionRequest}; use api::v1::{ColumnDataType, SemanticType}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; +use common_procedure_test::MockContextProvider; use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::debug; use store_api::storage::RegionId; use table::metadata::RawTableInfo; @@ -339,3 +341,193 @@ async fn test_on_prepare_part_logical_tables_exist() { let status = procedure.on_prepare().await.unwrap(); assert_matches!(status, Status::Executing { persist: true }); } + +#[derive(Clone)] +pub struct NaiveDatanodeHandler; + +#[async_trait::async_trait] +impl MockDatanodeHandler for NaiveDatanodeHandler { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); + Ok(0) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[tokio::test] +async fn test_on_create_metadata() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // The create logical table procedure. + let physical_table_id = table_id; + // Creates the logical table metadata. + let task = test_create_logical_table_task("foo"); + let yet_another_task = test_create_logical_table_task("bar"); + let mut procedure = CreateLogicalTablesProcedure::new( + cluster_id, + vec![task, yet_another_task], + physical_table_id, + ddl_context, + ); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + // Triggers procedure to create table metadata + let status = procedure.execute(&ctx).await.unwrap(); + let table_ids = status.downcast_output_ref::>().unwrap(); + assert_eq!(*table_ids, vec![1025, 1026]); +} + +#[tokio::test] +async fn test_on_create_metadata_part_logical_tables_exist() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // Creates the logical table metadata. + let mut task = test_create_logical_table_task("exists"); + task.set_table_id(8192); + ddl_context + .table_metadata_manager + .create_logic_tables_metadata(vec![( + task.table_info.clone(), + TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]), + )]) + .await + .unwrap(); + // The create logical table procedure. + let physical_table_id = table_id; + // Sets `create_if_not_exists` + task.create_table.create_if_not_exists = true; + let non_exist_task = test_create_logical_table_task("non_exists"); + let mut procedure = CreateLogicalTablesProcedure::new( + cluster_id, + vec![task, non_exist_task], + physical_table_id, + ddl_context, + ); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + // Triggers procedure to create table metadata + let status = procedure.execute(&ctx).await.unwrap(); + let table_ids = status.downcast_output_ref::>().unwrap(); + assert_eq!(*table_ids, vec![8192, 1025]); +} + +#[tokio::test] +async fn test_on_create_metadata_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + // The create logical table procedure. + let physical_table_id = table_id; + // Creates the logical table metadata. + let task = test_create_logical_table_task("foo"); + let yet_another_task = test_create_logical_table_task("bar"); + let mut procedure = CreateLogicalTablesProcedure::new( + cluster_id, + vec![task.clone(), yet_another_task], + physical_table_id, + ddl_context.clone(), + ); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + // Creates logical table metadata(different with the task) + let mut task = task.clone(); + task.table_info.ident.table_id = 1025; + ddl_context + .table_metadata_manager + .create_logic_tables_metadata(vec![( + task.table_info, + TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]), + )]) + .await + .unwrap(); + // Triggers procedure to create table metadata + let error = procedure.execute(&ctx).await.unwrap_err(); + assert!(!error.is_retry_later()); +}