From 5baed7b01d0202ffa1b860cdc425de93e18eff03 Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Sun, 25 Feb 2024 22:41:49 +0800 Subject: [PATCH 01/14] feat: Support automatic DNS lookup for kafka bootstrap servers --- Cargo.lock | 15 +++- Cargo.toml | 1 + src/common/wal/Cargo.toml | 1 + src/common/wal/src/config.rs | 79 +++++++++++++++++++ src/common/wal/src/config/kafka/common.rs | 35 +++++++- src/common/wal/src/config/kafka/datanode.rs | 1 + src/common/wal/src/config/kafka/metasrv.rs | 1 + src/common/wal/src/config/kafka/standalone.rs | 1 + 8 files changed, 132 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84d28103f312..4591bba5c5fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2106,6 +2106,7 @@ version = "0.6.0" dependencies = [ "common-base", "common-telemetry", + "dns-lookup 2.0.4", "futures-util", "humantime-serde", "rskafka", @@ -3073,6 +3074,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "dns-lookup" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "socket2 0.5.5", + "windows-sys 0.48.0", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -8370,7 +8383,7 @@ dependencies = [ "crossbeam-utils", "csv-core", "digest", - "dns-lookup", + "dns-lookup 1.0.8", "dyn-clone", "flate2", "gethostname", diff --git a/Cargo.toml b/Cargo.toml index 742681bb6a31..98a5b1466ef5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,6 +99,7 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } derive_builder = "0.12" +dns-lookup = "2.0" etcd-client = "0.12" fst = "0.4.7" futures = "0.3" diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index 3b84673bb1ee..ce10f40bd4eb 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] common-base.workspace = true common-telemetry.workspace = true +dns-lookup.workspace = true futures-util.workspace = true humantime-serde.workspace = true rskafka.workspace = true diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index a51335c199bb..4ae9594f09b9 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -147,6 +147,85 @@ mod tests { assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected)); } + // TODO: find an way to merge with the test below. + #[test] + fn test_toml_kafka_with_dnslookup() { + let toml_str = r#" + provider = "kafka" + broker_endpoints = ["localhost:9092"] + num_topics = 32 + selector_type = "round_robin" + topic_name_prefix = "greptimedb_wal_topic" + replication_factor = 1 + create_topic_timeout = "30s" + max_batch_size = "1MB" + linger = "200ms" + consumer_wait_timeout = "100ms" + backoff_init = "500ms" + backoff_max = "10s" + backoff_base = 2 + backoff_deadline = "5mins" + "#; + + // Deserialized to MetaSrvWalConfig. + let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap(); + let expected = MetaSrvKafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + backoff: BackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, + }; + assert_eq!(metasrv_wal_config, MetaSrvWalConfig::Kafka(expected)); + + // Deserialized to DatanodeWalConfig. + let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); + let expected = DatanodeKafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + compression: Compression::default(), + max_batch_size: ReadableSize::mb(1), + linger: Duration::from_millis(200), + consumer_wait_timeout: Duration::from_millis(100), + backoff: BackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, + }; + assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); + + // Deserialized to StandaloneWalConfig. + let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap(); + let expected = StandaloneKafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + compression: Compression::default(), + max_batch_size: ReadableSize::mb(1), + linger: Duration::from_millis(200), + consumer_wait_timeout: Duration::from_millis(100), + backoff: BackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, + }; + assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected)); + } #[test] fn test_toml_kafka() { let toml_str = r#" diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index ea708d96159c..b48e6af45382 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -14,7 +14,7 @@ use std::time::Duration; -use serde::{Deserialize, Serialize}; +use serde::{de, Deserialize, Serialize}; use serde_with::with_prefix; with_prefix!(pub backoff_prefix "backoff_"); @@ -46,3 +46,36 @@ impl Default for BackoffConfig { } } } +fn lookup_endpoint<'de, D>(endpoint: &str) -> Result +where + D: serde::Deserializer<'de>, +{ + let mut iter = endpoint.split(':'); + let ip_or_domain: &str = iter.next().unwrap(); + let port: &str = iter.next().unwrap(); + if ip_or_domain.parse::().is_ok() { + return Ok(endpoint.to_string()); + } + let ips: Vec<_> = dns_lookup::lookup_host(ip_or_domain) + .map_err(de::Error::custom)? + .into_iter() + .filter(|addr| addr.is_ipv4()) + .collect(); + if ips.is_empty() { + return Err(de::Error::custom(format!( + "failed to resolve the domain name: {}", + ip_or_domain + ))); + } + Ok(format!("{}:{}", ips[0], port)) +} +pub fn deserialize_broker_endpoints<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let mut broker_endpoints: Vec = Vec::deserialize(deserializer)?; + for endpoint in &mut broker_endpoints { + *endpoint = lookup_endpoint::(endpoint)?; + } + Ok(broker_endpoints) +} diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index b15d13dffc2a..dd35781973a3 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -26,6 +26,7 @@ use crate::BROKER_ENDPOINT; #[serde(default)] pub struct DatanodeKafkaConfig { /// The broker endpoints of the Kafka cluster. + #[serde(deserialize_with = "super::common::deserialize_broker_endpoints")] pub broker_endpoints: Vec, /// The compression algorithm used to compress kafka records. #[serde(skip)] diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index a8989275f42b..5705dd71a6e6 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -24,6 +24,7 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; #[serde(default)] pub struct MetaSrvKafkaConfig { /// The broker endpoints of the Kafka cluster. + #[serde(deserialize_with = "super::common::deserialize_broker_endpoints")] pub broker_endpoints: Vec, /// The number of topics to be created upon start. pub num_topics: usize, diff --git a/src/common/wal/src/config/kafka/standalone.rs b/src/common/wal/src/config/kafka/standalone.rs index 3da8fa498092..bd34b919d860 100644 --- a/src/common/wal/src/config/kafka/standalone.rs +++ b/src/common/wal/src/config/kafka/standalone.rs @@ -26,6 +26,7 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; #[serde(default)] pub struct StandaloneKafkaConfig { /// The broker endpoints of the Kafka cluster. + #[serde(deserialize_with = "super::common::deserialize_broker_endpoints")] pub broker_endpoints: Vec, /// Number of topics to be created upon start. pub num_topics: usize, From a2580418bf2ce8591e036f80ed8d8ec8e41539e5 Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Mon, 26 Feb 2024 14:45:03 +0800 Subject: [PATCH 02/14] Revert "feat: Support automatic DNS lookup for kafka bootstrap servers" This reverts commit 5baed7b01d0202ffa1b860cdc425de93e18eff03. --- Cargo.lock | 15 +--- Cargo.toml | 1 - src/common/wal/Cargo.toml | 1 - src/common/wal/src/config.rs | 79 ------------------- src/common/wal/src/config/kafka/common.rs | 35 +------- src/common/wal/src/config/kafka/datanode.rs | 1 - src/common/wal/src/config/kafka/metasrv.rs | 1 - src/common/wal/src/config/kafka/standalone.rs | 1 - 8 files changed, 2 insertions(+), 132 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4591bba5c5fb..84d28103f312 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2106,7 +2106,6 @@ version = "0.6.0" dependencies = [ "common-base", "common-telemetry", - "dns-lookup 2.0.4", "futures-util", "humantime-serde", "rskafka", @@ -3074,18 +3073,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "dns-lookup" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "socket2 0.5.5", - "windows-sys 0.48.0", -] - [[package]] name = "doc-comment" version = "0.3.3" @@ -8383,7 +8370,7 @@ dependencies = [ "crossbeam-utils", "csv-core", "digest", - "dns-lookup 1.0.8", + "dns-lookup", "dyn-clone", "flate2", "gethostname", diff --git a/Cargo.toml b/Cargo.toml index 98a5b1466ef5..742681bb6a31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,7 +99,6 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } derive_builder = "0.12" -dns-lookup = "2.0" etcd-client = "0.12" fst = "0.4.7" futures = "0.3" diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index ce10f40bd4eb..3b84673bb1ee 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -13,7 +13,6 @@ workspace = true [dependencies] common-base.workspace = true common-telemetry.workspace = true -dns-lookup.workspace = true futures-util.workspace = true humantime-serde.workspace = true rskafka.workspace = true diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 4ae9594f09b9..a51335c199bb 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -147,85 +147,6 @@ mod tests { assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected)); } - // TODO: find an way to merge with the test below. - #[test] - fn test_toml_kafka_with_dnslookup() { - let toml_str = r#" - provider = "kafka" - broker_endpoints = ["localhost:9092"] - num_topics = 32 - selector_type = "round_robin" - topic_name_prefix = "greptimedb_wal_topic" - replication_factor = 1 - create_topic_timeout = "30s" - max_batch_size = "1MB" - linger = "200ms" - consumer_wait_timeout = "100ms" - backoff_init = "500ms" - backoff_max = "10s" - backoff_base = 2 - backoff_deadline = "5mins" - "#; - - // Deserialized to MetaSrvWalConfig. - let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap(); - let expected = MetaSrvKafkaConfig { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - num_topics: 32, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), - backoff: BackoffConfig { - init: Duration::from_millis(500), - max: Duration::from_secs(10), - base: 2, - deadline: Some(Duration::from_secs(60 * 5)), - }, - }; - assert_eq!(metasrv_wal_config, MetaSrvWalConfig::Kafka(expected)); - - // Deserialized to DatanodeWalConfig. - let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); - let expected = DatanodeKafkaConfig { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - compression: Compression::default(), - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), - consumer_wait_timeout: Duration::from_millis(100), - backoff: BackoffConfig { - init: Duration::from_millis(500), - max: Duration::from_secs(10), - base: 2, - deadline: Some(Duration::from_secs(60 * 5)), - }, - }; - assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); - - // Deserialized to StandaloneWalConfig. - let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap(); - let expected = StandaloneKafkaConfig { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - num_topics: 32, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), - compression: Compression::default(), - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), - consumer_wait_timeout: Duration::from_millis(100), - backoff: BackoffConfig { - init: Duration::from_millis(500), - max: Duration::from_secs(10), - base: 2, - deadline: Some(Duration::from_secs(60 * 5)), - }, - }; - assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected)); - } #[test] fn test_toml_kafka() { let toml_str = r#" diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index b48e6af45382..ea708d96159c 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -14,7 +14,7 @@ use std::time::Duration; -use serde::{de, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use serde_with::with_prefix; with_prefix!(pub backoff_prefix "backoff_"); @@ -46,36 +46,3 @@ impl Default for BackoffConfig { } } } -fn lookup_endpoint<'de, D>(endpoint: &str) -> Result -where - D: serde::Deserializer<'de>, -{ - let mut iter = endpoint.split(':'); - let ip_or_domain: &str = iter.next().unwrap(); - let port: &str = iter.next().unwrap(); - if ip_or_domain.parse::().is_ok() { - return Ok(endpoint.to_string()); - } - let ips: Vec<_> = dns_lookup::lookup_host(ip_or_domain) - .map_err(de::Error::custom)? - .into_iter() - .filter(|addr| addr.is_ipv4()) - .collect(); - if ips.is_empty() { - return Err(de::Error::custom(format!( - "failed to resolve the domain name: {}", - ip_or_domain - ))); - } - Ok(format!("{}:{}", ips[0], port)) -} -pub fn deserialize_broker_endpoints<'de, D>(deserializer: D) -> Result, D::Error> -where - D: serde::Deserializer<'de>, -{ - let mut broker_endpoints: Vec = Vec::deserialize(deserializer)?; - for endpoint in &mut broker_endpoints { - *endpoint = lookup_endpoint::(endpoint)?; - } - Ok(broker_endpoints) -} diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index dd35781973a3..b15d13dffc2a 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -26,7 +26,6 @@ use crate::BROKER_ENDPOINT; #[serde(default)] pub struct DatanodeKafkaConfig { /// The broker endpoints of the Kafka cluster. - #[serde(deserialize_with = "super::common::deserialize_broker_endpoints")] pub broker_endpoints: Vec, /// The compression algorithm used to compress kafka records. #[serde(skip)] diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index 5705dd71a6e6..a8989275f42b 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -24,7 +24,6 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; #[serde(default)] pub struct MetaSrvKafkaConfig { /// The broker endpoints of the Kafka cluster. - #[serde(deserialize_with = "super::common::deserialize_broker_endpoints")] pub broker_endpoints: Vec, /// The number of topics to be created upon start. pub num_topics: usize, diff --git a/src/common/wal/src/config/kafka/standalone.rs b/src/common/wal/src/config/kafka/standalone.rs index bd34b919d860..3da8fa498092 100644 --- a/src/common/wal/src/config/kafka/standalone.rs +++ b/src/common/wal/src/config/kafka/standalone.rs @@ -26,7 +26,6 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; #[serde(default)] pub struct StandaloneKafkaConfig { /// The broker endpoints of the Kafka cluster. - #[serde(deserialize_with = "super::common::deserialize_broker_endpoints")] pub broker_endpoints: Vec, /// Number of topics to be created upon start. pub num_topics: usize, From 5c55325a0b6cc9b0e0b2991fa8fdd653e1b0f5f4 Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Mon, 26 Feb 2024 19:24:48 +0800 Subject: [PATCH 03/14] feat: Support automatic DNS lookup for Kafka broker --- src/common/meta/src/error.rs | 12 +++++++ .../kafka/topic_manager.rs | 31 +++++++++++++++++-- src/log-store/src/error.rs | 10 ++++++ src/log-store/src/kafka/client_manager.rs | 26 +++++++++++++++- 4 files changed, 75 insertions(+), 4 deletions(-) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 32af562e30f8..2f6f6c688f4c 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -340,6 +340,16 @@ pub enum Error { error: rskafka::client::error::Error, }, + #[snafu(display("Failed to resolve Kafka broker endpoint {:?}", broker_endpoint))] + ResolveKafkaEndpoint { + broker_endpoint: String, + #[snafu(source)] + error: std::io::Error, + }, + + #[snafu(display("Endpoint ip not found for broker endpoint: {:?}", broker_endpoint))] + EndpointIpNotFound { broker_endpoint: String }, + #[snafu(display("Failed to build a Kafka controller client"))] BuildKafkaCtrlClient { location: Location, @@ -425,6 +435,8 @@ impl ErrorExt for Error { | BuildKafkaClient { .. } | BuildKafkaCtrlClient { .. } | BuildKafkaPartitionClient { .. } + | ResolveKafkaEndpoint { .. } + | EndpointIpNotFound { .. } | ProduceRecord { .. } | CreateKafkaWalTopic { .. } | EmptyTopicPool { .. } diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 63944be13c05..41204861bc07 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -26,11 +26,12 @@ use rskafka::client::{Client, ClientBuilder}; use rskafka::record::Record; use rskafka::BackoffConfig; use snafu::{ensure, ResultExt}; +use tokio::net; use crate::error::{ BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, - CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu, - ProduceRecordSnafu, Result, + CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, EndpointIpNotFoundSnafu, + InvalidNumTopicsSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, }; use crate::kv_backend::KvBackendRef; use crate::rpc::store::PutRequest; @@ -108,6 +109,26 @@ impl TopicManager { Ok(()) } + async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { + let ips: Vec<_> = net::lookup_host(broker_endpoint) + .await + .with_context(|_| ResolveKafkaEndpointSnafu { + broker_endpoint: broker_endpoint.to_string(), + })? + .into_iter() + // Not sure if we should filter out ipv6 addresses + .filter(|addr| addr.is_ipv4()) + .collect(); + if ips.is_empty() { + return (|| { + EndpointIpNotFoundSnafu { + broker_endpoint: broker_endpoint.to_string(), + } + .fail() + })(); + } + Ok(ips[0].to_string()) + } /// Tries to create topics specified by indexes in `to_be_created`. async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> { // Builds an kafka controller client for creating topics. @@ -117,7 +138,11 @@ impl TopicManager { base: self.config.backoff.base as f64, deadline: self.config.backoff.deadline, }; - let client = ClientBuilder::new(self.config.broker_endpoints.clone()) + let mut broker_endpoints = Vec::with_capacity(self.config.broker_endpoints.len()); + for endpoint in &self.config.broker_endpoints { + broker_endpoints.push(Self::resolve_broker_endpoint(endpoint).await?); + } + let client = ClientBuilder::new(broker_endpoints) .backoff_config(backoff_config) .build() .await diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 0f5beaa16bb6..00484d9ab71c 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -108,6 +108,16 @@ pub enum Error { error: rskafka::client::error::Error, }, + #[snafu(display("Failed to resolve Kafka broker endpoint {:?}", broker_endpoint))] + ResolveKafkaEndpoint { + broker_endpoint: String, + #[snafu(source)] + error: std::io::Error, + }, + + #[snafu(display("Failed to find ip for broker endpoint: {:?}", broker_endpoint))] + EndpointIpNotFound { broker_endpoint: String }, + #[snafu(display( "Failed to build a Kafka partition client, topic: {}, partition: {}", topic, diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 9e082decb851..40a4ea1a0c06 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -22,9 +22,13 @@ use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; use snafu::ResultExt; +use tokio::net; use tokio::sync::RwLock; -use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; +use crate::error::{ + BuildClientSnafu, BuildPartitionClientSnafu, EndpointIpNotFoundSnafu, + ResolveKafkaEndpointSnafu, Result, +}; // Each topic only has one partition for now. // The `DEFAULT_PARTITION` refers to the index of the partition. @@ -94,6 +98,26 @@ impl ClientManager { client_pool: RwLock::new(HashMap::new()), }) } + async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { + let ips: Vec<_> = net::lookup_host(broker_endpoint) + .await + .with_context(|_| ResolveKafkaEndpointSnafu { + broker_endpoint: broker_endpoint.to_string(), + })? + .into_iter() + // Not sure if we should filter out ipv6 addresses + .filter(|addr| addr.is_ipv4()) + .collect(); + if ips.is_empty() { + return (|| { + EndpointIpNotFoundSnafu { + broker_endpoint: broker_endpoint.to_string(), + } + .fail() + })(); + } + Ok(ips[0].to_string()) + } /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. From e98f5bd6724339f4c503616fd86f801a01d26a43 Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Mon, 26 Feb 2024 20:40:52 +0800 Subject: [PATCH 04/14] fix: resolve broker endpoint in client manager --- src/log-store/src/kafka/client_manager.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 40a4ea1a0c06..40cedbd40a90 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -84,7 +84,11 @@ impl ClientManager { base: config.backoff.base as f64, deadline: config.backoff.deadline, }; - let client = ClientBuilder::new(config.broker_endpoints.clone()) + let mut broker_endpoints = Vec::with_capacity(config.broker_endpoints.len()); + for endpoint in &config.broker_endpoints { + broker_endpoints.push(Self::resolve_broker_endpoint(endpoint).await?); + } + let client = ClientBuilder::new(broker_endpoints) .backoff_config(backoff_config) .build() .await From 357dcaf340d7ea6883e40f15f68851189f879a0b Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Mon, 26 Feb 2024 21:50:36 +0800 Subject: [PATCH 05/14] fix: apply clippy lints --- .../src/wal_options_allocator/kafka/topic_manager.rs | 11 ++++------- src/log-store/src/kafka/client_manager.rs | 11 ++++------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 41204861bc07..14c23a57d9ec 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -115,17 +115,14 @@ impl TopicManager { .with_context(|_| ResolveKafkaEndpointSnafu { broker_endpoint: broker_endpoint.to_string(), })? - .into_iter() // Not sure if we should filter out ipv6 addresses .filter(|addr| addr.is_ipv4()) .collect(); if ips.is_empty() { - return (|| { - EndpointIpNotFoundSnafu { - broker_endpoint: broker_endpoint.to_string(), - } - .fail() - })(); + return EndpointIpNotFoundSnafu { + broker_endpoint: broker_endpoint.to_string(), + } + .fail(); } Ok(ips[0].to_string()) } diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 40cedbd40a90..b5f997bcc2bf 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -108,17 +108,14 @@ impl ClientManager { .with_context(|_| ResolveKafkaEndpointSnafu { broker_endpoint: broker_endpoint.to_string(), })? - .into_iter() // Not sure if we should filter out ipv6 addresses .filter(|addr| addr.is_ipv4()) .collect(); if ips.is_empty() { - return (|| { - EndpointIpNotFoundSnafu { - broker_endpoint: broker_endpoint.to_string(), - } - .fail() - })(); + return EndpointIpNotFoundSnafu { + broker_endpoint: broker_endpoint.to_string(), + } + .fail(); } Ok(ips[0].to_string()) } From c3014d82ff5bd687c75b2abf9e62f732f643bf4a Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Mon, 26 Feb 2024 22:42:50 +0800 Subject: [PATCH 06/14] refactor: slimplify the code with clippy hint --- .../wal_options_allocator/kafka/topic_manager.rs | 16 ++++++---------- src/log-store/src/kafka/client_manager.rs | 16 ++++++---------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 14c23a57d9ec..e4bf1cf4c3d5 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -25,7 +25,7 @@ use rskafka::client::partition::{Compression, UnknownTopicHandling}; use rskafka::client::{Client, ClientBuilder}; use rskafka::record::Record; use rskafka::BackoffConfig; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::net; use crate::error::{ @@ -110,21 +110,17 @@ impl TopicManager { } async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { - let ips: Vec<_> = net::lookup_host(broker_endpoint) + let ip = net::lookup_host(broker_endpoint) .await .with_context(|_| ResolveKafkaEndpointSnafu { broker_endpoint: broker_endpoint.to_string(), })? // Not sure if we should filter out ipv6 addresses - .filter(|addr| addr.is_ipv4()) - .collect(); - if ips.is_empty() { - return EndpointIpNotFoundSnafu { + .find(|addr| addr.is_ipv4()) + .with_context(|| EndpointIpNotFoundSnafu { broker_endpoint: broker_endpoint.to_string(), - } - .fail(); - } - Ok(ips[0].to_string()) + })?; + Ok(ip.to_string()) } /// Tries to create topics specified by indexes in `to_be_created`. async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> { diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index b5f997bcc2bf..4af4e5597f19 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -21,7 +21,7 @@ use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use tokio::net; use tokio::sync::RwLock; @@ -103,21 +103,17 @@ impl ClientManager { }) } async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { - let ips: Vec<_> = net::lookup_host(broker_endpoint) + let ip = net::lookup_host(broker_endpoint) .await .with_context(|_| ResolveKafkaEndpointSnafu { broker_endpoint: broker_endpoint.to_string(), })? // Not sure if we should filter out ipv6 addresses - .filter(|addr| addr.is_ipv4()) - .collect(); - if ips.is_empty() { - return EndpointIpNotFoundSnafu { + .find(|addr| addr.is_ipv4()) + .with_context(|| EndpointIpNotFoundSnafu { broker_endpoint: broker_endpoint.to_string(), - } - .fail(); - } - Ok(ips[0].to_string()) + })?; + Ok(ip.to_string()) } /// Gets the client associated with the topic. If the client does not exist, a new one will From 6ed31f454f2442cc4be3f9bcb4ec6623e211eb93 Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Tue, 27 Feb 2024 14:29:15 +0800 Subject: [PATCH 07/14] refactor: move resolve_broker_endpoint to common/wal/src/lib.rs --- Cargo.lock | 4 +++ src/common/meta/src/error.rs | 12 ++----- .../kafka/topic_manager.rs | 33 +++++++------------ src/common/wal/Cargo.toml | 4 +++ src/common/wal/src/error.rs | 32 ++++++++++++++++++ src/common/wal/src/lib.rs | 17 ++++++++++ src/log-store/src/error.rs | 11 ++----- src/log-store/src/kafka/client_manager.rs | 32 +++++++----------- 8 files changed, 84 insertions(+), 61 deletions(-) create mode 100644 src/common/wal/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 84d28103f312..e8f293204c8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2105,6 +2105,8 @@ name = "common-wal" version = "0.6.0" dependencies = [ "common-base", + "common-error", + "common-macro", "common-telemetry", "futures-util", "humantime-serde", @@ -2112,6 +2114,8 @@ dependencies = [ "serde", "serde_json", "serde_with", + "snafu", + "tokio", "toml 0.8.8", ] diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 2f6f6c688f4c..cfe1cbf65602 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -340,15 +340,8 @@ pub enum Error { error: rskafka::client::error::Error, }, - #[snafu(display("Failed to resolve Kafka broker endpoint {:?}", broker_endpoint))] - ResolveKafkaEndpoint { - broker_endpoint: String, - #[snafu(source)] - error: std::io::Error, - }, - - #[snafu(display("Endpoint ip not found for broker endpoint: {:?}", broker_endpoint))] - EndpointIpNotFound { broker_endpoint: String }, + #[snafu(display("Failed to resolve Kafka broker endpoint."))] + ResolveKafkaEndpoint { source: common_wal::error::Error }, #[snafu(display("Failed to build a Kafka controller client"))] BuildKafkaCtrlClient { @@ -436,7 +429,6 @@ impl ErrorExt for Error { | BuildKafkaCtrlClient { .. } | BuildKafkaPartitionClient { .. } | ResolveKafkaEndpoint { .. } - | EndpointIpNotFound { .. } | ProduceRecord { .. } | CreateKafkaWalTopic { .. } | EmptyTopicPool { .. } diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index e4bf1cf4c3d5..745a22af91ae 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -25,13 +25,12 @@ use rskafka::client::partition::{Compression, UnknownTopicHandling}; use rskafka::client::{Client, ClientBuilder}; use rskafka::record::Record; use rskafka::BackoffConfig; -use snafu::{ensure, OptionExt, ResultExt}; -use tokio::net; +use snafu::{ensure, ResultExt}; use crate::error::{ BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, - CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, EndpointIpNotFoundSnafu, - InvalidNumTopicsSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, + CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu, + ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, }; use crate::kv_backend::KvBackendRef; use crate::rpc::store::PutRequest; @@ -109,19 +108,6 @@ impl TopicManager { Ok(()) } - async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { - let ip = net::lookup_host(broker_endpoint) - .await - .with_context(|_| ResolveKafkaEndpointSnafu { - broker_endpoint: broker_endpoint.to_string(), - })? - // Not sure if we should filter out ipv6 addresses - .find(|addr| addr.is_ipv4()) - .with_context(|| EndpointIpNotFoundSnafu { - broker_endpoint: broker_endpoint.to_string(), - })?; - Ok(ip.to_string()) - } /// Tries to create topics specified by indexes in `to_be_created`. async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> { // Builds an kafka controller client for creating topics. @@ -131,10 +117,15 @@ impl TopicManager { base: self.config.backoff.base as f64, deadline: self.config.backoff.deadline, }; - let mut broker_endpoints = Vec::with_capacity(self.config.broker_endpoints.len()); - for endpoint in &self.config.broker_endpoints { - broker_endpoints.push(Self::resolve_broker_endpoint(endpoint).await?); - } + let broker_endpoints = + futures::future::try_join_all(self.config.broker_endpoints.clone().into_iter().map( + |endpoint| async move { + common_wal::resolve_broker_endpoint(&endpoint) + .await + .context(ResolveKafkaEndpointSnafu) + }, + )) + .await?; let client = ClientBuilder::new(broker_endpoints) .backoff_config(backoff_config) .build() diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index 3b84673bb1ee..a39baf438f19 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -12,12 +12,16 @@ workspace = true [dependencies] common-base.workspace = true +common-error.workspace = true +common-macro.workspace = true common-telemetry.workspace = true futures-util.workspace = true humantime-serde.workspace = true rskafka.workspace = true serde.workspace = true serde_with.workspace = true +snafu.workspace = true +tokio.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/src/common/wal/src/error.rs b/src/common/wal/src/error.rs new file mode 100644 index 000000000000..a61d44b72954 --- /dev/null +++ b/src/common/wal/src/error.rs @@ -0,0 +1,32 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_macro::stack_trace_debug; +use snafu::Snafu; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to resolve endpoint {:?}", broker_endpoint))] + ResolveEndpoint { + broker_endpoint: String, + #[snafu(source)] + error: std::io::Error, + }, + #[snafu(display("Failed to find ipv4 endpoint: {:?}", broker_endpoint))] + EndpointIpNotFound { broker_endpoint: String }, +} + +pub type Result = std::result::Result; diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 88d67ee3e0bb..88c1193a3e1d 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use error::{EndpointIpNotFoundSnafu, ResolveEndpointSnafu, Result}; use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; pub mod config; +pub mod error; pub mod options; #[cfg(any(test, feature = "testing"))] pub mod test_util; @@ -30,3 +33,17 @@ pub enum TopicSelectorType { #[default] RoundRobin, } + +pub async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { + let ip = tokio::net::lookup_host(broker_endpoint) + .await + .with_context(|_| ResolveEndpointSnafu { + broker_endpoint: broker_endpoint.to_string(), + })? + // only IPv4 addresses are valid + .find(|addr| addr.is_ipv4()) + .with_context(|| EndpointIpNotFoundSnafu { + broker_endpoint: broker_endpoint.to_string(), + })?; + Ok(ip.to_string()) +} diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 00484d9ab71c..edb06b42ca7f 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -108,15 +108,8 @@ pub enum Error { error: rskafka::client::error::Error, }, - #[snafu(display("Failed to resolve Kafka broker endpoint {:?}", broker_endpoint))] - ResolveKafkaEndpoint { - broker_endpoint: String, - #[snafu(source)] - error: std::io::Error, - }, - - #[snafu(display("Failed to find ip for broker endpoint: {:?}", broker_endpoint))] - EndpointIpNotFound { broker_endpoint: String }, + #[snafu(display("Failed to resolve Kafka broker endpoint."))] + ResolveKafkaEndpoint { source: common_wal::error::Error }, #[snafu(display( "Failed to build a Kafka partition client, topic: {}, partition: {}", diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 4af4e5597f19..f8e8acd9eae3 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -21,13 +21,11 @@ use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; -use snafu::{OptionExt, ResultExt}; -use tokio::net; +use snafu::ResultExt; use tokio::sync::RwLock; use crate::error::{ - BuildClientSnafu, BuildPartitionClientSnafu, EndpointIpNotFoundSnafu, - ResolveKafkaEndpointSnafu, Result, + BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, }; // Each topic only has one partition for now. @@ -84,10 +82,15 @@ impl ClientManager { base: config.backoff.base as f64, deadline: config.backoff.deadline, }; - let mut broker_endpoints = Vec::with_capacity(config.broker_endpoints.len()); - for endpoint in &config.broker_endpoints { - broker_endpoints.push(Self::resolve_broker_endpoint(endpoint).await?); - } + let broker_endpoints = + futures::future::try_join_all(config.broker_endpoints.clone().into_iter().map( + |endpoint| async move { + common_wal::resolve_broker_endpoint(&endpoint) + .await + .context(ResolveKafkaEndpointSnafu) + }, + )) + .await?; let client = ClientBuilder::new(broker_endpoints) .backoff_config(backoff_config) .build() @@ -102,19 +105,6 @@ impl ClientManager { client_pool: RwLock::new(HashMap::new()), }) } - async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { - let ip = net::lookup_host(broker_endpoint) - .await - .with_context(|_| ResolveKafkaEndpointSnafu { - broker_endpoint: broker_endpoint.to_string(), - })? - // Not sure if we should filter out ipv6 addresses - .find(|addr| addr.is_ipv4()) - .with_context(|| EndpointIpNotFoundSnafu { - broker_endpoint: broker_endpoint.to_string(), - })?; - Ok(ip.to_string()) - } /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. From edd5fb82e5bebd4c4c2a418a45a8eb0df2b560de Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Tue, 27 Feb 2024 16:30:17 +0800 Subject: [PATCH 08/14] test: add mock test for resolver_broker_endpoint --- Cargo.lock | 2 + src/common/wal/Cargo.toml | 2 + src/common/wal/src/lib.rs | 87 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 89 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8f293204c8b..960135bb965d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2104,12 +2104,14 @@ dependencies = [ name = "common-wal" version = "0.6.0" dependencies = [ + "async-trait", "common-base", "common-error", "common-macro", "common-telemetry", "futures-util", "humantime-serde", + "mockall", "rskafka", "serde", "serde_json", diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index a39baf438f19..7dd2f15323d8 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -11,12 +11,14 @@ testing = [] workspace = true [dependencies] +async-trait.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true futures-util.workspace = true humantime-serde.workspace = true +mockall.workspace = true rskafka.workspace = true serde.workspace = true serde_with.workspace = true diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 88c1193a3e1d..6f2836221efc 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -12,9 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io; +use std::net::SocketAddr; + +use async_trait::async_trait; use error::{EndpointIpNotFoundSnafu, ResolveEndpointSnafu, Result}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use tokio::net; pub mod config; pub mod error; @@ -34,12 +39,31 @@ pub enum TopicSelectorType { RoundRobin, } -pub async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { - let ip = tokio::net::lookup_host(broker_endpoint) +#[mockall::automock] +#[async_trait] +trait DNSResolver { + async fn resolve_broker_endpoint(&self, broker_endpoint: &str) -> io::Result>; +} +struct TokioDnsResolver; +#[async_trait] +impl DNSResolver for TokioDnsResolver { + async fn resolve_broker_endpoint(&self, broker_endpoint: &str) -> io::Result> { + net::lookup_host(broker_endpoint) + .await + .map(|iter| iter.collect()) + } +} +async fn resolve_broker_endpoint_inner( + broker_endpoint: &str, + resolver: R, +) -> Result { + let ip = resolver + .resolve_broker_endpoint(broker_endpoint) .await .with_context(|_| ResolveEndpointSnafu { broker_endpoint: broker_endpoint.to_string(), })? + .into_iter() // only IPv4 addresses are valid .find(|addr| addr.is_ipv4()) .with_context(|| EndpointIpNotFoundSnafu { @@ -47,3 +71,62 @@ pub async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { })?; Ok(ip.to_string()) } +pub async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { + resolve_broker_endpoint_inner(broker_endpoint, TokioDnsResolver).await +} + +#[cfg(test)] +mod tests { + use mockall::predicate::eq; + + use super::*; + use crate::error::Error; + // test for resolve_broker_endpoint + #[tokio::test] + async fn test_resolve_error_occur() { + let endpoint = "example.com:9092"; + let mut mock_resolver = MockDNSResolver::new(); + mock_resolver + .expect_resolve_broker_endpoint() + .with(eq("example.com:9092")) + .times(1) + .returning(|_| { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "mocked error", + )) + }); + let ip = resolve_broker_endpoint_inner(endpoint, mock_resolver).await; + assert!(ip.is_err_and(|err| { matches!(err, Error::ResolveEndpoint { .. }) })) + } + #[tokio::test] + async fn test_resolve_only_ipv6() { + let endpoint = "example.com:9092"; + let mut mock_resolver = MockDNSResolver::new(); + mock_resolver + .expect_resolve_broker_endpoint() + .with(eq("example.com:9092")) + .times(1) + .returning(|_| { + Ok(vec![SocketAddr::new( + std::net::IpAddr::V6(std::net::Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), + 9092, + )]) + }); + let ip = resolve_broker_endpoint_inner(endpoint, mock_resolver).await; + assert!(ip.is_err_and(|err| { matches!(err, Error::EndpointIpNotFound { .. }) })) + } + #[tokio::test] + async fn test_resolve_normal() { + let mut mock_resolver = MockDNSResolver::new(); + mock_resolver + .expect_resolve_broker_endpoint() + .with(eq("example.com:9092")) + .times(1) + .returning(|_| Ok(vec!["127.0.0.1:9092".parse().unwrap()])); + let ip = resolve_broker_endpoint_inner("example.com:9092", mock_resolver) + .await + .unwrap(); + assert_eq!(ip, "127.0.0.1:9092") + } +} From be83a89517da3b5ca1a71030f07a7d66c4e56c5f Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Tue, 27 Feb 2024 16:38:36 +0800 Subject: [PATCH 09/14] refactor: accept niebayes's advice --- src/common/wal/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 6f2836221efc..3d9e14f8decb 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -57,7 +57,7 @@ async fn resolve_broker_endpoint_inner( broker_endpoint: &str, resolver: R, ) -> Result { - let ip = resolver + resolver .resolve_broker_endpoint(broker_endpoint) .await .with_context(|_| ResolveEndpointSnafu { @@ -66,10 +66,10 @@ async fn resolve_broker_endpoint_inner( .into_iter() // only IPv4 addresses are valid .find(|addr| addr.is_ipv4()) + .map(|addr| addr.to_string()) .with_context(|| EndpointIpNotFoundSnafu { broker_endpoint: broker_endpoint.to_string(), - })?; - Ok(ip.to_string()) + }) } pub async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { resolve_broker_endpoint_inner(broker_endpoint, TokioDnsResolver).await From 93a351a5e1fb887402bdfcab2b85f284d65cf074 Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Tue, 27 Feb 2024 18:25:06 +0800 Subject: [PATCH 10/14] refactor: rename EndpointIpNotFound to EndpointIPV4NotFound --- src/common/wal/src/error.rs | 3 ++- src/common/wal/src/lib.rs | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/common/wal/src/error.rs b/src/common/wal/src/error.rs index a61d44b72954..147eeb293da1 100644 --- a/src/common/wal/src/error.rs +++ b/src/common/wal/src/error.rs @@ -25,8 +25,9 @@ pub enum Error { #[snafu(source)] error: std::io::Error, }, + #[snafu(display("Failed to find ipv4 endpoint: {:?}", broker_endpoint))] - EndpointIpNotFound { broker_endpoint: String }, + EndpointIPV4NotFound { broker_endpoint: String }, } pub type Result = std::result::Result; diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 3d9e14f8decb..8922b5912b40 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -16,7 +16,7 @@ use std::io; use std::net::SocketAddr; use async_trait::async_trait; -use error::{EndpointIpNotFoundSnafu, ResolveEndpointSnafu, Result}; +use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use tokio::net; @@ -67,7 +67,7 @@ async fn resolve_broker_endpoint_inner( // only IPv4 addresses are valid .find(|addr| addr.is_ipv4()) .map(|addr| addr.to_string()) - .with_context(|| EndpointIpNotFoundSnafu { + .with_context(|| EndpointIPV4NotFoundSnafu { broker_endpoint: broker_endpoint.to_string(), }) } @@ -114,7 +114,7 @@ mod tests { )]) }); let ip = resolve_broker_endpoint_inner(endpoint, mock_resolver).await; - assert!(ip.is_err_and(|err| { matches!(err, Error::EndpointIpNotFound { .. }) })) + assert!(ip.is_err_and(|err| { matches!(err, Error::EndpointIPV4NotFound { .. }) })) } #[tokio::test] async fn test_resolve_normal() { From 392255ee9b3d303c5cd646922de84b4cb3c928a6 Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Thu, 29 Feb 2024 10:47:33 +0800 Subject: [PATCH 11/14] refactor: remove mock test and simplify the implementation --- Cargo.lock | 1 - .../kafka/topic_manager.rs | 4 +- src/common/wal/Cargo.toml | 1 - src/common/wal/src/lib.rs | 100 ++++-------------- src/log-store/src/kafka/client_manager.rs | 17 ++- 5 files changed, 32 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 960135bb965d..eb8a0598fd7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2111,7 +2111,6 @@ dependencies = [ "common-telemetry", "futures-util", "humantime-serde", - "mockall", "rskafka", "serde", "serde_json", diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 745a22af91ae..863c9f63f6f8 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -118,9 +118,9 @@ impl TopicManager { deadline: self.config.backoff.deadline, }; let broker_endpoints = - futures::future::try_join_all(self.config.broker_endpoints.clone().into_iter().map( + futures::future::try_join_all(self.config.broker_endpoints.iter().map( |endpoint| async move { - common_wal::resolve_broker_endpoint(&endpoint) + common_wal::resolve_to_ipv4(endpoint) .await .context(ResolveKafkaEndpointSnafu) }, diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index 7dd2f15323d8..6d058c757f1e 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -18,7 +18,6 @@ common-macro.workspace = true common-telemetry.workspace = true futures-util.workspace = true humantime-serde.workspace = true -mockall.workspace = true rskafka.workspace = true serde.workspace = true serde_with.workspace = true diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 8922b5912b40..4aa2983478f4 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -12,10 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io; -use std::net::SocketAddr; - -use async_trait::async_trait; +#![feature(assert_matches)] use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -39,94 +36,41 @@ pub enum TopicSelectorType { RoundRobin, } -#[mockall::automock] -#[async_trait] -trait DNSResolver { - async fn resolve_broker_endpoint(&self, broker_endpoint: &str) -> io::Result>; -} -struct TokioDnsResolver; -#[async_trait] -impl DNSResolver for TokioDnsResolver { - async fn resolve_broker_endpoint(&self, broker_endpoint: &str) -> io::Result> { - net::lookup_host(broker_endpoint) - .await - .map(|iter| iter.collect()) - } -} -async fn resolve_broker_endpoint_inner( - broker_endpoint: &str, - resolver: R, -) -> Result { - resolver - .resolve_broker_endpoint(broker_endpoint) +pub async fn resolve_to_ipv4(broker_endpoint: &str) -> Result { + net::lookup_host(broker_endpoint) .await - .with_context(|_| ResolveEndpointSnafu { - broker_endpoint: broker_endpoint.to_string(), - })? - .into_iter() + .context(ResolveEndpointSnafu { broker_endpoint })? // only IPv4 addresses are valid - .find(|addr| addr.is_ipv4()) - .map(|addr| addr.to_string()) - .with_context(|| EndpointIPV4NotFoundSnafu { - broker_endpoint: broker_endpoint.to_string(), - }) -} -pub async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result { - resolve_broker_endpoint_inner(broker_endpoint, TokioDnsResolver).await + .find_map(|addr| addr.is_ipv4().then_some(addr.to_string())) + .context(EndpointIPV4NotFoundSnafu { broker_endpoint }) } #[cfg(test)] mod tests { - use mockall::predicate::eq; + use std::assert_matches::assert_matches; use super::*; use crate::error::Error; + // test for resolve_broker_endpoint #[tokio::test] - async fn test_resolve_error_occur() { - let endpoint = "example.com:9092"; - let mut mock_resolver = MockDNSResolver::new(); - mock_resolver - .expect_resolve_broker_endpoint() - .with(eq("example.com:9092")) - .times(1) - .returning(|_| { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "mocked error", - )) - }); - let ip = resolve_broker_endpoint_inner(endpoint, mock_resolver).await; - assert!(ip.is_err_and(|err| { matches!(err, Error::ResolveEndpoint { .. }) })) + async fn test_valid_host() { + let host = "localhost:9092"; + let got = resolve_to_ipv4(host).await; + assert_eq!(got.unwrap(), "127.0.0.1:9092"); } + #[tokio::test] - async fn test_resolve_only_ipv6() { - let endpoint = "example.com:9092"; - let mut mock_resolver = MockDNSResolver::new(); - mock_resolver - .expect_resolve_broker_endpoint() - .with(eq("example.com:9092")) - .times(1) - .returning(|_| { - Ok(vec![SocketAddr::new( - std::net::IpAddr::V6(std::net::Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), - 9092, - )]) - }); - let ip = resolve_broker_endpoint_inner(endpoint, mock_resolver).await; - assert!(ip.is_err_and(|err| { matches!(err, Error::EndpointIPV4NotFound { .. }) })) + async fn test_valid_host_ipv6() { + let host = "::1:9092"; + let got = resolve_to_ipv4(host).await; + assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. }); } + #[tokio::test] - async fn test_resolve_normal() { - let mut mock_resolver = MockDNSResolver::new(); - mock_resolver - .expect_resolve_broker_endpoint() - .with(eq("example.com:9092")) - .times(1) - .returning(|_| Ok(vec!["127.0.0.1:9092".parse().unwrap()])); - let ip = resolve_broker_endpoint_inner("example.com:9092", mock_resolver) - .await - .unwrap(); - assert_eq!(ip, "127.0.0.1:9092") + async fn test_invalid_host() { + let host = "non-exist-host:9092"; + let got = resolve_to_ipv4(host).await; + assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. }); } } diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index f8e8acd9eae3..e9575d6238ae 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -82,15 +82,14 @@ impl ClientManager { base: config.backoff.base as f64, deadline: config.backoff.deadline, }; - let broker_endpoints = - futures::future::try_join_all(config.broker_endpoints.clone().into_iter().map( - |endpoint| async move { - common_wal::resolve_broker_endpoint(&endpoint) - .await - .context(ResolveKafkaEndpointSnafu) - }, - )) - .await?; + let broker_endpoints = futures::future::try_join_all(config.broker_endpoints.iter().map( + |endpoint| async move { + common_wal::resolve_to_ipv4(endpoint) + .await + .context(ResolveKafkaEndpointSnafu) + }, + )) + .await?; let client = ClientBuilder::new(broker_endpoints) .backoff_config(backoff_config) .build() From 7e32b9fb99266fbffa080d6d31b3f3049026d6cc Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Thu, 29 Feb 2024 10:58:58 +0800 Subject: [PATCH 12/14] docs: add comments about test_vallid_host_ipv6 --- src/common/wal/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 4aa2983478f4..0ba4fadd2075 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -62,6 +62,7 @@ mod tests { #[tokio::test] async fn test_valid_host_ipv6() { + // the host is valid, it is an IPv6 address, but we only accept IPv4 addresses let host = "::1:9092"; let got = resolve_to_ipv4(host).await; assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. }); From 83f94efb7766f3c46a5ba25e5b45c46e5d8e35c8 Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 29 Feb 2024 14:53:19 +0800 Subject: [PATCH 13/14] Apply suggestions from code review Co-authored-by: niebayes --- src/common/wal/Cargo.toml | 1 - src/common/wal/src/lib.rs | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index 6d058c757f1e..a39baf438f19 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -11,7 +11,6 @@ testing = [] workspace = true [dependencies] -async-trait.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 0ba4fadd2075..55b1df94c08d 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] + use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; From 3b9c26ac41e7e83cef7f1da942725c7be3428020 Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 29 Feb 2024 15:18:46 +0800 Subject: [PATCH 14/14] move more common code Signed-off-by: tison --- Cargo.lock | 1 - .../kafka/topic_manager.rs | 12 ++------ src/common/wal/src/lib.rs | 29 +++++++++++++------ src/log-store/src/kafka/client_manager.rs | 11 ++----- 4 files changed, 26 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb8a0598fd7b..e8f293204c8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2104,7 +2104,6 @@ dependencies = [ name = "common-wal" version = "0.6.0" dependencies = [ - "async-trait", "common-base", "common-error", "common-macro", diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 863c9f63f6f8..d6ee3e774600 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -117,15 +117,9 @@ impl TopicManager { base: self.config.backoff.base as f64, deadline: self.config.backoff.deadline, }; - let broker_endpoints = - futures::future::try_join_all(self.config.broker_endpoints.iter().map( - |endpoint| async move { - common_wal::resolve_to_ipv4(endpoint) - .await - .context(ResolveKafkaEndpointSnafu) - }, - )) - .await?; + let broker_endpoints = common_wal::resolve_to_ipv4(&self.config.broker_endpoints) + .await + .context(ResolveKafkaEndpointSnafu)?; let client = ClientBuilder::new(broker_endpoints) .backoff_config(backoff_config) .build() diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 55b1df94c08d..086846ab3960 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -14,6 +14,8 @@ #![feature(assert_matches)] +use std::net::SocketAddr; + use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -37,13 +39,22 @@ pub enum TopicSelectorType { RoundRobin, } -pub async fn resolve_to_ipv4(broker_endpoint: &str) -> Result { - net::lookup_host(broker_endpoint) +pub async fn resolve_to_ipv4>(endpoints: &[T]) -> Result> { + futures_util::future::try_join_all(endpoints.iter().map(resolve_to_ipv4_one)).await +} + +async fn resolve_to_ipv4_one>(endpoint: T) -> Result { + let endpoint = endpoint.as_ref(); + net::lookup_host(endpoint) .await - .context(ResolveEndpointSnafu { broker_endpoint })? - // only IPv4 addresses are valid - .find_map(|addr| addr.is_ipv4().then_some(addr.to_string())) - .context(EndpointIPV4NotFoundSnafu { broker_endpoint }) + .context(ResolveEndpointSnafu { + broker_endpoint: endpoint, + })? + .find(SocketAddr::is_ipv4) + .map(|addr| addr.to_string()) + .context(EndpointIPV4NotFoundSnafu { + broker_endpoint: endpoint, + }) } #[cfg(test)] @@ -57,7 +68,7 @@ mod tests { #[tokio::test] async fn test_valid_host() { let host = "localhost:9092"; - let got = resolve_to_ipv4(host).await; + let got = resolve_to_ipv4_one(host).await; assert_eq!(got.unwrap(), "127.0.0.1:9092"); } @@ -65,14 +76,14 @@ mod tests { async fn test_valid_host_ipv6() { // the host is valid, it is an IPv6 address, but we only accept IPv4 addresses let host = "::1:9092"; - let got = resolve_to_ipv4(host).await; + let got = resolve_to_ipv4_one(host).await; assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. }); } #[tokio::test] async fn test_invalid_host() { let host = "non-exist-host:9092"; - let got = resolve_to_ipv4(host).await; + let got = resolve_to_ipv4_one(host).await; assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. }); } } diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index e9575d6238ae..1708efed1d09 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -82,14 +82,9 @@ impl ClientManager { base: config.backoff.base as f64, deadline: config.backoff.deadline, }; - let broker_endpoints = futures::future::try_join_all(config.broker_endpoints.iter().map( - |endpoint| async move { - common_wal::resolve_to_ipv4(endpoint) - .await - .context(ResolveKafkaEndpointSnafu) - }, - )) - .await?; + let broker_endpoints = common_wal::resolve_to_ipv4(&config.broker_endpoints) + .await + .context(ResolveKafkaEndpointSnafu)?; let client = ClientBuilder::new(broker_endpoints) .backoff_config(backoff_config) .build()