Skip to content

Commit

Permalink
refactor(metrics): replace hyper server implementation with axum (#16145
Browse files Browse the repository at this point in the history
)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Apr 9, 2024
1 parent 0c8371a commit 3f50c60
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 75 deletions.
32 changes: 4 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ aws-smithy-types = { version = "1", default-features = false, features = [
] }
aws-endpoint = "0.60"
aws-types = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
futures-async-stream = "0.2.9"
hytra = "0.1"
Expand Down
3 changes: 2 additions & 1 deletion src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ normal = ["workspace-hack"]

[dependencies]
async-trait = "0.1"
axum = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14" # required by tonic
prometheus = { version = "0.13" }
Expand All @@ -27,7 +28,7 @@ thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] }
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
tower-http = { version = "0.4", features = ["add-extension", "cors"] }
tower-http = { version = "0.5", features = ["add-extension"] }
tracing = "0.1"

[target.'cfg(not(madsim))'.dependencies]
Expand Down
62 changes: 30 additions & 32 deletions src/common/common_service/src/metrics_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::OnceLock;

use hyper::{Body, Request, Response};
use axum::body::Body;
use axum::response::{IntoResponse, Response};
use axum::{Extension, Router};
use prometheus::{Encoder, Registry, TextEncoder};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use thiserror_ext::AsReport;
use tower::make::Shared;
use tower::ServiceBuilder;
use tokio::net::TcpListener;
use tower_http::add_extension::AddExtensionLayer;
use tracing::{error, info, warn};

Expand All @@ -31,28 +31,28 @@ impl MetricsManager {
pub fn boot_metrics_service(listen_addr: String) {
static METRICS_SERVICE_LISTEN_ADDR: OnceLock<String> = OnceLock::new();
let new_listen_addr = listen_addr.clone();
let current_listen_addr = METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
let listen_addr_clone = listen_addr.clone();
tokio::spawn(async move {
info!(
"Prometheus listener for Prometheus is set up on http://{}",
listen_addr
);
let listen_socket_addr: SocketAddr = listen_addr.parse().unwrap();
let service = ServiceBuilder::new()
.layer(AddExtensionLayer::new(
GLOBAL_METRICS_REGISTRY.deref().clone(),
))
.service_fn(Self::metrics_service);
// TODO: use axum server
let serve_future =
hyper::Server::bind(&listen_socket_addr).serve(Shared::new(service));
if let Err(err) = serve_future.await {
error!(error = %err.as_report(), "metrics service exited with error");
}
let current_listen_addr =
METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
let listen_addr_clone = listen_addr.clone();
#[cfg(not(madsim))] // no need in simulation test
tokio::spawn(async move {
info!(
"Prometheus listener for Prometheus is set up on http://{}",
listen_addr
);

let service = Router::new().fallback(Self::metrics_service).layer(
AddExtensionLayer::new(GLOBAL_METRICS_REGISTRY.deref().clone()),
);

let serve_future =
axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service);
if let Err(err) = serve_future.await {
error!(error = %err.as_report(), "metrics service exited with error");
}
});
listen_addr_clone
});
listen_addr_clone
});
if new_listen_addr != *current_listen_addr {
warn!(
"unable to listen port {} for metrics service. Currently listening on {}",
Expand All @@ -62,17 +62,15 @@ impl MetricsManager {
}

#[expect(clippy::unused_async, reason = "required by service_fn")]
async fn metrics_service(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
let registry = req.extensions().get::<Registry>().unwrap();
async fn metrics_service(Extension(registry): Extension<Registry>) -> impl IntoResponse {
let encoder = TextEncoder::new();
let mut buffer = vec![];
let mf = registry.gather();
encoder.encode(&mf, &mut buffer).unwrap();
let response = Response::builder()
.header(hyper::header::CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap();

Ok(response)
Response::builder()
.header(axum::http::header::CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
}
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ url = "2"
uuid = { version = "1", features = ["v4"] }

[target.'cfg(not(madsim))'.dependencies]
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
axum = { workspace = true }
tower-http = { version = "0.5", features = [
"add-extension",
"cors",
Expand Down
3 changes: 1 addition & 2 deletions src/meta/dashboard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ repository = { workspace = true }

[dependencies]
anyhow = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
axum = { workspace = true }
axum-embed = "0.1"
bytes = "1"
hyper = "1"
mime_guess = "2"
reqwest = "0.12.2"
rust-embed = { version = "8", features = ["interpolate-folder-path", "mime-guess"] }
Expand Down
20 changes: 9 additions & 11 deletions src/meta/dashboard/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use anyhow::anyhow;
use axum::http::StatusCode;
use axum::http::{header, HeaderMap, StatusCode, Uri};
use axum::response::{IntoResponse, Response};
use axum::Router;
use bytes::Bytes;
use hyper::header::CONTENT_TYPE;
use hyper::{HeaderMap, Uri};
use thiserror_ext::AsReport as _;
use url::Url;

Expand All @@ -37,21 +35,21 @@ impl IntoResponse for CachedResponse {
fn into_response(self) -> Response {
let guess = mime_guess::from_path(self.uri.path());
let mut headers = HeaderMap::new();
if let Some(x) = self.headers.get(hyper::header::ETAG) {
headers.insert(hyper::header::ETAG, x.clone());
if let Some(x) = self.headers.get(header::ETAG) {
headers.insert(header::ETAG, x.clone());
}
if let Some(x) = self.headers.get(hyper::header::CACHE_CONTROL) {
headers.insert(hyper::header::CACHE_CONTROL, x.clone());
if let Some(x) = self.headers.get(header::CACHE_CONTROL) {
headers.insert(header::CACHE_CONTROL, x.clone());
}
if let Some(x) = self.headers.get(hyper::header::EXPIRES) {
headers.insert(hyper::header::EXPIRES, x.clone());
if let Some(x) = self.headers.get(header::EXPIRES) {
headers.insert(header::EXPIRES, x.clone());
}
if let Some(x) = guess.first() {
if x.type_() == "image" && x.subtype() == "svg" {
headers.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap());
headers.insert(header::CONTENT_TYPE, "image/svg+xml".parse().unwrap());
} else {
headers.insert(
CONTENT_TYPE,
header::CONTENT_TYPE,
format!("{}/{}", x.type_(), x.subtype()).parse().unwrap(),
);
}
Expand Down

0 comments on commit 3f50c60

Please sign in to comment.