From 675767c02314176fbf52c050fa8b240edd5ae592 Mon Sep 17 00:00:00 2001 From: WU Jingdi Date: Fri, 22 Dec 2023 10:09:12 +0800 Subject: [PATCH] feat: Support export metric in remote write format (#2928) * feat: remote write metric task * chore: pass stanalone task to frontend * chore: change name to system metric * fix: add header and rename to export metrics --- Cargo.lock | 2 + config/datanode.example.toml | 16 ++ config/frontend.example.toml | 16 ++ config/metasrv.example.toml | 16 ++ config/standalone.example.toml | 16 ++ src/cmd/src/frontend.rs | 4 + src/cmd/src/standalone.rs | 9 + src/common/telemetry/Cargo.toml | 1 + src/common/telemetry/src/metric.rs | 370 ++++++++++++++++++++++++++- src/datanode/src/config.rs | 3 + src/datanode/src/datanode.rs | 11 + src/frontend/src/frontend.rs | 3 + src/frontend/src/instance.rs | 15 +- src/frontend/src/instance/builder.rs | 1 + src/meta-srv/src/bootstrap.rs | 11 + src/meta-srv/src/error.rs | 6 + src/meta-srv/src/metasrv.rs | 3 + src/servers/Cargo.toml | 1 + src/servers/src/error.rs | 12 + src/servers/src/export_metrics.rs | 179 +++++++++++++ src/servers/src/lib.rs | 1 + tests-integration/tests/http.rs | 14 + 22 files changed, 708 insertions(+), 2 deletions(-) create mode 100644 src/servers/src/export_metrics.rs diff --git a/Cargo.lock b/Cargo.lock index d3fd84a9493a..c26186b8dfc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1945,6 +1945,7 @@ dependencies = [ "backtrace", "common-error", "console-subscriber", + "greptime-proto", "lazy_static", "once_cell", "opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -8463,6 +8464,7 @@ dependencies = [ "query", "rand", "regex", + "reqwest", "rust-embed", "rustls 0.22.1", "rustls-pemfile 2.0.0", diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 591246737d9b..bab945190b2c 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -127,3 +127,19 @@ parallel_scan_channel_size = 32 # [logging] # dir = "/tmp/greptimedb/logs" # level = "info" + +# Datanode export the metrics generated by itself +# encoded to Prometheus remote-write format +# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false +# enable = false +# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. +# endpoint = "127.0.0.1:4000" +# The database name of exported metrics stores, user needs to specify a valid database +# db = "" +# The interval of export metrics +# write_interval = "30s" +# HTTP headers of Prometheus remote-write carry +# headers = {} diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 566ed42f9ecf..37a31ba8799d 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -77,3 +77,19 @@ tcp_nodelay = true timeout = "10s" connect_timeout = "10s" tcp_nodelay = true + +# Frontend export the metrics generated by itself +# encoded to Prometheus remote-write format +# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false +# enable = false +# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. +# endpoint = "127.0.0.1:4000" +# The database name of exported metrics stores, user needs to specify a valid database +# db = "" +# The interval of export metrics +# write_interval = "30s" +# HTTP headers of Prometheus remote-write carry +# headers = {} diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index fff978f8c15d..b6db3fa6df7a 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -66,3 +66,19 @@ provider = "raft_engine" # num_partitions = 1 # Expected number of replicas of each partition. # replication_factor = 3 + +# Metasrv export the metrics generated by itself +# encoded to Prometheus remote-write format +# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false +# enable = false +# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. +# endpoint = "127.0.0.1:4000" +# The database name of exported metrics stores, user needs to specify a valid database +# db = "" +# The interval of export metrics +# write_interval = "30s" +# HTTP headers of Prometheus remote-write carry +# headers = {} diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 305043d95c41..c8930a9646ca 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -184,3 +184,19 @@ parallel_scan_channel_size = 32 # otlp_endpoint = "localhost:4317" # The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0 # tracing_sample_ratio = 1.0 + +# Standalone export the metrics generated by itself +# encoded to Prometheus remote-write format +# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false +# enable = false +# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. +# endpoint = "127.0.0.1:4000" +# The database name of exported metrics stores, user needs to specify a valid database +# db = "" +# The interval of export metrics +# write_interval = "30s" +# HTTP headers of Prometheus remote-write carry +# headers = {} diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index f863b41863eb..b1d12e8844bc 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -249,6 +249,10 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; + instance + .build_export_metrics_task(&opts.export_metrics) + .context(StartFrontendSnafu)?; + instance .build_servers(opts) .await diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index bf15c6b6eb30..85dcc51b70ca 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -44,6 +44,7 @@ use frontend::service_config::{ }; use mito2::config::MitoConfig; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; @@ -112,6 +113,7 @@ pub struct StandaloneOptions { pub user_provider: Option, /// Options for different store engines. pub region_engine: Vec, + pub export_metrics: ExportMetricsOption, } impl Default for StandaloneOptions { @@ -131,6 +133,7 @@ impl Default for StandaloneOptions { metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), logging: LoggingOptions::default(), + export_metrics: ExportMetricsOption::default(), user_provider: None, region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), @@ -154,6 +157,8 @@ impl StandaloneOptions { meta_client: None, logging: self.logging, user_provider: self.user_provider, + // Handle the export metrics task run by standalone to frontend for execution + export_metrics: self.export_metrics, ..Default::default() } } @@ -407,6 +412,10 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; + frontend + .build_export_metrics_task(&opts.frontend.export_metrics) + .context(StartFrontendSnafu)?; + frontend .build_servers(opts) .await diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 6d5e7a96ffcf..2d03aa45d1fb 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -12,6 +12,7 @@ deadlock_detection = ["parking_lot/deadlock_detection"] backtrace = "0.3" common-error.workspace = true console-subscriber = { version = "0.1", optional = true } +greptime-proto.workspace = true lazy_static.workspace = true once_cell.workspace = true opentelemetry = { version = "0.21.0", default-features = false, features = [ diff --git a/src/common/telemetry/src/metric.rs b/src/common/telemetry/src/metric.rs index 239d4244c282..666b0546503f 100644 --- a/src/common/telemetry/src/metric.rs +++ b/src/common/telemetry/src/metric.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -// metric stuffs, inspired by databend +use std::sync::Arc; +use greptime_proto::prometheus::remote::{Sample, TimeSeries}; +use greptime_proto::prometheus::*; +use prometheus::proto::{LabelPair, MetricFamily, MetricType}; use prometheus::{Encoder, TextEncoder}; pub fn dump_metrics() -> Result { @@ -25,3 +28,368 @@ pub fn dump_metrics() -> Result { .map_err(|_| "Encode metrics failed".to_string())?; String::from_utf8(buffer).map_err(|e| e.to_string()) } + +/// `MetricFilter` used in `report_metric_task`. +/// for metric user don't want collect, return a `false`, else return a `true` +#[derive(Clone)] +pub struct MetricFilter { + inner: Arc bool + Send + Sync>, +} + +impl MetricFilter { + pub fn new(inner: Arc bool + Send + Sync>) -> Self { + Self { inner } + } + pub fn filter(&self, mf: &MetricFamily) -> bool { + (self.inner)(mf) + } +} + +pub fn convert_metric_to_write_request( + metric_families: Vec, + metric_filter: Option<&MetricFilter>, + default_timestamp: i64, +) -> remote::WriteRequest { + let mut timeseries: Vec = Vec::with_capacity(metric_families.len()); + for mf in metric_families { + if !metric_filter.map(|f| f.filter(&mf)).unwrap_or(true) { + continue; + } + let mf_type = mf.get_field_type(); + let mf_name = mf.get_name(); + for m in mf.get_metric() { + let timestamp = if m.get_timestamp_ms() == 0 { + default_timestamp + } else { + m.get_timestamp_ms() + }; + match mf_type { + MetricType::COUNTER => timeseries.push(TimeSeries { + labels: convert_label(m.get_label(), mf_name, None), + samples: vec![Sample { + value: m.get_counter().get_value(), + timestamp, + }], + exemplars: vec![], + }), + MetricType::GAUGE => timeseries.push(TimeSeries { + labels: convert_label(m.get_label(), mf_name, None), + samples: vec![Sample { + value: m.get_gauge().get_value(), + timestamp, + }], + exemplars: vec![], + }), + MetricType::HISTOGRAM => { + let h = m.get_histogram(); + let mut inf_seen = false; + let metric_name = format!("{}_bucket", mf_name); + for b in h.get_bucket() { + let upper_bound = b.get_upper_bound(); + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + metric_name.as_str(), + Some(("le", upper_bound.to_string())), + ), + samples: vec![Sample { + value: b.get_cumulative_count() as f64, + timestamp, + }], + exemplars: vec![], + }); + if upper_bound.is_sign_positive() && upper_bound.is_infinite() { + inf_seen = true; + } + } + if !inf_seen { + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + metric_name.as_str(), + Some(("le", "+Inf".to_string())), + ), + samples: vec![Sample { + value: h.get_sample_count() as f64, + timestamp, + }], + exemplars: vec![], + }); + } + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + format!("{}_sum", mf_name).as_str(), + None, + ), + samples: vec![Sample { + value: h.get_sample_sum(), + timestamp, + }], + exemplars: vec![], + }); + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + format!("{}_count", mf_name).as_str(), + None, + ), + samples: vec![Sample { + value: h.get_sample_count() as f64, + timestamp, + }], + exemplars: vec![], + }); + } + MetricType::SUMMARY => { + let s = m.get_summary(); + for q in s.get_quantile() { + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + mf_name, + Some(("quantile", q.get_quantile().to_string())), + ), + samples: vec![Sample { + value: q.get_value(), + timestamp, + }], + exemplars: vec![], + }); + } + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + format!("{}_sum", mf_name).as_str(), + None, + ), + samples: vec![Sample { + value: s.get_sample_sum(), + timestamp, + }], + exemplars: vec![], + }); + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + format!("{}_count", mf_name).as_str(), + None, + ), + samples: vec![Sample { + value: s.get_sample_count() as f64, + timestamp, + }], + exemplars: vec![], + }); + } + MetricType::UNTYPED => { + // `TextEncoder` `MetricType::UNTYPED` unimplemented + // To keep the implementation consistent and not cause unexpected panics, we do nothing here. + } + }; + } + } + remote::WriteRequest { + timeseries, + metadata: vec![], + } +} + +fn convert_label( + pairs: &[LabelPair], + name: &str, + addon: Option<(&'static str, String)>, +) -> Vec { + let mut labels = Vec::with_capacity(pairs.len() + 1 + if addon.is_some() { 1 } else { 0 }); + for label in pairs { + labels.push(remote::Label { + name: label.get_name().to_string(), + value: label.get_value().to_string(), + }); + } + labels.push(remote::Label { + name: "__name__".to_string(), + value: name.to_string(), + }); + if let Some(addon) = addon { + labels.push(remote::Label { + name: addon.0.to_string(), + value: addon.1, + }); + } + // Remote write protocol need label names sorted in lexicographical order. + labels.sort_unstable_by(|a, b| a.name.cmp(&b.name)); + labels +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use prometheus::core::Collector; + use prometheus::proto::{LabelPair, MetricFamily, MetricType}; + use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts}; + + use super::convert_label; + use crate::metric::{convert_metric_to_write_request, MetricFilter}; + + #[test] + fn test_convert_label() { + let pairs = vec![ + { + let mut pair = LabelPair::new(); + pair.set_name(String::from("a")); + pair.set_value(String::from("b")); + pair + }, + { + let mut pair = LabelPair::new(); + pair.set_name(String::from("e")); + pair.set_value(String::from("g")); + pair + }, + ]; + let label1 = convert_label(&pairs, "label1", None); + assert_eq!( + format!("{:?}", label1), + r#"[Label { name: "__name__", value: "label1" }, Label { name: "a", value: "b" }, Label { name: "e", value: "g" }]"# + ); + let label2 = convert_label(&pairs, "label2", Some(("c", "c".to_string()))); + assert_eq!( + format!("{:?}", label2), + r#"[Label { name: "__name__", value: "label2" }, Label { name: "a", value: "b" }, Label { name: "c", value: "c" }, Label { name: "e", value: "g" }]"# + ); + } + + #[test] + fn test_write_request_encoder() { + let counter_opts = Opts::new("test_counter", "test help") + .const_label("a", "1") + .const_label("b", "2"); + let counter = Counter::with_opts(counter_opts).unwrap(); + counter.inc(); + + let mf = counter.collect(); + let write_quest = convert_metric_to_write_request(mf, None, 0); + + assert_eq!( + format!("{:?}", write_quest.timeseries), + r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }]"# + ); + + let gauge_opts = Opts::new("test_gauge", "test help") + .const_label("a", "1") + .const_label("b", "2"); + let gauge = Gauge::with_opts(gauge_opts).unwrap(); + gauge.inc(); + gauge.set(42.0); + + let mf = gauge.collect(); + let write_quest = convert_metric_to_write_request(mf, None, 0); + assert_eq!( + format!("{:?}", write_quest.timeseries), + r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [] }]"# + ); + } + + #[test] + fn test_write_request_histogram() { + let opts = HistogramOpts::new("test_histogram", "test help").const_label("a", "1"); + let histogram = Histogram::with_opts(opts).unwrap(); + histogram.observe(0.25); + + let mf = histogram.collect(); + let write_quest = convert_metric_to_write_request(mf, None, 0); + let write_quest_str: Vec<_> = write_quest + .timeseries + .iter() + .map(|x| format!("{:?}", x)) + .collect(); + let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }"#; + assert_eq!(write_quest_str.join("\n"), ans); + } + + #[test] + fn test_write_request_summary() { + use prometheus::proto::{Metric, Quantile, Summary}; + + let mut metric_family = MetricFamily::default(); + metric_family.set_name("test_summary".to_string()); + metric_family.set_help("This is a test summary statistic".to_string()); + metric_family.set_field_type(MetricType::SUMMARY); + + let mut summary = Summary::default(); + summary.set_sample_count(5.0 as u64); + summary.set_sample_sum(15.0); + + let mut quantile1 = Quantile::default(); + quantile1.set_quantile(50.0); + quantile1.set_value(3.0); + + let mut quantile2 = Quantile::default(); + quantile2.set_quantile(100.0); + quantile2.set_value(5.0); + + summary.set_quantile(vec![quantile1, quantile2].into()); + + let mut metric = Metric::default(); + metric.set_summary(summary); + metric_family.set_metric(vec![metric].into()); + + let write_quest = convert_metric_to_write_request(vec![metric_family], None, 20); + let write_quest_str: Vec<_> = write_quest + .timeseries + .iter() + .map(|x| format!("{:?}", x)) + .collect(); + let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }"#; + assert_eq!(write_quest_str.join("\n"), ans); + } + + #[test] + fn test_metric_filter() { + let counter_opts = Opts::new("filter_counter", "test help") + .const_label("a", "1") + .const_label("b", "2"); + let counter_1 = Counter::with_opts(counter_opts).unwrap(); + counter_1.inc_by(1.0); + let counter_opts = Opts::new("test_counter", "test help") + .const_label("a", "1") + .const_label("b", "2"); + let counter_2 = Counter::with_opts(counter_opts).unwrap(); + counter_2.inc_by(2.0); + + let mut mf = counter_1.collect(); + mf.append(&mut counter_2.collect()); + + let filter = MetricFilter::new(Arc::new(|mf: &MetricFamily| { + !mf.get_name().starts_with("filter") + })); + let write_quest1 = convert_metric_to_write_request(mf.clone(), None, 0); + let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0); + assert_eq!( + format!("{:?}", write_quest1.timeseries), + r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"# + ); + assert_eq!( + format!("{:?}", write_quest2.timeseries), + r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"# + ); + } +} diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 772033c2d35f..3437a8ca6a4a 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -28,6 +28,7 @@ use meta_client::MetaClientOptions; use mito2::config::MitoConfig; use secrecy::SecretString; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; use servers::Mode; @@ -242,6 +243,7 @@ pub struct DatanodeOptions { pub region_engine: Vec, pub logging: LoggingOptions, pub enable_telemetry: bool, + pub export_metrics: ExportMetricsOption, } impl Default for DatanodeOptions { @@ -267,6 +269,7 @@ impl Default for DatanodeOptions { logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::datanode_default(), enable_telemetry: true, + export_metrics: ExportMetricsOption::default(), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 604146a5d4a3..97001b8b5408 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -43,6 +43,7 @@ use mito2::engine::MitoEngine; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; use query::QueryEngineFactory; +use servers::export_metrics::ExportMetricsTask; use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; @@ -84,6 +85,7 @@ pub struct Datanode { greptimedb_telemetry_task: Arc, leases_notifier: Option>, plugins: Plugins, + export_metrics_task: Option, } impl Datanode { @@ -95,6 +97,10 @@ impl Datanode { self.start_telemetry(); + if let Some(t) = self.export_metrics_task.as_ref() { + t.start() + } + self.start_services().await } @@ -277,6 +283,10 @@ impl DatanodeBuilder { None }; + let export_metrics_task = + ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins)) + .context(StartServerSnafu)?; + Ok(Datanode { services, heartbeat_task, @@ -285,6 +295,7 @@ impl DatanodeBuilder { region_event_receiver, leases_notifier, plugins: self.plugins.clone(), + export_metrics_task, }) } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index d43e3816fbf5..eddd0e73a1b6 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -15,6 +15,7 @@ use common_telemetry::logging::LoggingOptions; use meta_client::MetaClientOptions; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; use servers::Mode; @@ -44,6 +45,7 @@ pub struct FrontendOptions { pub logging: LoggingOptions, pub datanode: DatanodeOptions, pub user_provider: Option, + pub export_metrics: ExportMetricsOption, } impl Default for FrontendOptions { @@ -64,6 +66,7 @@ impl Default for FrontendOptions { logging: LoggingOptions::default(), datanode: DatanodeOptions::default(), user_provider: None, + export_metrics: ExportMetricsOption::default(), } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8d17db6de81e..59e6a7b4b2df 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -55,6 +55,7 @@ use query::QueryEngineRef; use raft_engine::{Config, ReadableSize, RecoveryMode}; use servers::error as server_error; use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu}; +use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask}; use servers::interceptor::{ PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef, }; @@ -77,7 +78,8 @@ pub use standalone::StandaloneDatanodeManager; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu, - PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, TableOperationSnafu, + PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu, + TableOperationSnafu, }; use crate::frontend::{FrontendOptions, TomlSerializable}; use crate::heartbeat::HeartbeatTask; @@ -116,6 +118,7 @@ pub struct Instance { heartbeat_task: Option, inserter: InserterRef, deleter: DeleterRef, + export_metrics_task: Option, } impl Instance { @@ -193,6 +196,12 @@ impl Instance { Ok(()) } + pub fn build_export_metrics_task(&mut self, opts: &ExportMetricsOption) -> Result<()> { + self.export_metrics_task = + ExportMetricsTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?; + Ok(()) + } + pub fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } @@ -222,6 +231,10 @@ impl FrontendInstance for Instance { self.script_executor.start(self)?; + if let Some(t) = self.export_metrics_task.as_ref() { + t.start() + } + futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move { info!("Starting service: {name}"); start_server(handler).await diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 1a3147be8f19..15711f9a7b19 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -144,6 +144,7 @@ impl FrontendBuilder { heartbeat_task: self.heartbeat_task, inserter, deleter, + export_metrics_task: None, }) } } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 076d25c77598..5e5361bf6a76 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -26,6 +26,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_telemetry::info; use etcd_client::Client; use servers::configurator::ConfiguratorRef; +use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; @@ -36,6 +37,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; +use crate::error::InitExportMetricsTaskSnafu; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; use crate::metasrv::builder::MetaSrvBuilder; @@ -57,6 +59,8 @@ pub struct MetaSrvInstance { signal_sender: Option>, plugins: Plugins, + + export_metrics_task: Option, } impl MetaSrvInstance { @@ -73,18 +77,25 @@ impl MetaSrvInstance { ); // put meta_srv into plugins for later use plugins.insert::>(Arc::new(meta_srv.clone())); + let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins)) + .context(InitExportMetricsTaskSnafu)?; Ok(MetaSrvInstance { meta_srv, http_srv, opts, signal_sender: None, plugins, + export_metrics_task, }) } pub async fn start(&mut self) -> Result<()> { self.meta_srv.try_start().await?; + if let Some(t) = self.export_metrics_task.as_ref() { + t.start() + } + let (tx, rx) = mpsc::channel::<()>(1); self.signal_sender = Some(tx); diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index c081f0a94e1b..7fa9f7d217c3 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -196,6 +196,11 @@ pub enum Error { location: Location, source: servers::error::Error, }, + #[snafu(display("Failed to init export metrics task"))] + InitExportMetricsTask { + location: Location, + source: servers::error::Error, + }, #[snafu(display("Failed to parse address {}", addr))] ParseAddr { addr: String, @@ -651,6 +656,7 @@ impl ErrorExt for Error { | Error::ParseNum { .. } | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } + | Error::InitExportMetricsTask { .. } | Error::InvalidHeartbeatRequest { .. } | Error::TooManyPartitions { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 906d9847c617..6b5bb8d7336f 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -32,6 +32,7 @@ use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; use snafu::ResultExt; use table::metadata::TableId; @@ -72,6 +73,7 @@ pub struct MetaSrvOptions { pub enable_telemetry: bool, pub data_home: String, pub wal: WalConfig, + pub export_metrics: ExportMetricsOption, } impl Default for MetaSrvOptions { @@ -97,6 +99,7 @@ impl Default for MetaSrvOptions { enable_telemetry: true, data_home: METASRV_HOME.to_string(), wal: WalConfig::default(), + export_metrics: ExportMetricsOption::default(), } } } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 448549fdf609..45621a26e170 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -72,6 +72,7 @@ prost.workspace = true query.workspace = true rand.workspace = true regex.workspace = true +reqwest.workspace = true rust-embed = { version = "6.6", features = ["debug-embed"] } rustls = "0.22" rustls-pemfile = "2.0" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 72819a90de9b..d73fcbe91397 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -214,6 +214,16 @@ pub enum Error { error: snap::Error, }, + #[snafu(display("Failed to send prometheus remote request"))] + SendPromRemoteRequest { + location: Location, + #[snafu(source)] + error: reqwest::Error, + }, + + #[snafu(display("Invalid export metrics config, msg: {}", msg))] + InvalidExportMetricsConfig { msg: String, location: Location }, + #[snafu(display("Failed to compress prometheus remote request"))] CompressPromRemoteRequest { location: Location, @@ -427,6 +437,7 @@ impl ErrorExt for Error { | AlreadyStarted { .. } | InvalidPromRemoteReadQueryResult { .. } | TcpBind { .. } + | SendPromRemoteRequest { .. } | TcpIncoming { .. } | CatalogError { .. } | GrpcReflectionService { .. } @@ -457,6 +468,7 @@ impl ErrorExt for Error { | CompressPromRemoteRequest { .. } | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } + | InvalidExportMetricsConfig { .. } | InvalidFlightTicket { .. } | InvalidPrepareStatement { .. } | DataFrame { .. } diff --git a/src/servers/src/export_metrics.rs b/src/servers/src/export_metrics.rs new file mode 100644 index 000000000000..5a08f0a079e4 --- /dev/null +++ b/src/servers/src/export_metrics.rs @@ -0,0 +1,179 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::time::Duration; + +use axum::http::HeaderValue; +use common_base::Plugins; +use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter}; +use common_telemetry::{error, info}; +use common_time::Timestamp; +use hyper::HeaderMap; +use prost::Message; +use reqwest::header::HeaderName; +use reqwest::{Client, Response}; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use tokio::time; + +use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu}; +use crate::prom_store::snappy_compress; + +/// Use to export the metrics generated by greptimedb, encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), +/// and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct ExportMetricsOption { + pub enable: bool, + pub endpoint: String, + pub db: String, + #[serde(with = "humantime_serde")] + pub write_interval: Duration, + pub headers: HashMap, +} + +impl Default for ExportMetricsOption { + fn default() -> Self { + Self { + enable: false, + endpoint: "127.0.0.1:4000".to_string(), + db: String::new(), + write_interval: Duration::from_secs(30), + headers: HashMap::new(), + } + } +} + +#[derive(Default, Clone)] +pub struct ExportMetricsTask { + config: ExportMetricsOption, + filter: Option, + headers: HeaderMap, +} + +impl ExportMetricsTask { + pub fn try_new( + config: &ExportMetricsOption, + plugins: Option<&Plugins>, + ) -> Result> { + if !config.enable { + return Ok(None); + } + let filter = plugins.map(|p| p.get::()).unwrap_or(None); + ensure!( + config.write_interval.as_secs() != 0, + InvalidExportMetricsConfigSnafu { + msg: "Expected export metrics write_interval greater than zero" + } + ); + ensure!( + !config.db.is_empty(), + InvalidExportMetricsConfigSnafu { + msg: "Expected export metrics db not empty" + } + ); + // construct http header + let mut headers = reqwest::header::HeaderMap::with_capacity(config.headers.len()); + config.headers.iter().try_for_each(|(k, v)| { + let header = match TryInto::::try_into(k) { + Ok(header) => header, + Err(_) => { + return InvalidExportMetricsConfigSnafu { + msg: format!("Export metrics: invalid HTTP header name: {}", k), + } + .fail() + } + }; + match TryInto::::try_into(v) { + Ok(value) => headers.insert(header, value), + Err(_) => { + return InvalidExportMetricsConfigSnafu { + msg: format!("Export metrics: invalid HTTP header value: {}", v), + } + .fail() + } + }; + Ok(()) + })?; + Ok(Some(Self { + config: config.clone(), + filter, + headers, + })) + } + + pub fn start(&self) { + if !self.config.enable { + return; + } + let mut interval = time::interval(self.config.write_interval); + let sec = self.config.write_interval.as_secs(); + let endpoint = format!( + "http://{}/v1/prometheus/write?db={}", + self.config.endpoint, self.config.db + ); + let filter = self.filter.clone(); + let headers = self.headers.clone(); + let _handle = common_runtime::spawn_bg(async move { + info!( + "Start export metrics task to endpoint: {}, interval: {}s", + endpoint, sec + ); + // Pass the first tick. Because the first tick completes immediately. + interval.tick().await; + let client = reqwest::Client::new(); + loop { + interval.tick().await; + match write_system_metric(&client, &endpoint, filter.as_ref(), headers.clone()) + .await + { + Ok(resp) => { + if !resp.status().is_success() { + error!("report export metrics error, msg: {:#?}", resp); + } + } + Err(e) => error!("report export metrics failed, error {}", e), + }; + } + }); + } +} + +/// Export the collected metrics, encode metrics into [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), +/// and send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) specified by `url`. +/// User could use `MetricFilter` to filter metric they don't want collect +pub async fn write_system_metric( + client: &Client, + url: &str, + filter: Option<&MetricFilter>, + headers: HeaderMap, +) -> Result { + let metric_families = prometheus::gather(); + let request = convert_metric_to_write_request( + metric_families, + filter, + Timestamp::current_millis().value(), + ); + // RemoteWrite format require compress by snappy + client + .post(url) + .header("X-Prometheus-Remote-Write-Version", "0.1.0") + .header("Content-Type", "application/x-protobuf") + .headers(headers) + .body(snappy_compress(&request.encode_to_vec())?) + .send() + .await + .context(SendPromRemoteRequestSnafu) +} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 2bb142977d4f..0d3ff0f85998 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; pub mod configurator; pub mod error; +pub mod export_metrics; pub mod grpc; pub mod heartbeat_options; pub mod http; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 44153d365108..5a8f2aa4aaf7 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -753,6 +753,13 @@ timeout = "10s" connect_timeout = "1s" tcp_nodelay = true +[frontend.export_metrics] +enable = false +db = "" +write_interval = "30s" + +[frontend.export_metrics.headers] + [datanode] mode = "standalone" node_id = 0 @@ -809,6 +816,13 @@ parallel_scan_channel_size = 32 [datanode.logging] enable_otlp_tracing = false +[datanode.export_metrics] +enable = false +db = "" +write_interval = "30s" + +[datanode.export_metrics.headers] + [logging] enable_otlp_tracing = false"#, store_type,