From 3f50c6048f162b4be2699b62adcd873dfeb818f9 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 9 Apr 2024 16:17:29 +0800 Subject: [PATCH] refactor(metrics): replace hyper server implementation with axum (#16145) Signed-off-by: Bugen Zhao --- Cargo.lock | 32 ++-------- Cargo.toml | 1 + src/common/common_service/Cargo.toml | 3 +- .../common_service/src/metrics_manager.rs | 62 +++++++++---------- src/meta/Cargo.toml | 2 +- src/meta/dashboard/Cargo.toml | 3 +- src/meta/dashboard/src/proxy.rs | 20 +++--- 7 files changed, 48 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 82422098e0605..8a1c5bca7d3f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5208,12 +5208,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-range-header" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" - [[package]] name = "http-range-header" version = "0.4.0" @@ -9420,6 +9414,7 @@ name = "risingwave_common_service" version = "1.7.0-alpha" dependencies = [ "async-trait", + "axum 0.7.4", "futures", "hyper 0.14.27", "madsim-tokio", @@ -9431,7 +9426,7 @@ dependencies = [ "thiserror", "thiserror-ext", "tower", - "tower-http 0.4.4", + "tower-http", "tracing", "workspace-hack", ] @@ -10137,7 +10132,7 @@ dependencies = [ "tokio-retry", "tokio-stream", "tower", - "tower-http 0.5.2", + "tower-http", "tracing", "url", "uuid", @@ -10154,7 +10149,6 @@ dependencies = [ "bytes", "cargo-emit", "dircpy", - "hyper 1.2.0", "mime_guess", "npm_rs", "reqwest 0.12.2", @@ -13148,24 +13142,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower-http" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" -dependencies = [ - "bitflags 2.5.0", - "bytes", - "futures-core", - "futures-util", - "http 0.2.9", - "http-body 0.4.5", - "http-range-header 0.3.1", - "pin-project-lite", - "tower-layer", - "tower-service", -] - [[package]] name = "tower-http" version = "0.5.2" @@ -13180,7 +13156,7 @@ dependencies = [ "http 1.0.0", "http-body 1.0.0", "http-body-util", - "http-range-header 0.4.0", + "http-range-header", "httpdate", "mime", "mime_guess", diff --git a/Cargo.toml b/Cargo.toml index 6128eabfeddeb..67aa014821134 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,6 +113,7 @@ aws-smithy-types = { version = "1", default-features = false, features = [ ] } aws-endpoint = "0.60" aws-types = "1" +axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain etcd-client = { package = "madsim-etcd-client", version = "0.4" } futures-async-stream = "0.2.9" hytra = "0.1" diff --git a/src/common/common_service/Cargo.toml b/src/common/common_service/Cargo.toml index 4f39105be207a..f2fa832eaad55 100644 --- a/src/common/common_service/Cargo.toml +++ b/src/common/common_service/Cargo.toml @@ -16,6 +16,7 @@ normal = ["workspace-hack"] [dependencies] async-trait = "0.1" +axum = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } hyper = "0.14" # required by tonic prometheus = { version = "0.13" } @@ -27,7 +28,7 @@ thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] } tonic = { workspace = true } tower = { version = "0.4", features = ["util", "load-shed"] } -tower-http = { version = "0.4", features = ["add-extension", "cors"] } +tower-http = { version = "0.5", features = ["add-extension"] } tracing = "0.1" [target.'cfg(not(madsim))'.dependencies] diff --git a/src/common/common_service/src/metrics_manager.rs b/src/common/common_service/src/metrics_manager.rs index 2a284294cf7df..f6d31c98ef31b 100644 --- a/src/common/common_service/src/metrics_manager.rs +++ b/src/common/common_service/src/metrics_manager.rs @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::net::SocketAddr; use std::ops::Deref; use std::sync::OnceLock; -use hyper::{Body, Request, Response}; +use axum::body::Body; +use axum::response::{IntoResponse, Response}; +use axum::{Extension, Router}; use prometheus::{Encoder, Registry, TextEncoder}; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use thiserror_ext::AsReport; -use tower::make::Shared; -use tower::ServiceBuilder; +use tokio::net::TcpListener; use tower_http::add_extension::AddExtensionLayer; use tracing::{error, info, warn}; @@ -31,28 +31,28 @@ impl MetricsManager { pub fn boot_metrics_service(listen_addr: String) { static METRICS_SERVICE_LISTEN_ADDR: OnceLock = OnceLock::new(); let new_listen_addr = listen_addr.clone(); - let current_listen_addr = METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| { - let listen_addr_clone = listen_addr.clone(); - tokio::spawn(async move { - info!( - "Prometheus listener for Prometheus is set up on http://{}", - listen_addr - ); - let listen_socket_addr: SocketAddr = listen_addr.parse().unwrap(); - let service = ServiceBuilder::new() - .layer(AddExtensionLayer::new( - GLOBAL_METRICS_REGISTRY.deref().clone(), - )) - .service_fn(Self::metrics_service); - // TODO: use axum server - let serve_future = - hyper::Server::bind(&listen_socket_addr).serve(Shared::new(service)); - if let Err(err) = serve_future.await { - error!(error = %err.as_report(), "metrics service exited with error"); - } + let current_listen_addr = + METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| { + let listen_addr_clone = listen_addr.clone(); + #[cfg(not(madsim))] // no need in simulation test + tokio::spawn(async move { + info!( + "Prometheus listener for Prometheus is set up on http://{}", + listen_addr + ); + + let service = Router::new().fallback(Self::metrics_service).layer( + AddExtensionLayer::new(GLOBAL_METRICS_REGISTRY.deref().clone()), + ); + + let serve_future = + axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service); + if let Err(err) = serve_future.await { + error!(error = %err.as_report(), "metrics service exited with error"); + } + }); + listen_addr_clone }); - listen_addr_clone - }); if new_listen_addr != *current_listen_addr { warn!( "unable to listen port {} for metrics service. Currently listening on {}", @@ -62,17 +62,15 @@ impl MetricsManager { } #[expect(clippy::unused_async, reason = "required by service_fn")] - async fn metrics_service(req: Request) -> Result, hyper::Error> { - let registry = req.extensions().get::().unwrap(); + async fn metrics_service(Extension(registry): Extension) -> impl IntoResponse { let encoder = TextEncoder::new(); let mut buffer = vec![]; let mf = registry.gather(); encoder.encode(&mf, &mut buffer).unwrap(); - let response = Response::builder() - .header(hyper::header::CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) - .unwrap(); - Ok(response) + Response::builder() + .header(axum::http::header::CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() } } diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 29c1c9520cf52..fb757462adbc5 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -85,7 +85,7 @@ url = "2" uuid = { version = "1", features = ["v4"] } [target.'cfg(not(madsim))'.dependencies] -axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain +axum = { workspace = true } tower-http = { version = "0.5", features = [ "add-extension", "cors", diff --git a/src/meta/dashboard/Cargo.toml b/src/meta/dashboard/Cargo.toml index e848c8015cbb1..a0bd8e96b0b85 100644 --- a/src/meta/dashboard/Cargo.toml +++ b/src/meta/dashboard/Cargo.toml @@ -10,10 +10,9 @@ repository = { workspace = true } [dependencies] anyhow = "1" -axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain +axum = { workspace = true } axum-embed = "0.1" bytes = "1" -hyper = "1" mime_guess = "2" reqwest = "0.12.2" rust-embed = { version = "8", features = ["interpolate-folder-path", "mime-guess"] } diff --git a/src/meta/dashboard/src/proxy.rs b/src/meta/dashboard/src/proxy.rs index f63a4f63c5fe3..3dc7ac2625241 100644 --- a/src/meta/dashboard/src/proxy.rs +++ b/src/meta/dashboard/src/proxy.rs @@ -16,12 +16,10 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use anyhow::anyhow; -use axum::http::StatusCode; +use axum::http::{header, HeaderMap, StatusCode, Uri}; use axum::response::{IntoResponse, Response}; use axum::Router; use bytes::Bytes; -use hyper::header::CONTENT_TYPE; -use hyper::{HeaderMap, Uri}; use thiserror_ext::AsReport as _; use url::Url; @@ -37,21 +35,21 @@ impl IntoResponse for CachedResponse { fn into_response(self) -> Response { let guess = mime_guess::from_path(self.uri.path()); let mut headers = HeaderMap::new(); - if let Some(x) = self.headers.get(hyper::header::ETAG) { - headers.insert(hyper::header::ETAG, x.clone()); + if let Some(x) = self.headers.get(header::ETAG) { + headers.insert(header::ETAG, x.clone()); } - if let Some(x) = self.headers.get(hyper::header::CACHE_CONTROL) { - headers.insert(hyper::header::CACHE_CONTROL, x.clone()); + if let Some(x) = self.headers.get(header::CACHE_CONTROL) { + headers.insert(header::CACHE_CONTROL, x.clone()); } - if let Some(x) = self.headers.get(hyper::header::EXPIRES) { - headers.insert(hyper::header::EXPIRES, x.clone()); + if let Some(x) = self.headers.get(header::EXPIRES) { + headers.insert(header::EXPIRES, x.clone()); } if let Some(x) = guess.first() { if x.type_() == "image" && x.subtype() == "svg" { - headers.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap()); + headers.insert(header::CONTENT_TYPE, "image/svg+xml".parse().unwrap()); } else { headers.insert( - CONTENT_TYPE, + header::CONTENT_TYPE, format!("{}/{}", x.type_(), x.subtype()).parse().unwrap(), ); }