From 29a6aaff3b6defc76bdbae9511273c00b7870025 Mon Sep 17 00:00:00 2001 From: SSebo Date: Sat, 23 Dec 2023 18:22:39 +0800 Subject: [PATCH] feat: export runtime metric to promethues --- Cargo.lock | 27 +++++++++++++++++++++++++++ src/common/runtime/Cargo.toml | 3 +++ src/common/runtime/src/runtime.rs | 16 ++++++++++++++-- 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee7bde92e7b6..3fd60d1528f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1925,6 +1925,8 @@ dependencies = [ "prometheus", "snafu", "tokio", + "tokio-metrics", + "tokio-metrics-collector", "tokio-test", "tokio-util", ] @@ -9644,6 +9646,31 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "tokio-metrics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + +[[package]] +name = "tokio-metrics-collector" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767da47381602cc481653456823b3ebb600e83d5dd4e0293da9b5566c6c00f0" +dependencies = [ + "lazy_static", + "parking_lot 0.12.1", + "prometheus", + "tokio", + "tokio-metrics", +] + [[package]] name = "tokio-postgres" version = "0.7.10" diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml index a4d1460f349a..ac6733b57764 100644 --- a/src/common/runtime/Cargo.toml +++ b/src/common/runtime/Cargo.toml @@ -17,5 +17,8 @@ snafu.workspace = true tokio-util.workspace = true tokio.workspace = true +tokio-metrics = "0.3" +tokio-metrics-collector = { version = "0.2" } + [dev-dependencies] tokio-test = "0.4" diff --git a/src/common/runtime/src/runtime.rs b/src/common/runtime/src/runtime.rs index 6a776af25465..907f44fa5c6e 100644 --- a/src/common/runtime/src/runtime.rs +++ b/src/common/runtime/src/runtime.rs @@ -95,7 +95,7 @@ pub struct Builder { impl Default for Builder { fn default() -> Self { Self { - runtime_name: format!("runtime-{}", RUNTIME_ID.fetch_add(1, Ordering::Relaxed)), + runtime_name: format!("runtime_{}", RUNTIME_ID.fetch_add(1, Ordering::Relaxed)), thread_name: "default-worker".to_string(), builder: RuntimeBuilder::new_multi_thread(), } @@ -152,6 +152,7 @@ impl Builder { .build() .context(BuildRuntimeSnafu)?; + let name = self.runtime_name.clone(); let handle = runtime.handle().clone(); let (send_stop, recv_stop) = oneshot::channel(); // Block the runtime to shutdown. @@ -159,8 +160,10 @@ impl Builder { .name(format!("{}-blocker", self.thread_name)) .spawn(move || runtime.block_on(recv_stop)); + register_collector(name.clone(), &handle); + Ok(Runtime { - name: self.runtime_name.clone(), + name, handle, _dropper: Arc::new(Dropper { close: Some(send_stop), @@ -169,6 +172,12 @@ impl Builder { } } +pub fn register_collector(name: String, handle: &Handle) { + let monitor = tokio_metrics::RuntimeMonitor::new(handle); + let collector = tokio_metrics_collector::RuntimeCollector::new(monitor, name.clone()); + let _ = prometheus::register(Box::new(collector)); +} + fn on_thread_start(thread_name: String) -> impl Fn() + 'static { move || { METRIC_RUNTIME_THREADS_ALIVE @@ -241,6 +250,9 @@ mod tests { assert!(metric_text.contains("runtime_threads_idle{thread_name=\"test_runtime_metric\"}")); assert!(metric_text.contains("runtime_threads_alive{thread_name=\"test_runtime_metric\"}")); + assert!(metric_text.contains("runtime_0_tokio_budget_forced_yield_count")); + assert!(metric_text.contains("runtime_0_tokio_injection_queue_depth")); + assert!(metric_text.contains("runtime_0_tokio_workers_count")); } #[test]