Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(metrics): replace hyper server implementation with axum #16145

Merged
merged 2 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 4 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ aws-smithy-types = { version = "1", default-features = false, features = [
] }
aws-endpoint = "0.60"
aws-types = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
futures-async-stream = "0.2.9"
hytra = "0.1"
Expand Down
3 changes: 2 additions & 1 deletion src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ normal = ["workspace-hack"]

[dependencies]
async-trait = "0.1"
axum = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14" # required by tonic
prometheus = { version = "0.13" }
Expand All @@ -27,7 +28,7 @@ thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] }
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
tower-http = { version = "0.4", features = ["add-extension", "cors"] }
tower-http = { version = "0.5", features = ["add-extension"] }
tracing = "0.1"

[target.'cfg(not(madsim))'.dependencies]
Expand Down
62 changes: 30 additions & 32 deletions src/common/common_service/src/metrics_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::OnceLock;

use hyper::{Body, Request, Response};
use axum::body::Body;
use axum::response::{IntoResponse, Response};
use axum::{Extension, Router};
use prometheus::{Encoder, Registry, TextEncoder};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use thiserror_ext::AsReport;
use tower::make::Shared;
use tower::ServiceBuilder;
use tokio::net::TcpListener;
use tower_http::add_extension::AddExtensionLayer;
use tracing::{error, info, warn};

Expand All @@ -31,28 +31,28 @@ impl MetricsManager {
pub fn boot_metrics_service(listen_addr: String) {
static METRICS_SERVICE_LISTEN_ADDR: OnceLock<String> = OnceLock::new();
let new_listen_addr = listen_addr.clone();
let current_listen_addr = METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
let listen_addr_clone = listen_addr.clone();
tokio::spawn(async move {
info!(
"Prometheus listener for Prometheus is set up on http://{}",
listen_addr
);
let listen_socket_addr: SocketAddr = listen_addr.parse().unwrap();
let service = ServiceBuilder::new()
.layer(AddExtensionLayer::new(
GLOBAL_METRICS_REGISTRY.deref().clone(),
))
.service_fn(Self::metrics_service);
// TODO: use axum server
let serve_future =
hyper::Server::bind(&listen_socket_addr).serve(Shared::new(service));
if let Err(err) = serve_future.await {
error!(error = %err.as_report(), "metrics service exited with error");
}
let current_listen_addr =
METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
let listen_addr_clone = listen_addr.clone();
#[cfg(not(madsim))] // no need in simulation test
tokio::spawn(async move {
info!(
"Prometheus listener for Prometheus is set up on http://{}",
listen_addr
);

let service = Router::new().fallback(Self::metrics_service).layer(
AddExtensionLayer::new(GLOBAL_METRICS_REGISTRY.deref().clone()),
);

let serve_future =
axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service);
if let Err(err) = serve_future.await {
error!(error = %err.as_report(), "metrics service exited with error");
}
});
listen_addr_clone
});
listen_addr_clone
});
if new_listen_addr != *current_listen_addr {
warn!(
"unable to listen port {} for metrics service. Currently listening on {}",
Expand All @@ -62,17 +62,15 @@ impl MetricsManager {
}

#[expect(clippy::unused_async, reason = "required by service_fn")]
async fn metrics_service(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
let registry = req.extensions().get::<Registry>().unwrap();
async fn metrics_service(Extension(registry): Extension<Registry>) -> impl IntoResponse {
let encoder = TextEncoder::new();
let mut buffer = vec![];
let mf = registry.gather();
encoder.encode(&mf, &mut buffer).unwrap();
let response = Response::builder()
.header(hyper::header::CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap();

Ok(response)
Response::builder()
.header(axum::http::header::CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
}
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ url = "2"
uuid = { version = "1", features = ["v4"] }

[target.'cfg(not(madsim))'.dependencies]
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
axum = { workspace = true }
tower-http = { version = "0.5", features = [
"add-extension",
"cors",
Expand Down
3 changes: 1 addition & 2 deletions src/meta/dashboard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ repository = { workspace = true }

[dependencies]
anyhow = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
axum = { workspace = true }
axum-embed = "0.1"
bytes = "1"
hyper = "1"
mime_guess = "2"
reqwest = "0.12.2"
rust-embed = { version = "8", features = ["interpolate-folder-path", "mime-guess"] }
Expand Down
20 changes: 9 additions & 11 deletions src/meta/dashboard/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use anyhow::anyhow;
use axum::http::StatusCode;
use axum::http::{header, HeaderMap, StatusCode, Uri};
use axum::response::{IntoResponse, Response};
use axum::Router;
use bytes::Bytes;
use hyper::header::CONTENT_TYPE;
use hyper::{HeaderMap, Uri};
use thiserror_ext::AsReport as _;
use url::Url;

Expand All @@ -37,21 +35,21 @@ impl IntoResponse for CachedResponse {
fn into_response(self) -> Response {
let guess = mime_guess::from_path(self.uri.path());
let mut headers = HeaderMap::new();
if let Some(x) = self.headers.get(hyper::header::ETAG) {
headers.insert(hyper::header::ETAG, x.clone());
if let Some(x) = self.headers.get(header::ETAG) {
headers.insert(header::ETAG, x.clone());
}
if let Some(x) = self.headers.get(hyper::header::CACHE_CONTROL) {
headers.insert(hyper::header::CACHE_CONTROL, x.clone());
if let Some(x) = self.headers.get(header::CACHE_CONTROL) {
headers.insert(header::CACHE_CONTROL, x.clone());
}
if let Some(x) = self.headers.get(hyper::header::EXPIRES) {
headers.insert(hyper::header::EXPIRES, x.clone());
if let Some(x) = self.headers.get(header::EXPIRES) {
headers.insert(header::EXPIRES, x.clone());
}
if let Some(x) = guess.first() {
if x.type_() == "image" && x.subtype() == "svg" {
headers.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap());
headers.insert(header::CONTENT_TYPE, "image/svg+xml".parse().unwrap());
} else {
headers.insert(
CONTENT_TYPE,
header::CONTENT_TYPE,
format!("{}/{}", x.type_(), x.subtype()).parse().unwrap(),
);
}
Expand Down
Loading