Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): add process collector #15591

Merged
merged 8 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ tikv-jemalloc-sys = "0.5.2"
tokio = { workspace = true }
uuid = { workspace = true }

[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "^0.16" }

[dev-dependencies]
anyerror = { workspace = true }
anyhow = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions src/common/base/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod family;
mod family_metrics;
mod gauge;
mod histogram;
mod process_collector;
mod registry;
mod sample;

Expand All @@ -25,6 +26,7 @@ pub use gauge::Gauge;
pub use histogram::Histogram;
pub use histogram::BUCKET_MILLISECONDS;
pub use histogram::BUCKET_SECONDS;
pub use process_collector::dump_process_stat;
pub use registry::register_counter;
pub use registry::register_counter_family;
pub use registry::register_gauge;
Expand Down
181 changes: 181 additions & 0 deletions src/common/base/src/runtime/metrics/process_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus_client::collector::Collector;
use prometheus_client::encoding::EncodeMetric;
use prometheus_client::metrics::counter::ConstCounter;
use prometheus_client::metrics::gauge::ConstGauge;

#[derive(Debug)]
pub struct ProcessCollector {}

impl ProcessCollector {
pub fn new() -> Box<Self> {
Box::new(ProcessCollector {})
}
}

impl Collector for ProcessCollector {
fn encode(
&self,
mut encoder: prometheus_client::encoding::DescriptorEncoder,
) -> Result<(), std::fmt::Error> {
let stat = match dump_process_stat() {
Some(stat) => stat,
None => return Ok(()),
};

let cpu_secs = ConstCounter::new(stat.cpu_secs);
let cpu_secs_encoder = encoder.encode_descriptor(
"process_cpu_seconds_total",
"Total user and system CPU time spent in seconds.",
None,
cpu_secs.metric_type(),
)?;
cpu_secs.encode(cpu_secs_encoder)?;

let open_fds = ConstGauge::new(stat.open_fds as f64);
let open_fds_encoder = encoder.encode_descriptor(
"process_open_fds",
"Number of open file descriptors.",
None,
open_fds.metric_type(),
)?;
open_fds.encode(open_fds_encoder)?;

let max_fds = ConstGauge::new(stat.max_fds as f64);
let max_fds_encoder = encoder.encode_descriptor(
"process_max_fds",
"Maximum number of open file descriptors.",
None,
max_fds.metric_type(),
)?;
max_fds.encode(max_fds_encoder)?;

let vsize = ConstGauge::new(stat.vsize as f64);
let vsize_encoder = encoder.encode_descriptor(
"process_virtual_memory_bytes",
"Virtual memory size in bytes.",
None,
vsize.metric_type(),
)?;
vsize.encode(vsize_encoder)?;

let rss = ConstGauge::new(stat.rss as f64);
let rss_encoder = encoder.encode_descriptor(
"process_resident_memory_bytes",
"Resident memory size in bytes.",
None,
rss.metric_type(),
)?;
rss.encode(rss_encoder)?;

let start_time = ConstGauge::new(stat.start_time as f64);
let start_time_encoder = encoder.encode_descriptor(
"process_start_time_seconds",
"Start time of the process since unix epoch in seconds.",
None,
start_time.metric_type(),
)?;
start_time.encode(start_time_encoder)?;

let threads_num = ConstGauge::new(stat.threads_num as f64);
let threads_num_encoder = encoder.encode_descriptor(
"process_threads",
"Number of OS threads in the process.",
None,
threads_num.metric_type(),
)?;
threads_num.encode(threads_num_encoder)?;

Ok(())
}
}

#[derive(Clone, Default)]
pub struct ProcessStat {
pub cpu_secs: u64,
pub open_fds: u64,
pub max_fds: u64,
pub vsize: u64,
pub rss: u64,
pub start_time: i64,
pub threads_num: usize,
}

pub fn dump_process_stat() -> Option<ProcessStat> {
#[cfg(target_os = "linux")]
{
dump_linux_process_stat()
}

#[cfg(not(target_os = "linux"))]
{
None
}
}

#[cfg(target_os = "linux")]
fn dump_linux_process_stat() -> Option<ProcessStat> {
let proc = match procfs::process::Process::myself() {
Ok(p) => p,
Err(_) => {
return None;
}
};
let stat = match proc.stat() {
Ok(stat) => stat,
Err(_) => {
return None;
}
};

// constants
let clk_tck: i64 = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
let page_size: i64 = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };

// fds
let open_fds = proc.fd_count().unwrap_or(0) as u64;
let max_fds = if let Ok(limits) = proc.limits() {
match limits.max_open_files.soft_limit {
procfs::process::LimitValue::Value(v) => v,
procfs::process::LimitValue::Unlimited => 0,
}
} else {
0
};

// memory
let vsize = stat.vsize;
let rss = stat.rss * (page_size as u64);

// cpu time
let cpu_secs = (stat.utime + stat.stime) / clk_tck as u64;

// start time
let start_time = stat.starttime as i64 * clk_tck;

// threads
let threads_num = stat.num_threads as usize;

Some(ProcessStat {
open_fds,
max_fds,
vsize,
rss,
cpu_secs,
start_time,
threads_num,
})
}
5 changes: 4 additions & 1 deletion src/common/base/src/runtime/metrics/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::runtime::metrics::gauge::Gauge;
use crate::runtime::metrics::histogram::Histogram;
use crate::runtime::metrics::histogram::BUCKET_MILLISECONDS;
use crate::runtime::metrics::histogram::BUCKET_SECONDS;
use crate::runtime::metrics::process_collector::ProcessCollector;
use crate::runtime::metrics::sample::MetricSample;
use crate::runtime::ThreadTracker;

Expand Down Expand Up @@ -85,10 +86,12 @@ unsafe impl Sync for GlobalRegistry {}

impl GlobalRegistry {
pub fn create() -> GlobalRegistry {
let mut registry = Registry::with_prefix("databend");
registry.register_collector(ProcessCollector::new());
GlobalRegistry {
inner: Mutex::new(GlobalRegistryInner {
metrics: vec![],
registry: Registry::with_prefix("databend"),
registry,
}),
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/query/service/tests/it/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::net::SocketAddr;

use databend_common_base::base::tokio;
use databend_common_base::runtime::metrics::dump_process_stat;
use databend_common_base::runtime::metrics::register_counter;
use databend_query::servers::metrics::MetricService;
use databend_query::servers::Server;
Expand Down Expand Up @@ -47,3 +48,14 @@ async fn test_metric_server() -> databend_common_exception::Result<()> {

Ok(())
}

#[cfg(target_os = "linux")]
#[test]
fn test_process_collector() {
let stat = dump_process_stat().unwrap();

assert!(stat.max_fds > 0);
assert!(stat.vsize > 0);
assert!(stat.rss > 0);
assert!(stat.threads_num > 0);
}
Loading