Skip to content

Commit

Permalink
feat: export runtime metric to promethues (#2985)
Browse files Browse the repository at this point in the history
* feat: export runtime metric to promethues

* fix: export tokio metric should enable tokio_unstable

* chore: tokio-metric export add more check info
  • Loading branch information
SSebo authored Dec 25, 2023
1 parent 1641fd5 commit cf561df
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/common/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ once_cell.workspace = true
paste.workspace = true
prometheus.workspace = true
snafu.workspace = true
tokio-metrics = "0.3"
tokio-metrics-collector = "0.2"
tokio-util.workspace = true
tokio.workspace = true

Expand Down
21 changes: 20 additions & 1 deletion src/common/runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,19 @@ impl Builder {
.build()
.context(BuildRuntimeSnafu)?;

let name = self.runtime_name.clone();
let handle = runtime.handle().clone();
let (send_stop, recv_stop) = oneshot::channel();
// Block the runtime to shutdown.
let _ = thread::Builder::new()
.name(format!("{}-blocker", self.thread_name))
.spawn(move || runtime.block_on(recv_stop));

#[cfg(tokio_unstable)]
register_collector(name.clone(), &handle);

Ok(Runtime {
name: self.runtime_name.clone(),
name,
handle,
_dropper: Arc::new(Dropper {
close: Some(send_stop),
Expand All @@ -169,6 +173,14 @@ impl Builder {
}
}

#[cfg(tokio_unstable)]
pub fn register_collector(name: String, handle: &Handle) {
let name = name.replace("-", "_");
let monitor = tokio_metrics::RuntimeMonitor::new(handle);
let collector = tokio_metrics_collector::RuntimeCollector::new(monitor, name);
let _ = prometheus::register(Box::new(collector));
}

fn on_thread_start(thread_name: String) -> impl Fn() + 'static {
move || {
METRIC_RUNTIME_THREADS_ALIVE
Expand Down Expand Up @@ -241,6 +253,13 @@ mod tests {

assert!(metric_text.contains("runtime_threads_idle{thread_name=\"test_runtime_metric\"}"));
assert!(metric_text.contains("runtime_threads_alive{thread_name=\"test_runtime_metric\"}"));

#[cfg(tokio_unstable)]
{
assert!(metric_text.contains("runtime_0_tokio_budget_forced_yield_count 0"));
assert!(metric_text.contains("runtime_0_tokio_injection_queue_depth 0"));
assert!(metric_text.contains("runtime_0_tokio_workers_count 5"));
}
}

#[test]
Expand Down

0 comments on commit cf561df

Please sign in to comment.