From c3e54a86d74c5cbfd0536364a80a873eb9a2db1b Mon Sep 17 00:00:00 2001 From: WUJingdi Date: Tue, 19 Dec 2023 18:36:21 +0800 Subject: [PATCH 1/4] feat: remote write metric task --- Cargo.lock | 2 + config/standalone.example.toml | 12 + src/cmd/src/error.rs | 7 + src/cmd/src/frontend.rs | 4 + src/cmd/src/options.rs | 2 + src/cmd/src/standalone.rs | 21 +- src/common/telemetry/Cargo.toml | 1 + src/common/telemetry/src/metric.rs | 371 ++++++++++++++++++++++++++- src/datanode/src/config.rs | 3 + src/datanode/src/datanode.rs | 11 + src/frontend/src/frontend.rs | 3 + src/frontend/src/instance.rs | 15 +- src/frontend/src/instance/builder.rs | 1 + src/meta-srv/src/bootstrap.rs | 12 + src/meta-srv/src/error.rs | 6 + src/meta-srv/src/metasrv.rs | 3 + src/servers/Cargo.toml | 1 + src/servers/src/error.rs | 12 + src/servers/src/lib.rs | 1 + src/servers/src/remote_writer.rs | 121 +++++++++ tests-integration/src/standalone.rs | 2 + tests-integration/tests/http.rs | 14 +- 22 files changed, 619 insertions(+), 6 deletions(-) create mode 100644 src/servers/src/remote_writer.rs diff --git a/Cargo.lock b/Cargo.lock index 91a040543474..f07fb08a25ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1939,6 +1939,7 @@ dependencies = [ "backtrace", "common-error", "console-subscriber", + "greptime-proto", "lazy_static", "once_cell", "opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -8417,6 +8418,7 @@ dependencies = [ "query", "rand", "regex", + "reqwest", "rust-embed", "rustls 0.22.1", "rustls-pemfile 2.0.0", diff --git a/config/standalone.example.toml b/config/standalone.example.toml index fd6965ab3946..1521aac44ebf 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -177,3 +177,15 @@ parallel_scan_channel_size = 32 # otlp_endpoint = "localhost:4317" # The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0 # tracing_sample_ratio = 1.0 + +# standalone/frontend/datanode/metasrv export the metrics generated by itself +# send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) +# [remote_write] +# whether enable export remote_write, default is false +# enable = false +# The url of remote write endpoint. +# Taking greptimedb as an example, for `standalone` deployed under the default configuration. +# The user can create a database called `system` in the db and export the metric to `http://127.0.0.1:4000/v1/prometheus/write?db=system` +# endpoint = "http://127.0.0.1:4000/v1/prometheus/write?db=system" +# The interval of export metric, +# write_interval = "30s" diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 91d556ff3861..d7c99504e35b 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -140,6 +140,12 @@ pub enum Error { source: common_recordbatch::error::Error, }, + #[snafu(display("Failed to init remote write metric task"))] + InitRemoteWriteMetricTask { + location: Location, + source: servers::error::Error, + }, + #[snafu(display("Failed to pretty print Recordbatches"))] PrettyPrintRecordBatches { location: Location, @@ -266,6 +272,7 @@ impl ErrorExt for Error { | Error::NotDataFromOutput { .. } | Error::CreateDir { .. } | Error::EmptyResult { .. } + | Error::InitRemoteWriteMetricTask { .. } | Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments, Error::StartProcedureManager { source, .. } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index f863b41863eb..beebf834f900 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -249,6 +249,10 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; + instance + .build_remote_write_metric_task(&opts.remote_write) + .context(StartFrontendSnafu)?; + instance .build_servers(opts) .await diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 39a3d94e6de3..9f0ab8fac6bf 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -21,6 +21,7 @@ use frontend::error::{Result as FeResult, TomlFormatSnafu}; use frontend::frontend::{FrontendOptions, TomlSerializable}; use meta_srv::metasrv::MetaSrvOptions; use serde::{Deserialize, Serialize}; +use servers::remote_writer::RemoteWriteOptions; use snafu::ResultExt; use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu}; @@ -37,6 +38,7 @@ pub struct MixOptions { pub frontend: FrontendOptions, pub datanode: DatanodeOptions, pub logging: LoggingOptions, + pub remote_write: RemoteWriteOptions, } impl From for FrontendOptions { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 55ed1d9817a5..7589846cf9c1 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -45,14 +45,15 @@ use frontend::service_config::{ use mito2::config::MitoConfig; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; +use servers::remote_writer::{RemoteWriteMetricTask, RemoteWriteOptions}; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, Result, - ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, - StartProcedureManagerSnafu, StopProcedureManagerSnafu, + CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, + InitRemoteWriteMetricTaskSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, + StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; use crate::options::{CliOptions, MixOptions, Options}; use crate::App; @@ -112,6 +113,7 @@ pub struct StandaloneOptions { pub user_provider: Option, /// Options for different store engines. pub region_engine: Vec, + pub remote_write: RemoteWriteOptions, } impl Default for StandaloneOptions { @@ -131,6 +133,7 @@ impl Default for StandaloneOptions { metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), logging: LoggingOptions::default(), + remote_write: RemoteWriteOptions::default(), user_provider: None, region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), @@ -175,6 +178,7 @@ pub struct Instance { datanode: Datanode, frontend: FeInstance, procedure_manager: ProcedureManagerRef, + remote_write_metric_task: Option, } #[async_trait] @@ -191,6 +195,10 @@ impl App for Instance { .await .context(StartProcedureManagerSnafu)?; + if let Some(t) = self.remote_write_metric_task.as_ref() { + t.start() + } + self.frontend.start().await.context(StartFrontendSnafu)?; Ok(()) } @@ -320,6 +328,7 @@ impl StartCommand { let procedure = opts.procedure.clone(); let frontend = opts.clone().frontend_options(); let logging = opts.logging.clone(); + let remote_write = opts.remote_write.clone(); let datanode = opts.datanode_options(); Ok(Options::Standalone(Box::new(MixOptions { @@ -329,6 +338,7 @@ impl StartCommand { frontend, datanode, logging, + remote_write, }))) } @@ -393,6 +403,10 @@ impl StartCommand { ) .await?; + let remote_write_metric_task = + RemoteWriteMetricTask::try_new(&opts.remote_write, Some(&fe_plugins)) + .context(InitRemoteWriteMetricTaskSnafu)?; + let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor) .with_plugin(fe_plugins) .try_build() @@ -408,6 +422,7 @@ impl StartCommand { datanode, frontend, procedure_manager, + remote_write_metric_task, }) } diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 6d5e7a96ffcf..2d03aa45d1fb 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -12,6 +12,7 @@ deadlock_detection = ["parking_lot/deadlock_detection"] backtrace = "0.3" common-error.workspace = true console-subscriber = { version = "0.1", optional = true } +greptime-proto.workspace = true lazy_static.workspace = true once_cell.workspace = true opentelemetry = { version = "0.21.0", default-features = false, features = [ diff --git a/src/common/telemetry/src/metric.rs b/src/common/telemetry/src/metric.rs index 239d4244c282..9dc5f60171e9 100644 --- a/src/common/telemetry/src/metric.rs +++ b/src/common/telemetry/src/metric.rs @@ -12,8 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -// metric stuffs, inspired by databend +use std::sync::Arc; +// metric stuffs, inspired by databend +use greptime_proto::prometheus::remote::{Sample, TimeSeries}; +use greptime_proto::prometheus::*; +use prometheus::proto::{LabelPair, MetricFamily, MetricType}; use prometheus::{Encoder, TextEncoder}; pub fn dump_metrics() -> Result { @@ -25,3 +29,368 @@ pub fn dump_metrics() -> Result { .map_err(|_| "Encode metrics failed".to_string())?; String::from_utf8(buffer).map_err(|e| e.to_string()) } + +/// `MetricFilter` used in `report_metric_task`. +/// for metric user don't want collect, return a `false`, else return a `true` +#[derive(Clone)] +pub struct MetricFilter { + inner: Arc bool + Send + Sync>, +} + +impl MetricFilter { + pub fn new(inner: Arc bool + Send + Sync>) -> Self { + Self { inner } + } + pub fn filter(&self, mf: &MetricFamily) -> bool { + (self.inner)(mf) + } +} + +pub fn convert_metric_to_write_request( + metric_families: Vec, + metric_filter: Option<&MetricFilter>, + default_timestamp: i64, +) -> remote::WriteRequest { + let mut timeseries: Vec = Vec::with_capacity(metric_families.len()); + for mf in metric_families { + if !metric_filter.map(|f| f.filter(&mf)).unwrap_or(true) { + continue; + } + let mf_type = mf.get_field_type(); + let mf_name = mf.get_name(); + for m in mf.get_metric() { + let timestamp = if m.get_timestamp_ms() == 0 { + default_timestamp + } else { + m.get_timestamp_ms() + }; + match mf_type { + MetricType::COUNTER => timeseries.push(TimeSeries { + labels: convert_label(m.get_label(), mf_name, None), + samples: vec![Sample { + value: m.get_counter().get_value(), + timestamp, + }], + exemplars: vec![], + }), + MetricType::GAUGE => timeseries.push(TimeSeries { + labels: convert_label(m.get_label(), mf_name, None), + samples: vec![Sample { + value: m.get_gauge().get_value(), + timestamp, + }], + exemplars: vec![], + }), + MetricType::HISTOGRAM => { + let h = m.get_histogram(); + let mut inf_seen = false; + let metric_name = format!("{}_bucket", mf_name); + for b in h.get_bucket() { + let upper_bound = b.get_upper_bound(); + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + metric_name.as_str(), + Some(("le", upper_bound.to_string())), + ), + samples: vec![Sample { + value: b.get_cumulative_count() as f64, + timestamp, + }], + exemplars: vec![], + }); + if upper_bound.is_sign_positive() && upper_bound.is_infinite() { + inf_seen = true; + } + } + if !inf_seen { + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + metric_name.as_str(), + Some(("le", "+Inf".to_string())), + ), + samples: vec![Sample { + value: h.get_sample_count() as f64, + timestamp, + }], + exemplars: vec![], + }); + } + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + format!("{}_sum", mf_name).as_str(), + None, + ), + samples: vec![Sample { + value: h.get_sample_sum(), + timestamp, + }], + exemplars: vec![], + }); + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + format!("{}_count", mf_name).as_str(), + None, + ), + samples: vec![Sample { + value: h.get_sample_count() as f64, + timestamp, + }], + exemplars: vec![], + }); + } + MetricType::SUMMARY => { + let s = m.get_summary(); + for q in s.get_quantile() { + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + mf_name, + Some(("quantile", q.get_quantile().to_string())), + ), + samples: vec![Sample { + value: q.get_value(), + timestamp, + }], + exemplars: vec![], + }); + } + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + format!("{}_sum", mf_name).as_str(), + None, + ), + samples: vec![Sample { + value: s.get_sample_sum(), + timestamp, + }], + exemplars: vec![], + }); + timeseries.push(TimeSeries { + labels: convert_label( + m.get_label(), + format!("{}_count", mf_name).as_str(), + None, + ), + samples: vec![Sample { + value: s.get_sample_count() as f64, + timestamp, + }], + exemplars: vec![], + }); + } + MetricType::UNTYPED => { + // `TextEncoder` `MetricType::UNTYPED` unimplemented + // To keep the implementation consistent and not cause unexpected panics, we do nothing here. + } + }; + } + } + remote::WriteRequest { + timeseries, + metadata: vec![], + } +} + +fn convert_label( + pairs: &[LabelPair], + name: &str, + addon: Option<(&'static str, String)>, +) -> Vec { + let mut labels = Vec::with_capacity(pairs.len() + 1 + if addon.is_some() { 1 } else { 0 }); + for label in pairs { + labels.push(remote::Label { + name: label.get_name().to_string(), + value: label.get_value().to_string(), + }); + } + labels.push(remote::Label { + name: "__name__".to_string(), + value: name.to_string(), + }); + if let Some(addon) = addon { + labels.push(remote::Label { + name: addon.0.to_string(), + value: addon.1, + }); + } + // Remote write protocol need label names sorted in lexicographical order. + labels.sort_unstable_by(|a, b| a.name.cmp(&b.name)); + labels +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use prometheus::core::Collector; + use prometheus::proto::{LabelPair, MetricFamily, MetricType}; + use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts}; + + use super::convert_label; + use crate::metric::{convert_metric_to_write_request, MetricFilter}; + + #[test] + fn test_convert_label() { + let pairs = vec![ + { + let mut pair = LabelPair::new(); + pair.set_name(String::from("a")); + pair.set_value(String::from("b")); + pair + }, + { + let mut pair = LabelPair::new(); + pair.set_name(String::from("e")); + pair.set_value(String::from("g")); + pair + }, + ]; + let label1 = convert_label(&pairs, "label1", None); + assert_eq!( + format!("{:?}", label1), + r#"[Label { name: "__name__", value: "label1" }, Label { name: "a", value: "b" }, Label { name: "e", value: "g" }]"# + ); + let label2 = convert_label(&pairs, "label2", Some(("c", "c".to_string()))); + assert_eq!( + format!("{:?}", label2), + r#"[Label { name: "__name__", value: "label2" }, Label { name: "a", value: "b" }, Label { name: "c", value: "c" }, Label { name: "e", value: "g" }]"# + ); + } + + #[test] + fn test_write_request_encoder() { + let counter_opts = Opts::new("test_counter", "test help") + .const_label("a", "1") + .const_label("b", "2"); + let counter = Counter::with_opts(counter_opts).unwrap(); + counter.inc(); + + let mf = counter.collect(); + let write_quest = convert_metric_to_write_request(mf, None, 0); + + assert_eq!( + format!("{:?}", write_quest.timeseries), + r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }]"# + ); + + let gauge_opts = Opts::new("test_gauge", "test help") + .const_label("a", "1") + .const_label("b", "2"); + let gauge = Gauge::with_opts(gauge_opts).unwrap(); + gauge.inc(); + gauge.set(42.0); + + let mf = gauge.collect(); + let write_quest = convert_metric_to_write_request(mf, None, 0); + assert_eq!( + format!("{:?}", write_quest.timeseries), + r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [] }]"# + ); + } + + #[test] + fn test_write_request_histogram() { + let opts = HistogramOpts::new("test_histogram", "test help").const_label("a", "1"); + let histogram = Histogram::with_opts(opts).unwrap(); + histogram.observe(0.25); + + let mf = histogram.collect(); + let write_quest = convert_metric_to_write_request(mf, None, 0); + let write_quest_str: Vec<_> = write_quest + .timeseries + .iter() + .map(|x| format!("{:?}", x)) + .collect(); + let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }"#; + assert_eq!(write_quest_str.join("\n"), ans); + } + + #[test] + fn test_write_request_summary() { + use prometheus::proto::{Metric, Quantile, Summary}; + + let mut metric_family = MetricFamily::default(); + metric_family.set_name("test_summary".to_string()); + metric_family.set_help("This is a test summary statistic".to_string()); + metric_family.set_field_type(MetricType::SUMMARY); + + let mut summary = Summary::default(); + summary.set_sample_count(5.0 as u64); + summary.set_sample_sum(15.0); + + let mut quantile1 = Quantile::default(); + quantile1.set_quantile(50.0); + quantile1.set_value(3.0); + + let mut quantile2 = Quantile::default(); + quantile2.set_quantile(100.0); + quantile2.set_value(5.0); + + summary.set_quantile(vec![quantile1, quantile2].into()); + + let mut metric = Metric::default(); + metric.set_summary(summary); + metric_family.set_metric(vec![metric].into()); + + let write_quest = convert_metric_to_write_request(vec![metric_family], None, 20); + let write_quest_str: Vec<_> = write_quest + .timeseries + .iter() + .map(|x| format!("{:?}", x)) + .collect(); + let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [] } +TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }"#; + assert_eq!(write_quest_str.join("\n"), ans); + } + + #[test] + fn test_metric_filter() { + let counter_opts = Opts::new("filter_counter", "test help") + .const_label("a", "1") + .const_label("b", "2"); + let counter_1 = Counter::with_opts(counter_opts).unwrap(); + counter_1.inc_by(1.0); + let counter_opts = Opts::new("test_counter", "test help") + .const_label("a", "1") + .const_label("b", "2"); + let counter_2 = Counter::with_opts(counter_opts).unwrap(); + counter_2.inc_by(2.0); + + let mut mf = counter_1.collect(); + mf.append(&mut counter_2.collect()); + + let filter = MetricFilter::new(Arc::new(|mf: &MetricFamily| { + !mf.get_name().starts_with("filter") + })); + let write_quest1 = convert_metric_to_write_request(mf.clone(), None, 0); + let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0); + assert_eq!( + format!("{:?}", write_quest1.timeseries), + r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"# + ); + assert_eq!( + format!("{:?}", write_quest2.timeseries), + r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"# + ); + } +} diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index f26b0828c873..12ebc71a2d52 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -30,6 +30,7 @@ use secrecy::SecretString; use serde::{Deserialize, Serialize}; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; +use servers::remote_writer::RemoteWriteOptions; use servers::Mode; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256); @@ -241,6 +242,7 @@ pub struct DatanodeOptions { pub region_engine: Vec, pub logging: LoggingOptions, pub enable_telemetry: bool, + pub remote_write: RemoteWriteOptions, } impl Default for DatanodeOptions { @@ -265,6 +267,7 @@ impl Default for DatanodeOptions { logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::datanode_default(), enable_telemetry: true, + remote_write: RemoteWriteOptions::default(), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 6ea9288debb5..2da4ee3e98f3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -42,6 +42,7 @@ use query::QueryEngineFactory; use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; +use servers::remote_writer::RemoteWriteMetricTask; use servers::server::{start_server, ServerHandler, ServerHandlers}; use servers::Mode; use snafu::{OptionExt, ResultExt}; @@ -81,6 +82,7 @@ pub struct Datanode { greptimedb_telemetry_task: Arc, leases_notifier: Option>, plugins: Plugins, + remote_write_metric_task: Option, } impl Datanode { @@ -92,6 +94,10 @@ impl Datanode { self.start_telemetry(); + if let Some(t) = self.remote_write_metric_task.as_ref() { + t.start() + } + self.start_services().await } @@ -259,6 +265,10 @@ impl DatanodeBuilder { None }; + let remote_write_metric_task = + RemoteWriteMetricTask::try_new(&self.opts.remote_write, Some(&self.plugins)) + .context(StartServerSnafu)?; + Ok(Datanode { services, heartbeat_task, @@ -267,6 +277,7 @@ impl DatanodeBuilder { region_event_receiver, leases_notifier, plugins: self.plugins.clone(), + remote_write_metric_task, }) } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index d43e3816fbf5..b1928df87819 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -17,6 +17,7 @@ use meta_client::MetaClientOptions; use serde::{Deserialize, Serialize}; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; +use servers::remote_writer::RemoteWriteOptions; use servers::Mode; use snafu::prelude::*; @@ -44,6 +45,7 @@ pub struct FrontendOptions { pub logging: LoggingOptions, pub datanode: DatanodeOptions, pub user_provider: Option, + pub remote_write: RemoteWriteOptions, } impl Default for FrontendOptions { @@ -64,6 +66,7 @@ impl Default for FrontendOptions { logging: LoggingOptions::default(), datanode: DatanodeOptions::default(), user_provider: None, + remote_write: RemoteWriteOptions::default(), } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8d17db6de81e..e76dee112daa 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -65,6 +65,7 @@ use servers::query_handler::{ InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, PromStoreProtocolHandler, ScriptHandler, }; +use servers::remote_writer::{RemoteWriteMetricTask, RemoteWriteOptions}; use servers::server::{start_server, ServerHandlers}; use session::context::QueryContextRef; use snafu::prelude::*; @@ -77,7 +78,8 @@ pub use standalone::StandaloneDatanodeManager; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu, - PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, TableOperationSnafu, + PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu, + TableOperationSnafu, }; use crate::frontend::{FrontendOptions, TomlSerializable}; use crate::heartbeat::HeartbeatTask; @@ -116,6 +118,7 @@ pub struct Instance { heartbeat_task: Option, inserter: InserterRef, deleter: DeleterRef, + remote_write_metric_task: Option, } impl Instance { @@ -193,6 +196,12 @@ impl Instance { Ok(()) } + pub fn build_remote_write_metric_task(&mut self, opts: &RemoteWriteOptions) -> Result<()> { + self.remote_write_metric_task = + RemoteWriteMetricTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?; + Ok(()) + } + pub fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } @@ -222,6 +231,10 @@ impl FrontendInstance for Instance { self.script_executor.start(self)?; + if let Some(t) = self.remote_write_metric_task.as_ref() { + t.start() + } + futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move { info!("Starting service: {name}"); start_server(handler).await diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 1a3147be8f19..550f6fc4251b 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -144,6 +144,7 @@ impl FrontendBuilder { heartbeat_task: self.heartbeat_task, inserter, deleter, + remote_write_metric_task: None, }) } } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 076d25c77598..2a8f47803461 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -28,6 +28,7 @@ use etcd_client::Client; use servers::configurator::ConfiguratorRef; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; +use servers::remote_writer::RemoteWriteMetricTask; use servers::server::Server; use snafu::ResultExt; use tokio::net::TcpListener; @@ -36,6 +37,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; +use crate::error::InitRemoteWriteMetricTaskSnafu; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; use crate::metasrv::builder::MetaSrvBuilder; @@ -57,6 +59,8 @@ pub struct MetaSrvInstance { signal_sender: Option>, plugins: Plugins, + + remote_write_metric_task: Option, } impl MetaSrvInstance { @@ -73,18 +77,26 @@ impl MetaSrvInstance { ); // put meta_srv into plugins for later use plugins.insert::>(Arc::new(meta_srv.clone())); + let remote_write_metric_task = + RemoteWriteMetricTask::try_new(&opts.remote_write, Some(&plugins)) + .context(InitRemoteWriteMetricTaskSnafu)?; Ok(MetaSrvInstance { meta_srv, http_srv, opts, signal_sender: None, plugins, + remote_write_metric_task, }) } pub async fn start(&mut self) -> Result<()> { self.meta_srv.try_start().await?; + if let Some(t) = self.remote_write_metric_task.as_ref() { + t.start() + } + let (tx, rx) = mpsc::channel::<()>(1); self.signal_sender = Some(tx); diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index c081f0a94e1b..a973bbb25e33 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -196,6 +196,11 @@ pub enum Error { location: Location, source: servers::error::Error, }, + #[snafu(display("Failed to init remote write metric task"))] + InitRemoteWriteMetricTask { + location: Location, + source: servers::error::Error, + }, #[snafu(display("Failed to parse address {}", addr))] ParseAddr { addr: String, @@ -651,6 +656,7 @@ impl ErrorExt for Error { | Error::ParseNum { .. } | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } + | Error::InitRemoteWriteMetricTask { .. } | Error::InvalidHeartbeatRequest { .. } | Error::TooManyPartitions { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 906d9847c617..44eff939f101 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -33,6 +33,7 @@ use common_telemetry::logging::LoggingOptions; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; +use servers::remote_writer::RemoteWriteOptions; use snafu::ResultExt; use table::metadata::TableId; use tokio::sync::broadcast::error::RecvError; @@ -72,6 +73,7 @@ pub struct MetaSrvOptions { pub enable_telemetry: bool, pub data_home: String, pub wal: WalConfig, + pub remote_write: RemoteWriteOptions, } impl Default for MetaSrvOptions { @@ -97,6 +99,7 @@ impl Default for MetaSrvOptions { enable_telemetry: true, data_home: METASRV_HOME.to_string(), wal: WalConfig::default(), + remote_write: RemoteWriteOptions::default(), } } } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 448549fdf609..45621a26e170 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -72,6 +72,7 @@ prost.workspace = true query.workspace = true rand.workspace = true regex.workspace = true +reqwest.workspace = true rust-embed = { version = "6.6", features = ["debug-embed"] } rustls = "0.22" rustls-pemfile = "2.0" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 4c21b9832693..f25ffd582d6a 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -214,6 +214,16 @@ pub enum Error { error: snap::Error, }, + #[snafu(display("Failed to send prometheus remote request"))] + SendPromRemoteRequest { + location: Location, + #[snafu(source)] + error: reqwest::Error, + }, + + #[snafu(display("Invalid prometheus remote write config, msg: {}", msg))] + InvalidRemoteWriteConfig { msg: String, location: Location }, + #[snafu(display("Failed to compress prometheus remote request"))] CompressPromRemoteRequest { location: Location, @@ -427,6 +437,7 @@ impl ErrorExt for Error { | AlreadyStarted { .. } | InvalidPromRemoteReadQueryResult { .. } | TcpBind { .. } + | SendPromRemoteRequest { .. } | TcpIncoming { .. } | CatalogError { .. } | GrpcReflectionService { .. } @@ -457,6 +468,7 @@ impl ErrorExt for Error { | CompressPromRemoteRequest { .. } | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } + | InvalidRemoteWriteConfig { .. } | InvalidFlightTicket { .. } | InvalidPrepareStatement { .. } | DataFrame { .. } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 2bb142977d4f..c08003b4481d 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -37,6 +37,7 @@ pub mod postgres; pub mod prom_store; pub mod prometheus_handler; pub mod query_handler; +pub mod remote_writer; mod row_writer; pub mod server; mod shutdown; diff --git a/src/servers/src/remote_writer.rs b/src/servers/src/remote_writer.rs new file mode 100644 index 000000000000..8c564e7c3af5 --- /dev/null +++ b/src/servers/src/remote_writer.rs @@ -0,0 +1,121 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_base::Plugins; +use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter}; +use common_telemetry::{error, info}; +use common_time::Timestamp; +use prost::Message; +use reqwest::Response; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use tokio::time; + +use crate::error::{InvalidRemoteWriteConfigSnafu, Result, SendPromRemoteRequestSnafu}; +use crate::prom_store::snappy_compress; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct RemoteWriteOptions { + pub enable: bool, + pub endpoint: String, + #[serde(with = "humantime_serde")] + pub write_interval: Duration, +} + +impl Default for RemoteWriteOptions { + fn default() -> Self { + Self { + enable: false, + endpoint: String::new(), + write_interval: Duration::from_secs(30), + } + } +} + +#[derive(Default, Clone)] +pub struct RemoteWriteMetricTask { + config: RemoteWriteOptions, + filter: Option, +} + +impl RemoteWriteMetricTask { + pub fn try_new(config: &RemoteWriteOptions, plugins: Option<&Plugins>) -> Result> { + if !config.enable { + return Ok(None); + } + let filter = plugins.map(|p| p.get::()).unwrap_or(None); + ensure!( + config.write_interval.as_secs() != 0, + InvalidRemoteWriteConfigSnafu { + msg: "Expected Remote write write_interval greater than zero" + } + ); + Ok(Some(Self { + config: config.clone(), + filter, + })) + } + pub fn start(&self) { + if !self.config.enable { + return; + } + let mut interval = time::interval(self.config.write_interval); + let sec = self.config.write_interval.as_secs(); + let endpoint = self.config.endpoint.clone(); + let filter = self.filter.clone(); + let _handle = common_runtime::spawn_bg(async move { + info!( + "Start remote write metric task to endpoint: {}, interval: {}s", + endpoint, sec + ); + loop { + interval.tick().await; + match report_metric(&endpoint, filter.as_ref()).await { + Ok(resp) => { + if !resp.status().is_success() { + error!("report metric in remote write error, msg: {:#?}", resp); + } + } + Err(e) => error!("report metric in remote write failed, error {}", e), + }; + } + }); + } +} + +/// Export the collected metrics, encode metrics into [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), +/// and send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) specified by `url`. +/// User could use `MetricFilter` to filter metric they don't want collect +pub async fn report_metric(url: &str, filter: Option<&MetricFilter>) -> Result { + let metric_families = prometheus::gather(); + let request = convert_metric_to_write_request( + metric_families, + filter, + Timestamp::current_millis().value(), + ); + // RemoteWrite format require compress by snappy + let content = snappy_compress(&request.encode_to_vec())?; + let client = reqwest::Client::new(); + client + .post(url) + .header("X-Prometheus-Remote-Write-Version", "0.1.0") + .header("Content-Type", "application/x-protobuf") + .body(content) + .send() + .await + .context(SendPromRemoteRequestSnafu) +} diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 0f561ca1512b..922955e6220c 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -32,6 +32,7 @@ use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::standalone::StandaloneTableMetadataAllocator; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; +use servers::remote_writer::RemoteWriteOptions; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -162,6 +163,7 @@ impl GreptimeDbStandaloneBuilder { frontend: FrontendOptions::default(), datanode: opts, logging: LoggingOptions::default(), + remote_write: RemoteWriteOptions::default(), }, guard, } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c26e840f20c0..7287e9797705 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -753,6 +753,10 @@ timeout = "10s" connect_timeout = "1s" tcp_nodelay = true +[frontend.remote_write] +enable = false +write_interval = "30s" + [datanode] mode = "standalone" node_id = 0 @@ -807,8 +811,16 @@ parallel_scan_channel_size = 32 [datanode.logging] enable_otlp_tracing = false +[datanode.remote_write] +enable = false +write_interval = "30s" + [logging] -enable_otlp_tracing = false"#, +enable_otlp_tracing = false + +[remote_write] +enable = false +write_interval = "30s""#, store_type, ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); From 854b055388189d4c81a5fc4ad7326e230282bd10 Mon Sep 17 00:00:00 2001 From: WUJingdi Date: Wed, 20 Dec 2023 09:41:51 +0800 Subject: [PATCH 2/4] chore: pass stanalone task to frontend --- src/cmd/src/error.rs | 7 ------- src/cmd/src/options.rs | 2 -- src/cmd/src/standalone.rs | 26 ++++++++++---------------- src/servers/src/remote_writer.rs | 2 ++ tests-integration/src/standalone.rs | 2 -- tests-integration/tests/http.rs | 6 +----- 6 files changed, 13 insertions(+), 32 deletions(-) diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index d7c99504e35b..91d556ff3861 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -140,12 +140,6 @@ pub enum Error { source: common_recordbatch::error::Error, }, - #[snafu(display("Failed to init remote write metric task"))] - InitRemoteWriteMetricTask { - location: Location, - source: servers::error::Error, - }, - #[snafu(display("Failed to pretty print Recordbatches"))] PrettyPrintRecordBatches { location: Location, @@ -272,7 +266,6 @@ impl ErrorExt for Error { | Error::NotDataFromOutput { .. } | Error::CreateDir { .. } | Error::EmptyResult { .. } - | Error::InitRemoteWriteMetricTask { .. } | Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments, Error::StartProcedureManager { source, .. } diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 9f0ab8fac6bf..39a3d94e6de3 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -21,7 +21,6 @@ use frontend::error::{Result as FeResult, TomlFormatSnafu}; use frontend::frontend::{FrontendOptions, TomlSerializable}; use meta_srv::metasrv::MetaSrvOptions; use serde::{Deserialize, Serialize}; -use servers::remote_writer::RemoteWriteOptions; use snafu::ResultExt; use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu}; @@ -38,7 +37,6 @@ pub struct MixOptions { pub frontend: FrontendOptions, pub datanode: DatanodeOptions, pub logging: LoggingOptions, - pub remote_write: RemoteWriteOptions, } impl From for FrontendOptions { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 7589846cf9c1..91e546d67526 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -45,15 +45,15 @@ use frontend::service_config::{ use mito2::config::MitoConfig; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; -use servers::remote_writer::{RemoteWriteMetricTask, RemoteWriteOptions}; +use servers::remote_writer::RemoteWriteOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, - InitRemoteWriteMetricTaskSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, - StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, + CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, Result, + ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, + StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; use crate::options::{CliOptions, MixOptions, Options}; use crate::App; @@ -157,6 +157,8 @@ impl StandaloneOptions { meta_client: None, logging: self.logging, user_provider: self.user_provider, + // Handle the remote write metric task run by standalone to frontend for execution + remote_write: self.remote_write, ..Default::default() } } @@ -178,7 +180,6 @@ pub struct Instance { datanode: Datanode, frontend: FeInstance, procedure_manager: ProcedureManagerRef, - remote_write_metric_task: Option, } #[async_trait] @@ -195,10 +196,6 @@ impl App for Instance { .await .context(StartProcedureManagerSnafu)?; - if let Some(t) = self.remote_write_metric_task.as_ref() { - t.start() - } - self.frontend.start().await.context(StartFrontendSnafu)?; Ok(()) } @@ -328,7 +325,6 @@ impl StartCommand { let procedure = opts.procedure.clone(); let frontend = opts.clone().frontend_options(); let logging = opts.logging.clone(); - let remote_write = opts.remote_write.clone(); let datanode = opts.datanode_options(); Ok(Options::Standalone(Box::new(MixOptions { @@ -338,7 +334,6 @@ impl StartCommand { frontend, datanode, logging, - remote_write, }))) } @@ -403,16 +398,16 @@ impl StartCommand { ) .await?; - let remote_write_metric_task = - RemoteWriteMetricTask::try_new(&opts.remote_write, Some(&fe_plugins)) - .context(InitRemoteWriteMetricTaskSnafu)?; - let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor) .with_plugin(fe_plugins) .try_build() .await .context(StartFrontendSnafu)?; + frontend + .build_remote_write_metric_task(&opts.frontend.remote_write) + .context(StartFrontendSnafu)?; + frontend .build_servers(opts) .await @@ -422,7 +417,6 @@ impl StartCommand { datanode, frontend, procedure_manager, - remote_write_metric_task, }) } diff --git a/src/servers/src/remote_writer.rs b/src/servers/src/remote_writer.rs index 8c564e7c3af5..79dac3742133 100644 --- a/src/servers/src/remote_writer.rs +++ b/src/servers/src/remote_writer.rs @@ -82,6 +82,8 @@ impl RemoteWriteMetricTask { "Start remote write metric task to endpoint: {}, interval: {}s", endpoint, sec ); + // Pass the first tick. Because the first tick completes immediately. + interval.tick().await; loop { interval.tick().await; match report_metric(&endpoint, filter.as_ref()).await { diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 922955e6220c..0f561ca1512b 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -32,7 +32,6 @@ use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::standalone::StandaloneTableMetadataAllocator; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; -use servers::remote_writer::RemoteWriteOptions; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -163,7 +162,6 @@ impl GreptimeDbStandaloneBuilder { frontend: FrontendOptions::default(), datanode: opts, logging: LoggingOptions::default(), - remote_write: RemoteWriteOptions::default(), }, guard, } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7287e9797705..ef2f6bb823e4 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -816,11 +816,7 @@ enable = false write_interval = "30s" [logging] -enable_otlp_tracing = false - -[remote_write] -enable = false -write_interval = "30s""#, +enable_otlp_tracing = false"#, store_type, ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); From 70c2b45aebda86e2796a76bfda12a035c511e382 Mon Sep 17 00:00:00 2001 From: WUJingdi Date: Thu, 21 Dec 2023 10:22:04 +0800 Subject: [PATCH 3/4] chore: change name to system metric --- config/datanode.example.toml | 14 +++++++ config/frontend.example.toml | 14 +++++++ config/metasrv.example.toml | 14 +++++++ config/standalone.example.toml | 20 +++++----- src/cmd/src/frontend.rs | 2 +- src/cmd/src/standalone.rs | 12 +++--- src/common/telemetry/src/metric.rs | 1 - src/datanode/src/config.rs | 6 +-- src/datanode/src/datanode.rs | 12 +++--- src/frontend/src/frontend.rs | 6 +-- src/frontend/src/instance.rs | 12 +++--- src/frontend/src/instance/builder.rs | 2 +- src/meta-srv/src/bootstrap.rs | 13 +++---- src/meta-srv/src/metasrv.rs | 6 +-- src/servers/src/lib.rs | 2 +- .../{remote_writer.rs => system_metric.rs} | 39 ++++++++++++------- tests-integration/tests/http.rs | 6 ++- 17 files changed, 118 insertions(+), 63 deletions(-) rename src/servers/src/{remote_writer.rs => system_metric.rs} (75%) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index a0cc3601906e..db7777af0fcf 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -101,3 +101,17 @@ parallel_scan_channel_size = 32 # [logging] # dir = "/tmp/greptimedb/logs" # level = "info" + +# datanode export the metrics generated by itself +# encoded to Prometheus remote-write format +# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) +# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. +# [system_metric] +# whether enable export system_metric, default is false +# enable = false +# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# endpoint = "127.0.0.1:4000" +# The database name of exported metrics stores, user needs to specify a valid database +# db = "" +# The interval of export metric +# write_interval = "30s" diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 566ed42f9ecf..e828cee4f74e 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -77,3 +77,17 @@ tcp_nodelay = true timeout = "10s" connect_timeout = "10s" tcp_nodelay = true + +# frontend export the metrics generated by itself +# encoded to Prometheus remote-write format +# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) +# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. +# [system_metric] +# whether enable export system_metric, default is false +# enable = false +# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# endpoint = "127.0.0.1:4000" +# The database name of exported metrics stores, user needs to specify a valid database +# db = "" +# The interval of export metric +# write_interval = "30s" diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index aad33ce1afcf..3fd388390072 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -66,3 +66,17 @@ provider = "raft_engine" # num_partitions = 1 # Expected number of replicas of each partition. # replication_factor = 3 + +# metasrv export the metrics generated by itself +# encoded to Prometheus remote-write format +# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) +# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. +# [system_metric] +# whether enable export system_metric, default is false +# enable = false +# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# endpoint = "127.0.0.1:4000" +# The database name of exported metrics stores, user needs to specify a valid database +# db = "" +# The interval of export metric +# write_interval = "30s" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 1521aac44ebf..0f235059d7d8 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -178,14 +178,16 @@ parallel_scan_channel_size = 32 # The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0 # tracing_sample_ratio = 1.0 -# standalone/frontend/datanode/metasrv export the metrics generated by itself -# send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) -# [remote_write] -# whether enable export remote_write, default is false +# standalone export the metrics generated by itself +# encoded to Prometheus remote-write format +# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) +# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. +# [system_metric] +# whether enable export system_metric, default is false # enable = false -# The url of remote write endpoint. -# Taking greptimedb as an example, for `standalone` deployed under the default configuration. -# The user can create a database called `system` in the db and export the metric to `http://127.0.0.1:4000/v1/prometheus/write?db=system` -# endpoint = "http://127.0.0.1:4000/v1/prometheus/write?db=system" -# The interval of export metric, +# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# endpoint = "127.0.0.1:4000" +# The database name of exported metrics stores, user needs to specify a valid database +# db = "" +# The interval of export metric # write_interval = "30s" diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index beebf834f900..d6b1210104b1 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -250,7 +250,7 @@ impl StartCommand { .context(StartFrontendSnafu)?; instance - .build_remote_write_metric_task(&opts.remote_write) + .build_system_metric_task(&opts.system_metric) .context(StartFrontendSnafu)?; instance diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 91e546d67526..f88991377d21 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -45,7 +45,7 @@ use frontend::service_config::{ use mito2::config::MitoConfig; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; -use servers::remote_writer::RemoteWriteOptions; +use servers::system_metric::SystemMetricOption; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; @@ -113,7 +113,7 @@ pub struct StandaloneOptions { pub user_provider: Option, /// Options for different store engines. pub region_engine: Vec, - pub remote_write: RemoteWriteOptions, + pub system_metric: SystemMetricOption, } impl Default for StandaloneOptions { @@ -133,7 +133,7 @@ impl Default for StandaloneOptions { metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), logging: LoggingOptions::default(), - remote_write: RemoteWriteOptions::default(), + system_metric: SystemMetricOption::default(), user_provider: None, region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), @@ -157,8 +157,8 @@ impl StandaloneOptions { meta_client: None, logging: self.logging, user_provider: self.user_provider, - // Handle the remote write metric task run by standalone to frontend for execution - remote_write: self.remote_write, + // Handle the system metric task run by standalone to frontend for execution + system_metric: self.system_metric, ..Default::default() } } @@ -405,7 +405,7 @@ impl StartCommand { .context(StartFrontendSnafu)?; frontend - .build_remote_write_metric_task(&opts.frontend.remote_write) + .build_system_metric_task(&opts.frontend.system_metric) .context(StartFrontendSnafu)?; frontend diff --git a/src/common/telemetry/src/metric.rs b/src/common/telemetry/src/metric.rs index 9dc5f60171e9..666b0546503f 100644 --- a/src/common/telemetry/src/metric.rs +++ b/src/common/telemetry/src/metric.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -// metric stuffs, inspired by databend use greptime_proto::prometheus::remote::{Sample, TimeSeries}; use greptime_proto::prometheus::*; use prometheus::proto::{LabelPair, MetricFamily, MetricType}; diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 12ebc71a2d52..44fa5e7eb813 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -30,7 +30,7 @@ use secrecy::SecretString; use serde::{Deserialize, Serialize}; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; -use servers::remote_writer::RemoteWriteOptions; +use servers::system_metric::SystemMetricOption; use servers::Mode; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256); @@ -242,7 +242,7 @@ pub struct DatanodeOptions { pub region_engine: Vec, pub logging: LoggingOptions, pub enable_telemetry: bool, - pub remote_write: RemoteWriteOptions, + pub system_metric: SystemMetricOption, } impl Default for DatanodeOptions { @@ -267,7 +267,7 @@ impl Default for DatanodeOptions { logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::datanode_default(), enable_telemetry: true, - remote_write: RemoteWriteOptions::default(), + system_metric: SystemMetricOption::default(), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 2da4ee3e98f3..be7e5b7d416f 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -42,8 +42,8 @@ use query::QueryEngineFactory; use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; -use servers::remote_writer::RemoteWriteMetricTask; use servers::server::{start_server, ServerHandler, ServerHandlers}; +use servers::system_metric::SystemMetricTask; use servers::Mode; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; @@ -82,7 +82,7 @@ pub struct Datanode { greptimedb_telemetry_task: Arc, leases_notifier: Option>, plugins: Plugins, - remote_write_metric_task: Option, + system_metric_task: Option, } impl Datanode { @@ -94,7 +94,7 @@ impl Datanode { self.start_telemetry(); - if let Some(t) = self.remote_write_metric_task.as_ref() { + if let Some(t) = self.system_metric_task.as_ref() { t.start() } @@ -265,8 +265,8 @@ impl DatanodeBuilder { None }; - let remote_write_metric_task = - RemoteWriteMetricTask::try_new(&self.opts.remote_write, Some(&self.plugins)) + let system_metric_task = + SystemMetricTask::try_new(&self.opts.system_metric, Some(&self.plugins)) .context(StartServerSnafu)?; Ok(Datanode { @@ -277,7 +277,7 @@ impl DatanodeBuilder { region_event_receiver, leases_notifier, plugins: self.plugins.clone(), - remote_write_metric_task, + system_metric_task, }) } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index b1928df87819..d0dd0391a4eb 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -17,7 +17,7 @@ use meta_client::MetaClientOptions; use serde::{Deserialize, Serialize}; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; -use servers::remote_writer::RemoteWriteOptions; +use servers::system_metric::SystemMetricOption; use servers::Mode; use snafu::prelude::*; @@ -45,7 +45,7 @@ pub struct FrontendOptions { pub logging: LoggingOptions, pub datanode: DatanodeOptions, pub user_provider: Option, - pub remote_write: RemoteWriteOptions, + pub system_metric: SystemMetricOption, } impl Default for FrontendOptions { @@ -66,7 +66,7 @@ impl Default for FrontendOptions { logging: LoggingOptions::default(), datanode: DatanodeOptions::default(), user_provider: None, - remote_write: RemoteWriteOptions::default(), + system_metric: SystemMetricOption::default(), } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index e76dee112daa..23acc9de7f7a 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -65,8 +65,8 @@ use servers::query_handler::{ InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, PromStoreProtocolHandler, ScriptHandler, }; -use servers::remote_writer::{RemoteWriteMetricTask, RemoteWriteOptions}; use servers::server::{start_server, ServerHandlers}; +use servers::system_metric::{SystemMetricOption, SystemMetricTask}; use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::Dialect; @@ -118,7 +118,7 @@ pub struct Instance { heartbeat_task: Option, inserter: InserterRef, deleter: DeleterRef, - remote_write_metric_task: Option, + system_metric_task: Option, } impl Instance { @@ -196,9 +196,9 @@ impl Instance { Ok(()) } - pub fn build_remote_write_metric_task(&mut self, opts: &RemoteWriteOptions) -> Result<()> { - self.remote_write_metric_task = - RemoteWriteMetricTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?; + pub fn build_system_metric_task(&mut self, opts: &SystemMetricOption) -> Result<()> { + self.system_metric_task = + SystemMetricTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?; Ok(()) } @@ -231,7 +231,7 @@ impl FrontendInstance for Instance { self.script_executor.start(self)?; - if let Some(t) = self.remote_write_metric_task.as_ref() { + if let Some(t) = self.system_metric_task.as_ref() { t.start() } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 550f6fc4251b..afdee140f25d 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -144,7 +144,7 @@ impl FrontendBuilder { heartbeat_task: self.heartbeat_task, inserter, deleter, - remote_write_metric_task: None, + system_metric_task: None, }) } } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 2a8f47803461..7a354e67ae3e 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -28,8 +28,8 @@ use etcd_client::Client; use servers::configurator::ConfiguratorRef; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; -use servers::remote_writer::RemoteWriteMetricTask; use servers::server::Server; +use servers::system_metric::SystemMetricTask; use snafu::ResultExt; use tokio::net::TcpListener; use tokio::select; @@ -60,7 +60,7 @@ pub struct MetaSrvInstance { plugins: Plugins, - remote_write_metric_task: Option, + system_metric_task: Option, } impl MetaSrvInstance { @@ -77,23 +77,22 @@ impl MetaSrvInstance { ); // put meta_srv into plugins for later use plugins.insert::>(Arc::new(meta_srv.clone())); - let remote_write_metric_task = - RemoteWriteMetricTask::try_new(&opts.remote_write, Some(&plugins)) - .context(InitRemoteWriteMetricTaskSnafu)?; + let system_metric_task = SystemMetricTask::try_new(&opts.system_metric, Some(&plugins)) + .context(InitRemoteWriteMetricTaskSnafu)?; Ok(MetaSrvInstance { meta_srv, http_srv, opts, signal_sender: None, plugins, - remote_write_metric_task, + system_metric_task, }) } pub async fn start(&mut self) -> Result<()> { self.meta_srv.try_start().await?; - if let Some(t) = self.remote_write_metric_task.as_ref() { + if let Some(t) = self.system_metric_task.as_ref() { t.start() } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 44eff939f101..2a136a6729bb 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -33,7 +33,7 @@ use common_telemetry::logging::LoggingOptions; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; -use servers::remote_writer::RemoteWriteOptions; +use servers::system_metric::SystemMetricOption; use snafu::ResultExt; use table::metadata::TableId; use tokio::sync::broadcast::error::RecvError; @@ -73,7 +73,7 @@ pub struct MetaSrvOptions { pub enable_telemetry: bool, pub data_home: String, pub wal: WalConfig, - pub remote_write: RemoteWriteOptions, + pub system_metric: SystemMetricOption, } impl Default for MetaSrvOptions { @@ -99,7 +99,7 @@ impl Default for MetaSrvOptions { enable_telemetry: true, data_home: METASRV_HOME.to_string(), wal: WalConfig::default(), - remote_write: RemoteWriteOptions::default(), + system_metric: SystemMetricOption::default(), } } } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index c08003b4481d..7397164b5842 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -37,10 +37,10 @@ pub mod postgres; pub mod prom_store; pub mod prometheus_handler; pub mod query_handler; -pub mod remote_writer; mod row_writer; pub mod server; mod shutdown; +pub mod system_metric; pub mod tls; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] diff --git a/src/servers/src/remote_writer.rs b/src/servers/src/system_metric.rs similarity index 75% rename from src/servers/src/remote_writer.rs rename to src/servers/src/system_metric.rs index 79dac3742133..f7006af2f2c2 100644 --- a/src/servers/src/remote_writer.rs +++ b/src/servers/src/system_metric.rs @@ -29,31 +29,33 @@ use crate::prom_store::snappy_compress; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] -pub struct RemoteWriteOptions { +pub struct SystemMetricOption { pub enable: bool, pub endpoint: String, + pub db: String, #[serde(with = "humantime_serde")] pub write_interval: Duration, } -impl Default for RemoteWriteOptions { +impl Default for SystemMetricOption { fn default() -> Self { Self { enable: false, - endpoint: String::new(), + endpoint: "127.0.0.1:4000".to_string(), + db: String::new(), write_interval: Duration::from_secs(30), } } } #[derive(Default, Clone)] -pub struct RemoteWriteMetricTask { - config: RemoteWriteOptions, +pub struct SystemMetricTask { + config: SystemMetricOption, filter: Option, } -impl RemoteWriteMetricTask { - pub fn try_new(config: &RemoteWriteOptions, plugins: Option<&Plugins>) -> Result> { +impl SystemMetricTask { + pub fn try_new(config: &SystemMetricOption, plugins: Option<&Plugins>) -> Result> { if !config.enable { return Ok(None); } @@ -61,7 +63,13 @@ impl RemoteWriteMetricTask { ensure!( config.write_interval.as_secs() != 0, InvalidRemoteWriteConfigSnafu { - msg: "Expected Remote write write_interval greater than zero" + msg: "Expected System metric write_interval greater than zero" + } + ); + ensure!( + !config.db.is_empty(), + InvalidRemoteWriteConfigSnafu { + msg: "Expected System metric db not empty" } ); Ok(Some(Self { @@ -75,24 +83,27 @@ impl RemoteWriteMetricTask { } let mut interval = time::interval(self.config.write_interval); let sec = self.config.write_interval.as_secs(); - let endpoint = self.config.endpoint.clone(); + let endpoint = format!( + "http://{}/v1/prometheus/write?db={}", + self.config.endpoint, self.config.db + ); let filter = self.filter.clone(); let _handle = common_runtime::spawn_bg(async move { info!( - "Start remote write metric task to endpoint: {}, interval: {}s", + "Start system metric task to endpoint: {}, interval: {}s", endpoint, sec ); // Pass the first tick. Because the first tick completes immediately. interval.tick().await; loop { interval.tick().await; - match report_metric(&endpoint, filter.as_ref()).await { + match write_system_metric(&endpoint, filter.as_ref()).await { Ok(resp) => { if !resp.status().is_success() { - error!("report metric in remote write error, msg: {:#?}", resp); + error!("report system metric error, msg: {:#?}", resp); } } - Err(e) => error!("report metric in remote write failed, error {}", e), + Err(e) => error!("report system metric failed, error {}", e), }; } }); @@ -102,7 +113,7 @@ impl RemoteWriteMetricTask { /// Export the collected metrics, encode metrics into [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), /// and send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) specified by `url`. /// User could use `MetricFilter` to filter metric they don't want collect -pub async fn report_metric(url: &str, filter: Option<&MetricFilter>) -> Result { +pub async fn write_system_metric(url: &str, filter: Option<&MetricFilter>) -> Result { let metric_families = prometheus::gather(); let request = convert_metric_to_write_request( metric_families, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index ef2f6bb823e4..c8ffc91efc5b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -753,8 +753,9 @@ timeout = "10s" connect_timeout = "1s" tcp_nodelay = true -[frontend.remote_write] +[frontend.system_metric] enable = false +db = "" write_interval = "30s" [datanode] @@ -811,8 +812,9 @@ parallel_scan_channel_size = 32 [datanode.logging] enable_otlp_tracing = false -[datanode.remote_write] +[datanode.system_metric] enable = false +db = "" write_interval = "30s" [logging] From 308c5fb8eda5a047049a9f5329d4c4d963cf757f Mon Sep 17 00:00:00 2001 From: WUJingdi Date: Thu, 21 Dec 2023 18:26:43 +0800 Subject: [PATCH 4/4] fix: add header and rename to export metrics --- config/datanode.example.toml | 14 +-- config/frontend.example.toml | 14 +-- config/metasrv.example.toml | 14 +-- config/standalone.example.toml | 14 +-- src/cmd/src/frontend.rs | 2 +- src/cmd/src/standalone.rs | 12 +-- src/datanode/src/config.rs | 6 +- src/datanode/src/datanode.rs | 12 +-- src/frontend/src/frontend.rs | 6 +- src/frontend/src/instance.rs | 12 +-- src/frontend/src/instance/builder.rs | 2 +- src/meta-srv/src/bootstrap.rs | 14 +-- src/meta-srv/src/error.rs | 6 +- src/meta-srv/src/metasrv.rs | 6 +- src/servers/src/error.rs | 6 +- .../{system_metric.rs => export_metrics.rs} | 85 ++++++++++++++----- src/servers/src/lib.rs | 2 +- tests-integration/tests/http.rs | 8 +- 18 files changed, 146 insertions(+), 89 deletions(-) rename src/servers/src/{system_metric.rs => export_metrics.rs} (55%) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index db7777af0fcf..8c845778072f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -102,16 +102,18 @@ parallel_scan_channel_size = 32 # dir = "/tmp/greptimedb/logs" # level = "info" -# datanode export the metrics generated by itself +# Datanode export the metrics generated by itself # encoded to Prometheus remote-write format # and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) -# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. -# [system_metric] -# whether enable export system_metric, default is false +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false # enable = false -# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. # endpoint = "127.0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" -# The interval of export metric +# The interval of export metrics # write_interval = "30s" +# HTTP headers of Prometheus remote-write carry +# headers = {} diff --git a/config/frontend.example.toml b/config/frontend.example.toml index e828cee4f74e..37a31ba8799d 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -78,16 +78,18 @@ timeout = "10s" connect_timeout = "10s" tcp_nodelay = true -# frontend export the metrics generated by itself +# Frontend export the metrics generated by itself # encoded to Prometheus remote-write format # and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) -# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. -# [system_metric] -# whether enable export system_metric, default is false +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false # enable = false -# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. # endpoint = "127.0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" -# The interval of export metric +# The interval of export metrics # write_interval = "30s" +# HTTP headers of Prometheus remote-write carry +# headers = {} diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 3fd388390072..a6584e134871 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -67,16 +67,18 @@ provider = "raft_engine" # Expected number of replicas of each partition. # replication_factor = 3 -# metasrv export the metrics generated by itself +# Metasrv export the metrics generated by itself # encoded to Prometheus remote-write format # and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) -# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. -# [system_metric] -# whether enable export system_metric, default is false +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false # enable = false -# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. # endpoint = "127.0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" -# The interval of export metric +# The interval of export metrics # write_interval = "30s" +# HTTP headers of Prometheus remote-write carry +# headers = {} diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 0f235059d7d8..cdbf7b10cf82 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -178,16 +178,18 @@ parallel_scan_channel_size = 32 # The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0 # tracing_sample_ratio = 1.0 -# standalone export the metrics generated by itself +# Standalone export the metrics generated by itself # encoded to Prometheus remote-write format # and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) -# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. -# [system_metric] -# whether enable export system_metric, default is false +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false # enable = false -# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# The url of metrics export endpoint, default is `frontend` default HTTP endpoint. # endpoint = "127.0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" -# The interval of export metric +# The interval of export metrics # write_interval = "30s" +# HTTP headers of Prometheus remote-write carry +# headers = {} diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index d6b1210104b1..b1d12e8844bc 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -250,7 +250,7 @@ impl StartCommand { .context(StartFrontendSnafu)?; instance - .build_system_metric_task(&opts.system_metric) + .build_export_metrics_task(&opts.export_metrics) .context(StartFrontendSnafu)?; instance diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index f88991377d21..ba0ab2705a51 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -44,8 +44,8 @@ use frontend::service_config::{ }; use mito2::config::MitoConfig; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; -use servers::system_metric::SystemMetricOption; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; @@ -113,7 +113,7 @@ pub struct StandaloneOptions { pub user_provider: Option, /// Options for different store engines. pub region_engine: Vec, - pub system_metric: SystemMetricOption, + pub export_metrics: ExportMetricsOption, } impl Default for StandaloneOptions { @@ -133,7 +133,7 @@ impl Default for StandaloneOptions { metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), logging: LoggingOptions::default(), - system_metric: SystemMetricOption::default(), + export_metrics: ExportMetricsOption::default(), user_provider: None, region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), @@ -157,8 +157,8 @@ impl StandaloneOptions { meta_client: None, logging: self.logging, user_provider: self.user_provider, - // Handle the system metric task run by standalone to frontend for execution - system_metric: self.system_metric, + // Handle the export metrics task run by standalone to frontend for execution + export_metrics: self.export_metrics, ..Default::default() } } @@ -405,7 +405,7 @@ impl StartCommand { .context(StartFrontendSnafu)?; frontend - .build_system_metric_task(&opts.frontend.system_metric) + .build_export_metrics_task(&opts.frontend.export_metrics) .context(StartFrontendSnafu)?; frontend diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 44fa5e7eb813..086c1da30726 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -28,9 +28,9 @@ use meta_client::MetaClientOptions; use mito2::config::MitoConfig; use secrecy::SecretString; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; -use servers::system_metric::SystemMetricOption; use servers::Mode; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256); @@ -242,7 +242,7 @@ pub struct DatanodeOptions { pub region_engine: Vec, pub logging: LoggingOptions, pub enable_telemetry: bool, - pub system_metric: SystemMetricOption, + pub export_metrics: ExportMetricsOption, } impl Default for DatanodeOptions { @@ -267,7 +267,7 @@ impl Default for DatanodeOptions { logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::datanode_default(), enable_telemetry: true, - system_metric: SystemMetricOption::default(), + export_metrics: ExportMetricsOption::default(), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index be7e5b7d416f..f19450485d8d 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -39,11 +39,11 @@ use mito2::engine::MitoEngine; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; use query::QueryEngineFactory; +use servers::export_metrics::ExportMetricsTask; use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; use servers::server::{start_server, ServerHandler, ServerHandlers}; -use servers::system_metric::SystemMetricTask; use servers::Mode; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; @@ -82,7 +82,7 @@ pub struct Datanode { greptimedb_telemetry_task: Arc, leases_notifier: Option>, plugins: Plugins, - system_metric_task: Option, + export_metrics_task: Option, } impl Datanode { @@ -94,7 +94,7 @@ impl Datanode { self.start_telemetry(); - if let Some(t) = self.system_metric_task.as_ref() { + if let Some(t) = self.export_metrics_task.as_ref() { t.start() } @@ -265,8 +265,8 @@ impl DatanodeBuilder { None }; - let system_metric_task = - SystemMetricTask::try_new(&self.opts.system_metric, Some(&self.plugins)) + let export_metrics_task = + ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins)) .context(StartServerSnafu)?; Ok(Datanode { @@ -277,7 +277,7 @@ impl DatanodeBuilder { region_event_receiver, leases_notifier, plugins: self.plugins.clone(), - system_metric_task, + export_metrics_task, }) } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index d0dd0391a4eb..eddd0e73a1b6 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -15,9 +15,9 @@ use common_telemetry::logging::LoggingOptions; use meta_client::MetaClientOptions; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; -use servers::system_metric::SystemMetricOption; use servers::Mode; use snafu::prelude::*; @@ -45,7 +45,7 @@ pub struct FrontendOptions { pub logging: LoggingOptions, pub datanode: DatanodeOptions, pub user_provider: Option, - pub system_metric: SystemMetricOption, + pub export_metrics: ExportMetricsOption, } impl Default for FrontendOptions { @@ -66,7 +66,7 @@ impl Default for FrontendOptions { logging: LoggingOptions::default(), datanode: DatanodeOptions::default(), user_provider: None, - system_metric: SystemMetricOption::default(), + export_metrics: ExportMetricsOption::default(), } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 23acc9de7f7a..59e6a7b4b2df 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -55,6 +55,7 @@ use query::QueryEngineRef; use raft_engine::{Config, ReadableSize, RecoveryMode}; use servers::error as server_error; use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu}; +use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask}; use servers::interceptor::{ PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef, }; @@ -66,7 +67,6 @@ use servers::query_handler::{ PromStoreProtocolHandler, ScriptHandler, }; use servers::server::{start_server, ServerHandlers}; -use servers::system_metric::{SystemMetricOption, SystemMetricTask}; use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::Dialect; @@ -118,7 +118,7 @@ pub struct Instance { heartbeat_task: Option, inserter: InserterRef, deleter: DeleterRef, - system_metric_task: Option, + export_metrics_task: Option, } impl Instance { @@ -196,9 +196,9 @@ impl Instance { Ok(()) } - pub fn build_system_metric_task(&mut self, opts: &SystemMetricOption) -> Result<()> { - self.system_metric_task = - SystemMetricTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?; + pub fn build_export_metrics_task(&mut self, opts: &ExportMetricsOption) -> Result<()> { + self.export_metrics_task = + ExportMetricsTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?; Ok(()) } @@ -231,7 +231,7 @@ impl FrontendInstance for Instance { self.script_executor.start(self)?; - if let Some(t) = self.system_metric_task.as_ref() { + if let Some(t) = self.export_metrics_task.as_ref() { t.start() } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index afdee140f25d..15711f9a7b19 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -144,7 +144,7 @@ impl FrontendBuilder { heartbeat_task: self.heartbeat_task, inserter, deleter, - system_metric_task: None, + export_metrics_task: None, }) } } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 7a354e67ae3e..5e5361bf6a76 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -26,10 +26,10 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_telemetry::info; use etcd_client::Client; use servers::configurator::ConfiguratorRef; +use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; -use servers::system_metric::SystemMetricTask; use snafu::ResultExt; use tokio::net::TcpListener; use tokio::select; @@ -37,7 +37,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; -use crate::error::InitRemoteWriteMetricTaskSnafu; +use crate::error::InitExportMetricsTaskSnafu; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; use crate::metasrv::builder::MetaSrvBuilder; @@ -60,7 +60,7 @@ pub struct MetaSrvInstance { plugins: Plugins, - system_metric_task: Option, + export_metrics_task: Option, } impl MetaSrvInstance { @@ -77,22 +77,22 @@ impl MetaSrvInstance { ); // put meta_srv into plugins for later use plugins.insert::>(Arc::new(meta_srv.clone())); - let system_metric_task = SystemMetricTask::try_new(&opts.system_metric, Some(&plugins)) - .context(InitRemoteWriteMetricTaskSnafu)?; + let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins)) + .context(InitExportMetricsTaskSnafu)?; Ok(MetaSrvInstance { meta_srv, http_srv, opts, signal_sender: None, plugins, - system_metric_task, + export_metrics_task, }) } pub async fn start(&mut self) -> Result<()> { self.meta_srv.try_start().await?; - if let Some(t) = self.system_metric_task.as_ref() { + if let Some(t) = self.export_metrics_task.as_ref() { t.start() } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index a973bbb25e33..7fa9f7d217c3 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -196,8 +196,8 @@ pub enum Error { location: Location, source: servers::error::Error, }, - #[snafu(display("Failed to init remote write metric task"))] - InitRemoteWriteMetricTask { + #[snafu(display("Failed to init export metrics task"))] + InitExportMetricsTask { location: Location, source: servers::error::Error, }, @@ -656,7 +656,7 @@ impl ErrorExt for Error { | Error::ParseNum { .. } | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } - | Error::InitRemoteWriteMetricTask { .. } + | Error::InitExportMetricsTask { .. } | Error::InvalidHeartbeatRequest { .. } | Error::TooManyPartitions { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 2a136a6729bb..6b5bb8d7336f 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -32,8 +32,8 @@ use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; -use servers::system_metric::SystemMetricOption; use snafu::ResultExt; use table::metadata::TableId; use tokio::sync::broadcast::error::RecvError; @@ -73,7 +73,7 @@ pub struct MetaSrvOptions { pub enable_telemetry: bool, pub data_home: String, pub wal: WalConfig, - pub system_metric: SystemMetricOption, + pub export_metrics: ExportMetricsOption, } impl Default for MetaSrvOptions { @@ -99,7 +99,7 @@ impl Default for MetaSrvOptions { enable_telemetry: true, data_home: METASRV_HOME.to_string(), wal: WalConfig::default(), - system_metric: SystemMetricOption::default(), + export_metrics: ExportMetricsOption::default(), } } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index f25ffd582d6a..7a3be28de055 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -221,8 +221,8 @@ pub enum Error { error: reqwest::Error, }, - #[snafu(display("Invalid prometheus remote write config, msg: {}", msg))] - InvalidRemoteWriteConfig { msg: String, location: Location }, + #[snafu(display("Invalid export metrics config, msg: {}", msg))] + InvalidExportMetricsConfig { msg: String, location: Location }, #[snafu(display("Failed to compress prometheus remote request"))] CompressPromRemoteRequest { @@ -468,7 +468,7 @@ impl ErrorExt for Error { | CompressPromRemoteRequest { .. } | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } - | InvalidRemoteWriteConfig { .. } + | InvalidExportMetricsConfig { .. } | InvalidFlightTicket { .. } | InvalidPrepareStatement { .. } | DataFrame { .. } diff --git a/src/servers/src/system_metric.rs b/src/servers/src/export_metrics.rs similarity index 55% rename from src/servers/src/system_metric.rs rename to src/servers/src/export_metrics.rs index f7006af2f2c2..5a08f0a079e4 100644 --- a/src/servers/src/system_metric.rs +++ b/src/servers/src/export_metrics.rs @@ -12,71 +12,108 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::time::Duration; +use axum::http::HeaderValue; use common_base::Plugins; use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter}; use common_telemetry::{error, info}; use common_time::Timestamp; +use hyper::HeaderMap; use prost::Message; -use reqwest::Response; +use reqwest::header::HeaderName; +use reqwest::{Client, Response}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use tokio::time; -use crate::error::{InvalidRemoteWriteConfigSnafu, Result, SendPromRemoteRequestSnafu}; +use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu}; use crate::prom_store::snappy_compress; +/// Use to export the metrics generated by greptimedb, encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), +/// and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] -pub struct SystemMetricOption { +pub struct ExportMetricsOption { pub enable: bool, pub endpoint: String, pub db: String, #[serde(with = "humantime_serde")] pub write_interval: Duration, + pub headers: HashMap, } -impl Default for SystemMetricOption { +impl Default for ExportMetricsOption { fn default() -> Self { Self { enable: false, endpoint: "127.0.0.1:4000".to_string(), db: String::new(), write_interval: Duration::from_secs(30), + headers: HashMap::new(), } } } #[derive(Default, Clone)] -pub struct SystemMetricTask { - config: SystemMetricOption, +pub struct ExportMetricsTask { + config: ExportMetricsOption, filter: Option, + headers: HeaderMap, } -impl SystemMetricTask { - pub fn try_new(config: &SystemMetricOption, plugins: Option<&Plugins>) -> Result> { +impl ExportMetricsTask { + pub fn try_new( + config: &ExportMetricsOption, + plugins: Option<&Plugins>, + ) -> Result> { if !config.enable { return Ok(None); } let filter = plugins.map(|p| p.get::()).unwrap_or(None); ensure!( config.write_interval.as_secs() != 0, - InvalidRemoteWriteConfigSnafu { - msg: "Expected System metric write_interval greater than zero" + InvalidExportMetricsConfigSnafu { + msg: "Expected export metrics write_interval greater than zero" } ); ensure!( !config.db.is_empty(), - InvalidRemoteWriteConfigSnafu { - msg: "Expected System metric db not empty" + InvalidExportMetricsConfigSnafu { + msg: "Expected export metrics db not empty" } ); + // construct http header + let mut headers = reqwest::header::HeaderMap::with_capacity(config.headers.len()); + config.headers.iter().try_for_each(|(k, v)| { + let header = match TryInto::::try_into(k) { + Ok(header) => header, + Err(_) => { + return InvalidExportMetricsConfigSnafu { + msg: format!("Export metrics: invalid HTTP header name: {}", k), + } + .fail() + } + }; + match TryInto::::try_into(v) { + Ok(value) => headers.insert(header, value), + Err(_) => { + return InvalidExportMetricsConfigSnafu { + msg: format!("Export metrics: invalid HTTP header value: {}", v), + } + .fail() + } + }; + Ok(()) + })?; Ok(Some(Self { config: config.clone(), filter, + headers, })) } + pub fn start(&self) { if !self.config.enable { return; @@ -88,22 +125,26 @@ impl SystemMetricTask { self.config.endpoint, self.config.db ); let filter = self.filter.clone(); + let headers = self.headers.clone(); let _handle = common_runtime::spawn_bg(async move { info!( - "Start system metric task to endpoint: {}, interval: {}s", + "Start export metrics task to endpoint: {}, interval: {}s", endpoint, sec ); // Pass the first tick. Because the first tick completes immediately. interval.tick().await; + let client = reqwest::Client::new(); loop { interval.tick().await; - match write_system_metric(&endpoint, filter.as_ref()).await { + match write_system_metric(&client, &endpoint, filter.as_ref(), headers.clone()) + .await + { Ok(resp) => { if !resp.status().is_success() { - error!("report system metric error, msg: {:#?}", resp); + error!("report export metrics error, msg: {:#?}", resp); } } - Err(e) => error!("report system metric failed, error {}", e), + Err(e) => error!("report export metrics failed, error {}", e), }; } }); @@ -113,7 +154,12 @@ impl SystemMetricTask { /// Export the collected metrics, encode metrics into [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), /// and send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) specified by `url`. /// User could use `MetricFilter` to filter metric they don't want collect -pub async fn write_system_metric(url: &str, filter: Option<&MetricFilter>) -> Result { +pub async fn write_system_metric( + client: &Client, + url: &str, + filter: Option<&MetricFilter>, + headers: HeaderMap, +) -> Result { let metric_families = prometheus::gather(); let request = convert_metric_to_write_request( metric_families, @@ -121,13 +167,12 @@ pub async fn write_system_metric(url: &str, filter: Option<&MetricFilter>) -> Re Timestamp::current_millis().value(), ); // RemoteWrite format require compress by snappy - let content = snappy_compress(&request.encode_to_vec())?; - let client = reqwest::Client::new(); client .post(url) .header("X-Prometheus-Remote-Write-Version", "0.1.0") .header("Content-Type", "application/x-protobuf") - .body(content) + .headers(headers) + .body(snappy_compress(&request.encode_to_vec())?) .send() .await .context(SendPromRemoteRequestSnafu) diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 7397164b5842..0d3ff0f85998 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; pub mod configurator; pub mod error; +pub mod export_metrics; pub mod grpc; pub mod heartbeat_options; pub mod http; @@ -40,7 +41,6 @@ pub mod query_handler; mod row_writer; pub mod server; mod shutdown; -pub mod system_metric; pub mod tls; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c8ffc91efc5b..35a86bf854e8 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -753,11 +753,13 @@ timeout = "10s" connect_timeout = "1s" tcp_nodelay = true -[frontend.system_metric] +[frontend.export_metrics] enable = false db = "" write_interval = "30s" +[frontend.export_metrics.headers] + [datanode] mode = "standalone" node_id = 0 @@ -812,11 +814,13 @@ parallel_scan_channel_size = 32 [datanode.logging] enable_otlp_tracing = false -[datanode.system_metric] +[datanode.export_metrics] enable = false db = "" write_interval = "30s" +[datanode.export_metrics.headers] + [logging] enable_otlp_tracing = false"#, store_type,