diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 9127f4502698..56412a63b243 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -209,7 +209,7 @@ mod tests { Some("mybucket"), ), ( - // wal.dir = /other/wal/dir + // wal.raft_engine_opts.dir = /other/wal/dir [ env_prefix.to_string(), "wal".to_uppercase(), diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 652fe309e9bf..af19ce0eef63 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -25,6 +25,7 @@ use crate::error::Result; use crate::key::TableMetadataManagerRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::router::RegionRoute; +use crate::wal::meta::WalMeta; pub mod alter_table; pub mod create_table; @@ -32,8 +33,6 @@ pub mod drop_table; pub mod truncate_table; pub mod utils; -use crate::wal::kafka::KafkaTopic; - #[derive(Debug, Default)] pub struct ExecutorContext { pub cluster_id: Option, @@ -58,7 +57,7 @@ pub struct TableMetadataAllocatorContext { pub struct TableMetadata { pub table_id: TableId, pub region_routes: Vec, - pub region_topics: Option>, + pub wal_meta: WalMeta, } #[async_trait::async_trait] diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 4599783cc12a..837c00a69f73 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -37,9 +37,11 @@ use crate::key::table_name::TableNameKey; use crate::metrics; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; -use crate::wal::kafka::KafkaTopic; +use crate::wal::meta::WalMeta; -const TOPIC_KEY: &str = "kafka_topic"; +// TODO(niebayes): remove `WAL_PROVIDER_KEY` since there's no need to store wal provider in the region options any more. +pub const WAL_PROVIDER_KEY: &str = "wal_provider"; +pub const TOPIC_KEY: &str = "kafka_topic"; pub struct CreateTableProcedure { pub context: DdlContext, @@ -53,12 +55,12 @@ impl CreateTableProcedure { cluster_id: u64, task: CreateTableTask, region_routes: Vec, - region_topics: Option>, + wal_meta: WalMeta, context: DdlContext, ) -> Self { Self { context, - creator: TableCreator::new(cluster_id, task, region_routes, region_topics), + creator: TableCreator::new(cluster_id, task, region_routes, wal_meta), } } @@ -174,7 +176,7 @@ impl CreateTableProcedure { pub async fn on_datanode_create_regions(&mut self) -> Result { let create_table_data = &self.creator.data; let region_routes = &create_table_data.region_routes; - let region_topics = create_table_data.region_topics.as_ref(); + let region_topics = &create_table_data.wal_meta.region_topics; let create_table_expr = &create_table_data.task.create_table; let catalog = &create_table_expr.catalog_name; @@ -190,6 +192,11 @@ impl CreateTableProcedure { let requester = self.context.datanode_manager.datanode(&datanode).await; let regions = find_leader_regions(region_routes, &datanode); + // Safety: `TableMetadataAllocator` ensures the region routes and topics are of the same length. + // Besides, `find_leader_regions` may filter out some regions. Therefore, the following condition must be met + // and the indexing on `region_topics` is safe definitely. + assert!(regions.len() <= region_topics.len()); + let requests = regions .iter() .enumerate() @@ -200,11 +207,7 @@ impl CreateTableProcedure { create_region_request.region_id = region_id.as_u64(); create_region_request.path = storage_path.clone(); - if let Some(region_topics) = region_topics { - // Safety: `TableMetadataAllocator` ensures the region routes and topics are of the same length. - // and hence the following indexing operation is safe. - assert_eq!(region_routes.len(), region_topics.len()); - + if !region_topics.is_empty() { create_region_request .options .insert(TOPIC_KEY.to_string(), region_topics[i].clone()); @@ -306,7 +309,7 @@ impl TableCreator { cluster_id: u64, task: CreateTableTask, region_routes: Vec, - region_topics: Option>, + wal_meta: WalMeta, ) -> Self { Self { data: CreateTableData { @@ -314,7 +317,7 @@ impl TableCreator { cluster_id, task, region_routes, - region_topics, + wal_meta, }, } } @@ -336,7 +339,7 @@ pub struct CreateTableData { pub task: CreateTableTask, pub cluster_id: u64, pub region_routes: Vec, - pub region_topics: Option>, + pub wal_meta: WalMeta, } impl CreateTableData { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index ad9bd061415f..acd83c61af0a 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -43,7 +43,7 @@ use crate::rpc::ddl::{ TruncateTableTask, }; use crate::rpc::router::RegionRoute; -use crate::wal::kafka::KafkaTopic; +use crate::wal::meta::WalMeta; pub type DdlManagerRef = Arc; @@ -165,7 +165,7 @@ impl DdlManager { cluster_id: u64, create_table_task: CreateTableTask, region_routes: Vec, - region_topics: Option>, + wal_meta: WalMeta, ) -> Result { let context = self.create_context(); @@ -173,7 +173,7 @@ impl DdlManager { cluster_id, create_table_task, region_routes, - region_topics, + wal_meta, context, ); @@ -379,11 +379,11 @@ async fn handle_create_table_task( let TableMetadata { table_id, region_routes, - region_topics, + wal_meta, } = table_metadata; let id = ddl_manager - .submit_create_table_task(cluster_id, create_table_task, region_routes, region_topics) + .submit_create_table_task(cluster_id, create_table_task, region_routes, wal_meta) .await?; info!("Table: {table_id:?} is created via procedure_id {id:?}"); diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 264357ce6504..7650b00c6f4a 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -324,6 +324,9 @@ pub enum Error { #[snafu(source)] error: RsKafkaError, }, + + #[snafu(display("Missing required Kafka topic manager"))] + MissingKafkaTopicManager { location: Location }, } pub type Result = std::result::Result; @@ -382,14 +385,15 @@ impl ErrorExt for Error { InvalidCatalogValue { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), - Error::MissingKafkaOpts { .. } - | Error::DeserKafkaTopics { .. } + Error::DeserKafkaTopics { .. } | Error::SerKafkaTopics { .. } | Error::InvalidNumTopics { .. } | Error::BuildKafkaClient { .. } | Error::BuildKafkaCtrlClient { .. } | Error::TooManyCreatedKafkaTopics { .. } - | Error::CreateKafkaTopic { .. } => StatusCode::Unexpected, + | Error::CreateKafkaTopic { .. } + | Error::MissingKafkaOpts { .. } + | Error::MissingKafkaTopicManager { .. } => StatusCode::Unexpected, } } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index aa07fd0ba641..1f55d5165df8 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -13,13 +13,15 @@ // limitations under the License. pub mod kafka; +pub mod meta; use serde::{Deserialize, Serialize}; use crate::wal::kafka::KafkaOptions; -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default)] pub enum WalProvider { + #[default] RaftEngine, Kafka, } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index f654fed22295..684db9c6e63f 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -59,8 +59,8 @@ impl TopicManager { }) } - pub fn select_topics(&self, num_regions: usize) -> Vec { - (0..num_regions) + pub fn select_topics(&self, num_topics: usize) -> Vec { + (0..num_topics) .map(|_| self.topic_selector.select(&self.topic_pool)) .collect() } diff --git a/src/common/meta/src/wal/meta.rs b/src/common/meta/src/wal/meta.rs new file mode 100644 index 000000000000..f3e6f0580ed4 --- /dev/null +++ b/src/common/meta/src/wal/meta.rs @@ -0,0 +1,82 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod demand_builder; + +pub use demand_builder::WalMetaDemandBuilder; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; + +use crate::error::{MissingKafkaTopicManagerSnafu, Result}; +use crate::kv_backend::KvBackendRef; +use crate::wal::kafka::{KafkaTopic as Topic, KafkaTopicManager as TopicManager}; +use crate::wal::{WalOptions, WalProvider}; + +/// Wal metadata allocated to a table. +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct WalMeta { + pub region_topics: Vec, +} + +/// The allocator user shall state what wal meta it demands. +pub struct WalMetaDemand { + pub num_topics: Option, +} + +/// The allocator responsible for allocating wal metadata for a table. +#[derive(Default)] +pub struct WalMetaAllocator { + wal_provider: WalProvider, + topic_manager: Option, +} + +impl WalMetaAllocator { + pub async fn try_new(wal_opts: &WalOptions, kv_backend: &KvBackendRef) -> Result { + let mut this = Self { + wal_provider: wal_opts.provider.clone(), + ..Default::default() + }; + + match this.wal_provider { + WalProvider::RaftEngine => {} + WalProvider::Kafka => { + let topic_manager = + TopicManager::try_new(wal_opts.kafka_opts.as_ref(), kv_backend).await?; + this.topic_manager = Some(topic_manager); + } + } + + Ok(this) + } + + /// Allocate wal meta according to the wal provider. + pub async fn try_alloc(&self, demand: WalMetaDemand) -> Result { + let mut allocated = WalMeta::default(); + + if let Some(num_topics) = demand.num_topics.as_ref() { + allocated.region_topics = self.try_alloc_topics(*num_topics).await?; + } + + Ok(allocated) + } + + pub async fn try_alloc_topics(&self, num_topics: usize) -> Result> { + let topics = self + .topic_manager + .as_ref() + .context(MissingKafkaTopicManagerSnafu)? + .select_topics(num_topics); + Ok(topics) + } +} diff --git a/src/common/meta/src/wal/meta/demand_builder.rs b/src/common/meta/src/wal/meta/demand_builder.rs new file mode 100644 index 000000000000..243bc3cf709e --- /dev/null +++ b/src/common/meta/src/wal/meta/demand_builder.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::wal::meta::WalMetaDemand; + +#[derive(Default)] +pub struct WalMetaDemandBuilder { + num_topics: Option, +} + +impl WalMetaDemandBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn with_num_topics(&mut self, num_topics: usize) -> &mut Self { + self.num_topics = Some(num_topics); + self + } + + pub fn build(&self) -> WalMetaDemand { + WalMetaDemand { + num_topics: self.num_topics, + } + } +} diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index c2e7d86aeda8..6d11b5d544d5 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -26,6 +26,7 @@ use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::{Sequence, SequenceRef}; +use common_meta::wal::meta::WalMeta; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing; use common_telemetry::tracing_context::{FutureExt, TracingContext}; @@ -152,7 +153,7 @@ impl TableMetadataAllocator for StandaloneTableMetadataCreator { Ok(TableMetadata { table_id, region_routes, - region_topics: None, + wal_meta: WalMeta::default(), }) } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 17eb63979925..ca57ab0628bb 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -579,6 +579,12 @@ pub enum Error { source: common_meta::error::Error, location: Location, }, + + #[snafu(display("Failed to build a wal meta allocator"))] + BuildWalMetaAllocator { + source: common_meta::error::Error, + location: Location, + }, } impl Error { @@ -693,7 +699,8 @@ impl ErrorExt for Error { Error::InitMetadata { source, .. } => source.status_code(), - Error::BuildKafkaTopicManager { source, .. } => source.status_code(), + Error::BuildKafkaTopicManager { source, .. } + | Error::BuildWalMetaAllocator { source, .. } => source.status_code(), Error::Other { source, .. } => source.status_code(), } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index a32733aead5a..6ba7706a9fc2 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -27,15 +27,14 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::sequence::Sequence; use common_meta::state_store::KvStateStore; -use common_meta::wal::kafka::topic_manager::TopicManager as KafkaTopicManager; -use common_meta::wal::WalProvider; +use common_meta::wal::meta::WalMetaAllocator; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::error::{BuildKafkaTopicManagerSnafu, Result}; +use crate::error::{BuildWalMetaAllocatorSnafu, Result}; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; use crate::handler::collect_stats_handler::CollectStatsHandler; @@ -193,20 +192,14 @@ impl MetaSrvBuilder { table_id: None, }; - let kafka_topic_manager = match options.wal.provider { - WalProvider::Kafka => Some( - KafkaTopicManager::try_new(options.wal.kafka_opts.as_ref(), &kv_backend) - .await - .context(BuildKafkaTopicManagerSnafu)?, - ), - WalProvider::RaftEngine => None, - }; - + let wal_meta_allocator = WalMetaAllocator::try_new(&options.wal, &kv_backend) + .await + .context(BuildWalMetaAllocatorSnafu)?; let table_meta_allocator = Arc::new(MetaSrvTableMetadataAllocator::new( selector_ctx.clone(), selector.clone(), table_id_sequence.clone(), - kafka_topic_manager, + wal_meta_allocator, )); let ddl_manager = build_ddl_manager( diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index ce790ead8ce9..1f147f8e0ca8 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -34,6 +34,7 @@ use common_meta::key::table_route::TableRouteValue; use common_meta::key::DeserializedValueWithBytes; use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DropTableTask}; use common_meta::rpc::router::{find_leaders, RegionRoute}; +use common_meta::wal::meta::WalMeta; use common_procedure::Status; use store_api::storage::RegionId; @@ -97,7 +98,7 @@ fn test_create_region_request_template() { 1, create_table_task(), test_data::new_region_routes(), - None, + WalMeta::default(), test_data::new_ddl_context(Arc::new(DatanodeClients::default())), ); @@ -184,7 +185,7 @@ async fn test_on_datanode_create_regions() { 1, create_table_task(), region_routes, - None, + WalMeta::default(), test_data::new_ddl_context(datanode_manager), ); diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index bd64d24311b1..9ce37a813610 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -19,7 +19,7 @@ use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAlloc use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; -use common_meta::wal::kafka::topic_manager::TopicManager as KafkaTopicManager; +use common_meta::wal::meta::{WalMetaAllocator, WalMetaDemandBuilder}; use common_telemetry::warn; use snafu::{ensure, ResultExt}; use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ}; @@ -33,7 +33,7 @@ pub struct MetaSrvTableMetadataAllocator { ctx: SelectorContext, selector: SelectorRef, table_id_sequence: SequenceRef, - kafka_topic_manager: Option, + wal_meta_allocator: WalMetaAllocator, } impl MetaSrvTableMetadataAllocator { @@ -41,13 +41,13 @@ impl MetaSrvTableMetadataAllocator { ctx: SelectorContext, selector: SelectorRef, table_id_sequence: SequenceRef, - kafka_topic_manager: Option, + wal_meta_allocator: WalMetaAllocator, ) -> Self { Self { ctx, selector, table_id_sequence, - kafka_topic_manager, + wal_meta_allocator, } } } @@ -72,15 +72,16 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - let region_topics = self - .kafka_topic_manager - .as_ref() - .map(|kafka_topic_manager| kafka_topic_manager.select_topics(region_routes.len())); + // Each region gets assigned one topic. + let demand = WalMetaDemandBuilder::new() + .with_num_topics(region_routes.len()) + .build(); + let wal_meta = self.wal_meta_allocator.try_alloc(demand).await?; Ok(TableMetadata { table_id, region_routes, - region_topics, + wal_meta, }) } }