diff --git a/Cargo.lock b/Cargo.lock index def2104e34be..bae24333c169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1807,7 +1807,9 @@ dependencies = [ "lazy_static", "prometheus", "prost 0.12.2", + "rand", "regex", + "rskafka", "serde", "serde_json", "snafu", @@ -1815,6 +1817,7 @@ dependencies = [ "strum 0.25.0", "table", "tokio", + "toml 0.7.8", "tonic 0.10.2", ] @@ -2128,6 +2131,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32c" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -3988,6 +4000,12 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "integer-encoding" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924df4f0e24e2e7f9cdd90babb0b96f93b20f3ecfa949ea9e6613756b8c8e1bf" + [[package]] name = "inventory" version = "0.3.13" @@ -7232,6 +7250,30 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rskafka" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "132ecfa3cd9c3825208524a80881f115337762904ad3f0174e87975b2d79162c" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "crc32c", + "flate2", + "futures", + "integer-encoding 4.0.0", + "lz4", + "parking_lot 0.12.1", + "pin-project-lite", + "rand", + "snap", + "thiserror", + "tokio", + "tracing", + "zstd 0.12.4", +] + [[package]] name = "rstest" version = "0.17.0" @@ -9311,7 +9353,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", - "integer-encoding", + "integer-encoding 3.0.4", "ordered-float 2.10.1", ] diff --git a/Cargo.toml b/Cargo.toml index ba46247cf922..328f4d307890 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls-native-roots", "stream", ] } +rskafka = "0.5" rust_decimal = "1.33" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index bd39701d4e33..06202f9d5f02 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -42,3 +42,27 @@ first_heartbeat_estimate = "1000ms" # timeout = "10s" # connect_timeout = "10s" # tcp_nodelay = true + +# Wal options. +[wal] +# Available wal providers: +# - "RaftEngine" (default) +# - "Kafka" +provider = "Kafka" + +# Kafka wal options. +[wal.kafka] +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. +broker_endpoints = ["127.0.0.1:9090"] +# Number of topics to be created upon start. +num_topics = 64 +# Topic selector type. +# Available selector types: +# - "RoundRobin" (default) +selector_type = "RoundRobin" +# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +topic_name_prefix = "greptime_wal" +# Number of partitions per topic. +num_partitions = 1 +# Expected number of replicas of each partition. +replication_factor = 3 diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 9127f4502698..56412a63b243 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -209,7 +209,7 @@ mod tests { Some("mybucket"), ), ( - // wal.dir = /other/wal/dir + // wal.raft_engine_opts.dir = /other/wal/dir [ env_prefix.to_string(), "wal".to_uppercase(), diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index b002bdd8370e..ece2a68cec67 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -30,7 +30,9 @@ humantime-serde.workspace = true lazy_static.workspace = true prometheus.workspace = true prost.workspace = true +rand.workspace = true regex.workspace = true +rskafka.workspace = true serde.workspace = true serde_json.workspace = true snafu.workspace = true @@ -38,6 +40,7 @@ store-api.workspace = true strum.workspace = true table.workspace = true tokio.workspace = true +toml.workspace = true tonic.workspace = true [dev-dependencies] diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 0c3327167037..979ed036586f 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -25,6 +25,7 @@ use crate::error::Result; use crate::key::TableMetadataManagerRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::router::RegionRoute; +use crate::wal::kafka::KafkaTopic; pub mod alter_table; pub mod create_table; @@ -53,6 +54,12 @@ pub struct TableMetadataAllocatorContext { pub cluster_id: u64, } +pub struct TableMetadata { + pub table_id: TableId, + pub region_routes: Vec, + pub region_topics: Vec, +} + #[async_trait::async_trait] pub trait TableMetadataAllocator: Send + Sync { async fn create( @@ -60,7 +67,7 @@ pub trait TableMetadataAllocator: Send + Sync { ctx: &TableMetadataAllocatorContext, table_info: &mut RawTableInfo, partitions: &[Partition], - ) -> Result<(TableId, Vec)>; + ) -> Result; } pub type TableMetadataAllocatorRef = Arc; diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 54632e9b3f05..ba082a7add5b 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{ CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, @@ -32,11 +34,15 @@ use table::metadata::{RawTableInfo, TableId}; use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path}; use crate::ddl::DdlContext; -use crate::error::{self, Result}; +use crate::error::{self, Result, UnexpectedNumRegionTopicsSnafu}; use crate::key::table_name::TableNameKey; use crate::metrics; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::wal::kafka::KafkaTopic; + +// TODO(niebayes): Maybe move `TOPIC_KEY` into a more appropriate crate. +pub const TOPIC_KEY: &str = "kafka_topic"; pub struct CreateTableProcedure { pub context: DdlContext, @@ -50,11 +56,12 @@ impl CreateTableProcedure { cluster_id: u64, task: CreateTableTask, region_routes: Vec, + region_topics: Vec, context: DdlContext, ) -> Self { Self { context, - creator: TableCreator::new(cluster_id, task, region_routes), + creator: TableCreator::new(cluster_id, task, region_routes, region_topics), } } @@ -170,6 +177,27 @@ impl CreateTableProcedure { pub async fn on_datanode_create_regions(&mut self) -> Result { let create_table_data = &self.creator.data; let region_routes = &create_table_data.region_routes; + let region_topics = &create_table_data.region_topics; + + // The following checking is redundant as the wal meta allocator ensures the allocated + // region topics are of the same length as the region routes. + if !region_topics.is_empty() { + let num_region_routes = region_routes.len(); + let num_region_topics = region_topics.len(); + ensure!( + num_region_routes == num_region_topics, + UnexpectedNumRegionTopicsSnafu { + num_region_topics, + num_region_routes + } + ); + } + + let region_topic_map: HashMap<_, _> = region_routes + .iter() + .map(|route| route.region.id.region_number()) + .zip(region_topics) + .collect(); let create_table_expr = &create_table_data.task.create_table; let catalog = &create_table_expr.catalog_name; @@ -193,6 +221,12 @@ impl CreateTableProcedure { let mut create_region_request = request_template.clone(); create_region_request.region_id = region_id.as_u64(); create_region_request.path = storage_path.clone(); + region_topic_map.get(region_number).and_then(|topic| { + create_region_request + .options + .insert(TOPIC_KEY.to_string(), topic.to_string()) + }); + PbRegionRequest::Create(create_region_request) }) .collect::>(); @@ -285,13 +319,19 @@ pub struct TableCreator { } impl TableCreator { - pub fn new(cluster_id: u64, task: CreateTableTask, region_routes: Vec) -> Self { + pub fn new( + cluster_id: u64, + task: CreateTableTask, + region_routes: Vec, + region_topics: Vec, + ) -> Self { Self { data: CreateTableData { state: CreateTableState::Prepare, cluster_id, task, region_routes, + region_topics, }, } } @@ -311,8 +351,9 @@ pub enum CreateTableState { pub struct CreateTableData { pub state: CreateTableState, pub task: CreateTableTask, - pub region_routes: Vec, pub cluster_id: u64, + pub region_routes: Vec, + pub region_topics: Vec, } impl CreateTableData { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 3d3cd5ae1c1f..0cb8f5a5052a 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -26,7 +26,7 @@ use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{ - DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadataAllocatorContext, + DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, TableMetadataAllocatorContext, TableMetadataAllocatorRef, }; use crate::error::{ @@ -43,6 +43,7 @@ use crate::rpc::ddl::{ TruncateTableTask, }; use crate::rpc::router::RegionRoute; +use crate::wal::kafka::KafkaTopic; pub type DdlManagerRef = Arc; @@ -164,11 +165,17 @@ impl DdlManager { cluster_id: u64, create_table_task: CreateTableTask, region_routes: Vec, + region_topics: Vec, ) -> Result { let context = self.create_context(); - let procedure = - CreateTableProcedure::new(cluster_id, create_table_task, region_routes, context); + let procedure = CreateTableProcedure::new( + cluster_id, + create_table_task, + region_routes, + region_topics, + context, + ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -360,7 +367,7 @@ async fn handle_create_table_task( cluster_id: u64, mut create_table_task: CreateTableTask, ) -> Result { - let (table_id, region_routes) = ddl_manager + let table_metadata = ddl_manager .table_meta_allocator .create( &TableMetadataAllocatorContext { cluster_id }, @@ -369,8 +376,14 @@ async fn handle_create_table_task( ) .await?; + let TableMetadata { + table_id, + region_routes, + region_topics, + } = table_metadata; + let id = ddl_manager - .submit_create_table_task(cluster_id, create_table_task, region_routes) + .submit_create_table_task(cluster_id, create_table_task, region_routes, region_topics) .await?; info!("Table: {table_id:?} is created via procedure_id {id:?}"); diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 4e3c712082d5..f8dc6a57d99e 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -17,6 +17,7 @@ use std::str::Utf8Error; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use rskafka::client::error::Error as RsKafkaError; use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; use store_api::storage::RegionNumber; @@ -266,6 +267,77 @@ pub enum Error { #[snafu(display("Retry later"))] RetryLater { source: BoxedError }, + + #[snafu(display("Missing required Kafka options"))] + MissingKafkaOpts { location: Location }, + + #[snafu(display("Invalid number of topics {}", num_topics))] + InvalidNumTopics { + num_topics: usize, + location: Location, + }, + + #[snafu(display("Failed to deserialize Kafka topics"))] + DeserKafkaTopics { + location: Location, + #[snafu(source)] + error: JsonError, + }, + + #[snafu(display("Failed to serialize Kafka topics"))] + SerKafkaTopics { + location: Location, + #[snafu(source)] + error: JsonError, + }, + + #[snafu(display( + "Failed to build a rskafka client, broker endpoints: {:?}", + broker_endpoints + ))] + BuildKafkaClient { + broker_endpoints: Vec, + location: Location, + #[snafu(source)] + error: RsKafkaError, + }, + + #[snafu(display("Failed to build a rskafka controller client"))] + BuildKafkaCtrlClient { + location: Location, + #[snafu(source)] + error: RsKafkaError, + }, + + #[snafu(display( + "Too many created Kafka topics, num_created_topics: {}", + num_created_topics + ))] + TooManyCreatedKafkaTopics { + num_created_topics: usize, + location: Location, + }, + + #[snafu(display("Failed to create a Kafka topic client"))] + CreateKafkaTopic { + location: Location, + #[snafu(source)] + error: RsKafkaError, + }, + + #[snafu(display("Missing required Kafka topic manager"))] + MissingKafkaTopicManager { location: Location }, + + #[snafu(display( + "Unexpected number of region topics, num_region_topics: {}, num_region_routes: {}", + num_region_topics, + num_region_routes, + ))] + UnexpectedNumRegionTopics { + num_region_topics: usize, + num_region_routes: usize, + location: Location, + }, } pub type Result = std::result::Result; @@ -323,6 +395,17 @@ impl ErrorExt for Error { RetryLater { source, .. } => source.status_code(), InvalidCatalogValue { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), + + Error::DeserKafkaTopics { .. } + | Error::SerKafkaTopics { .. } + | Error::InvalidNumTopics { .. } + | Error::BuildKafkaClient { .. } + | Error::BuildKafkaCtrlClient { .. } + | Error::TooManyCreatedKafkaTopics { .. } + | Error::CreateKafkaTopic { .. } + | Error::MissingKafkaOpts { .. } + | Error::MissingKafkaTopicManager { .. } + | Error::UnexpectedNumRegionTopics { .. } => StatusCode::Unexpected, } } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 996f793b10de..a60833be6415 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -34,6 +34,7 @@ pub mod sequence; pub mod state_store; pub mod table_name; pub mod util; +pub mod wal; pub type ClusterId = u64; pub type DatanodeId = u64; diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs new file mode 100644 index 000000000000..b33320fc7c53 --- /dev/null +++ b/src/common/meta/src/wal.rs @@ -0,0 +1,66 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod kafka; +pub mod meta; + +use serde::{Deserialize, Serialize}; + +use crate::wal::kafka::KafkaOptions; + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default)] +pub enum WalProvider { + #[default] + RaftEngine, + Kafka, +} + +// Options of different wal providers are integrated into the WalOptions. +// There're no options related raft engine since it's a local wal. +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct WalOptions { + pub provider: WalProvider, + pub kafka_opts: Option, +} + +impl Default for WalOptions { + fn default() -> Self { + Self { + provider: WalProvider::RaftEngine, + kafka_opts: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_toml_none_kafka_opts() { + let opts = WalOptions::default(); + let toml_string = toml::to_string(&opts).unwrap(); + let _parsed: WalOptions = toml::from_str(&toml_string).unwrap(); + } + + #[test] + fn test_toml_some_kafka_opts() { + let opts = WalOptions { + provider: WalProvider::Kafka, + kafka_opts: Some(KafkaOptions::default()), + }; + let toml_string = toml::to_string(&opts).unwrap(); + let _parsed: WalOptions = toml::from_str(&toml_string).unwrap(); + } +} diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs new file mode 100644 index 000000000000..f31c6fa2b477 --- /dev/null +++ b/src/common/meta/src/wal/kafka.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod topic_manager; +mod topic_selector; + +use serde::{Deserialize, Serialize}; +pub use topic_manager::{Topic as KafkaTopic, TopicManager as KafkaTopicManager}; + +use crate::wal::kafka::topic_selector::TopicSelectorType; + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct KafkaOptions { + /// The broker endpoints of the Kafka cluster. + broker_endpoints: Vec, + /// Number of topics to be created upon start. + num_topics: usize, + /// The type of the topic selector with which to select a topic for a region. + selector_type: TopicSelectorType, + /// Topic name prefix. + topic_name_prefix: String, + /// Number of partitions per topic. + num_partitions: i32, + /// The replication factor of each topic. + replication_factor: i16, +} + +impl Default for KafkaOptions { + fn default() -> Self { + Self { + broker_endpoints: vec!["127.0.0.1:9090".to_string()], + num_topics: 64, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptime_wal".to_string(), + num_partitions: 1, + replication_factor: 3, + } + } +} diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs new file mode 100644 index 000000000000..684db9c6e63f --- /dev/null +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -0,0 +1,147 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use rand::seq::SliceRandom; +use rskafka::client::ClientBuilder; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error::{ + BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaTopicSnafu, DeserKafkaTopicsSnafu, + InvalidNumTopicsSnafu, MissingKafkaOptsSnafu, Result, SerKafkaTopicsSnafu, + TooManyCreatedKafkaTopicsSnafu, +}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::PutRequest; +use crate::wal::kafka::topic_selector::{build_topic_selector, TopicSelectorRef}; +use crate::wal::kafka::{KafkaOptions, TopicSelectorType}; + +pub type Topic = String; + +const METASRV_CREATED_TOPICS_KEY: &str = "metasrv_created_topics"; +const CREATE_TOPIC_TIMEOUT: i32 = 5_000; // 5,000 ms. + +pub struct TopicManager { + topic_pool: Vec, + topic_selector: TopicSelectorRef, +} + +impl TopicManager { + pub async fn try_new( + kafka_opts: Option<&KafkaOptions>, + kv_backend: &KvBackendRef, + ) -> Result { + let opts = kafka_opts.context(MissingKafkaOptsSnafu)?; + let topic_pool = build_topic_pool(opts, kv_backend).await?; + let topic_selector = build_topic_selector(&opts.selector_type); + + // The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes. + // Introduces a shuffling may help mitigate this issue. + let topic_pool = match opts.selector_type { + TopicSelectorType::RoundRobin => shuffle_topic_pool(topic_pool), + }; + + Ok(Self { + topic_pool, + topic_selector, + }) + } + + pub fn select_topics(&self, num_topics: usize) -> Vec { + (0..num_topics) + .map(|_| self.topic_selector.select(&self.topic_pool)) + .collect() + } +} + +async fn build_topic_pool(opts: &KafkaOptions, kv_backend: &KvBackendRef) -> Result> { + let num_topics = opts.num_topics; + ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); + + let broker_endpoints = opts.broker_endpoints.clone(); + let kafka_client = ClientBuilder::new(broker_endpoints.clone()) + .build() + .await + .context(BuildKafkaClientSnafu { broker_endpoints })?; + + let kafka_ctrl_client = kafka_client + .controller_client() + .context(BuildKafkaCtrlClientSnafu)?; + + let topics = (0..num_topics) + .map(|topic_id| format!("{}_{topic_id}", opts.topic_name_prefix)) + .collect::>(); + + let created_topics = restore_created_topics(kv_backend) + .await? + .into_iter() + .collect::>(); + + let num_created_topics = created_topics.len(); + ensure!( + num_created_topics <= num_topics, + TooManyCreatedKafkaTopicsSnafu { num_created_topics } + ); + + let create_topic_tasks = topics + .iter() + .filter_map(|topic| { + if created_topics.contains(topic) { + return None; + } + + // TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised. + Some(kafka_ctrl_client.create_topic( + topic, + opts.num_partitions, + opts.replication_factor, + CREATE_TOPIC_TIMEOUT, + )) + }) + .collect::>(); + + futures::future::try_join_all(create_topic_tasks) + .await + .context(CreateKafkaTopicSnafu)?; + + persist_created_topics(&topics, kv_backend).await?; + + Ok(topics) +} + +fn shuffle_topic_pool(mut topic_pool: Vec) -> Vec { + topic_pool.shuffle(&mut rand::thread_rng()); + topic_pool +} + +async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result> { + kv_backend + .get(METASRV_CREATED_TOPICS_KEY.as_bytes()) + .await? + .map(|key_value| serde_json::from_slice(&key_value.value).context(DeserKafkaTopicsSnafu)) + .unwrap_or_else(|| Ok(vec![])) +} + +async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> { + let raw_topics = serde_json::to_string(topics).context(SerKafkaTopicsSnafu)?; + kv_backend + .put(PutRequest { + key: METASRV_CREATED_TOPICS_KEY.as_bytes().to_vec(), + value: raw_topics.into_bytes(), + prev_kv: false, + }) + .await + .map(|_| ()) +} diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs new file mode 100644 index 000000000000..635615299364 --- /dev/null +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -0,0 +1,67 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use crate::wal::kafka::topic_manager::Topic; + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub enum TopicSelectorType { + RoundRobin, +} + +pub trait TopicSelector: Send + Sync { + fn select(&self, topic_pool: &[Topic]) -> Topic; +} + +pub type TopicSelectorRef = Arc; + +#[derive(Default)] +pub struct RoundRobinTopicSelector { + cursor: AtomicUsize, +} + +impl TopicSelector for RoundRobinTopicSelector { + fn select(&self, topic_pool: &[Topic]) -> Topic { + // Safety: the caller ensures the topic pool is not empty and hence the modulo operation is safe. + let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len(); + // Safety: the modulo operation ensures the indexing is safe. + topic_pool[which].clone() + } +} + +pub fn build_topic_selector(selector_type: &TopicSelectorType) -> TopicSelectorRef { + match selector_type { + TopicSelectorType::RoundRobin => Arc::new(RoundRobinTopicSelector::default()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_round_robin_topic_selector() { + let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect(); + let selector = RoundRobinTopicSelector::default(); + + assert_eq!(selector.select(&topic_pool), "0"); + assert_eq!(selector.select(&topic_pool), "1"); + assert_eq!(selector.select(&topic_pool), "2"); + assert_eq!(selector.select(&topic_pool), "0"); + } +} diff --git a/src/common/meta/src/wal/meta.rs b/src/common/meta/src/wal/meta.rs new file mode 100644 index 000000000000..8f35c8fcdca8 --- /dev/null +++ b/src/common/meta/src/wal/meta.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::OptionExt; + +use crate::error::{MissingKafkaTopicManagerSnafu, Result}; +use crate::kv_backend::KvBackendRef; +use crate::wal::kafka::{KafkaTopic as Topic, KafkaTopicManager as TopicManager}; +use crate::wal::{WalOptions, WalProvider}; + +/// The allocator responsible for allocating wal metadata for a table. +#[derive(Default)] +pub struct WalMetaAllocator { + wal_provider: WalProvider, + topic_manager: Option, +} + +impl WalMetaAllocator { + pub async fn try_new(wal_opts: &WalOptions, kv_backend: &KvBackendRef) -> Result { + let mut this = Self { + wal_provider: wal_opts.provider.clone(), + ..Default::default() + }; + + match this.wal_provider { + WalProvider::RaftEngine => {} + WalProvider::Kafka => { + let topic_manager = + TopicManager::try_new(wal_opts.kafka_opts.as_ref(), kv_backend).await?; + this.topic_manager = Some(topic_manager); + } + } + + Ok(this) + } + + pub fn wal_provider(&self) -> &WalProvider { + &self.wal_provider + } + + pub async fn try_alloc_topics(&self, num_topics: usize) -> Result> { + let topics = self + .topic_manager + .as_ref() + .context(MissingKafkaTopicManagerSnafu)? + .select_topics(num_topics); + Ok(topics) + } +} diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 29ce275b2cb8..0c9458257b68 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef}; -use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; +use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; @@ -32,7 +32,7 @@ use common_telemetry::tracing_context::{FutureExt, TracingContext}; use datanode::region_server::RegionServer; use servers::grpc::region_server::RegionServerHandler; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{RegionId, TableId}; +use store_api::storage::RegionId; use table::metadata::RawTableInfo; use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; @@ -126,7 +126,7 @@ impl TableMetadataAllocator for StandaloneTableMetadataCreator { _ctx: &TableMetadataAllocatorContext, raw_table_info: &mut RawTableInfo, partitions: &[Partition], - ) -> MetaResult<(TableId, Vec)> { + ) -> MetaResult { let table_id = self.table_id_sequence.next().await? as u32; raw_table_info.ident.table_id = table_id; let region_routes = partitions @@ -149,6 +149,10 @@ impl TableMetadataAllocator for StandaloneTableMetadataCreator { }) .collect::>(); - Ok((table_id, region_routes)) + Ok(TableMetadata { + table_id, + region_routes, + region_topics: vec![], + }) } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 600c188e3a91..ca57ab0628bb 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -573,6 +573,18 @@ pub enum Error { #[snafu(display("Weight array is not set"))] NotSetWeightArray { location: Location }, + + #[snafu(display("Failed to build a Kafka topic manager"))] + BuildKafkaTopicManager { + source: common_meta::error::Error, + location: Location, + }, + + #[snafu(display("Failed to build a wal meta allocator"))] + BuildWalMetaAllocator { + source: common_meta::error::Error, + location: Location, + }, } impl Error { @@ -687,6 +699,9 @@ impl ErrorExt for Error { Error::InitMetadata { source, .. } => source.status_code(), + Error::BuildKafkaTopicManager { source, .. } + | Error::BuildWalMetaAllocator { source, .. } => source.status_code(), + Error::Other { source, .. } => source.status_code(), } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 9821d628b718..5a8ba41201db 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -26,6 +26,7 @@ use common_meta::ddl::DdlTaskExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::sequence::SequenceRef; +use common_meta::wal::WalOptions; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; @@ -71,6 +72,7 @@ pub struct MetaSrvOptions { pub datanode: DatanodeOptions, pub enable_telemetry: bool, pub data_home: String, + pub wal: WalOptions, } impl Default for MetaSrvOptions { @@ -95,6 +97,7 @@ impl Default for MetaSrvOptions { datanode: DatanodeOptions::default(), enable_telemetry: true, data_home: METASRV_HOME.to_string(), + wal: WalOptions::default(), } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 92b259c10cdd..6ba7706a9fc2 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -19,19 +19,22 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_base::Plugins; use common_grpc::channel_manager::ChannelConfig; +use common_meta::ddl::TableMetadataAllocatorRef; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; -use common_meta::sequence::{Sequence, SequenceRef}; +use common_meta::sequence::Sequence; use common_meta::state_store::KvStateStore; +use common_meta::wal::meta::WalMetaAllocator; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; +use snafu::ResultExt; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::error::Result; +use crate::error::{BuildWalMetaAllocatorSnafu, Result}; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; use crate::handler::collect_stats_handler::CollectStatsHandler; @@ -188,14 +191,24 @@ impl MetaSrvBuilder { meta_peer_client: meta_peer_client.clone(), table_id: None, }; + + let wal_meta_allocator = WalMetaAllocator::try_new(&options.wal, &kv_backend) + .await + .context(BuildWalMetaAllocatorSnafu)?; + let table_meta_allocator = Arc::new(MetaSrvTableMetadataAllocator::new( + selector_ctx.clone(), + selector.clone(), + table_id_sequence.clone(), + wal_meta_allocator, + )); + let ddl_manager = build_ddl_manager( &options, datanode_clients, &procedure_manager, &mailbox, &table_metadata_manager, - (&selector, &selector_ctx), - &table_id_sequence, + table_meta_allocator, ); let _ = ddl_manager.try_start(); let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); @@ -328,8 +341,7 @@ fn build_ddl_manager( procedure_manager: &ProcedureManagerRef, mailbox: &MailboxRef, table_metadata_manager: &TableMetadataManagerRef, - (selector, selector_ctx): (&SelectorRef, &SelectorContext), - table_id_sequence: &SequenceRef, + table_meta_allocator: TableMetadataAllocatorRef, ) -> DdlManagerRef { let datanode_clients = datanode_clients.unwrap_or_else(|| { let datanode_client_channel_config = ChannelConfig::new() @@ -349,12 +361,6 @@ fn build_ddl_manager( }, )); - let table_meta_allocator = Arc::new(MetaSrvTableMetadataAllocator::new( - selector_ctx.clone(), - selector.clone(), - table_id_sequence.clone(), - )); - Arc::new(DdlManager::new( procedure_manager.clone(), datanode_clients, diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index d9ca7a7d31bc..e1a91e36ae14 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -97,6 +97,7 @@ fn test_create_region_request_template() { 1, create_table_task(), test_data::new_region_routes(), + vec![], test_data::new_ddl_context(Arc::new(DatanodeClients::default())), ); @@ -183,6 +184,7 @@ async fn test_on_datanode_create_regions() { 1, create_table_task(), region_routes, + vec![], test_data::new_ddl_context(datanode_manager), ); diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index 395104a9fc78..a1ebfd7fe77b 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -15,10 +15,12 @@ use api::v1::meta::Partition; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; -use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; +use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; +use common_meta::wal::meta::WalMetaAllocator; +use common_meta::wal::WalProvider; use common_telemetry::warn; use snafu::{ensure, ResultExt}; use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ}; @@ -32,6 +34,7 @@ pub struct MetaSrvTableMetadataAllocator { ctx: SelectorContext, selector: SelectorRef, table_id_sequence: SequenceRef, + wal_meta_allocator: WalMetaAllocator, } impl MetaSrvTableMetadataAllocator { @@ -39,11 +42,13 @@ impl MetaSrvTableMetadataAllocator { ctx: SelectorContext, selector: SelectorRef, table_id_sequence: SequenceRef, + wal_meta_allocator: WalMetaAllocator, ) -> Self { Self { ctx, selector, table_id_sequence, + wal_meta_allocator, } } } @@ -55,8 +60,8 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { ctx: &TableMetadataAllocatorContext, raw_table_info: &mut RawTableInfo, partitions: &[Partition], - ) -> MetaResult<(TableId, Vec)> { - handle_create_region_routes( + ) -> MetaResult { + let (table_id, region_routes) = handle_create_region_routes( ctx.cluster_id, raw_table_info, partitions, @@ -66,7 +71,23 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { ) .await .map_err(BoxedError::new) - .context(meta_error::ExternalSnafu) + .context(meta_error::ExternalSnafu)?; + + let region_topics = match self.wal_meta_allocator.wal_provider() { + WalProvider::RaftEngine => vec![], + WalProvider::Kafka => { + // Each region gets assigned one topic. + self.wal_meta_allocator + .try_alloc_topics(region_routes.len()) + .await? + } + }; + + Ok(TableMetadata { + table_id, + region_routes, + region_topics, + }) } }