Skip to content

Commit

Permalink
chore(monitor): add debug log for client dns resolve (#14229) (#14236)
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] authored Dec 27, 2023
1 parent 0e59c2d commit 393f17f
Showing 1 changed file with 93 additions and 2 deletions.
95 changes: 93 additions & 2 deletions src/common/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,27 @@ use std::any::type_name;
use std::cmp::Ordering;
use std::future::Future;
use std::io::{Error, IoSlice};
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::LazyLock;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::FutureExt;
use http::Uri;
use hyper::client::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name};
use hyper::client::connect::Connection;
use hyper::client::HttpConnector;
use hyper::service::Service;
use itertools::Itertools;
use pin_project_lite::pin_project;
use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::{Channel, Endpoint};
use tracing::{info, warn};
use tracing::{debug, info, warn};

use crate::metrics::LabelGuardedIntCounterVec;
use crate::monitor::GLOBAL_METRICS_REGISTRY;
Expand Down Expand Up @@ -388,6 +391,92 @@ pub fn monitor_connector<C>(
MonitoredConnection::new(connector, MonitorNewConnectionImpl { connection_type })
}

pub struct MonitoredGaiAddrs {
inner: Vec<SocketAddr>,
pos: usize,
}

impl From<GaiAddrs> for MonitoredGaiAddrs {
fn from(value: GaiAddrs) -> Self {
Self {
inner: value.collect_vec(),
pos: 0,
}
}
}

impl Iterator for MonitoredGaiAddrs {
type Item = SocketAddr;

fn next(&mut self) -> Option<Self::Item> {
let res = self.inner.get(self.pos).cloned();
self.pos += 1;
res
}
}

pub struct MonitoredGaiFuture {
name: Name,
inner: GaiFuture,
}

impl std::fmt::Debug for MonitoredGaiFuture {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.pad("MonitoredGaiFuture")
}
}

impl Future for MonitoredGaiFuture {
type Output = Result<MonitoredGaiAddrs, Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner).poll(cx).map(|res| match res {
Ok(addrs) => {
let addrs: MonitoredGaiAddrs = addrs.into();
debug!("resolve {} => {:?}", self.name, addrs.inner);
Ok(addrs)
}
Err(err) => Err(err),
})
}
}

#[derive(Clone)]
pub struct MonitoredGaiResolver {
inner: GaiResolver,
}

impl std::fmt::Debug for MonitoredGaiResolver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.pad("MonitoredGaiResolver")
}
}

impl Default for MonitoredGaiResolver {
fn default() -> Self {
Self {
inner: GaiResolver::new(),
}
}
}

impl Service<Name> for MonitoredGaiResolver {
type Error = Error;
type Future = MonitoredGaiFuture;
type Response = MonitoredGaiAddrs;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Name) -> Self::Future {
MonitoredGaiFuture {
name: req.clone(),
inner: self.inner.call(req),
}
}
}

#[easy_ext::ext(EndpointExt)]
impl Endpoint {
pub async fn monitored_connect(
Expand All @@ -397,7 +486,9 @@ impl Endpoint {
) -> Result<Channel, tonic::transport::Error> {
#[cfg(not(madsim))]
{
let mut http = HttpConnector::new();
let resolver = MonitoredGaiResolver::default();
let mut http = HttpConnector::new_with_resolver(resolver);

http.enforce_http(false);
http.set_nodelay(config.tcp_nodelay);
http.set_keepalive(config.keepalive_duration);
Expand Down

0 comments on commit 393f17f

Please sign in to comment.