diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 27cc19fd6655..0ce6e72bb6f2 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -22,7 +22,8 @@ use common_config::wal::StandaloneWalConfig; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; -use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef}; +use common_meta::ddl::table_meta::TableMetadataAllocator; +use common_meta::ddl::DdlTaskExecutorRef; use common_meta::ddl_manager::DdlManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; @@ -38,7 +39,6 @@ use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; -use frontend::instance::standalone::StandaloneTableMetadataAllocator; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, @@ -406,10 +406,8 @@ impl StartCommand { opts.wal_meta.clone(), kv_backend.clone(), )); - let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( - table_id_sequence, - wal_options_allocator.clone(), - )); + let table_meta_allocator = + TableMetadataAllocator::new(table_id_sequence, wal_options_allocator.clone()); let ddl_task_executor = Self::create_ddl_task_executor( kv_backend.clone(), @@ -446,7 +444,7 @@ impl StartCommand { kv_backend: KvBackendRef, procedure_manager: ProcedureManagerRef, datanode_manager: DatanodeManagerRef, - table_meta_allocator: TableMetadataAllocatorRef, + table_meta_allocator: TableMetadataAllocator, ) -> Result { let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index bb5220724ab6..05e076f8ae47 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -24,11 +24,12 @@ use crate::error::Result; use crate::key::table_route::TableRouteValue; use crate::key::TableMetadataManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; -use crate::rpc::ddl::{CreateTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; pub mod alter_table; pub mod create_table; pub mod drop_table; +pub mod table_meta; pub mod truncate_table; pub mod utils; @@ -64,17 +65,6 @@ pub struct TableMetadata { pub region_wal_options: HashMap, } -#[async_trait::async_trait] -pub trait TableMetadataAllocator: Send + Sync { - async fn create( - &self, - ctx: &TableMetadataAllocatorContext, - task: &CreateTableTask, - ) -> Result; -} - -pub type TableMetadataAllocatorRef = Arc; - #[derive(Clone)] pub struct DdlContext { pub datanode_manager: DatanodeManagerRef, diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs new file mode 100644 index 000000000000..8bd460d98a74 --- /dev/null +++ b/src/common/meta/src/ddl/table_meta.rs @@ -0,0 +1,190 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use common_catalog::consts::METRIC_ENGINE; +use common_telemetry::{debug, info}; +use snafu::ensure; +use store_api::storage::{RegionId, RegionNumber, TableId}; + +use crate::ddl::{TableMetadata, TableMetadataAllocatorContext}; +use crate::error::{Result, UnsupportedSnafu}; +use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue}; +use crate::peer::Peer; +use crate::rpc::ddl::CreateTableTask; +use crate::rpc::router::{Region, RegionRoute}; +use crate::sequence::SequenceRef; +use crate::wal::{allocate_region_wal_options, WalOptionsAllocatorRef}; + +pub struct TableMetadataAllocator { + table_id_sequence: SequenceRef, + wal_options_allocator: WalOptionsAllocatorRef, + peer_allocator: PeerAllocatorRef, +} + +impl TableMetadataAllocator { + pub fn new( + table_id_sequence: SequenceRef, + wal_options_allocator: WalOptionsAllocatorRef, + ) -> Self { + Self::with_peer_allocator( + table_id_sequence, + wal_options_allocator, + Arc::new(NoopPeerAllocator), + ) + } + + pub fn with_peer_allocator( + table_id_sequence: SequenceRef, + wal_options_allocator: WalOptionsAllocatorRef, + peer_allocator: PeerAllocatorRef, + ) -> Self { + Self { + table_id_sequence, + wal_options_allocator, + peer_allocator, + } + } + + async fn allocate_table_id(&self, task: &CreateTableTask) -> Result { + let table_id = if let Some(table_id) = &task.create_table.table_id { + let table_id = table_id.id; + + ensure!( + !self + .table_id_sequence + .min_max() + .await + .contains(&(table_id as u64)), + UnsupportedSnafu { + operation: format!( + "create table by id {} that is reserved in this node", + table_id + ) + } + ); + + info!( + "Received explicitly allocated table id {}, will use it directly.", + table_id + ); + + table_id + } else { + self.table_id_sequence.next().await? as TableId + }; + Ok(table_id) + } + + fn create_wal_options( + &self, + table_route: &TableRouteValue, + ) -> Result> { + match table_route { + TableRouteValue::Physical(x) => { + let region_numbers = x + .region_routes + .iter() + .map(|route| route.region.id.region_number()) + .collect(); + allocate_region_wal_options(region_numbers, &self.wal_options_allocator) + } + TableRouteValue::Logical(_) => Ok(HashMap::new()), + } + } + + async fn create_table_route( + &self, + ctx: &TableMetadataAllocatorContext, + table_id: TableId, + task: &CreateTableTask, + ) -> Result { + let regions = task.partitions.len(); + + let table_route = if task.create_table.engine == METRIC_ENGINE { + TableRouteValue::Logical(LogicalTableRouteValue {}) + } else { + let peers = self.peer_allocator.alloc(ctx, regions).await?; + + let region_routes = task + .partitions + .iter() + .enumerate() + .map(|(i, partition)| { + let region = Region { + id: RegionId::new(table_id, i as u32), + partition: Some(partition.clone().into()), + ..Default::default() + }; + + let peer = peers[i % peers.len()].clone(); + + RegionRoute { + region, + leader_peer: Some(peer), + ..Default::default() + } + }) + .collect::>(); + TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes)) + }; + Ok(table_route) + } + pub async fn create( + &self, + ctx: &TableMetadataAllocatorContext, + task: &CreateTableTask, + ) -> Result { + let table_id = self.allocate_table_id(task).await?; + let table_route = self.create_table_route(ctx, table_id, task).await?; + let region_wal_options = self.create_wal_options(&table_route)?; + + debug!( + "Allocated region wal options {:?} for table {}", + region_wal_options, table_id + ); + + Ok(TableMetadata { + table_id, + table_route, + region_wal_options, + }) + } +} + +pub type PeerAllocatorRef = Arc; + +/// [PeerAllocator] allocates [Peer]s for creating regions. +#[async_trait] +pub trait PeerAllocator: Send + Sync { + /// Allocates `regions` size [Peer]s. + async fn alloc(&self, ctx: &TableMetadataAllocatorContext, regions: usize) + -> Result>; +} + +struct NoopPeerAllocator; + +#[async_trait] +impl PeerAllocator for NoopPeerAllocator { + async fn alloc( + &self, + _ctx: &TableMetadataAllocatorContext, + regions: usize, + ) -> Result> { + Ok(vec![Peer::default(); regions]) + } +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 6b1e4bf94f38..af669797f4d4 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -26,10 +26,10 @@ use crate::datanode_manager::DatanodeManagerRef; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; +use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{ DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, TableMetadataAllocatorContext, - TableMetadataAllocatorRef, }; use crate::error::{ self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu, @@ -54,7 +54,7 @@ pub struct DdlManager { datanode_manager: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocatorRef, + table_metadata_allocator: TableMetadataAllocator, memory_region_keeper: MemoryRegionKeeperRef, } @@ -65,7 +65,7 @@ impl DdlManager { datanode_clients: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocatorRef, + table_metadata_allocator: TableMetadataAllocator, memory_region_keeper: MemoryRegionKeeperRef, ) -> Result { let manager = Self { @@ -461,15 +461,15 @@ mod tests { use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; + use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; - use crate::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; - use crate::error::Result; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; - use crate::rpc::ddl::CreateTableTask; + use crate::sequence::SequenceBuilder; use crate::state_store::KvStateStore; + use crate::wal::WalOptionsAllocator; /// A dummy implemented [DatanodeManager]. pub struct DummyDatanodeManager; @@ -481,26 +481,12 @@ mod tests { } } - /// A dummy implemented [TableMetadataAllocator]. - pub struct DummyTableMetadataAllocator; - - #[async_trait::async_trait] - impl TableMetadataAllocator for DummyTableMetadataAllocator { - async fn create( - &self, - _ctx: &TableMetadataAllocatorContext, - _task: &CreateTableTask, - ) -> Result { - unimplemented!() - } - } - #[test] fn test_try_new() { let kv_backend = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - let state_store = Arc::new(KvStateStore::new(kv_backend)); + let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store)); let _ = DdlManager::try_new( @@ -508,7 +494,10 @@ mod tests { Arc::new(DummyDatanodeManager), Arc::new(DummyCacheInvalidator), table_metadata_manager, - Arc::new(DummyTableMetadataAllocator), + TableMetadataAllocator::new( + Arc::new(SequenceBuilder::new("test", kv_backend).build()), + Arc::new(WalOptionsAllocator::default()), + ), Arc::new(MemoryRegionKeeper::default()), ); diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 21496e28edc5..b7a067d49c16 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -12,33 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use api::v1::region::{QueryRequest, RegionRequest, RegionResponse}; use async_trait::async_trait; use client::region::check_response_header; -use common_catalog::consts::METRIC_ENGINE; use common_error::ext::BoxedError; use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef}; -use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; -use common_meta::error::{self as meta_error, Result as MetaResult, UnsupportedSnafu}; -use common_meta::key::table_route::{ - LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue, -}; +use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::peer::Peer; -use common_meta::rpc::ddl::CreateTableTask; -use common_meta::rpc::router::{Region, RegionRoute}; -use common_meta::sequence::SequenceRef; -use common_meta::wal::options_allocator::allocate_region_wal_options; -use common_meta::wal::WalOptionsAllocatorRef; use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::tracing; use common_telemetry::tracing_context::{FutureExt, TracingContext}; -use common_telemetry::{debug, info, tracing}; use datanode::region_server::RegionServer; use servers::grpc::region_server::RegionServerHandler; -use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::{RegionId, RegionNumber, TableId}; +use snafu::{OptionExt, ResultExt}; use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; @@ -109,121 +97,3 @@ impl Datanode for RegionInvoker { .context(meta_error::ExternalSnafu) } } - -pub struct StandaloneTableMetadataAllocator { - table_id_sequence: SequenceRef, - wal_options_allocator: WalOptionsAllocatorRef, -} - -impl StandaloneTableMetadataAllocator { - pub fn new( - table_id_sequence: SequenceRef, - wal_options_allocator: WalOptionsAllocatorRef, - ) -> Self { - Self { - table_id_sequence, - wal_options_allocator, - } - } - - async fn allocate_table_id(&self, task: &CreateTableTask) -> MetaResult { - let table_id = if let Some(table_id) = &task.create_table.table_id { - let table_id = table_id.id; - - ensure!( - !self - .table_id_sequence - .min_max() - .await - .contains(&(table_id as u64)), - UnsupportedSnafu { - operation: format!( - "create table by id {} that is reserved in this node", - table_id - ) - } - ); - - info!( - "Received explicitly allocated table id {}, will use it directly.", - table_id - ); - - table_id - } else { - self.table_id_sequence.next().await? as TableId - }; - Ok(table_id) - } - - fn create_wal_options( - &self, - table_route: &TableRouteValue, - ) -> MetaResult> { - match table_route { - TableRouteValue::Physical(x) => { - let region_numbers = x - .region_routes - .iter() - .map(|route| route.region.id.region_number()) - .collect(); - allocate_region_wal_options(region_numbers, &self.wal_options_allocator) - } - TableRouteValue::Logical(_) => Ok(HashMap::new()), - } - } -} - -fn create_table_route(table_id: TableId, task: &CreateTableTask) -> TableRouteValue { - if task.create_table.engine == METRIC_ENGINE { - TableRouteValue::Logical(LogicalTableRouteValue {}) - } else { - let region_routes = task - .partitions - .iter() - .enumerate() - .map(|(i, partition)| { - let region = Region { - id: RegionId::new(table_id, i as u32), - partition: Some(partition.clone().into()), - ..Default::default() - }; - // It's only a placeholder. - let peer = Peer::default(); - RegionRoute { - region, - leader_peer: Some(peer), - follower_peers: vec![], - leader_status: None, - } - }) - .collect::>(); - TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes)) - } -} - -#[async_trait] -impl TableMetadataAllocator for StandaloneTableMetadataAllocator { - async fn create( - &self, - _ctx: &TableMetadataAllocatorContext, - task: &CreateTableTask, - ) -> MetaResult { - let table_id = self.allocate_table_id(task).await?; - - let table_route = create_table_route(table_id, task); - - let region_wal_options = self.create_wal_options(&table_route)?; - - debug!( - "Allocated region wal options {:?} for table {}", - region_wal_options, table_id - ); - - Ok(TableMetadata { - table_id, - table_route, - region_wal_options, - }) - } -} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 4d6782a2b630..c93a97cad20e 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -18,13 +18,13 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::Peer; use common_base::Plugins; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::DdlTaskExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; +use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::wal::options_allocator::WalOptionsAllocatorRef; use common_meta::wal::WalConfig; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 105e9dab0017..22c8040874f5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -21,7 +21,7 @@ use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; use common_grpc::channel_manager::ChannelConfig; use common_meta::datanode_manager::DatanodeManagerRef; -use common_meta::ddl::TableMetadataAllocatorRef; +use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -62,7 +62,7 @@ use crate::selector::lease_based::LeaseBasedSelector; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend}; use crate::state::State; -use crate::table_meta_alloc::MetaSrvTableMetadataAllocator; +use crate::table_meta_alloc::MetasrvPeerAllocator; // TODO(fys): try use derive_builder macro pub struct MetaSrvBuilder { @@ -76,7 +76,7 @@ pub struct MetaSrvBuilder { lock: Option, datanode_manager: Option, plugins: Option, - table_metadata_allocator: Option, + table_metadata_allocator: Option, } impl MetaSrvBuilder { @@ -148,7 +148,7 @@ impl MetaSrvBuilder { pub fn table_metadata_allocator( mut self, - table_metadata_allocator: TableMetadataAllocatorRef, + table_metadata_allocator: TableMetadataAllocator, ) -> Self { self.table_metadata_allocator = Some(table_metadata_allocator); self @@ -216,12 +216,15 @@ impl MetaSrvBuilder { .step(10) .build(), ); - Arc::new(MetaSrvTableMetadataAllocator::new( + let peer_allocator = Arc::new(MetasrvPeerAllocator::new( selector_ctx.clone(), selector.clone(), - sequence.clone(), + )); + TableMetadataAllocator::with_peer_allocator( + sequence, wal_options_allocator.clone(), - )) + peer_allocator, + ) }); let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); @@ -368,7 +371,7 @@ fn build_ddl_manager( procedure_manager: &ProcedureManagerRef, mailbox: &MailboxRef, table_metadata_manager: &TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocatorRef, + table_metadata_allocator: TableMetadataAllocator, memory_region_keeper: &MemoryRegionKeeperRef, ) -> Result { let datanode_clients = datanode_clients.unwrap_or_else(|| { diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 7566ce7f3fec..50ab0e742307 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -383,12 +383,13 @@ mod tests { use std::sync::Mutex; use api::v1::meta::mailbox_message::Payload; - use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader}; + use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::ddl::utils::region_storage_path; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::peer::Peer; use common_meta::sequence::SequenceBuilder; use common_meta::DatanodeId; use common_procedure::{BoxedProcedure, ProcedureId}; diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs index d3c2dea8f261..003cd7e4b9c8 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_start.rs @@ -59,7 +59,7 @@ impl RegionFailoverStart { .iter() .filter_map(|p| { if p.id != failed_region.datanode_id { - Some(p.clone().into()) + Some(p.clone()) } else { None } diff --git a/src/meta-srv/src/selector/common.rs b/src/meta-srv/src/selector/common.rs index 8d807b7eb26c..cccdcd391282 100644 --- a/src/meta-srv/src/selector/common.rs +++ b/src/meta-srv/src/selector/common.rs @@ -14,7 +14,7 @@ use std::collections::HashSet; -use api::v1::meta::Peer; +use common_meta::peer::Peer; use snafu::ensure; use super::weighted_choose::{WeightedChoose, WeightedItem}; @@ -92,7 +92,7 @@ where mod tests { use std::collections::HashSet; - use api::v1::meta::Peer; + use common_meta::peer::Peer; use crate::selector::common::choose_peers; use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem}; diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index 268371a003f7..bdfffacf0529 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::Peer; +use common_meta::peer::Peer; use crate::error::Result; use crate::lease; diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index a5f5beeacd35..e8b3dcdf9e97 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -14,8 +14,8 @@ use std::collections::HashMap; -use api::v1::meta::Peer; use common_meta::key::TableMetadataManager; +use common_meta::peer::Peer; use common_meta::rpc::router::find_leaders; use common_telemetry::{debug, info}; use parking_lot::RwLock; diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index 10e4b50035c4..c897ebf17412 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use api::v1::meta::Peer; +use common_meta::peer::Peer; use itertools::{Itertools, MinMaxResult}; use crate::keys::{StatKey, StatValue}; @@ -92,7 +92,7 @@ impl WeightCompute for RegionNumsBasedWeightCompute { mod tests { use std::collections::HashMap; - use api::v1::meta::Peer; + use common_meta::peer::Peer; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index 21e5778209f7..636db1b7d6b2 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -12,154 +12,72 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - -use common_catalog::consts::METRIC_ENGINE; +use async_trait::async_trait; use common_error::ext::BoxedError; -use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; +use common_meta::ddl::table_meta::PeerAllocator; +use common_meta::ddl::TableMetadataAllocatorContext; use common_meta::error::{ExternalSnafu, Result as MetaResult}; -use common_meta::key::table_route::{ - LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue, -}; -use common_meta::rpc::ddl::CreateTableTask; -use common_meta::rpc::router::{Region, RegionRoute}; -use common_meta::sequence::SequenceRef; -use common_meta::wal::{allocate_region_wal_options, WalOptionsAllocatorRef}; -use common_meta::ClusterId; -use common_telemetry::debug; +use common_meta::peer::Peer; use snafu::{ensure, ResultExt}; -use store_api::storage::{RegionId, RegionNumber, TableId, MAX_REGION_SEQ}; +use store_api::storage::MAX_REGION_SEQ; use crate::error::{self, Result, TooManyPartitionsSnafu}; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::selector::SelectorOptions; -pub struct MetaSrvTableMetadataAllocator { +pub struct MetasrvPeerAllocator { ctx: SelectorContext, selector: SelectorRef, - table_id_sequence: SequenceRef, - wal_options_allocator: WalOptionsAllocatorRef, } -impl MetaSrvTableMetadataAllocator { - pub fn new( - ctx: SelectorContext, - selector: SelectorRef, - table_id_sequence: SequenceRef, - wal_options_allocator: WalOptionsAllocatorRef, - ) -> Self { - Self { - ctx, - selector, - table_id_sequence, - wal_options_allocator, - } +impl MetasrvPeerAllocator { + pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self { + Self { ctx, selector } } - async fn create_table_route( + async fn alloc( &self, - cluster_id: ClusterId, - table_id: TableId, - task: &CreateTableTask, - ) -> Result { - let table_route = if task.create_table.engine == METRIC_ENGINE { - TableRouteValue::Logical(LogicalTableRouteValue {}) - } else { - let regions = task.partitions.len(); - - ensure!(regions <= MAX_REGION_SEQ as usize, TooManyPartitionsSnafu); - - let mut peers = self - .selector - .select( - cluster_id, - &self.ctx, - SelectorOptions { - min_required_items: regions, - allow_duplication: true, - }, - ) - .await?; - - ensure!( - peers.len() >= regions, - error::NoEnoughAvailableDatanodeSnafu { - required: regions, - available: peers.len(), - } - ); - - peers.truncate(regions); - - let region_routes = task - .partitions - .iter() - .enumerate() - .map(|(i, partition)| { - let region = Region { - id: RegionId::new(table_id, i as RegionNumber), - partition: Some(partition.clone().into()), - ..Default::default() - }; - - let peer = peers[i % peers.len()].clone(); + ctx: &TableMetadataAllocatorContext, + regions: usize, + ) -> Result> { + ensure!(regions <= MAX_REGION_SEQ as usize, TooManyPartitionsSnafu); + + let mut peers = self + .selector + .select( + ctx.cluster_id, + &self.ctx, + SelectorOptions { + min_required_items: regions, + allow_duplication: true, + }, + ) + .await?; + + ensure!( + peers.len() >= regions, + error::NoEnoughAvailableDatanodeSnafu { + required: regions, + available: peers.len(), + } + ); - RegionRoute { - region, - leader_peer: Some(peer.into()), - ..Default::default() - } - }) - .collect::>(); - TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes)) - }; - Ok(table_route) - } + peers.truncate(regions); - fn create_wal_options( - &self, - table_route: &TableRouteValue, - ) -> MetaResult> { - match table_route { - TableRouteValue::Physical(x) => { - let region_numbers = x - .region_routes - .iter() - .map(|route| route.region.id.region_number()) - .collect(); - allocate_region_wal_options(region_numbers, &self.wal_options_allocator) - } - TableRouteValue::Logical(_) => Ok(HashMap::new()), - } + Ok(peers) } } -#[async_trait::async_trait] -impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { - async fn create( +#[async_trait] +impl PeerAllocator for MetasrvPeerAllocator { + async fn alloc( &self, ctx: &TableMetadataAllocatorContext, - task: &CreateTableTask, - ) -> MetaResult { - let table_id = self.table_id_sequence.next().await? as TableId; - - let table_route = self - .create_table_route(ctx.cluster_id, table_id, task) + regions: usize, + ) -> MetaResult> { + self.alloc(ctx, regions) .await .map_err(BoxedError::new) - .context(ExternalSnafu)?; - - let region_wal_options = self.create_wal_options(&table_route)?; - - debug!( - "Allocated region wal options {:?} for table {}", - region_wal_options, table_id - ); - - Ok(TableMetadata { - table_id, - table_route, - region_wal_options, - }) + .context(ExternalSnafu) } } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 14afcb2ca19a..072ff2282099 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -19,6 +19,7 @@ use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::KvBackendConfig; use common_meta::cache_invalidator::DummyCacheInvalidator; +use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; use common_meta::region_keeper::MemoryRegionKeeper; @@ -30,7 +31,6 @@ use datanode::config::DatanodeOptions; use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; -use frontend::instance::standalone::StandaloneTableMetadataAllocator; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -123,10 +123,8 @@ impl GreptimeDbStandaloneBuilder { wal_meta.clone(), kv_backend.clone(), )); - let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( - table_id_sequence, - wal_options_allocator.clone(), - )); + let table_meta_allocator = + TableMetadataAllocator::new(table_id_sequence, wal_options_allocator.clone()); let ddl_task_executor = Arc::new( DdlManager::try_new( diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index be1ea3ba92f3..c8cfdc12796f 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -15,11 +15,11 @@ use std::sync::Arc; use std::time::Duration; -use api::v1::meta::Peer; use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::table_route::TableRouteKey; use common_meta::key::{RegionDistribution, TableMetaKey}; +use common_meta::peer::Peer; use common_meta::{distributed_time_constants, RegionIdent}; use common_procedure::{watcher, ProcedureWithId}; use common_query::Output;