diff --git a/src/common/src/monitor/connection.rs b/src/common/src/monitor/connection.rs index 1b65461927df6..ef51eb6260be0 100644 --- a/src/common/src/monitor/connection.rs +++ b/src/common/src/monitor/connection.rs @@ -16,6 +16,7 @@ use std::any::type_name; use std::cmp::Ordering; use std::future::Future; use std::io::{Error, IoSlice}; +use std::net::SocketAddr; use std::pin::Pin; use std::sync::LazyLock; use std::task::{Context, Poll}; @@ -23,9 +24,11 @@ use std::time::Duration; use futures::FutureExt; use http::Uri; +use hyper::client::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name}; use hyper::client::connect::Connection; use hyper::client::HttpConnector; use hyper::service::Service; +use itertools::Itertools; use pin_project_lite::pin_project; use prometheus::{ register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounter, @@ -33,7 +36,7 @@ use prometheus::{ }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tonic::transport::{Channel, Endpoint}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::metrics::LabelGuardedIntCounterVec; use crate::monitor::GLOBAL_METRICS_REGISTRY; @@ -388,6 +391,92 @@ pub fn monitor_connector( MonitoredConnection::new(connector, MonitorNewConnectionImpl { connection_type }) } +pub struct MonitoredGaiAddrs { + inner: Vec, + pos: usize, +} + +impl From for MonitoredGaiAddrs { + fn from(value: GaiAddrs) -> Self { + Self { + inner: value.collect_vec(), + pos: 0, + } + } +} + +impl Iterator for MonitoredGaiAddrs { + type Item = SocketAddr; + + fn next(&mut self) -> Option { + let res = self.inner.get(self.pos).cloned(); + self.pos += 1; + res + } +} + +pub struct MonitoredGaiFuture { + name: Name, + inner: GaiFuture, +} + +impl std::fmt::Debug for MonitoredGaiFuture { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.pad("MonitoredGaiFuture") + } +} + +impl Future for MonitoredGaiFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.inner).poll(cx).map(|res| match res { + Ok(addrs) => { + let addrs: MonitoredGaiAddrs = addrs.into(); + debug!("resolve {} => {:?}", self.name, addrs.inner); + Ok(addrs) + } + Err(err) => Err(err), + }) + } +} + +#[derive(Clone)] +pub struct MonitoredGaiResolver { + inner: GaiResolver, +} + +impl std::fmt::Debug for MonitoredGaiResolver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.pad("MonitoredGaiResolver") + } +} + +impl Default for MonitoredGaiResolver { + fn default() -> Self { + Self { + inner: GaiResolver::new(), + } + } +} + +impl Service for MonitoredGaiResolver { + type Error = Error; + type Future = MonitoredGaiFuture; + type Response = MonitoredGaiAddrs; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Name) -> Self::Future { + MonitoredGaiFuture { + name: req.clone(), + inner: self.inner.call(req), + } + } +} + #[easy_ext::ext(EndpointExt)] impl Endpoint { pub async fn monitored_connect( @@ -397,7 +486,9 @@ impl Endpoint { ) -> Result { #[cfg(not(madsim))] { - let mut http = HttpConnector::new(); + let resolver = MonitoredGaiResolver::default(); + let mut http = HttpConnector::new_with_resolver(resolver); + http.enforce_http(false); http.set_nodelay(config.tcp_nodelay); http.set_keepalive(config.keepalive_duration);