Skip to content

Commit

Permalink
refactor(metrics): replace hyper implementation with axum
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao committed Apr 4, 2024
1 parent dfe5f31 commit 500fdb0
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 62 deletions.
31 changes: 4 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ aws-smithy-types = { version = "1", default-features = false, features = [
] }
aws-endpoint = "0.60"
aws-types = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
futures-async-stream = "0.2.9"
hytra = "0.1"
Expand Down
3 changes: 2 additions & 1 deletion src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ normal = ["workspace-hack"]

[dependencies]
async-trait = "0.1"
axum = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14" # required by tonic
prometheus = { version = "0.13" }
Expand All @@ -27,7 +28,7 @@ thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] }
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
tower-http = { version = "0.4", features = ["add-extension", "cors"] }
tower-http = { version = "0.5", features = ["add-extension"] }
tracing = "0.1"

[target.'cfg(not(madsim))'.dependencies]
Expand Down
61 changes: 29 additions & 32 deletions src/common/common_service/src/metrics_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::OnceLock;

use hyper::{Body, Request, Response};
use axum::body::Body;
use axum::response::{IntoResponse, Response};
use axum::{Extension, Router};
use prometheus::{Encoder, Registry, TextEncoder};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use thiserror_ext::AsReport;
use tower::make::Shared;
use tower::ServiceBuilder;
use tokio::net::TcpListener;
use tower_http::add_extension::AddExtensionLayer;
use tracing::{error, info, warn};

Expand All @@ -31,28 +31,27 @@ impl MetricsManager {
pub fn boot_metrics_service(listen_addr: String) {
static METRICS_SERVICE_LISTEN_ADDR: OnceLock<String> = OnceLock::new();
let new_listen_addr = listen_addr.clone();
let current_listen_addr = METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
let listen_addr_clone = listen_addr.clone();
tokio::spawn(async move {
info!(
"Prometheus listener for Prometheus is set up on http://{}",
listen_addr
);
let listen_socket_addr: SocketAddr = listen_addr.parse().unwrap();
let service = ServiceBuilder::new()
.layer(AddExtensionLayer::new(
GLOBAL_METRICS_REGISTRY.deref().clone(),
))
.service_fn(Self::metrics_service);
// TODO: use axum server
let serve_future =
hyper::Server::bind(&listen_socket_addr).serve(Shared::new(service));
if let Err(err) = serve_future.await {
error!(error = %err.as_report(), "metrics service exited with error");
}
let current_listen_addr =
METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
let listen_addr_clone = listen_addr.clone();
tokio::spawn(async move {
info!(
"Prometheus listener for Prometheus is set up on http://{}",
listen_addr
);

let service = Router::new().fallback(Self::metrics_service).layer(
AddExtensionLayer::new(GLOBAL_METRICS_REGISTRY.deref().clone()),
);

let serve_future =
axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service);
if let Err(err) = serve_future.await {
error!(error = %err.as_report(), "metrics service exited with error");
}
});
listen_addr_clone
});
listen_addr_clone
});
if new_listen_addr != *current_listen_addr {
warn!(
"unable to listen port {} for metrics service. Currently listening on {}",
Expand All @@ -62,17 +61,15 @@ impl MetricsManager {
}

#[expect(clippy::unused_async, reason = "required by service_fn")]
async fn metrics_service(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
let registry = req.extensions().get::<Registry>().unwrap();
async fn metrics_service(Extension(registry): Extension<Registry>) -> impl IntoResponse {
let encoder = TextEncoder::new();
let mut buffer = vec![];
let mf = registry.gather();
encoder.encode(&mf, &mut buffer).unwrap();
let response = Response::builder()
.header(hyper::header::CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap();

Ok(response)
Response::builder()
.header(axum::http::header::CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
}
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ url = "2"
uuid = { version = "1", features = ["v4"] }

[target.'cfg(not(madsim))'.dependencies]
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
axum = { workspace = true }
tower-http = { version = "0.5", features = [
"add-extension",
"cors",
Expand Down
2 changes: 1 addition & 1 deletion src/meta/dashboard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repository = { workspace = true }

[dependencies]
anyhow = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
axum = { workspace = true }
axum-embed = "0.1"
bytes = "1"
hyper = "1"
Expand Down

0 comments on commit 500fdb0

Please sign in to comment.