From 0f4ef3e571b0f1eda1df1fa71439ea4c030717e3 Mon Sep 17 00:00:00 2001 From: niebayes Date: Mon, 25 Dec 2023 17:05:34 +0800 Subject: [PATCH] fix: resolve review conversations --- config/datanode.example.toml | 3 --- config/standalone.example.toml | 6 ------ src/common/config/src/wal.rs | 6 ------ src/common/config/src/wal/kafka.rs | 9 --------- src/common/meta/src/error.rs | 6 +++--- src/log-store/src/error.rs | 4 ++-- src/log-store/src/kafka/client_manager.rs | 2 +- 7 files changed, 6 insertions(+), 30 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 7343fbe47bf5..342e10bfe19f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -51,9 +51,6 @@ sync_write = false # Kafka wal options, see `standalone.example.toml`. # broker_endpoints = ["127.0.0.1:9090"] -# num_topics = 64 -# topic_name_prefix = "greptimedb_wal_topic" -# num_partitions = 1 # max_batch_size = "4MB" # linger = "200ms" # max_wait_time = "100ms" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 7e12a6288ae0..713f8ef79edb 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -90,12 +90,6 @@ provider = "raft_engine" # Kafka wal options. # The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. # broker_endpoints = ["127.0.0.1:9090"] -# Number of topics shall be created beforehand. -# num_topics = 64 -# Topic name prefix. -# topic_name_prefix = "greptimedb_wal_topic" -# Number of partitions per topic. -# num_partitions = 1 # The maximum log size a kafka batch producer could buffer. # max_batch_size = "4MB" # The linger duration of a kafka batch producer. diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 6c4955f4e63c..60128d14b35e 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -70,9 +70,6 @@ mod tests { fn test_serde_kafka_config() { let toml_str = r#" broker_endpoints = ["127.0.0.1:9090"] - num_topics = 32 - topic_name_prefix = "greptimedb_wal_topic" - num_partitions = 1 max_batch_size = "4MB" linger = "200ms" max_wait_time = "100ms" @@ -84,9 +81,6 @@ mod tests { let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); let expected = KafkaConfig { broker_endpoints: vec!["127.0.0.1:9090".to_string()], - num_topics: 32, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, compression: RsKafkaCompression::default(), max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index dc31bcec9e6c..eb6795054141 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -29,12 +29,6 @@ pub type Topic = String; pub struct KafkaConfig { /// The broker endpoints of the Kafka cluster. pub broker_endpoints: Vec, - /// Number of topics shall be created beforehand. - pub num_topics: usize, - /// Topic name prefix. - pub topic_name_prefix: String, - /// Number of partitions per topic. - pub num_partitions: i32, /// The compression algorithm used to compress log entries. #[serde(skip)] #[serde(default)] @@ -66,9 +60,6 @@ impl Default for KafkaConfig { fn default() -> Self { Self { broker_endpoints: vec!["127.0.0.1:9090".to_string()], - num_topics: 64, - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), - num_partitions: 1, compression: RsKafkaCompression::NoCompression, max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 79c3e9316c59..519d8ec7a1af 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -304,7 +304,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build a kafka client, broker endpoints: {:?}", + "Failed to build a Kafka client, broker endpoints: {:?}", broker_endpoints ))] BuildKafkaClient { @@ -314,14 +314,14 @@ pub enum Error { error: rskafka::client::error::Error, }, - #[snafu(display("Failed to build a kafka controller client"))] + #[snafu(display("Failed to build a Kafka controller client"))] BuildKafkaCtrlClient { location: Location, #[snafu(source)] error: rskafka::client::error::Error, }, - #[snafu(display("Failed to create a kafka wal topic"))] + #[snafu(display("Failed to create a Kafka wal topic"))] CreateKafkaWalTopic { location: Location, #[snafu(source)] diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index ea54853dea09..1ee344046adc 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -87,7 +87,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build a kafka client, broker endpoints: {:?}", + "Failed to build a Kafka client, broker endpoints: {:?}", broker_endpoints ))] BuildClient { @@ -98,7 +98,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build a kafka partition client, topic: {}, partition: {}", + "Failed to build a Kafka partition client, topic: {}, partition: {}", topic, partition ))] diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 56cdadabeccb..9aa27bf1b3fd 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -91,7 +91,7 @@ impl ClientManager { Ok(Self { config: config.clone(), client_factory: client, - client_pool: DashMap::with_capacity(config.num_topics), + client_pool: DashMap::new(), }) }