diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index 2af7e2636..e3385333a 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -867,6 +867,7 @@ impl PoolRefiller { mut endpoint: UntranslatedEndpoint, ) -> impl Future { let cloud_config = self.pool_config.connection_config.cloud_config.clone(); + let hostname_resolution_timeout = self.pool_config.hostname_resolution_timeout; async move { if let Some(cloud_config) = cloud_config { // If we operate in the serverless Cloud, then we substitute every node's address @@ -881,7 +882,9 @@ impl PoolRefiller { if let Some(dc) = datacenter.as_deref() { if let Some(dc_config) = cloud_config.get_datacenters().get(dc) { let hostname = dc_config.get_server(); - if let Ok(resolved) = resolve_hostname(hostname).await { + if let Ok(resolved) = + resolve_hostname(hostname, hostname_resolution_timeout).await + { *address = NodeAddr::Untranslatable(resolved) } else { warn!( diff --git a/scylla/src/transport/node.rs b/scylla/src/transport/node.rs index ba2b3d9f4..20f9ade41 100644 --- a/scylla/src/transport/node.rs +++ b/scylla/src/transport/node.rs @@ -1,5 +1,6 @@ use itertools::Itertools; -use tokio::net::lookup_host; +use thiserror::Error; +use tokio::net::{lookup_host, ToSocketAddrs}; use tracing::warn; use uuid::Uuid; @@ -13,6 +14,7 @@ use crate::transport::errors::{ConnectionPoolError, QueryError}; use std::fmt::Display; use std::io; use std::net::IpAddr; +use std::time::Duration; use std::{ hash::{Hash, Hasher}, net::SocketAddr, @@ -267,27 +269,53 @@ pub(crate) struct ResolvedContactPoint { pub(crate) datacenter: Option, } +#[derive(Error, Debug)] +pub(crate) enum DnsLookupError { + #[error("Failed to perform DNS lookup within {0}ms")] + Timeout(u128), + #[error("Empty address list returned by DNS for {0}")] + EmptyAddressListForHost(String), + #[error(transparent)] + IoError(#[from] io::Error), +} + +/// Performs a DNS lookup with provided optional timeout. +async fn lookup_host_with_timeout( + host: T, + hostname_resolution_timeout: Option, +) -> Result, DnsLookupError> { + if let Some(timeout) = hostname_resolution_timeout { + match tokio::time::timeout(timeout, lookup_host(host)).await { + Ok(res) => res.map_err(Into::into), + // Elapsed error from tokio library does not provide any context. + Err(_) => Err(DnsLookupError::Timeout(timeout.as_millis())), + } + } else { + lookup_host(host).await.map_err(Into::into) + } +} + // Resolve the given hostname using a DNS lookup if necessary. // The resolution may return multiple IPs and the function returns one of them. // It prefers to return IPv4s first, and only if there are none, IPv6s. -pub(crate) async fn resolve_hostname(hostname: &str) -> Result { - let addrs = match lookup_host(hostname).await { +pub(crate) async fn resolve_hostname( + hostname: &str, + hostname_resolution_timeout: Option, +) -> Result { + let addrs = match lookup_host_with_timeout(hostname, hostname_resolution_timeout).await { Ok(addrs) => itertools::Either::Left(addrs), // Use a default port in case of error, but propagate the original error on failure Err(e) => { - let addrs = lookup_host((hostname, 9042)).await.or(Err(e))?; + let addrs = lookup_host_with_timeout((hostname, 9042), hostname_resolution_timeout) + .await + .or(Err(e))?; itertools::Either::Right(addrs) } }; addrs .find_or_last(|addr| matches!(addr, SocketAddr::V4(_))) - .ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - format!("Empty address list returned by DNS for {}", hostname), - ) - }) + .ok_or_else(|| DnsLookupError::EmptyAddressListForHost(hostname.to_owned())) } /// Transforms the given [`InternalKnownNode`]s into [`ContactPoint`]s. @@ -296,6 +324,7 @@ pub(crate) async fn resolve_hostname(hostname: &str) -> Result, ) -> (Vec, Vec) { // Find IP addresses of all known nodes passed in the config let mut initial_peers: Vec = Vec::with_capacity(known_nodes.len()); @@ -323,7 +352,7 @@ pub(crate) async fn resolve_contact_points( let resolve_futures = to_resolve .into_iter() .map(|(hostname, datacenter)| async move { - match resolve_hostname(hostname).await { + match resolve_hostname(hostname, hostname_resolution_timeout).await { Ok(address) => Some(ResolvedContactPoint { address, datacenter, diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index 260e3e98c..e123975df 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -481,7 +481,7 @@ impl MetadataReader { host_filter: &Option>, ) -> Result { let (initial_peers, resolved_hostnames) = - resolve_contact_points(&initial_known_nodes).await; + resolve_contact_points(&initial_known_nodes, hostname_resolution_timeout).await; // Ensure there is at least one resolved node if initial_peers.is_empty() { return Err(NewSessionError::FailedToResolveAnyHostname( @@ -574,8 +574,11 @@ impl MetadataReader { // If no known peer is reachable, try falling back to initial contact points, in hope that // there are some hostnames there which will resolve to reachable new addresses. warn!("Failed to establish control connection and fetch metadata on all known peers. Falling back to initial contact points."); - let (initial_peers, _hostnames) = - resolve_contact_points(&self.initial_known_nodes).await; + let (initial_peers, _hostnames) = resolve_contact_points( + &self.initial_known_nodes, + self.hostname_resolution_timeout, + ) + .await; result = self .retry_fetch_metadata_on_nodes( initial,