From a8a0f7e3ea870dde831d07192bde40c35d10cd7f Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 23 Nov 2023 12:07:32 +0800 Subject: [PATCH] fix(remote_wal): resolve review conversations --- Cargo.lock | 1 + config/metasrv.example.toml | 2 +- src/common/meta/Cargo.toml | 1 + src/common/meta/src/error.rs | 4 - src/common/meta/src/wal/kafka.rs | 2 +- .../meta/src/wal/kafka/topic_manager.rs | 80 +++++++++---------- src/meta-srv/src/metasrv/builder.rs | 2 +- 7 files changed, 45 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e4725f3e713..d0e6da85b81d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1816,6 +1816,7 @@ dependencies = [ "lazy_static", "prometheus", "prost 0.12.1", + "rand", "regex", "rskafka", "serde", diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 8e12c1fb6022..696ed0810bb8 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -59,6 +59,6 @@ num_topics = 64 # Available selector types: # - "RoundRobin" (default) selector_type = "RoundRobin" -topic_name_prefix = "gt_kafka_topic" +topic_name_prefix = "greptime_wal" num_partitions = 1 replication_factor = 3 diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index b4bd1b6c93f5..ece2a68cec67 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -30,6 +30,7 @@ humantime-serde.workspace = true lazy_static.workspace = true prometheus.workspace = true prost.workspace = true +rand.workspace = true regex.workspace = true rskafka.workspace = true serde.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 5741b703f1b3..264357ce6504 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -291,9 +291,6 @@ pub enum Error { error: JsonError, }, - #[snafu(display("Failed to persist Kafka topics"))] - PersistKafkaTopics { location: Location }, - #[snafu(display( "Failed to build a rskafka client, broker endpoints: {:?}", broker_endpoints @@ -388,7 +385,6 @@ impl ErrorExt for Error { Error::MissingKafkaOpts { .. } | Error::DeserKafkaTopics { .. } | Error::SerKafkaTopics { .. } - | Error::PersistKafkaTopics { .. } | Error::InvalidNumTopics { .. } | Error::BuildKafkaClient { .. } | Error::BuildKafkaCtrlClient { .. } diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 6368145ced34..f31c6fa2b477 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -42,7 +42,7 @@ impl Default for KafkaOptions { broker_endpoints: vec!["127.0.0.1:9090".to_string()], num_topics: 64, selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "gt_kafka_topic".to_string(), + topic_name_prefix: "greptime_wal".to_string(), num_partitions: 1, replication_factor: 3, } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 2fab9d5da05e..f654fed22295 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -14,21 +14,23 @@ use std::collections::HashSet; +use rand::seq::SliceRandom; use rskafka::client::ClientBuilder; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaTopicSnafu, DeserKafkaTopicsSnafu, - InvalidNumTopicsSnafu, MissingKafkaOptsSnafu, PersistKafkaTopicsSnafu, Result, - SerKafkaTopicsSnafu, TooManyCreatedKafkaTopicsSnafu, + InvalidNumTopicsSnafu, MissingKafkaOptsSnafu, Result, SerKafkaTopicsSnafu, + TooManyCreatedKafkaTopicsSnafu, }; use crate::kv_backend::KvBackendRef; +use crate::rpc::store::PutRequest; use crate::wal::kafka::topic_selector::{build_topic_selector, TopicSelectorRef}; -use crate::wal::kafka::KafkaOptions; +use crate::wal::kafka::{KafkaOptions, TopicSelectorType}; pub type Topic = String; -const TOPICS_KEY: &str = "gt_kafka_topics"; +const METASRV_CREATED_TOPICS_KEY: &str = "metasrv_created_topics"; const CREATE_TOPIC_TIMEOUT: i32 = 5_000; // 5,000 ms. pub struct TopicManager { @@ -38,14 +40,22 @@ pub struct TopicManager { impl TopicManager { pub async fn try_new( - kafka_opts: &Option, + kafka_opts: Option<&KafkaOptions>, kv_backend: &KvBackendRef, ) -> Result { - let opts = kafka_opts.as_ref().context(MissingKafkaOptsSnafu)?; + let opts = kafka_opts.context(MissingKafkaOptsSnafu)?; + let topic_pool = build_topic_pool(opts, kv_backend).await?; + let topic_selector = build_topic_selector(&opts.selector_type); + + // The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes. + // Introduces a shuffling may help mitigate this issue. + let topic_pool = match opts.selector_type { + TopicSelectorType::RoundRobin => shuffle_topic_pool(topic_pool), + }; Ok(Self { - topic_pool: build_topic_pool(opts, kv_backend).await?, - topic_selector: build_topic_selector(&opts.selector_type), + topic_pool, + topic_selector, }) } @@ -57,17 +67,10 @@ impl TopicManager { } async fn build_topic_pool(opts: &KafkaOptions, kv_backend: &KvBackendRef) -> Result> { - let KafkaOptions { - broker_endpoints, - num_topics, - topic_name_prefix, - num_partitions, - replication_factor, - .. - } = opts.clone(); - + let num_topics = opts.num_topics; ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); + let broker_endpoints = opts.broker_endpoints.clone(); let kafka_client = ClientBuilder::new(broker_endpoints.clone()) .build() .await @@ -78,7 +81,7 @@ async fn build_topic_pool(opts: &KafkaOptions, kv_backend: &KvBackendRef) -> Res .context(BuildKafkaCtrlClientSnafu)?; let topics = (0..num_topics) - .map(|topic_id| format!("{topic_name_prefix}_{topic_id}")) + .map(|topic_id| format!("{}_{topic_id}", opts.topic_name_prefix)) .collect::>(); let created_topics = restore_created_topics(kv_backend) @@ -99,49 +102,46 @@ async fn build_topic_pool(opts: &KafkaOptions, kv_backend: &KvBackendRef) -> Res return None; } + // TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised. Some(kafka_ctrl_client.create_topic( topic, - num_partitions, - replication_factor, + opts.num_partitions, + opts.replication_factor, CREATE_TOPIC_TIMEOUT, )) }) .collect::>(); - let _ = futures::future::try_join_all(create_topic_tasks) + futures::future::try_join_all(create_topic_tasks) .await .context(CreateKafkaTopicSnafu)?; - // FIXME(niebayes): current persistence strategy is all-or-none. Maybe we should increase the granularity. persist_created_topics(&topics, kv_backend).await?; Ok(topics) } +fn shuffle_topic_pool(mut topic_pool: Vec) -> Vec { + topic_pool.shuffle(&mut rand::thread_rng()); + topic_pool +} + async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result> { - let raw_topics = kv_backend - .get(TOPICS_KEY.as_bytes()) + kv_backend + .get(METASRV_CREATED_TOPICS_KEY.as_bytes()) .await? - .map(|key_value| key_value.value) - .unwrap_or_default(); - - serde_json::from_slice(&raw_topics).context(DeserKafkaTopicsSnafu) + .map(|key_value| serde_json::from_slice(&key_value.value).context(DeserKafkaTopicsSnafu)) + .unwrap_or_else(|| Ok(vec![])) } async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> { let raw_topics = serde_json::to_string(topics).context(SerKafkaTopicsSnafu)?; kv_backend - .put_conditionally( - TOPICS_KEY.as_bytes().to_vec(), - raw_topics.into_bytes(), - false, - ) - .await - .and_then(|persisted| { - if !persisted { - PersistKafkaTopicsSnafu.fail() - } else { - Ok(()) - } + .put(PutRequest { + key: METASRV_CREATED_TOPICS_KEY.as_bytes().to_vec(), + value: raw_topics.into_bytes(), + prev_kv: false, }) + .await + .map(|_| ()) } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index b1e0afa20133..c5473201b2a6 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -194,7 +194,7 @@ impl MetaSrvBuilder { let kafka_topic_manager = match options.wal.provider { WalProvider::Kafka => Some( - KafkaTopicManager::try_new(&options.wal.kafka_opts, &kv_backend) + KafkaTopicManager::try_new(options.wal.kafka_opts.as_ref(), &kv_backend) .await .context(BuildKafkaTopicManagerSnafu)?, ),