From 231d5c070e74cd4e8fa7f91d15423fad3beabf04 Mon Sep 17 00:00:00 2001 From: niebayes Date: Wed, 20 Dec 2023 17:46:45 +0800 Subject: [PATCH 1/9] chore: implement wal options allocator --- .../meta/src/wal/kafka/topic_manager.rs | 7 +-- src/common/meta/src/wal/options_allocator.rs | 44 ++++++++++++------- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 43d829bdbcab..62c9d6524385 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -54,8 +54,9 @@ impl TopicManager { } /// Selects a batch of topics from the topic pool through the topic selector. - pub fn select_batch(&self, num_topics: usize) -> Vec { - // TODO(niebayes): calls `select` to select a collection of topics in a batching manner. - vec!["tmp_topic".to_string(); num_topics] + pub fn select_batch(&self, num_topics: usize) -> Vec<&Topic> { + (0..num_topics) + .map(|_| self.topic_selector.select(&self.topic_pool)) + .collect() } } diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs index be9913db1121..82a321be89bd 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal/options_allocator.rs @@ -35,30 +35,45 @@ pub enum WalOptionsAllocator { impl WalOptionsAllocator { /// Creates a WalOptionsAllocator. pub fn new(config: &WalConfig, kv_backend: KvBackendRef) -> Self { - todo!() + match config { + WalConfig::RaftEngine => Self::RaftEngine, + WalConfig::Kafka(kafka_config) => { + Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend)) + } + } } /// Tries to initialize the allocator. - pub fn try_init(&self) -> Result<()> { - todo!() + pub async fn try_init(&mut self) -> Result<()> { + match self { + Self::RaftEngine => Ok(()), + Self::Kafka(kafka_topic_manager) => kafka_topic_manager.try_init().await, + } } /// Allocates a wal options for a region. pub fn alloc(&self) -> WalOptions { - todo!() + match self { + Self::RaftEngine => WalOptions::RaftEngine, + Self::Kafka(kafka_topic_manager) => WalOptions::Kafka(KafkaWalOptions { + topic: kafka_topic_manager.select().clone(), + }), + } } /// Allocates a batch of wal options where each wal options goes to a region. pub fn alloc_batch(&self, num_regions: usize) -> Vec { match self { WalOptionsAllocator::RaftEngine => vec![WalOptions::RaftEngine; num_regions], - WalOptionsAllocator::Kafka(topic_manager) => { - let topics = topic_manager.select_batch(num_regions); - topics - .into_iter() - .map(|topic| WalOptions::Kafka(KafkaWalOptions { topic })) - .collect() - } + WalOptionsAllocator::Kafka(topic_manager) => topic_manager + .select_batch(num_regions) + .into_iter() + .map(|topic| { + WalOptions::Kafka(KafkaWalOptions { + topic: topic.clone(), + }) + }) + .collect(), } } } @@ -80,12 +95,11 @@ pub fn build_region_wal_options( } /// Builds a wal options allocator. -// TODO(niebayes): implement. pub async fn build_wal_options_allocator( config: &WalConfig, kv_backend: &KvBackendRef, ) -> Result { - let _ = config; - let _ = kv_backend; - Ok(WalOptionsAllocator::default()) + let mut allocator = WalOptionsAllocator::new(config, kv_backend.clone()); + allocator.try_init().await?; + Ok(allocator) } From 6b6cfd24015fb82558edf4a3d284b476638d5528 Mon Sep 17 00:00:00 2001 From: niebayes Date: Wed, 20 Dec 2023 17:57:25 +0800 Subject: [PATCH 2/9] chore: implement round-robin topic selector --- .../meta/src/wal/kafka/topic_manager.rs | 2 +- .../meta/src/wal/kafka/topic_selector.rs | 35 ++++++++++++++----- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 62c9d6524385..b6a4b74f150f 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -31,7 +31,7 @@ impl TopicManager { /// Creates a new topic manager. pub fn new(config: &KafkaConfig, kv_backend: KvBackendRef) -> Self { let selector = match config.selector_type { - SelectorType::RoundRobin => RoundRobinTopicSelector::new(), + SelectorType::RoundRobin => RoundRobinTopicSelector::default(), }; Self { diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs index f29ac55b6ba8..3b84e4e6dcae 100644 --- a/src/common/meta/src/wal/kafka/topic_selector.rs +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use serde::{Deserialize, Serialize}; @@ -29,23 +30,39 @@ pub enum SelectorType { /// Controls topic selection. pub(super) trait TopicSelector: Send + Sync { /// Selects a topic from the topic pool. - fn select(&self, topic_pool: &[Topic]) -> &Topic; + fn select<'a>(&'a self, topic_pool: &'a [Topic]) -> &Topic; } +/// Arc wrapper of TopicSelector. pub(super) type TopicSelectorRef = Arc; /// A topic selector with the round-robin strategy, i.e. selects topics in a round-robin manner. -pub(super) struct RoundRobinTopicSelector; +#[derive(Default)] +pub(super) struct RoundRobinTopicSelector { + cursor: AtomicUsize, +} -impl RoundRobinTopicSelector { - /// Creates a new round-robin topic selector. - pub(super) fn new() -> Self { - todo!() +impl TopicSelector for RoundRobinTopicSelector { + fn select<'a>(&'a self, topic_pool: &'a [Topic]) -> &Topic { + // Safety: the caller ensures the topic pool is not empty and hence the modulo operation is safe. + let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len(); + // Safety: the modulo operation ensures the index operation is safe. + topic_pool.get(which).unwrap() } } -impl TopicSelector for RoundRobinTopicSelector { - fn select(&self, topic_pool: &[Topic]) -> &Topic { - todo!() +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_round_robin_topic_selector() { + let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect(); + let selector = RoundRobinTopicSelector::default(); + + assert_eq!(selector.select(&topic_pool), "0"); + assert_eq!(selector.select(&topic_pool), "1"); + assert_eq!(selector.select(&topic_pool), "2"); + assert_eq!(selector.select(&topic_pool), "0"); } } From 6dbfea2ab8329a558f56a57d562d7f0c394400b5 Mon Sep 17 00:00:00 2001 From: niebayes Date: Wed, 20 Dec 2023 18:19:48 +0800 Subject: [PATCH 3/9] feat: add shuffle to round-robin topic selector --- Cargo.lock | 1 + src/common/meta/Cargo.toml | 1 + src/common/meta/src/wal/kafka/topic_manager.rs | 2 +- .../meta/src/wal/kafka/topic_selector.rs | 18 ++++++++++++++++++ 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 78f7e5c7470e..0f83950cce11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1834,6 +1834,7 @@ dependencies = [ "lazy_static", "prometheus", "prost 0.12.2", + "rand", "regex", "serde", "serde_json", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 7b0624748964..75192ab59025 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -33,6 +33,7 @@ lazy_static.workspace = true prometheus.workspace = true prost.workspace = true regex.workspace = true +rand.workspace = true serde.workspace = true serde_json.workspace = true serde_with = "3" diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index b6a4b74f150f..17b0c490489c 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -31,7 +31,7 @@ impl TopicManager { /// Creates a new topic manager. pub fn new(config: &KafkaConfig, kv_backend: KvBackendRef) -> Self { let selector = match config.selector_type { - SelectorType::RoundRobin => RoundRobinTopicSelector::default(), + SelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), }; Self { diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs index 3b84e4e6dcae..fffa5714b2c5 100644 --- a/src/common/meta/src/wal/kafka/topic_selector.rs +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use rand::Rng; use serde::{Deserialize, Serialize}; use crate::wal::kafka::topic::Topic; @@ -42,6 +43,18 @@ pub(super) struct RoundRobinTopicSelector { cursor: AtomicUsize, } +impl RoundRobinTopicSelector { + // The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes. + // Introducing a shuffling strategy may help mitigate this issue. + pub fn with_shuffle() -> Self { + let mut this = Self::default(); + let offset = rand::thread_rng().gen::() % usize::MAX; + // It's ok when an overflow happens since `fetch_add` automatically wraps around. + this.cursor.fetch_add(offset, Ordering::Relaxed); + this + } +} + impl TopicSelector for RoundRobinTopicSelector { fn select<'a>(&'a self, topic_pool: &'a [Topic]) -> &Topic { // Safety: the caller ensures the topic pool is not empty and hence the modulo operation is safe. @@ -64,5 +77,10 @@ mod tests { assert_eq!(selector.select(&topic_pool), "1"); assert_eq!(selector.select(&topic_pool), "2"); assert_eq!(selector.select(&topic_pool), "0"); + + // Creates a round-robin selector with shuffle. + let selector = RoundRobinTopicSelector::with_shuffle(); + let topic = selector.select(&topic_pool); + assert!(topic_pool.contains(&topic)); } } From 59b461e99b4195c73d410c8020fa5482a7900fcb Mon Sep 17 00:00:00 2001 From: niebayes Date: Wed, 20 Dec 2023 19:52:09 +0800 Subject: [PATCH 4/9] chore: implement kafka topic manager --- Cargo.lock | 1 + src/common/meta/Cargo.toml | 3 +- src/common/meta/src/error.rs | 38 ++++- .../meta/src/wal/kafka/topic_manager.rs | 134 +++++++++++++++++- .../meta/src/wal/kafka/topic_selector.rs | 2 +- src/common/meta/src/wal/options_allocator.rs | 6 +- 6 files changed, 174 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f83950cce11..8f3b46b0b468 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1836,6 +1836,7 @@ dependencies = [ "prost 0.12.2", "rand", "regex", + "rskafka", "serde", "serde_json", "serde_with", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 75192ab59025..6f95109196d8 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -32,8 +32,9 @@ humantime-serde.workspace = true lazy_static.workspace = true prometheus.workspace = true prost.workspace = true -regex.workspace = true rand.workspace = true +regex.workspace = true +rskafka = "0.5" serde.workspace = true serde_json.workspace = true serde_with = "3" diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 1d8c0c3ecc26..91e1e5e04201 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -296,6 +296,37 @@ pub enum Error { error: serde_json::Error, location: Location, }, + + #[snafu(display("Invalid number of topics {}", num_topics))] + InvalidNumTopics { + num_topics: usize, + location: Location, + }, + + #[snafu(display( + "Failed to build an rskafka client, broker endpoints: {:?}", + broker_endpoints + ))] + BuildClient { + broker_endpoints: Vec, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display("Failed to build an rskafka controller client"))] + BuildCtrlClient { + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display("Failed to create a kafka wal topic through rskafka client"))] + CreateKafkaWalTopic { + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, } pub type Result = std::result::Result; @@ -331,7 +362,10 @@ impl ErrorExt for Error { | TableRouteNotFound { .. } | ConvertRawTableInfo { .. } | RegionOperatingRace { .. } - | EncodeWalOptionsToJson { .. } => StatusCode::Unexpected, + | EncodeWalOptionsToJson { .. } + | BuildClient { .. } + | BuildCtrlClient { .. } + | CreateKafkaWalTopic { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } @@ -356,6 +390,8 @@ impl ErrorExt for Error { RetryLater { source, .. } => source.status_code(), InvalidCatalogValue { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), + + InvalidNumTopics { .. } => StatusCode::InvalidArguments, } } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 17b0c490489c..fd007c7b7f71 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -12,16 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; -use crate::error::Result; +use common_telemetry::debug; +use rskafka::client::ClientBuilder; +use rskafka::BackoffConfig; +use snafu::{ensure, ResultExt}; + +use crate::error::{ + BuildClientSnafu, BuildCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu, + EncodeJsonSnafu, InvalidNumTopicsSnafu, Result, +}; use crate::kv_backend::KvBackendRef; +use crate::rpc::store::PutRequest; use crate::wal::kafka::topic::Topic; use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, SelectorType, TopicSelectorRef}; use crate::wal::kafka::KafkaConfig; +const CREATE_TOPIC_TIMEOUT: i32 = 5_000; // 5,000 ms. +const CREATED_TOPICS_KEY: &str = "__created_kafka_wal_topics"; + /// Manages topic initialization and selection. pub struct TopicManager { + config: KafkaConfig, topic_pool: Vec, topic_selector: TopicSelectorRef, kv_backend: KvBackendRef, @@ -29,23 +44,81 @@ pub struct TopicManager { impl TopicManager { /// Creates a new topic manager. - pub fn new(config: &KafkaConfig, kv_backend: KvBackendRef) -> Self { + pub fn new(config: KafkaConfig, kv_backend: KvBackendRef) -> Self { let selector = match config.selector_type { SelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), }; Self { + config, topic_pool: Vec::new(), topic_selector: Arc::new(selector), kv_backend, } } - /// Tries to initialize the topic pool. + /// Tries to initialize the topic manager. /// The initializer first tries to restore persisted topics from the kv backend. - /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request more topics. + /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. pub async fn try_init(&mut self) -> Result<()> { - todo!() + let num_topics = self.config.num_topics; + ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); + + // Builds an rskafka controller client for creating topics if necessary. + let broker_endpoints = self.config.broker_endpoints.clone(); + let backoff_config = BackoffConfig { + init_backoff: Duration::from_millis(500), + max_backoff: Duration::from_secs(10), + base: 2.0, + // Stop reconnecting if the total wait time reaches the deadline. + deadline: Some(Duration::from_secs(60 * 5)), // 5 mins. + }; + let client = ClientBuilder::new(broker_endpoints.clone()) + .backoff_config(backoff_config) + .build() + .await + .context(BuildClientSnafu { broker_endpoints })? + .controller_client() + .context(BuildCtrlClientSnafu)?; + + // Topics should be created. + let topics = (0..num_topics) + .map(|topic_id| format!("{}_{topic_id}", self.config.topic_name_prefix)) + .collect::>(); + // Topics already created. + // There may have extra topics created but it's okay since those topics won't break topic allocation. + let created_topics = Self::restore_created_topics(&self.kv_backend) + .await? + .into_iter() + .collect::>(); + + // Spawns tokio tasks for creating missing topics. + let tasks = topics + .iter() + .filter_map(|topic| { + if created_topics.contains(topic) { + debug!("Topic {} was created", topic); + return None; + } + + debug!("Tries to create topic {}", topic); + Some(client.create_topic( + topic, + self.config.num_partitions, + self.config.replication_factor, + CREATE_TOPIC_TIMEOUT, + )) + }) + .collect::>(); + // TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised. + futures::future::try_join_all(tasks) + .await + .context(CreateKafkaWalTopicSnafu)?; + + // Persists created topics. + Self::persist_created_topics(&topics, &self.kv_backend).await?; + + Ok(()) } /// Selects one topic from the topic pool through the topic selector. @@ -59,4 +132,55 @@ impl TopicManager { .map(|_| self.topic_selector.select(&self.topic_pool)) .collect() } + + async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result> { + kv_backend + .get(CREATED_TOPICS_KEY.as_bytes()) + .await? + .map(|key_value| serde_json::from_slice(&key_value.value).context(DecodeJsonSnafu)) + .unwrap_or_else(|| Ok(vec![])) + } + + async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> { + let raw_topics = serde_json::to_vec(topics).context(EncodeJsonSnafu)?; + kv_backend + .put(PutRequest { + key: CREATED_TOPICS_KEY.as_bytes().to_vec(), + value: raw_topics, + prev_kv: false, + }) + .await + .map(|_| ()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + + // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. + #[tokio::test] + async fn test_restore_persisted_topics() { + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let topic_name_prefix = "__test_"; + let num_topics = 16; + + // Constructs mock topics. + let topics = (0..num_topics) + .map(|topic| format!("{topic_name_prefix}{topic}")) + .collect::>(); + + // Persists topics to kv backend. + TopicManager::persist_created_topics(&topics, &kv_backend) + .await + .unwrap(); + + // Restores topics from kv backend. + let restored_topics = TopicManager::restore_created_topics(&kv_backend) + .await + .unwrap(); + + assert_eq!(topics, restored_topics); + } } diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs index fffa5714b2c5..043727386a21 100644 --- a/src/common/meta/src/wal/kafka/topic_selector.rs +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -81,6 +81,6 @@ mod tests { // Creates a round-robin selector with shuffle. let selector = RoundRobinTopicSelector::with_shuffle(); let topic = selector.select(&topic_pool); - assert!(topic_pool.contains(&topic)); + assert!(topic_pool.contains(topic)); } } diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs index 82a321be89bd..6803f7b1bec3 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal/options_allocator.rs @@ -34,7 +34,7 @@ pub enum WalOptionsAllocator { impl WalOptionsAllocator { /// Creates a WalOptionsAllocator. - pub fn new(config: &WalConfig, kv_backend: KvBackendRef) -> Self { + pub fn new(config: WalConfig, kv_backend: KvBackendRef) -> Self { match config { WalConfig::RaftEngine => Self::RaftEngine, WalConfig::Kafka(kafka_config) => { @@ -99,7 +99,9 @@ pub async fn build_wal_options_allocator( config: &WalConfig, kv_backend: &KvBackendRef, ) -> Result { - let mut allocator = WalOptionsAllocator::new(config, kv_backend.clone()); + let mut allocator = WalOptionsAllocator::new(config.clone(), kv_backend.clone()); allocator.try_init().await?; Ok(allocator) } + +// TODO(niebayes): add tests for the above utility functions. From 0d4d5e524d9495e0e52cb485a5296cbbfc4a9c2e Mon Sep 17 00:00:00 2001 From: niebayes Date: Wed, 20 Dec 2023 23:35:36 +0800 Subject: [PATCH 5/9] test: add tests for wal options allocator --- src/common/meta/src/wal/options_allocator.rs | 48 +++++++++++++++----- src/frontend/src/instance/standalone.rs | 4 +- src/meta-srv/src/table_meta_alloc.rs | 4 +- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs index 6803f7b1bec3..c8fd2eea963c 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal/options_allocator.rs @@ -78,8 +78,18 @@ impl WalOptionsAllocator { } } +/// Creates and initializes a wal options allocator. +pub async fn build_wal_options_allocator( + config: &WalConfig, + kv_backend: &KvBackendRef, +) -> Result { + let mut allocator = WalOptionsAllocator::new(config.clone(), kv_backend.clone()); + allocator.try_init().await?; + Ok(allocator) +} + /// Allocates a wal options for each region. The allocated wal options is encoded immediately. -pub fn build_region_wal_options( +pub fn allocate_region_wal_options( regions: Vec, wal_options_allocator: &WalOptionsAllocator, ) -> Result> { @@ -94,14 +104,30 @@ pub fn build_region_wal_options( Ok(regions.into_iter().zip(wal_options).collect()) } -/// Builds a wal options allocator. -pub async fn build_wal_options_allocator( - config: &WalConfig, - kv_backend: &KvBackendRef, -) -> Result { - let mut allocator = WalOptionsAllocator::new(config.clone(), kv_backend.clone()); - allocator.try_init().await?; - Ok(allocator) -} +#[cfg(test)] +mod tests { + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + + // Tests the wal options allocator could successfully allocate raft-engine wal options. + // Note: tests for allocator with kafka are integration tests. + #[tokio::test] + async fn test_allocator_with_raft_engine() { + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let wal_config = WalConfig::RaftEngine; + let allocator = build_wal_options_allocator(&wal_config, &kv_backend) + .await + .unwrap(); + + let num_regions = 32; + let regions = (0..num_regions).collect::>(); + let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap(); -// TODO(niebayes): add tests for the above utility functions. + let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap(); + let expected = regions + .into_iter() + .zip(vec![encoded_wal_options; num_regions as usize]) + .collect(); + assert_eq!(got, expected); + } +} diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 95229d343694..2bf0f066c18a 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -26,7 +26,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; -use common_meta::wal::options_allocator::build_region_wal_options; +use common_meta::wal::options_allocator::allocate_region_wal_options; use common_meta::wal::WalOptionsAllocator; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing_context::{FutureExt, TracingContext}; @@ -156,7 +156,7 @@ impl TableMetadataAllocator for StandaloneTableMetadataAllocator { .map(|route| route.region.id.region_number()) .collect(); let region_wal_options = - build_region_wal_options(region_numbers, &self.wal_options_allocator)?; + allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?; debug!( "Allocated region wal options {:?} for table {}", diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index 7a4183e58a88..d5888305fb18 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -19,7 +19,7 @@ use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAlloc use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; -use common_meta::wal::options_allocator::build_region_wal_options; +use common_meta::wal::options_allocator::allocate_region_wal_options; use common_meta::wal::WalOptionsAllocator; use common_telemetry::{debug, warn}; use snafu::{ensure, ResultExt}; @@ -78,7 +78,7 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { .map(|route| route.region.id.region_number()) .collect(); let region_wal_options = - build_region_wal_options(region_numbers, &self.wal_options_allocator)?; + allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?; debug!( "Allocated region wal options {:?} for table {}", From 0f7466372b6b1874e5f6820beb03be2d7f3e5aba Mon Sep 17 00:00:00 2001 From: niebayes Date: Wed, 20 Dec 2023 23:42:14 +0800 Subject: [PATCH 6/9] test: add wal provider to test config files --- tests/conf/datanode-test.toml.template | 1 + tests/conf/standalone-test.toml.template | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index bda5b8cb365e..55d0c2f1fe4c 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -6,6 +6,7 @@ rpc_hostname = '127.0.0.1' rpc_runtime_size = 8 [wal] +provider = "raft_engine" file_size = '1GB' purge_interval = '10m' purge_threshold = '10GB' diff --git a/tests/conf/standalone-test.toml.template b/tests/conf/standalone-test.toml.template index b0b20809dace..f0ddc38d048e 100644 --- a/tests/conf/standalone-test.toml.template +++ b/tests/conf/standalone-test.toml.template @@ -3,6 +3,7 @@ enable_memory_catalog = false require_lease_before_startup = true [wal] +provider = "raft_engine" file_size = '1GB' purge_interval = '10m' purge_threshold = '10GB' From f7d8fc398a1f10a0add86925dfb8a1f20bbd0f65 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 00:02:27 +0800 Subject: [PATCH 7/9] test: leave todos for adding tests for remote wal --- .github/workflows/develop.yml | 1 + tests-integration/tests/main.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 438cb3336ba4..47cc7590041e 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -175,6 +175,7 @@ jobs: - name: Setup etcd server working-directory: tests-integration/fixtures/etcd run: docker compose -f docker-compose-standalone.yml up -d --wait + #TODO(niebaye) Add a step to setup kafka clusters. Maybe add a docker file for starting kafka clusters. - name: Run nextest cases run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard env: diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index 8b1e064579c8..65dfcc8cf7c4 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -25,3 +25,5 @@ grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); // region_failover_tests!(File, S3, S3WithCache, Oss, Azblob); sql_tests!(File); + +// TODO(niebayes): add integration tests for remote wal. From dafd3f507fb019ff14416955dc1509a516e440c0 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 22 Dec 2023 15:54:42 +0800 Subject: [PATCH 8/9] fix: resolve review conversations --- config/metasrv.example.toml | 12 +- src/cmd/src/error.rs | 7 ++ src/cmd/src/standalone.rs | 25 ++-- src/common/meta/src/error.rs | 24 ++-- src/common/meta/src/wal.rs | 22 +++- src/common/meta/src/wal/kafka.rs | 24 +++- .../meta/src/wal/kafka/topic_manager.rs | 108 ++++++++++-------- .../meta/src/wal/kafka/topic_selector.rs | 36 +++--- src/common/meta/src/wal/options_allocator.rs | 66 ++++++----- src/frontend/src/instance/standalone.rs | 9 +- src/meta-srv/src/metasrv.rs | 9 ++ src/meta-srv/src/metasrv/builder.rs | 16 +-- src/meta-srv/src/table_meta_alloc.rs | 7 +- tests-integration/src/standalone.rs | 13 ++- 14 files changed, 235 insertions(+), 143 deletions(-) diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index fff978f8c15d..04267a5be1b6 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -61,8 +61,18 @@ provider = "raft_engine" # - "round_robin" (default) # selector_type = "round_robin" # A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. -# topic_name_prefix = "greptimedb_kafka_wal" +# topic_name_prefix = "greptimedb_wal_kafka" # Number of partitions per topic. # num_partitions = 1 # Expected number of replicas of each partition. # replication_factor = 3 +# Aboves which a topic creation operation will be cancelled. +# create_topic_timeout = "30s" +# The initial backoff for kafka clients. +# backoff_init = "500ms" +# The maximum backoff for kafka clients. +# backoff_max = "10s" +# Exponential backoff rate, i.e. next backoff = base * current backoff. +# backoff_base = 2.0 +# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. +# backoff_deadline = "5mins" diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 91d556ff3861..d90afaef2442 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -55,6 +55,12 @@ pub enum Error { source: common_procedure::error::Error, }, + #[snafu(display("Failed to start wal options allocator"))] + StartWalOptionsAllocator { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to start datanode"))] StartDatanode { location: Location, @@ -270,6 +276,7 @@ impl ErrorExt for Error { Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), + Error::StartWalOptionsAllocator { source, .. } => source.status_code(), Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, Error::RequestDatabase { source, .. } => source.status_code(), Error::CollectRecordBatches { source, .. } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index fb0e23ebefd9..d6b844a55268 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -19,7 +19,6 @@ use async_trait::async_trait; use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig, WalConfig}; -use common_error::ext::BoxedError; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef}; @@ -28,7 +27,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal::build_wal_options_allocator; +use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef}; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; @@ -50,9 +49,9 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, Result, + CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, - StartProcedureManagerSnafu, StopProcedureManagerSnafu, + StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; use crate::options::{CliOptions, MixOptions, Options}; use crate::App; @@ -175,6 +174,7 @@ pub struct Instance { datanode: Datanode, frontend: FeInstance, procedure_manager: ProcedureManagerRef, + wal_options_allocator: WalOptionsAllocatorRef, } #[async_trait] @@ -191,6 +191,11 @@ impl App for Instance { .await .context(StartProcedureManagerSnafu)?; + self.wal_options_allocator + .start() + .await + .context(StartWalOptionsAllocatorSnafu)?; + self.frontend.start().await.context(StartFrontendSnafu)?; Ok(()) } @@ -375,14 +380,13 @@ impl StartCommand { .build(), ); // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. - let wal_options_allocator = - build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend) - .await - .map_err(BoxedError::new) - .context(OtherSnafu)?; + let wal_options_allocator = Arc::new(WalOptionsAllocator::new( + common_meta::wal::WalConfig::default(), + kv_backend.clone(), + )); let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( table_id_sequence, - wal_options_allocator, + wal_options_allocator.clone(), )); let ddl_task_executor = Self::create_ddl_task_executor( @@ -408,6 +412,7 @@ impl StartCommand { datanode, frontend, procedure_manager, + wal_options_allocator, }) } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 91e1e5e04201..79c3e9316c59 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -290,7 +290,7 @@ pub enum Error { "Failed to encode a wal options to json string, wal_options: {:?}", wal_options ))] - EncodeWalOptionsToJson { + EncodeWalOptions { wal_options: WalOptions, #[snafu(source)] error: serde_json::Error, @@ -304,29 +304,32 @@ pub enum Error { }, #[snafu(display( - "Failed to build an rskafka client, broker endpoints: {:?}", + "Failed to build a kafka client, broker endpoints: {:?}", broker_endpoints ))] - BuildClient { + BuildKafkaClient { broker_endpoints: Vec, location: Location, #[snafu(source)] error: rskafka::client::error::Error, }, - #[snafu(display("Failed to build an rskafka controller client"))] - BuildCtrlClient { + #[snafu(display("Failed to build a kafka controller client"))] + BuildKafkaCtrlClient { location: Location, #[snafu(source)] error: rskafka::client::error::Error, }, - #[snafu(display("Failed to create a kafka wal topic through rskafka client"))] + #[snafu(display("Failed to create a kafka wal topic"))] CreateKafkaWalTopic { location: Location, #[snafu(source)] error: rskafka::client::error::Error, }, + + #[snafu(display("The topic pool is empty"))] + EmptyTopicPool { location: Location }, } pub type Result = std::result::Result; @@ -362,10 +365,11 @@ impl ErrorExt for Error { | TableRouteNotFound { .. } | ConvertRawTableInfo { .. } | RegionOperatingRace { .. } - | EncodeWalOptionsToJson { .. } - | BuildClient { .. } - | BuildCtrlClient { .. } - | CreateKafkaWalTopic { .. } => StatusCode::Unexpected, + | EncodeWalOptions { .. } + | BuildKafkaClient { .. } + | BuildKafkaCtrlClient { .. } + | CreateKafkaWalTopic { .. } + | EmptyTopicPool { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 0ffaba6a22e0..0e1576c3645e 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -23,7 +23,9 @@ use serde_with::with_prefix; use crate::error::Result; use crate::wal::kafka::KafkaConfig; pub use crate::wal::kafka::Topic as KafkaWalTopic; -pub use crate::wal::options_allocator::{build_wal_options_allocator, WalOptionsAllocator}; +pub use crate::wal::options_allocator::{ + allocate_region_wal_options, WalOptionsAllocator, WalOptionsAllocatorRef, +}; /// Wal config for metasrv. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] @@ -38,6 +40,8 @@ pub enum WalConfig { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use crate::wal::kafka::topic_selector::SelectorType as KafkaTopicSelectorType; @@ -65,19 +69,31 @@ mod tests { broker_endpoints = ["127.0.0.1:9090"] num_topics = 32 selector_type = "round_robin" - topic_name_prefix = "greptimedb_kafka_wal" + topic_name_prefix = "greptimedb_wal_kafka" num_partitions = 1 replication_factor = 3 + create_topic_timeout = "30s" + backoff_init = "500ms" + backoff_max = "10s" + backoff_base = 2.0 + backoff_deadline = "5mins" "#; let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); let expected_kafka_config = KafkaConfig { broker_endpoints: vec!["127.0.0.1:9090".to_string()], num_topics: 32, selector_type: KafkaTopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_kafka_wal".to_string(), + topic_name_prefix: "greptimedb_wal_kafka".to_string(), num_partitions: 1, replication_factor: 3, + create_topic_timeout: Duration::from_secs(30), + backoff_init: Duration::from_millis(500), + backoff_max: Duration::from_secs(10), + backoff_base: 2.0, + backoff_deadline: Some(Duration::from_secs(60 * 5)), }; assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config)); } + + // TODO(niebayes): the integrate test needs to test that the example config file can be successfully parsed. } diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 90de795d5a73..13ba0a4707c4 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -16,6 +16,8 @@ pub mod topic; pub mod topic_manager; pub mod topic_selector; +use std::time::Duration; + use serde::{Deserialize, Serialize}; pub use crate::wal::kafka::topic::Topic; @@ -37,6 +39,21 @@ pub struct KafkaConfig { pub num_partitions: i32, /// The replication factor of each topic. pub replication_factor: i16, + /// Aboves which a topic creation operation will be cancelled. + #[serde(with = "humantime_serde")] + pub create_topic_timeout: Duration, + /// The initial backoff for kafka clients. + #[serde(with = "humantime_serde")] + pub backoff_init: Duration, + /// The maximum backoff for kafka clients. + #[serde(with = "humantime_serde")] + pub backoff_max: Duration, + /// Exponential backoff rate, i.e. next backoff = base * current backoff. + pub backoff_base: f64, + /// Stop reconnecting if the total wait time reaches the deadline. + /// If it's None, the reconnecting won't terminate. + #[serde(with = "humantime_serde")] + pub backoff_deadline: Option, } impl Default for KafkaConfig { @@ -45,9 +62,14 @@ impl Default for KafkaConfig { broker_endpoints: vec!["127.0.0.1:9090".to_string()], num_topics: 64, selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_kafka_wal".to_string(), + topic_name_prefix: "greptimedb_wal_kafka".to_string(), num_partitions: 1, replication_factor: 3, + create_topic_timeout: Duration::from_secs(30), + backoff_init: Duration::from_millis(500), + backoff_max: Duration::from_secs(10), + backoff_base: 2.0, + backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins } } } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index fd007c7b7f71..4773871ae134 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -22,7 +22,7 @@ use rskafka::BackoffConfig; use snafu::{ensure, ResultExt}; use crate::error::{ - BuildClientSnafu, BuildCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu, + BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu, Result, }; use crate::kv_backend::KvBackendRef; @@ -31,12 +31,12 @@ use crate::wal::kafka::topic::Topic; use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, SelectorType, TopicSelectorRef}; use crate::wal::kafka::KafkaConfig; -const CREATE_TOPIC_TIMEOUT: i32 = 5_000; // 5,000 ms. -const CREATED_TOPICS_KEY: &str = "__created_kafka_wal_topics"; +const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/"; /// Manages topic initialization and selection. pub struct TopicManager { config: KafkaConfig, + // TODO(niebayes): maybe add a guard to ensure all topics in the topic pool are created. topic_pool: Vec, topic_selector: TopicSelectorRef, kv_backend: KvBackendRef, @@ -45,13 +45,18 @@ pub struct TopicManager { impl TopicManager { /// Creates a new topic manager. pub fn new(config: KafkaConfig, kv_backend: KvBackendRef) -> Self { + // Topics should be created. + let topics = (0..config.num_topics) + .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) + .collect::>(); + let selector = match config.selector_type { SelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), }; Self { config, - topic_pool: Vec::new(), + topic_pool: topics, topic_selector: Arc::new(selector), kv_backend, } @@ -60,74 +65,85 @@ impl TopicManager { /// Tries to initialize the topic manager. /// The initializer first tries to restore persisted topics from the kv backend. /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. - pub async fn try_init(&mut self) -> Result<()> { + pub async fn start(&self) -> Result<()> { let num_topics = self.config.num_topics; ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); - // Builds an rskafka controller client for creating topics if necessary. - let broker_endpoints = self.config.broker_endpoints.clone(); - let backoff_config = BackoffConfig { - init_backoff: Duration::from_millis(500), - max_backoff: Duration::from_secs(10), - base: 2.0, - // Stop reconnecting if the total wait time reaches the deadline. - deadline: Some(Duration::from_secs(60 * 5)), // 5 mins. - }; - let client = ClientBuilder::new(broker_endpoints.clone()) - .backoff_config(backoff_config) - .build() - .await - .context(BuildClientSnafu { broker_endpoints })? - .controller_client() - .context(BuildCtrlClientSnafu)?; - // Topics should be created. - let topics = (0..num_topics) - .map(|topic_id| format!("{}_{topic_id}", self.config.topic_name_prefix)) - .collect::>(); + let topics = &self.topic_pool; + // Topics already created. // There may have extra topics created but it's okay since those topics won't break topic allocation. let created_topics = Self::restore_created_topics(&self.kv_backend) .await? .into_iter() .collect::>(); + debug!("Restored {} topics", created_topics.len()); - // Spawns tokio tasks for creating missing topics. - let tasks = topics + // Creates missing topics. + let to_be_created = topics .iter() - .filter_map(|topic| { + .enumerate() + .filter_map(|(i, topic)| { if created_topics.contains(topic) { - debug!("Topic {} was created", topic); return None; } + Some(i) + }) + .collect::>(); + if !to_be_created.is_empty() { + self.try_create_topics(topics, &to_be_created).await?; + Self::persist_created_topics(topics, &self.kv_backend).await?; + debug!("Persisted {} topics", topics.len()); + } + Ok(()) + } + + /// Tries to create topics specified by indexes in `to_be_created`. + async fn try_create_topics(&self, topics: &[Topic], to_be_created: &[usize]) -> Result<()> { + // Builds an kafka controller client for creating topics. + let backoff_config = BackoffConfig { + init_backoff: self.config.backoff_init, + max_backoff: self.config.backoff_max, + base: self.config.backoff_base, + deadline: self.config.backoff_deadline, + }; + let client = ClientBuilder::new(self.config.broker_endpoints.clone()) + .backoff_config(backoff_config) + .build() + .await + .with_context(|_| BuildKafkaClientSnafu { + broker_endpoints: self.config.broker_endpoints.clone(), + })? + .controller_client() + .context(BuildKafkaCtrlClientSnafu)?; - debug!("Tries to create topic {}", topic); - Some(client.create_topic( - topic, + // Spawns tokio tasks for creating missing topics. + let tasks = to_be_created + .iter() + .map(|i| { + client.create_topic( + topics[*i].clone(), self.config.num_partitions, self.config.replication_factor, - CREATE_TOPIC_TIMEOUT, - )) + self.config.create_topic_timeout.as_millis() as i32, + ) }) .collect::>(); // TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised. futures::future::try_join_all(tasks) .await - .context(CreateKafkaWalTopicSnafu)?; - - // Persists created topics. - Self::persist_created_topics(&topics, &self.kv_backend).await?; - - Ok(()) + .context(CreateKafkaWalTopicSnafu) + .map(|_| ()) } /// Selects one topic from the topic pool through the topic selector. - pub fn select(&self) -> &Topic { + pub fn select(&self) -> Result<&Topic> { self.topic_selector.select(&self.topic_pool) } /// Selects a batch of topics from the topic pool through the topic selector. - pub fn select_batch(&self, num_topics: usize) -> Vec<&Topic> { + pub fn select_batch(&self, num_topics: usize) -> Result> { (0..num_topics) .map(|_| self.topic_selector.select(&self.topic_pool)) .collect() @@ -137,8 +153,10 @@ impl TopicManager { kv_backend .get(CREATED_TOPICS_KEY.as_bytes()) .await? - .map(|key_value| serde_json::from_slice(&key_value.value).context(DecodeJsonSnafu)) - .unwrap_or_else(|| Ok(vec![])) + .map_or_else( + || Ok(vec![]), + |key_value| serde_json::from_slice(&key_value.value).context(DecodeJsonSnafu), + ) } async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> { @@ -163,7 +181,7 @@ mod tests { #[tokio::test] async fn test_restore_persisted_topics() { let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let topic_name_prefix = "__test_"; + let topic_name_prefix = "greptimedb_wal_kafka"; let num_topics = 16; // Constructs mock topics. diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs index 043727386a21..6764cadcc990 100644 --- a/src/common/meta/src/wal/kafka/topic_selector.rs +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use rand::Rng; use serde::{Deserialize, Serialize}; +use snafu::ensure; +use crate::error::{EmptyTopicPoolSnafu, Result}; use crate::wal::kafka::topic::Topic; /// The type of the topic selector, i.e. with which strategy to select a topic. @@ -29,17 +31,17 @@ pub enum SelectorType { } /// Controls topic selection. -pub(super) trait TopicSelector: Send + Sync { +pub(crate) trait TopicSelector: Send + Sync { /// Selects a topic from the topic pool. - fn select<'a>(&'a self, topic_pool: &'a [Topic]) -> &Topic; + fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic>; } /// Arc wrapper of TopicSelector. -pub(super) type TopicSelectorRef = Arc; +pub(crate) type TopicSelectorRef = Arc; /// A topic selector with the round-robin strategy, i.e. selects topics in a round-robin manner. #[derive(Default)] -pub(super) struct RoundRobinTopicSelector { +pub(crate) struct RoundRobinTopicSelector { cursor: AtomicUsize, } @@ -47,20 +49,18 @@ impl RoundRobinTopicSelector { // The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes. // Introducing a shuffling strategy may help mitigate this issue. pub fn with_shuffle() -> Self { - let mut this = Self::default(); - let offset = rand::thread_rng().gen::() % usize::MAX; - // It's ok when an overflow happens since `fetch_add` automatically wraps around. - this.cursor.fetch_add(offset, Ordering::Relaxed); - this + let offset = rand::thread_rng().gen_range(0..64); + Self { + cursor: AtomicUsize::new(offset), + } } } impl TopicSelector for RoundRobinTopicSelector { - fn select<'a>(&'a self, topic_pool: &'a [Topic]) -> &Topic { - // Safety: the caller ensures the topic pool is not empty and hence the modulo operation is safe. + fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic> { + ensure!(!topic_pool.is_empty(), EmptyTopicPoolSnafu); let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len(); - // Safety: the modulo operation ensures the index operation is safe. - topic_pool.get(which).unwrap() + Ok(&topic_pool[which]) } } @@ -73,14 +73,14 @@ mod tests { let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect(); let selector = RoundRobinTopicSelector::default(); - assert_eq!(selector.select(&topic_pool), "0"); - assert_eq!(selector.select(&topic_pool), "1"); - assert_eq!(selector.select(&topic_pool), "2"); - assert_eq!(selector.select(&topic_pool), "0"); + assert_eq!(selector.select(&topic_pool).unwrap(), "0"); + assert_eq!(selector.select(&topic_pool).unwrap(), "1"); + assert_eq!(selector.select(&topic_pool).unwrap(), "2"); + assert_eq!(selector.select(&topic_pool).unwrap(), "0"); // Creates a round-robin selector with shuffle. let selector = RoundRobinTopicSelector::with_shuffle(); - let topic = selector.select(&topic_pool); + let topic = selector.select(&topic_pool).unwrap(); assert!(topic_pool.contains(topic)); } } diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs index c8fd2eea963c..6c2702053b87 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal/options_allocator.rs @@ -19,7 +19,7 @@ use common_config::{KafkaWalOptions, WalOptions}; use snafu::ResultExt; use store_api::storage::RegionNumber; -use crate::error::{EncodeWalOptionsToJsonSnafu, Result}; +use crate::error::{EncodeWalOptionsSnafu, Result}; use crate::kv_backend::KvBackendRef; use crate::wal::kafka::TopicManager as KafkaTopicManager; use crate::wal::WalConfig; @@ -32,6 +32,9 @@ pub enum WalOptionsAllocator { Kafka(KafkaTopicManager), } +/// Arc wrapper of WalOptionsAllocator. +pub type WalOptionsAllocatorRef = Arc; + impl WalOptionsAllocator { /// Creates a WalOptionsAllocator. pub fn new(config: WalConfig, kv_backend: KvBackendRef) -> Self { @@ -43,61 +46,57 @@ impl WalOptionsAllocator { } } - /// Tries to initialize the allocator. - pub async fn try_init(&mut self) -> Result<()> { + /// Tries to start the allocator. + pub async fn start(&self) -> Result<()> { match self { Self::RaftEngine => Ok(()), - Self::Kafka(kafka_topic_manager) => kafka_topic_manager.try_init().await, + Self::Kafka(kafka_topic_manager) => kafka_topic_manager.start().await, } } /// Allocates a wal options for a region. - pub fn alloc(&self) -> WalOptions { + pub fn alloc(&self) -> Result { match self { - Self::RaftEngine => WalOptions::RaftEngine, - Self::Kafka(kafka_topic_manager) => WalOptions::Kafka(KafkaWalOptions { - topic: kafka_topic_manager.select().clone(), - }), + Self::RaftEngine => Ok(WalOptions::RaftEngine), + Self::Kafka(topic_manager) => { + let topic = topic_manager.select()?; + Ok(WalOptions::Kafka(KafkaWalOptions { + topic: topic.clone(), + })) + } } } /// Allocates a batch of wal options where each wal options goes to a region. - pub fn alloc_batch(&self, num_regions: usize) -> Vec { + pub fn alloc_batch(&self, num_regions: usize) -> Result> { match self { - WalOptionsAllocator::RaftEngine => vec![WalOptions::RaftEngine; num_regions], - WalOptionsAllocator::Kafka(topic_manager) => topic_manager - .select_batch(num_regions) - .into_iter() - .map(|topic| { - WalOptions::Kafka(KafkaWalOptions { - topic: topic.clone(), + WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]), + WalOptionsAllocator::Kafka(topic_manager) => { + let options_batch = topic_manager + .select_batch(num_regions)? + .into_iter() + .map(|topic| { + WalOptions::Kafka(KafkaWalOptions { + topic: topic.clone(), + }) }) - }) - .collect(), + .collect(); + Ok(options_batch) + } } } } -/// Creates and initializes a wal options allocator. -pub async fn build_wal_options_allocator( - config: &WalConfig, - kv_backend: &KvBackendRef, -) -> Result { - let mut allocator = WalOptionsAllocator::new(config.clone(), kv_backend.clone()); - allocator.try_init().await?; - Ok(allocator) -} - /// Allocates a wal options for each region. The allocated wal options is encoded immediately. pub fn allocate_region_wal_options( regions: Vec, wal_options_allocator: &WalOptionsAllocator, ) -> Result> { let wal_options = wal_options_allocator - .alloc_batch(regions.len()) + .alloc_batch(regions.len())? .into_iter() .map(|wal_options| { - serde_json::to_string(&wal_options).context(EncodeWalOptionsToJsonSnafu { wal_options }) + serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options }) }) .collect::>>()?; @@ -115,9 +114,8 @@ mod tests { async fn test_allocator_with_raft_engine() { let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; let wal_config = WalConfig::RaftEngine; - let allocator = build_wal_options_allocator(&wal_config, &kv_backend) - .await - .unwrap(); + let mut allocator = WalOptionsAllocator::new(wal_config, kv_backend); + allocator.start().await.unwrap(); let num_regions = 32; let regions = (0..num_regions).collect::>(); diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 2bf0f066c18a..ec569fe138c0 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -27,7 +27,7 @@ use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; use common_meta::wal::options_allocator::allocate_region_wal_options; -use common_meta::wal::WalOptionsAllocator; +use common_meta::wal::WalOptionsAllocatorRef; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, tracing}; @@ -109,11 +109,14 @@ impl Datanode for RegionInvoker { pub struct StandaloneTableMetadataAllocator { table_id_sequence: SequenceRef, - wal_options_allocator: WalOptionsAllocator, + wal_options_allocator: WalOptionsAllocatorRef, } impl StandaloneTableMetadataAllocator { - pub fn new(table_id_sequence: SequenceRef, wal_options_allocator: WalOptionsAllocator) -> Self { + pub fn new( + table_id_sequence: SequenceRef, + wal_options_allocator: WalOptionsAllocatorRef, + ) -> Self { Self { table_id_sequence, wal_options_allocator, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 906d9847c617..9bf7c9693ae4 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -26,6 +26,7 @@ use common_meta::ddl::DdlTaskExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::region_keeper::MemoryRegionKeeperRef; +use common_meta::wal::options_allocator::WalOptionsAllocatorRef; use common_meta::wal::WalConfig; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; @@ -174,6 +175,7 @@ pub type ElectionRef = Arc>; pub struct MetaStateHandler { procedure_manager: ProcedureManagerRef, + wal_options_allocator: WalOptionsAllocatorRef, subscribe_manager: Option, greptimedb_telemetry_task: Arc, leader_cached_kv_backend: Arc, @@ -193,6 +195,11 @@ impl MetaStateHandler { if let Err(e) = self.procedure_manager.start().await { error!(e; "Failed to start procedure manager"); } + + if let Err(e) = self.wal_options_allocator.start().await { + error!(e; "Failed to start wal options allocator"); + } + self.greptimedb_telemetry_task.should_report(true); } @@ -233,6 +240,7 @@ pub struct MetaSrv { procedure_manager: ProcedureManagerRef, mailbox: MailboxRef, ddl_executor: DdlTaskExecutorRef, + wal_options_allocator: WalOptionsAllocatorRef, table_metadata_manager: TableMetadataManagerRef, memory_region_keeper: MemoryRegionKeeperRef, greptimedb_telemetry_task: Arc, @@ -267,6 +275,7 @@ impl MetaSrv { greptimedb_telemetry_task, subscribe_manager, procedure_manager, + wal_options_allocator: self.wal_options_allocator.clone(), state: self.state.clone(), leader_cached_kv_backend: leader_cached_kv_backend.clone(), }; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 0e168d522d62..105e9dab0017 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -19,7 +19,6 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; -use common_error::ext::BoxedError; use common_grpc::channel_manager::ChannelConfig; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::TableMetadataAllocatorRef; @@ -31,14 +30,14 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; -use common_meta::wal::build_wal_options_allocator; +use common_meta::wal::WalOptionsAllocator; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::error::{self, OtherSnafu, Result}; +use crate::error::{self, Result}; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; use crate::handler::collect_stats_handler::CollectStatsHandler; @@ -206,10 +205,10 @@ impl MetaSrvBuilder { table_id: None, }; - let wal_options_allocator = build_wal_options_allocator(&options.wal, &kv_backend) - .await - .map_err(BoxedError::new) - .context(OtherSnafu)?; + let wal_options_allocator = Arc::new(WalOptionsAllocator::new( + options.wal.clone(), + kv_backend.clone(), + )); let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { let sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) @@ -221,7 +220,7 @@ impl MetaSrvBuilder { selector_ctx.clone(), selector.clone(), sequence.clone(), - wal_options_allocator, + wal_options_allocator.clone(), )) }); @@ -314,6 +313,7 @@ impl MetaSrvBuilder { procedure_manager, mailbox, ddl_executor: ddl_manager, + wal_options_allocator, table_metadata_manager, greptimedb_telemetry_task: get_greptimedb_telemetry_task( Some(metasrv_home), diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index d5888305fb18..29a7f52bcdf6 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -19,8 +19,7 @@ use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAlloc use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; -use common_meta::wal::options_allocator::allocate_region_wal_options; -use common_meta::wal::WalOptionsAllocator; +use common_meta::wal::{allocate_region_wal_options, WalOptionsAllocatorRef}; use common_telemetry::{debug, warn}; use snafu::{ensure, ResultExt}; use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ}; @@ -34,7 +33,7 @@ pub struct MetaSrvTableMetadataAllocator { ctx: SelectorContext, selector: SelectorRef, table_id_sequence: SequenceRef, - wal_options_allocator: WalOptionsAllocator, + wal_options_allocator: WalOptionsAllocatorRef, } impl MetaSrvTableMetadataAllocator { @@ -42,7 +41,7 @@ impl MetaSrvTableMetadataAllocator { ctx: SelectorContext, selector: SelectorRef, table_id_sequence: SequenceRef, - wal_options_allocator: WalOptionsAllocator, + wal_options_allocator: WalOptionsAllocatorRef, ) -> Self { Self { ctx, diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 0f561ca1512b..8cbd70260cde 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -23,7 +23,7 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal::build_wal_options_allocator; +use common_meta::wal::WalOptionsAllocator; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use datanode::config::DatanodeOptions; @@ -119,13 +119,13 @@ impl GreptimeDbStandaloneBuilder { .build(), ); // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. - let wal_options_allocator = - build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend) - .await - .unwrap(); + let wal_options_allocator = Arc::new(WalOptionsAllocator::new( + common_meta::wal::WalConfig::default(), + kv_backend.clone(), + )); let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( table_id_sequence, - wal_options_allocator, + wal_options_allocator.clone(), )); let ddl_task_executor = Arc::new( @@ -147,6 +147,7 @@ impl GreptimeDbStandaloneBuilder { .unwrap(); procedure_manager.start().await.unwrap(); + wal_options_allocator.start().await.unwrap(); test_util::prepare_another_catalog_and_schema(&instance).await; From 43e514ce89a6f6cac3a6324fb9c1d61c8dd7131d Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 22 Dec 2023 15:59:45 +0800 Subject: [PATCH 9/9] fix: typo --- config/metasrv.example.toml | 2 +- src/common/meta/src/wal/kafka.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 74a0897af8d3..b057d7f6b677 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -66,7 +66,7 @@ provider = "raft_engine" # num_partitions = 1 # Expected number of replicas of each partition. # replication_factor = 3 -# Aboves which a topic creation operation will be cancelled. +# Above which a topic creation operation will be cancelled. # create_topic_timeout = "30s" # The initial backoff for kafka clients. # backoff_init = "500ms" diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 13ba0a4707c4..709c5a4f9879 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -39,7 +39,7 @@ pub struct KafkaConfig { pub num_partitions: i32, /// The replication factor of each topic. pub replication_factor: i16, - /// Aboves which a topic creation operation will be cancelled. + /// Above which a topic creation operation will be cancelled. #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, /// The initial backoff for kafka clients.