Skip to content

Commit

Permalink
refactor: move metrics, resource_util out of common (#15647)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Mar 15, 2024
1 parent 6143406 commit e2a17d2
Show file tree
Hide file tree
Showing 21 changed files with 231 additions and 81 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go/bin/
src/**/target
src/**/target_tarpaulin
target/
target-bisector-*/
src/proto/
src/prost/src/*.rs
src/prost/src/sim/*.rs
Expand Down
57 changes: 53 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"src/common/common_service",
"src/common/fields-derive",
"src/common/heap_profiling",
"src/common/metrics",
"src/compute",
"src/connector",
"src/connector/with_options",
Expand Down Expand Up @@ -54,8 +55,10 @@ members = [
"src/tests/state_cleaning_test",
"src/utils/delta_btree_map",
"src/utils/futures_util",
"src/utils/iter_util",
"src/utils/local_stats_alloc",
"src/utils/pgwire",
"src/utils/resource_util",
"src/utils/runtime",
"src/utils/sync-point",
"src/utils/variables",
Expand Down Expand Up @@ -166,6 +169,7 @@ risingwave_cmd = { path = "./src/cmd" }
risingwave_common = { path = "./src/common" }
risingwave_common_service = { path = "./src/common/common_service" }
risingwave_common_heap_profiling = { path = "./src/common/heap_profiling" }
risingwave_common_metrics = { path = "./src/common/metrics" }
risingwave_compactor = { path = "./src/storage/compactor" }
risingwave_compute = { path = "./src/compute" }
risingwave_ctl = { path = "./src/ctl" }
Expand Down Expand Up @@ -199,6 +203,8 @@ risingwave_variables = { path = "./src/utils/variables" }
risingwave_java_binding = { path = "./src/java_binding" }
risingwave_jni_core = { path = "src/jni_core" }
rw_futures_util = { path = "src/utils/futures_util" }
rw_resource_util = { path = "src/utils/resource_util" }
rw_iter_util = { path = "src/utils/iter_util" }

[workspace.lints.rust]
# `forbid` will also prevent the misuse of `#[allow(unused)]`
Expand Down
9 changes: 4 additions & 5 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ enum-as-inner = "0.6"
enumflags2 = { version = "0.7.8" }
ethnum = { version = "1", features = ["serde"] }
fixedbitset = { version = "0.5", features = ["std"] }
fs-err = "2"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hex = "0.4.3"
http = "0.2"
humantime = "2.1"
hyper = "0.14"
hytra = { workspace = true }
itertools = "0.12"
itoa = "1.0"
Expand Down Expand Up @@ -79,11 +77,13 @@ rand = "0.8"
regex = "1"
reqwest = { version = "0.11", features = ["json"] }
risingwave-fields-derive = { path = "./fields-derive" }
risingwave_common_metrics = { path = "./metrics" }
risingwave_common_proc_macro = { path = "./proc_macro" }
risingwave_error = { workspace = true }
risingwave_pb = { workspace = true }
rust_decimal = { version = "1", features = ["db-postgres", "maths"] }
rw_futures_util = { workspace = true }
rw_iter_util = { workspace = true }
rw_resource_util = { workspace = true }
ryu = "1.0"
serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11"
Expand All @@ -108,7 +108,6 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"signal",
] }
toml = "0.8"
tonic = { workspace = true }
tracing = "0.1"
tracing-futures = { version = "0.2", features = ["futures-03"] }
tracing-opentelemetry = { workspace = true }
Expand All @@ -129,7 +128,7 @@ libc = "0.2"

[target.'cfg(target_os = "macos")'.dependencies]
darwin-libproc = { git = "https://github.com/risingwavelabs/darwin-libproc.git", rev = "a502be24bd0971463f5bcbfe035a248d8ba503b7" }
libc = "0.2.148"
libc = "0.2"
mach2 = "0.4"

[dev-dependencies]
Expand Down
52 changes: 52 additions & 0 deletions src/common/metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[package]
name = "risingwave_common_metrics"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[package.metadata.cargo-machete]
ignored = ["workspace-hack"]

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]

[dependencies]
bytes = "1"
clap = { version = "4", features = ["derive"] }
easy-ext = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
http = "0.2"
hyper = { version = "0.14", features = ["client"] }
hytra = { workspace = true }
itertools = "0.12"
parking_lot = "0.12"
pin-project-lite = "0.2"
prometheus = { version = "0.13" }
rw_iter_util = { workspace = true }
rw_resource_util = { workspace = true }
serde = { version = "1", features = ["derive"] }
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio" }
tonic = { workspace = true }
tracing = "0.1"
tracing-subscriber = "0.3.17"

[target.'cfg(not(madsim))'.dependencies]
http-body = "0.4.5"
tower-layer = "0.3.2"
tower-service = "0.3.2"
[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16", default-features = false }
libc = "0.2"

[target.'cfg(target_os = "macos")'.dependencies]
darwin-libproc = { git = "https://github.com/risingwavelabs/darwin-libproc.git", rev = "a502be24bd0971463f5bcbfe035a248d8ba503b7" }
libc = "0.2"
mach2 = "0.4"

[lints]
workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use parking_lot::Mutex;
use prometheus::core::{Collector, Desc};
use prometheus::proto::{Gauge, LabelPair, Metric, MetricFamily};
use prometheus::Registry;
use rw_iter_util::ZipEqFast;

use crate::monitor::GLOBAL_METRICS_REGISTRY;
use crate::util::iter_util::ZipEqFast;

pub struct ErrorMetric<const N: usize> {
payload: Arc<Mutex<HashMap<[String; N], u32>>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ macro_rules! register_guarded_histogram_vec_with_registry {
($HOPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let inner = prometheus::HistogramVec::new($HOPTS, $LABELS_NAMES);
inner.and_then(|inner| {
let inner = $crate::metrics::__extract_histogram_builder(inner);
let label_guarded =
$crate::metrics::LabelGuardedHistogramVec::new(inner, { $LABELS_NAMES });
let inner = $crate::__extract_histogram_builder(inner);
let label_guarded = $crate::LabelGuardedHistogramVec::new(inner, { $LABELS_NAMES });
let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
result.map(move |()| label_guarded)
})
Expand All @@ -72,9 +71,8 @@ macro_rules! register_guarded_gauge_vec_with_registry {
($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let inner = prometheus::GaugeVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
inner.and_then(|inner| {
let inner = $crate::metrics::__extract_gauge_builder(inner);
let label_guarded =
$crate::metrics::LabelGuardedGaugeVec::new(inner, { $LABELS_NAMES });
let inner = $crate::__extract_gauge_builder(inner);
let label_guarded = $crate::LabelGuardedGaugeVec::new(inner, { $LABELS_NAMES });
let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
result.map(move |()| label_guarded)
})
Expand All @@ -86,9 +84,8 @@ macro_rules! register_guarded_int_gauge_vec_with_registry {
($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let inner = prometheus::IntGaugeVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
inner.and_then(|inner| {
let inner = $crate::metrics::__extract_gauge_builder(inner);
let label_guarded =
$crate::metrics::LabelGuardedIntGaugeVec::new(inner, { $LABELS_NAMES });
let inner = $crate::__extract_gauge_builder(inner);
let label_guarded = $crate::LabelGuardedIntGaugeVec::new(inner, { $LABELS_NAMES });
let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
result.map(move |()| label_guarded)
})
Expand All @@ -100,9 +97,8 @@ macro_rules! register_guarded_int_counter_vec_with_registry {
($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let inner = prometheus::IntCounterVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
inner.and_then(|inner| {
let inner = $crate::metrics::__extract_counter_builder(inner);
let label_guarded =
$crate::metrics::LabelGuardedIntCounterVec::new(inner, { $LABELS_NAMES });
let inner = $crate::__extract_counter_builder(inner);
let label_guarded = $crate::LabelGuardedIntCounterVec::new(inner, { $LABELS_NAMES });
let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
result.map(move |()| label_guarded)
})
Expand Down Expand Up @@ -396,7 +392,7 @@ impl<T: MetricWithLocal, const N: usize> LabelGuardedMetric<T, N> {
mod tests {
use prometheus::core::Collector;

use crate::metrics::LabelGuardedIntCounterVec;
use crate::LabelGuardedIntCounterVec;

#[test]
fn test_label_guarded_metrics_drop() {
Expand Down
45 changes: 42 additions & 3 deletions src/common/src/metrics.rs → src/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lazy_cell)]
#![feature(type_alias_impl_trait)]
#![feature(impl_trait_in_assoc_type)]
#![feature(array_methods)]
use std::ops::Deref;
use std::sync::LazyLock;

Expand All @@ -23,10 +27,9 @@ use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;

use crate::monitor::GLOBAL_METRICS_REGISTRY;

mod error_metrics;
mod guarded_metrics;
pub mod monitor;
mod relabeled_metric;

pub use error_metrics::*;
Expand Down Expand Up @@ -87,7 +90,7 @@ impl MetricsLayer {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
static AWS_SDK_RETRY_COUNTS: LazyLock<GenericCounter<AtomicU64>> = LazyLock::new(|| {
let registry = GLOBAL_METRICS_REGISTRY.deref();
let registry = crate::monitor::GLOBAL_METRICS_REGISTRY.deref();
register_int_counter_with_registry!(
"aws_sdk_retry_counts",
"Total number of aws sdk retry happens",
Expand All @@ -101,3 +104,39 @@ impl MetricsLayer {
}
}
}

#[derive(Debug, Default, Clone, Copy, serde::Serialize, serde::Deserialize)]
pub enum MetricLevel {
#[default]
Disabled = 0,
Critical = 1,
Info = 2,
Debug = 3,
}

impl clap::ValueEnum for MetricLevel {
fn value_variants<'a>() -> &'a [Self] {
&[Self::Disabled, Self::Critical, Self::Info, Self::Debug]
}

fn to_possible_value<'a>(&self) -> ::std::option::Option<clap::builder::PossibleValue> {
match self {
Self::Disabled => Some(clap::builder::PossibleValue::new("disabled").alias("0")),
Self::Critical => Some(clap::builder::PossibleValue::new("critical")),
Self::Info => Some(clap::builder::PossibleValue::new("info").alias("1")),
Self::Debug => Some(clap::builder::PossibleValue::new("debug")),
}
}
}

impl PartialEq<Self> for MetricLevel {
fn eq(&self, other: &Self) -> bool {
(*self as u8).eq(&(*other as u8))
}
}

impl PartialOrd for MetricLevel {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
(*self as u8).partial_cmp(&(*other as u8))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, info, warn};

use crate::metrics::LabelGuardedIntCounterVec;
use crate::monitor::GLOBAL_METRICS_REGISTRY;
use crate::register_guarded_int_counter_vec_with_registry;
use crate::{register_guarded_int_counter_vec_with_registry, LabelGuardedIntCounterVec};

pub trait MonitorAsyncReadWrite {
fn on_read(&mut self, _size: usize) {}
Expand Down
File renamed without changes.
Loading

0 comments on commit e2a17d2

Please sign in to comment.