diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index f219dd264eb6..528b86fe1fec 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::datanode_manager::{DatanodeRef, FlownodeRef, NodeManager}; +use common_meta::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; use common_meta::peer::Peer; use moka::future::{Cache, CacheBuilder}; diff --git a/src/client/src/region.rs b/src/client/src/region.rs index a401fa434803..e6c6e4af81a4 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -24,8 +24,8 @@ use async_trait::async_trait; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_grpc::flight::{FlightDecoder, FlightMessage}; -use common_meta::datanode_manager::Datanode; use common_meta::error::{self as meta_error, Result as MetaResult}; +use common_meta::node_manager::Datanode; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 7148606a0943..061c4f98e2bd 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -21,12 +21,12 @@ use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator}; -use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::ProcedureExecutorRef; use common_meta::ddl_manager::DdlManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; +use common_meta::node_manager::NodeManagerRef; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef}; @@ -408,7 +408,7 @@ impl StartCommand { DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); let datanode = builder.build().await.context(StartDatanodeSnafu)?; - let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); + let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); let table_id_sequence = Arc::new( SequenceBuilder::new("table_id", kv_backend.clone()) @@ -432,22 +432,18 @@ impl StartCommand { let ddl_task_executor = Self::create_ddl_task_executor( table_metadata_manager, procedure_manager.clone(), - datanode_manager.clone(), + node_manager.clone(), multi_cache_invalidator, table_meta_allocator, ) .await?; - let mut frontend = FrontendBuilder::new( - kv_backend, - catalog_manager, - datanode_manager, - ddl_task_executor, - ) - .with_plugin(fe_plugins.clone()) - .try_build() - .await - .context(StartFrontendSnafu)?; + let mut frontend = + FrontendBuilder::new(kv_backend, catalog_manager, node_manager, ddl_task_executor) + .with_plugin(fe_plugins.clone()) + .try_build() + .await + .context(StartFrontendSnafu)?; let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins) .build() @@ -468,14 +464,14 @@ impl StartCommand { pub async fn create_ddl_task_executor( table_metadata_manager: TableMetadataManagerRef, procedure_manager: ProcedureManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_meta_allocator: TableMetadataAllocatorRef, ) -> Result { let procedure_executor: ProcedureExecutorRef = Arc::new( DdlManager::try_new( procedure_manager, - datanode_manager, + node_manager, cache_invalidator, table_metadata_manager, table_meta_allocator, diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index c7abac1d6553..3feea55253ef 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -20,10 +20,10 @@ use store_api::storage::{RegionNumber, TableId}; use self::table_meta::TableMetadataAllocatorRef; use crate::cache_invalidator::CacheInvalidatorRef; -use crate::datanode_manager::NodeManagerRef; use crate::error::Result; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; +use crate::node_manager::NodeManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -95,7 +95,7 @@ pub struct TableMetadata { #[derive(Clone)] pub struct DdlContext { - pub datanode_manager: NodeManagerRef, + pub node_manager: NodeManagerRef, pub cache_invalidator: CacheInvalidatorRef, pub table_metadata_manager: TableMetadataManagerRef, pub memory_region_keeper: MemoryRegionKeeperRef, diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 6819f18941ee..abec47764780 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -116,7 +116,7 @@ impl AlterLogicalTablesProcedure { let mut alter_region_tasks = Vec::with_capacity(leaders.len()); for peer in leaders { - let requester = self.context.datanode_manager.datanode(&peer).await; + let requester = self.context.node_manager.datanode(&peer).await; let request = self.make_request(&peer, &physical_table_route.region_routes)?; alter_region_tasks.push(async move { diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index f286db47cd7a..31a4fd4af1eb 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -105,7 +105,7 @@ impl AlterTableProcedure { let mut alter_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { - let requester = self.context.datanode_manager.datanode(&datanode).await; + let requester = self.context.node_manager.datanode(&datanode).await; let regions = find_leader_regions(&physical_table_route.region_routes, &datanode); for region in regions { diff --git a/src/common/meta/src/ddl/alter_table/region_request.rs b/src/common/meta/src/ddl/alter_table/region_request.rs index ce82ef831dd5..265466b69442 100644 --- a/src/common/meta/src/ddl/alter_table/region_request.rs +++ b/src/common/meta/src/ddl/alter_table/region_request.rs @@ -138,8 +138,8 @@ mod tests { #[tokio::test] async fn test_make_alter_region_request() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let table_id = 1024; let region_id = RegionId::new(table_id, 1); diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index d050e7e3e465..5095b7c32e1a 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -142,7 +142,7 @@ impl CreateLogicalTablesProcedure { let mut create_region_tasks = Vec::with_capacity(leaders.len()); for peer in leaders { - let requester = self.context.datanode_manager.datanode(&peer).await; + let requester = self.context.node_manager.datanode(&peer).await; let request = self.make_request(&peer, region_routes)?; create_region_tasks.push(async move { diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 6204f168a55d..044715b32381 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -222,7 +222,7 @@ impl CreateTableProcedure { let mut create_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { - let requester = self.context.datanode_manager.datanode(&datanode).await; + let requester = self.context.node_manager.datanode(&datanode).await; let regions = find_leader_regions(region_routes, &datanode); let mut requests = Vec::with_capacity(regions.len()); diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index ed21902e7508..7e1cb05bb98d 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -163,8 +163,8 @@ mod tests { #[tokio::test] async fn test_next_without_logical_tables() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); create_physical_table(&ddl_context, 0, "phy").await; // It always starts from Logical let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); @@ -197,8 +197,8 @@ mod tests { #[tokio::test] async fn test_next_with_logical_tables() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await; create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric_0").await; // It always starts from Logical @@ -228,8 +228,8 @@ mod tests { #[tokio::test] async fn test_reach_the_end() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let mut state = DropDatabaseCursor::new(DropTableTarget::Physical); let mut ctx = DropDatabaseContext { catalog: DEFAULT_CATALOG_NAME.to_string(), diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index e3bcf0c004d6..acc2d6333156 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -159,8 +159,8 @@ mod tests { #[tokio::test] async fn test_next_with_physical_table() { - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await; let (_, table_route) = ddl_context .table_metadata_manager @@ -209,8 +209,8 @@ mod tests { #[tokio::test] async fn test_next_logical_table() { - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await; create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric").await; let logical_table_id = physical_table_id + 1; @@ -313,8 +313,8 @@ mod tests { #[tokio::test] async fn test_next_retryable_err() { - let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await; let (_, table_route) = ddl_context .table_metadata_manager diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs index f06c51963a78..a9a64f9eca95 100644 --- a/src/common/meta/src/ddl/drop_database/metadata.rs +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -108,8 +108,8 @@ mod tests { #[tokio::test] async fn test_next() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); ddl_context .table_metadata_manager .schema_manager() diff --git a/src/common/meta/src/ddl/drop_database/start.rs b/src/common/meta/src/ddl/drop_database/start.rs index 7d71d1972d6b..792eeac8dda1 100644 --- a/src/common/meta/src/ddl/drop_database/start.rs +++ b/src/common/meta/src/ddl/drop_database/start.rs @@ -85,8 +85,8 @@ mod tests { #[tokio::test] async fn test_schema_not_exists_err() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let mut step = DropDatabaseStart; let mut ctx = DropDatabaseContext { catalog: "foo".to_string(), @@ -100,8 +100,8 @@ mod tests { #[tokio::test] async fn test_schema_not_exists() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let mut state = DropDatabaseStart; let mut ctx = DropDatabaseContext { catalog: "foo".to_string(), @@ -116,8 +116,8 @@ mod tests { #[tokio::test] async fn test_next() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); ddl_context .table_metadata_manager .schema_manager() diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 6659ee238f61..0c0f2ddc9cb9 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -175,7 +175,7 @@ impl DropTableExecutor { let table_id = self.table_id; for datanode in leaders { - let requester = ctx.datanode_manager.datanode(&datanode).await; + let requester = ctx.node_manager.datanode(&datanode).await; let regions = find_leader_regions(region_routes, &datanode); let region_ids = regions .iter() @@ -271,8 +271,8 @@ mod tests { #[tokio::test] async fn test_on_prepare() { // Drops if exists - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ctx = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ctx = new_ddl_context(node_manager); let executor = DropTableExecutor::new( TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"), 1024, diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index b970d62a0ebf..41de5ef4b10b 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -83,8 +83,8 @@ fn make_alter_logical_table_rename_task( #[tokio::test] async fn test_on_prepare_check_schema() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let tasks = vec![ make_alter_logical_table_add_column_task( @@ -107,8 +107,8 @@ async fn test_on_prepare_check_schema() { #[tokio::test] async fn test_on_prepare_check_alter_kind() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let tasks = vec![make_alter_logical_table_rename_task( "schema1", @@ -125,8 +125,8 @@ async fn test_on_prepare_check_alter_kind() { #[tokio::test] async fn test_on_prepare_different_physical_table() { let cluster_id = 1; - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let phy1_id = create_physical_table(&ddl_context, cluster_id, "phy1").await; create_logical_table(ddl_context.clone(), cluster_id, phy1_id, "table1").await; @@ -146,8 +146,8 @@ async fn test_on_prepare_different_physical_table() { #[tokio::test] async fn test_on_prepare_logical_table_not_exists() { let cluster_id = 1; - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); // Creates physical table let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; @@ -168,8 +168,8 @@ async fn test_on_prepare_logical_table_not_exists() { #[tokio::test] async fn test_on_prepare() { let cluster_id = 1; - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); // Creates physical table let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; @@ -192,8 +192,8 @@ async fn test_on_prepare() { #[tokio::test] async fn test_on_update_metadata() { let cluster_id = 1; - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); // Creates physical table let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; @@ -229,8 +229,8 @@ async fn test_on_update_metadata() { #[tokio::test] async fn test_on_part_duplicate_alter_request() { let cluster_id = 1; - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); // Creates physical table let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index 2342aaf44e0d..06654cfe0f3d 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -55,8 +55,8 @@ fn test_rename_alter_table_task(table_name: &str, new_table_name: &str) -> Alter #[tokio::test] async fn test_on_prepare_table_exists_err() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let task = test_create_table_task("foo", 1024); // Puts a value to table name key. @@ -78,8 +78,8 @@ async fn test_on_prepare_table_exists_err() { #[tokio::test] async fn test_on_prepare_table_not_exists_err() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let task = test_rename_alter_table_task("non-exists", "foo"); let mut procedure = AlterTableProcedure::new(cluster_id, 1024, task, ddl_context).unwrap(); @@ -91,8 +91,8 @@ async fn test_on_prepare_table_not_exists_err() { async fn test_on_submit_alter_request() { let (tx, mut rx) = mpsc::channel(8); let datanode_handler = DatanodeWatcher(tx); - let datanode_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let table_id = 1024; let table_name = "foo"; @@ -175,10 +175,10 @@ async fn test_on_submit_alter_request() { #[tokio::test] async fn test_on_submit_alter_request_with_outdated_request() { - let datanode_manager = Arc::new(MockDatanodeManager::new( + let node_manager = Arc::new(MockDatanodeManager::new( RequestOutdatedErrorDatanodeHandler, )); - let ddl_context = new_ddl_context(datanode_manager); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let table_id = 1024; let table_name = "foo"; @@ -236,8 +236,8 @@ async fn test_on_submit_alter_request_with_outdated_request() { #[tokio::test] async fn test_on_update_metadata_rename() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let table_name = "foo"; let new_table_name = "bar"; @@ -287,8 +287,8 @@ async fn test_on_update_metadata_rename() { #[tokio::test] async fn test_on_update_metadata_add_columns() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let table_name = "foo"; let table_id = 1024; diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index 74000cb557c3..c4f65bcac449 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -33,8 +33,8 @@ use crate::test_util::{new_ddl_context, MockDatanodeManager}; #[tokio::test] async fn test_on_prepare_physical_table_not_found() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let tasks = vec![test_create_logical_table_task("foo")]; let physical_table_id = 1024u32; @@ -46,8 +46,8 @@ async fn test_on_prepare_physical_table_not_found() { #[tokio::test] async fn test_on_prepare() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); @@ -81,8 +81,8 @@ async fn test_on_prepare() { #[tokio::test] async fn test_on_prepare_logical_table_exists_err() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); @@ -127,8 +127,8 @@ async fn test_on_prepare_logical_table_exists_err() { #[tokio::test] async fn test_on_prepare_with_create_if_table_exists() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); @@ -175,8 +175,8 @@ async fn test_on_prepare_with_create_if_table_exists() { #[tokio::test] async fn test_on_prepare_part_logical_tables_exist() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); @@ -227,8 +227,8 @@ async fn test_on_prepare_part_logical_tables_exist() { #[tokio::test] async fn test_on_create_metadata() { - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); @@ -277,8 +277,8 @@ async fn test_on_create_metadata() { #[tokio::test] async fn test_on_create_metadata_part_logical_tables_exist() { - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); @@ -338,8 +338,8 @@ async fn test_on_create_metadata_part_logical_tables_exist() { #[tokio::test] async fn test_on_create_metadata_err() { - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index 33e5fd55d594..f9c464be0ab0 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -85,8 +85,8 @@ fn test_create_table_task(name: &str) -> CreateTableTask { #[tokio::test] async fn test_on_prepare_table_exists_err() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); @@ -108,8 +108,8 @@ async fn test_on_prepare_table_exists_err() { #[tokio::test] async fn test_on_prepare_with_create_if_table_exists() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let mut task = test_create_table_task("foo"); task.create_table.create_if_not_exists = true; @@ -133,8 +133,8 @@ async fn test_on_prepare_with_create_if_table_exists() { #[tokio::test] async fn test_on_prepare_without_create_if_table_exists() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let mut task = test_create_table_task("foo"); task.create_table.create_if_not_exists = true; @@ -146,8 +146,8 @@ async fn test_on_prepare_without_create_if_table_exists() { #[tokio::test] async fn test_on_prepare_with_no_partition_err() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let mut task = test_create_table_task("foo"); task.partitions = vec![]; @@ -163,8 +163,8 @@ async fn test_on_prepare_with_no_partition_err() { #[tokio::test] async fn test_on_datanode_create_regions_should_retry() { common_telemetry::init_default_ut_logging(); - let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); @@ -181,8 +181,8 @@ async fn test_on_datanode_create_regions_should_retry() { #[tokio::test] async fn test_on_datanode_create_regions_should_not_retry() { common_telemetry::init_default_ut_logging(); - let datanode_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); @@ -199,8 +199,8 @@ async fn test_on_datanode_create_regions_should_not_retry() { #[tokio::test] async fn test_on_create_metadata_error() { common_telemetry::init_default_ut_logging(); - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); @@ -231,8 +231,8 @@ async fn test_on_create_metadata_error() { #[tokio::test] async fn test_on_create_metadata() { common_telemetry::init_default_ut_logging(); - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); @@ -253,9 +253,9 @@ async fn test_on_create_metadata() { async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { let cluster_id = 1; - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); let kv_backend = Arc::new(MemoryKvBackend::new()); - let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend); + let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend); let task = test_create_table_task("foo"); let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context.clone()); diff --git a/src/common/meta/src/ddl/tests/drop_database.rs b/src/common/meta/src/ddl/tests/drop_database.rs index d4469195c8b6..656e6eb914e8 100644 --- a/src/common/meta/src/ddl/tests/drop_database.rs +++ b/src/common/meta/src/ddl/tests/drop_database.rs @@ -29,8 +29,8 @@ use crate::test_util::{new_ddl_context, MockDatanodeManager}; async fn test_drop_database_with_logical_tables() { common_telemetry::init_default_ut_logging(); let cluster_id = 1; - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); ddl_context .table_metadata_manager .schema_manager() @@ -78,8 +78,8 @@ async fn test_drop_database_with_logical_tables() { async fn test_drop_database_retryable_error() { common_telemetry::init_default_ut_logging(); let cluster_id = 1; - let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); ddl_context .table_metadata_manager .schema_manager() diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index 26ad4580339d..20034fa06f97 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -45,8 +45,8 @@ use crate::test_util::{new_ddl_context, new_ddl_context_with_kv_backend, MockDat #[tokio::test] async fn test_on_prepare_table_not_exists_err() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let table_name = "foo"; let table_id = 1024; @@ -70,8 +70,8 @@ async fn test_on_prepare_table_not_exists_err() { #[tokio::test] async fn test_on_prepare_table() { - let datanode_manager = Arc::new(MockDatanodeManager::new(())); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let table_name = "foo"; let table_id = 1024; @@ -102,8 +102,8 @@ async fn test_on_prepare_table() { async fn test_on_datanode_drop_regions() { let (tx, mut rx) = mpsc::channel(8); let datanode_handler = DatanodeWatcher(tx); - let datanode_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); - let ddl_context = new_ddl_context(datanode_manager); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let ddl_context = new_ddl_context(node_manager); let cluster_id = 1; let table_id = 1024; let table_name = "foo"; @@ -175,9 +175,9 @@ async fn test_on_datanode_drop_regions() { #[tokio::test] async fn test_on_rollback() { - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); let kv_backend = Arc::new(MemoryKvBackend::new()); - let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend.clone()); + let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend.clone()); let cluster_id = 1; // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); @@ -258,9 +258,9 @@ fn new_drop_table_task(table_name: &str, table_id: TableId, drop_if_exists: bool async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { let cluster_id = 1; - let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); let kv_backend = Arc::new(MemoryKvBackend::new()); - let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend); + let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend); let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await; let logical_table_id = diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 7890f70fbf99..de0316de5179 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -143,7 +143,7 @@ impl TruncateTableProcedure { let mut truncate_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { - let requester = self.context.datanode_manager.datanode(&datanode).await; + let requester = self.context.node_manager.datanode(&datanode).await; let regions = find_leader_regions(region_routes, &datanode); for region in regions { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 7201f9602d3a..8db6198bd609 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -23,7 +23,6 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::TableId; use crate::cache_invalidator::CacheInvalidatorRef; -use crate::datanode_manager::NodeManagerRef; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_database::CreateDatabaseProcedure; @@ -43,6 +42,7 @@ use crate::error::{ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use crate::node_manager::NodeManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::DdlTask::{ AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase, @@ -64,7 +64,7 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade /// The [DdlManager] provides the ability to execute Ddl. pub struct DdlManager { procedure_manager: ProcedureManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, @@ -84,7 +84,7 @@ impl DdlManager { ) -> Result { let manager = Self { procedure_manager, - datanode_manager: datanode_clients, + node_manager: datanode_clients, cache_invalidator, table_metadata_manager, table_metadata_allocator, @@ -104,7 +104,7 @@ impl DdlManager { /// Returns the [DdlContext] pub fn create_context(&self) -> DdlContext { DdlContext { - datanode_manager: self.datanode_manager.clone(), + node_manager: self.node_manager.clone(), cache_invalidator: self.cache_invalidator.clone(), table_metadata_manager: self.table_metadata_manager.clone(), memory_region_keeper: self.memory_region_keeper.clone(), @@ -716,7 +716,6 @@ mod tests { use super::DdlManager; use crate::cache_invalidator::DummyCacheInvalidator; - use crate::datanode_manager::{DatanodeRef, FlownodeRef, NodeManager}; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; @@ -724,6 +723,7 @@ mod tests { use crate::ddl::truncate_table::TruncateTableProcedure; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; + use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 8aa8c8abecc4..385f46818a47 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -20,7 +20,6 @@ pub mod cache_invalidator; pub mod cluster; -pub mod datanode_manager; pub mod ddl; pub mod ddl_manager; pub mod distributed_time_constants; @@ -31,6 +30,7 @@ pub mod key; pub mod kv_backend; pub mod lock_key; pub mod metrics; +pub mod node_manager; pub mod peer; pub mod range_stream; pub mod region_keeper; diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/node_manager.rs similarity index 100% rename from src/common/meta/src/datanode_manager.rs rename to src/common/meta/src/node_manager.rs diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index c4c64445d796..3d282e8caff3 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -20,13 +20,13 @@ pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; -use crate::datanode_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager, NodeManagerRef}; use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::DdlContext; use crate::error::Result; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackendRef; +use crate::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager, NodeManagerRef}; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; @@ -88,20 +88,20 @@ impl NodeManager for MockDatanodeManager { } /// Returns a test purpose [DdlContext]. -pub fn new_ddl_context(datanode_manager: NodeManagerRef) -> DdlContext { +pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); - new_ddl_context_with_kv_backend(datanode_manager, kv_backend) + new_ddl_context_with_kv_backend(node_manager, kv_backend) } /// Returns a test purpose [DdlContext] with a specified [KvBackendRef]. pub fn new_ddl_context_with_kv_backend( - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, kv_backend: KvBackendRef, ) -> DdlContext { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); DdlContext { - datanode_manager, + node_manager, cache_invalidator: Arc::new(DummyCacheInvalidator), memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), table_metadata_allocator: Arc::new(TableMetadataAllocator::new( diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 2f39a0dd6795..719c5f33cc64 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -17,10 +17,10 @@ use std::sync::Arc; use catalog::CatalogManagerRef; use common_base::Plugins; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; -use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; +use common_meta::node_manager::NodeManagerRef; use operator::delete::Deleter; use operator::insert::Inserter; use operator::procedure::ProcedureServiceOperator; @@ -42,7 +42,7 @@ pub struct FrontendBuilder { kv_backend: KvBackendRef, cache_invalidator: Option, catalog_manager: CatalogManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, plugins: Option, procedure_executor: ProcedureExecutorRef, heartbeat_task: Option, @@ -52,14 +52,14 @@ impl FrontendBuilder { pub fn new( kv_backend: KvBackendRef, catalog_manager: CatalogManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, procedure_executor: ProcedureExecutorRef, ) -> Self { Self { kv_backend, cache_invalidator: None, catalog_manager, - datanode_manager, + node_manager, plugins: None, procedure_executor, heartbeat_task: None, @@ -89,7 +89,7 @@ impl FrontendBuilder { pub async fn try_build(self) -> Result { let kv_backend = self.kv_backend; - let datanode_manager = self.datanode_manager; + let node_manager = self.node_manager; let plugins = self.plugins.unwrap_or_default(); let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); @@ -99,22 +99,22 @@ impl FrontendBuilder { .unwrap_or_else(|| Arc::new(DummyCacheInvalidator)); let region_query_handler = - FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone()); + FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone()); let inserter = Arc::new(Inserter::new( self.catalog_manager.clone(), partition_manager.clone(), - datanode_manager.clone(), + node_manager.clone(), )); let deleter = Arc::new(Deleter::new( self.catalog_manager.clone(), partition_manager.clone(), - datanode_manager.clone(), + node_manager.clone(), )); let requester = Arc::new(Requester::new( self.catalog_manager.clone(), partition_manager, - datanode_manager.clone(), + node_manager.clone(), )); let table_mutation_handler = Arc::new(TableMutationOperator::new( inserter.clone(), diff --git a/src/frontend/src/instance/region_query.rs b/src/frontend/src/instance/region_query.rs index a6c21e35030a..3cbd07e75905 100644 --- a/src/frontend/src/instance/region_query.rs +++ b/src/frontend/src/instance/region_query.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use api::v1::region::QueryRequest; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_meta::datanode_manager::NodeManagerRef; +use common_meta::node_manager::NodeManagerRef; use common_recordbatch::SendableRecordBatchStream; use partition::manager::PartitionRuleManagerRef; use query::error::{RegionQuerySnafu, Result as QueryResult}; @@ -29,17 +29,17 @@ use crate::error::{FindTableRouteSnafu, RequestQuerySnafu, Result}; pub(crate) struct FrontendRegionQueryHandler { partition_manager: PartitionRuleManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, } impl FrontendRegionQueryHandler { pub fn arc( partition_manager: PartitionRuleManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, ) -> Arc { Arc::new(Self { partition_manager, - datanode_manager, + node_manager, }) } } @@ -66,7 +66,7 @@ impl FrontendRegionQueryHandler { table_id: region_id.table_id(), })?; - let client = self.datanode_manager.datanode(peer).await; + let client = self.node_manager.datanode(peer).await; client .handle_query(request) diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index fcb1bd61e42f..911c7fd30b11 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -19,8 +19,8 @@ use api::v1::region::{QueryRequest, RegionRequest, RegionResponse as RegionRespo use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; -use common_meta::datanode_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager}; use common_meta::error::{self as meta_error, Result as MetaResult}; +use common_meta::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager}; use common_meta::peer::Peer; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ea8613db4f0d..223ccf11d147 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -20,13 +20,13 @@ use client::client_manager::DatanodeClients; use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; use common_grpc::channel_manager::ChannelConfig; -use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; +use common_meta::node_manager::NodeManagerRef; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; @@ -79,7 +79,7 @@ pub struct MetasrvBuilder { election: Option, meta_peer_client: Option, lock: Option, - datanode_manager: Option, + node_manager: Option, plugins: Option, table_metadata_allocator: Option, } @@ -95,7 +95,7 @@ impl MetasrvBuilder { election: None, options: None, lock: None, - datanode_manager: None, + node_manager: None, plugins: None, table_metadata_allocator: None, } @@ -141,8 +141,8 @@ impl MetasrvBuilder { self } - pub fn datanode_manager(mut self, datanode_manager: NodeManagerRef) -> Self { - self.datanode_manager = Some(datanode_manager); + pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self { + self.node_manager = Some(node_manager); self } @@ -171,7 +171,7 @@ impl MetasrvBuilder { selector, handler_group, lock, - datanode_manager, + node_manager, plugins, table_metadata_allocator, } = self; @@ -236,7 +236,7 @@ impl MetasrvBuilder { let ddl_manager = build_ddl_manager( &options, - datanode_manager, + node_manager, &procedure_manager, &mailbox, &table_metadata_manager, diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 53e22ce6d4af..e9a3b58c8e63 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -70,7 +70,7 @@ pub async fn mock( }; let builder = match datanode_clients { - Some(clients) => builder.datanode_manager(clients), + Some(clients) => builder.node_manager(clients), None => builder, }; diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index ce2e5cda4d9d..028df5411091 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -21,7 +21,6 @@ use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef}; use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType}; use client::client_manager::DatanodeClients; use common_catalog::consts::MITO2_ENGINE; -use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState}; use common_meta::ddl::create_table::*; use common_meta::ddl::test_util::columns::TestColumnDefBuilder; @@ -29,6 +28,7 @@ use common_meta::ddl::test_util::create_table::{ build_raw_table_info_from_expr, TestCreateTableExprBuilder, }; use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; +use common_meta::node_manager::NodeManagerRef; use common_meta::rpc::ddl::CreateTableTask; use common_meta::rpc::router::{find_leaders, RegionRoute}; use common_procedure::Status; @@ -170,7 +170,7 @@ fn test_region_request_builder() { assert_eq!(template.template(), &expected); } -async fn new_datanode_manager( +async fn new_node_manager( region_server: &EchoRegionServer, region_routes: &[RegionRoute], ) -> NodeManagerRef { @@ -189,12 +189,12 @@ async fn new_datanode_manager( async fn test_on_datanode_create_regions() { let (region_server, mut rx) = EchoRegionServer::new(); let region_routes = test_data::new_region_routes(); - let datanode_manager = new_datanode_manager(®ion_server, ®ion_routes).await; + let node_manager = new_node_manager(®ion_server, ®ion_routes).await; let mut procedure = CreateTableProcedure::new( 1, create_table_task(None), - test_data::new_ddl_context(datanode_manager), + test_data::new_ddl_context(node_manager), ); procedure.set_allocated_metadata( @@ -241,7 +241,7 @@ async fn test_on_datanode_create_regions() { async fn test_on_datanode_create_logical_regions() { let (region_server, mut rx) = EchoRegionServer::new(); let region_routes = test_data::new_region_routes(); - let datanode_manager = new_datanode_manager(®ion_server, ®ion_routes).await; + let node_manager = new_node_manager(®ion_server, ®ion_routes).await; let physical_table_route = TableRouteValue::physical(region_routes); let physical_table_id = 1; @@ -249,7 +249,7 @@ async fn test_on_datanode_create_logical_regions() { let task2 = create_table_task(Some("my_table2")); let task3 = create_table_task(Some("my_table3")); - let ctx = test_data::new_ddl_context(datanode_manager); + let ctx = test_data::new_ddl_context(node_manager); let kv_backend = ctx.table_metadata_manager.kv_backend(); let physical_route_txn = ctx .table_metadata_manager diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index a79138e7a7c8..f614f33b00d8 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -105,11 +105,11 @@ pub mod test_data { use chrono::DateTime; use common_catalog::consts::MITO2_ENGINE; - use common_meta::datanode_manager::NodeManagerRef; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::DdlContext; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::node_manager::NodeManagerRef; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::RegionRoute; @@ -188,7 +188,7 @@ pub mod test_data { } } - pub(crate) fn new_ddl_context(datanode_manager: NodeManagerRef) -> DdlContext { + pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); let mailbox_sequence = @@ -197,7 +197,7 @@ pub mod test_data { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); DdlContext { - datanode_manager, + node_manager, cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( mailbox, MetasrvInfo { diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 309edf4146b0..ecd3ec23c573 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -19,7 +19,7 @@ use std::{iter, mem}; use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader}; use api::v1::{DeleteRequests, RowDeleteRequests}; use catalog::CatalogManagerRef; -use common_meta::datanode_manager::{AffectedRows, NodeManagerRef}; +use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_query::Output; use common_telemetry::tracing_context::TracingContext; @@ -40,7 +40,7 @@ use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion}; pub struct Deleter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, } pub type DeleterRef = Arc; @@ -49,12 +49,12 @@ impl Deleter { pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, ) -> Self { Self { catalog_manager, partition_manager, - datanode_manager, + node_manager, } } @@ -133,9 +133,9 @@ impl Deleter { .into_iter() .map(|(peer, deletes)| { let request = request_factory.build_delete(deletes); - let datanode_manager = self.datanode_manager.clone(); + let node_manager = self.node_manager.clone(); common_runtime::spawn_write(async move { - datanode_manager + node_manager .datanode(&peer) .await .handle(request) diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 95ff84f13a1b..5b2ac304d96e 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -25,7 +25,7 @@ use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; -use common_meta::datanode_manager::{AffectedRows, NodeManagerRef}; +use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; @@ -57,7 +57,7 @@ use crate::statement::StatementExecutor; pub struct Inserter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, } pub type InserterRef = Arc; @@ -66,12 +66,12 @@ impl Inserter { pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, ) -> Self { Self { catalog_manager, partition_manager, - datanode_manager, + node_manager, } } @@ -205,9 +205,9 @@ impl Inserter { .into_iter() .map(|(peer, inserts)| { let request = request_factory.build_insert(inserts); - let datanode_manager = self.datanode_manager.clone(); + let node_manager = self.node_manager.clone(); common_runtime::spawn_write(async move { - datanode_manager + node_manager .datanode(&peer) .await .handle(request) diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index 768a2850aac3..b25228b5d068 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -18,7 +18,7 @@ use api::v1::region::region_request::Body as RegionRequestBody; use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader}; use catalog::CatalogManagerRef; use common_catalog::build_db_string; -use common_meta::datanode_manager::{AffectedRows, NodeManagerRef}; +use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_telemetry::logging::{error, info}; use common_telemetry::tracing_context::TracingContext; @@ -39,7 +39,7 @@ use crate::region_req_factory::RegionRequestFactory; pub struct Requester { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, } pub type RequesterRef = Arc; @@ -48,12 +48,12 @@ impl Requester { pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, - datanode_manager: NodeManagerRef, + node_manager: NodeManagerRef, ) -> Self { Self { catalog_manager, partition_manager, - datanode_manager, + node_manager, } } @@ -168,11 +168,11 @@ impl Requester { let tasks = requests.into_iter().map(|req_body| { let request = request_factory.build_request(req_body.clone()); let partition_manager = self.partition_manager.clone(); - let datanode_manager = self.datanode_manager.clone(); + let node_manager = self.node_manager.clone(); common_runtime::spawn_write(async move { let peer = Self::find_region_leader_by_request(partition_manager, &req_body).await?; - datanode_manager + node_manager .datanode(&peer) .await .handle(request) diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 72b36dad5da1..79fbef604e24 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -130,7 +130,7 @@ impl GreptimeDbStandaloneBuilder { let catalog_manager = KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await; - let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); + let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); let table_id_sequence = Arc::new( SequenceBuilder::new("table_id", kv_backend.clone()) @@ -150,7 +150,7 @@ impl GreptimeDbStandaloneBuilder { let ddl_task_executor = Arc::new( DdlManager::try_new( procedure_manager.clone(), - datanode_manager.clone(), + node_manager.clone(), multi_cache_invalidator, table_metadata_manager, table_meta_allocator, @@ -163,7 +163,7 @@ impl GreptimeDbStandaloneBuilder { let instance = FrontendBuilder::new( kv_backend.clone(), catalog_manager, - datanode_manager, + node_manager, ddl_task_executor, ) .with_plugin(plugins)