diff --git a/Cargo.lock b/Cargo.lock index fcb2e7a20db6d..b5fda53e843ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5213,12 +5213,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-range-header" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" - [[package]] name = "http-range-header" version = "0.4.0" @@ -9424,6 +9418,7 @@ name = "risingwave_common_service" version = "1.7.0-alpha" dependencies = [ "async-trait", + "axum 0.7.4", "futures", "hyper 0.14.27", "madsim-tokio", @@ -9435,7 +9430,7 @@ dependencies = [ "thiserror", "thiserror-ext", "tower", - "tower-http 0.4.4", + "tower-http", "tracing", "workspace-hack", ] @@ -10139,7 +10134,7 @@ dependencies = [ "tokio-retry", "tokio-stream", "tower", - "tower-http 0.5.2", + "tower-http", "tracing", "url", "uuid", @@ -13147,24 +13142,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower-http" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" -dependencies = [ - "bitflags 2.5.0", - "bytes", - "futures-core", - "futures-util", - "http 0.2.9", - "http-body 0.4.5", - "http-range-header 0.3.1", - "pin-project-lite", - "tower-layer", - "tower-service", -] - [[package]] name = "tower-http" version = "0.5.2" @@ -13179,7 +13156,7 @@ dependencies = [ "http 1.0.0", "http-body 1.0.0", "http-body-util", - "http-range-header 0.4.0", + "http-range-header", "httpdate", "mime", "mime_guess", diff --git a/Cargo.toml b/Cargo.toml index f931f3524a8f6..c9e8cc532fd5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,6 +113,7 @@ aws-smithy-types = { version = "1", default-features = false, features = [ ] } aws-endpoint = "0.60" aws-types = "1" +axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain etcd-client = { package = "madsim-etcd-client", version = "0.4" } futures-async-stream = "0.2.9" hytra = "0.1" diff --git a/src/common/common_service/Cargo.toml b/src/common/common_service/Cargo.toml index 4f39105be207a..f2fa832eaad55 100644 --- a/src/common/common_service/Cargo.toml +++ b/src/common/common_service/Cargo.toml @@ -16,6 +16,7 @@ normal = ["workspace-hack"] [dependencies] async-trait = "0.1" +axum = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } hyper = "0.14" # required by tonic prometheus = { version = "0.13" } @@ -27,7 +28,7 @@ thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] } tonic = { workspace = true } tower = { version = "0.4", features = ["util", "load-shed"] } -tower-http = { version = "0.4", features = ["add-extension", "cors"] } +tower-http = { version = "0.5", features = ["add-extension"] } tracing = "0.1" [target.'cfg(not(madsim))'.dependencies] diff --git a/src/common/common_service/src/metrics_manager.rs b/src/common/common_service/src/metrics_manager.rs index 2a284294cf7df..3bf632211aefd 100644 --- a/src/common/common_service/src/metrics_manager.rs +++ b/src/common/common_service/src/metrics_manager.rs @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::net::SocketAddr; use std::ops::Deref; use std::sync::OnceLock; -use hyper::{Body, Request, Response}; +use axum::body::Body; +use axum::response::{IntoResponse, Response}; +use axum::{Extension, Router}; use prometheus::{Encoder, Registry, TextEncoder}; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use thiserror_ext::AsReport; -use tower::make::Shared; -use tower::ServiceBuilder; +use tokio::net::TcpListener; use tower_http::add_extension::AddExtensionLayer; use tracing::{error, info, warn}; @@ -31,28 +31,27 @@ impl MetricsManager { pub fn boot_metrics_service(listen_addr: String) { static METRICS_SERVICE_LISTEN_ADDR: OnceLock = OnceLock::new(); let new_listen_addr = listen_addr.clone(); - let current_listen_addr = METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| { - let listen_addr_clone = listen_addr.clone(); - tokio::spawn(async move { - info!( - "Prometheus listener for Prometheus is set up on http://{}", - listen_addr - ); - let listen_socket_addr: SocketAddr = listen_addr.parse().unwrap(); - let service = ServiceBuilder::new() - .layer(AddExtensionLayer::new( - GLOBAL_METRICS_REGISTRY.deref().clone(), - )) - .service_fn(Self::metrics_service); - // TODO: use axum server - let serve_future = - hyper::Server::bind(&listen_socket_addr).serve(Shared::new(service)); - if let Err(err) = serve_future.await { - error!(error = %err.as_report(), "metrics service exited with error"); - } + let current_listen_addr = + METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| { + let listen_addr_clone = listen_addr.clone(); + tokio::spawn(async move { + info!( + "Prometheus listener for Prometheus is set up on http://{}", + listen_addr + ); + + let service = Router::new().fallback(Self::metrics_service).layer( + AddExtensionLayer::new(GLOBAL_METRICS_REGISTRY.deref().clone()), + ); + + let serve_future = + axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service); + if let Err(err) = serve_future.await { + error!(error = %err.as_report(), "metrics service exited with error"); + } + }); + listen_addr_clone }); - listen_addr_clone - }); if new_listen_addr != *current_listen_addr { warn!( "unable to listen port {} for metrics service. Currently listening on {}", @@ -62,17 +61,15 @@ impl MetricsManager { } #[expect(clippy::unused_async, reason = "required by service_fn")] - async fn metrics_service(req: Request) -> Result, hyper::Error> { - let registry = req.extensions().get::().unwrap(); + async fn metrics_service(Extension(registry): Extension) -> impl IntoResponse { let encoder = TextEncoder::new(); let mut buffer = vec![]; let mf = registry.gather(); encoder.encode(&mf, &mut buffer).unwrap(); - let response = Response::builder() - .header(hyper::header::CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) - .unwrap(); - Ok(response) + Response::builder() + .header(axum::http::header::CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() } } diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 29c1c9520cf52..fb757462adbc5 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -85,7 +85,7 @@ url = "2" uuid = { version = "1", features = ["v4"] } [target.'cfg(not(madsim))'.dependencies] -axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain +axum = { workspace = true } tower-http = { version = "0.5", features = [ "add-extension", "cors", diff --git a/src/meta/dashboard/Cargo.toml b/src/meta/dashboard/Cargo.toml index e848c8015cbb1..9c7e59fab315c 100644 --- a/src/meta/dashboard/Cargo.toml +++ b/src/meta/dashboard/Cargo.toml @@ -10,7 +10,7 @@ repository = { workspace = true } [dependencies] anyhow = "1" -axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain +axum = { workspace = true } axum-embed = "0.1" bytes = "1" hyper = "1"