From 663d1297de97bdff581d2a890c5a1eb5217bc468 Mon Sep 17 00:00:00 2001 From: WUJingdi Date: Thu, 21 Dec 2023 17:01:13 +0800 Subject: [PATCH] fix: add header and rename to export metrics --- config/datanode.example.toml | 14 ++-- config/frontend.example.toml | 14 ++-- config/metasrv.example.toml | 14 ++-- config/standalone.example.toml | 14 ++-- src/cmd/src/frontend.rs | 2 +- src/cmd/src/standalone.rs | 12 +-- src/datanode/src/config.rs | 6 +- src/datanode/src/datanode.rs | 12 +-- src/frontend/src/frontend.rs | 6 +- src/frontend/src/instance.rs | 12 +-- src/frontend/src/instance/builder.rs | 2 +- src/meta-srv/src/bootstrap.rs | 14 ++-- src/meta-srv/src/error.rs | 6 +- src/meta-srv/src/metasrv.rs | 6 +- src/servers/src/error.rs | 6 +- .../{system_metric.rs => export_metrics.rs} | 83 ++++++++++++++----- src/servers/src/lib.rs | 2 +- tests-integration/tests/http.rs | 8 +- 18 files changed, 144 insertions(+), 89 deletions(-) rename src/servers/src/{system_metric.rs => export_metrics.rs} (57%) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index db7777af0fcf..bab6c4893fac 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -102,16 +102,18 @@ parallel_scan_channel_size = 32 # dir = "/tmp/greptimedb/logs" # level = "info" -# datanode export the metrics generated by itself +# Datanode export the metrics generated by itself # encoded to Prometheus remote-write format # and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) -# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. -# [system_metric] -# whether enable export system_metric, default is false +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false # enable = false -# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# The url of metrics export endpoint, default is `frontend` default gRPC endpoint. # endpoint = "127.0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" -# The interval of export metric +# The interval of export metrics # write_interval = "30s" +# Http headers of Prometheus remote-write carry +# headers = {} diff --git a/config/frontend.example.toml b/config/frontend.example.toml index e828cee4f74e..2f535551369e 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -78,16 +78,18 @@ timeout = "10s" connect_timeout = "10s" tcp_nodelay = true -# frontend export the metrics generated by itself +# Frontend export the metrics generated by itself # encoded to Prometheus remote-write format # and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) -# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. -# [system_metric] -# whether enable export system_metric, default is false +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false # enable = false -# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# The url of metrics export endpoint, default is `frontend` default gRPC endpoint. # endpoint = "127.0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" -# The interval of export metric +# The interval of export metrics # write_interval = "30s" +# Http headers of Prometheus remote-write carry +# headers = {} diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 3fd388390072..104cf2406d1f 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -67,16 +67,18 @@ provider = "raft_engine" # Expected number of replicas of each partition. # replication_factor = 3 -# metasrv export the metrics generated by itself +# Metasrv export the metrics generated by itself # encoded to Prometheus remote-write format # and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) -# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. -# [system_metric] -# whether enable export system_metric, default is false +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false # enable = false -# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# The url of metrics export endpoint, default is `frontend` default gRPC endpoint. # endpoint = "127.0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" -# The interval of export metric +# The interval of export metrics # write_interval = "30s" +# Http headers of Prometheus remote-write carry +# headers = {} diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 0f235059d7d8..324d1cb150c4 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -178,16 +178,18 @@ parallel_scan_channel_size = 32 # The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0 # tracing_sample_ratio = 1.0 -# standalone export the metrics generated by itself +# Standalone export the metrics generated by itself # encoded to Prometheus remote-write format # and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself) -# This is only used for `greptimedb` to export its own metric internally. Please see `logging` option for normal export of metric. -# [system_metric] -# whether enable export system_metric, default is false +# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. +# [export_metrics] +# whether enable export metrics, default is false # enable = false -# The url of metric export endpoint, default is `greptimedb` default frontend endpoint +# The url of metrics export endpoint, default is `frontend` default gRPC endpoint. # endpoint = "127.0.0.1:4000" # The database name of exported metrics stores, user needs to specify a valid database # db = "" -# The interval of export metric +# The interval of export metrics # write_interval = "30s" +# Http headers of Prometheus remote-write carry +# headers = {} diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index d6b1210104b1..b1d12e8844bc 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -250,7 +250,7 @@ impl StartCommand { .context(StartFrontendSnafu)?; instance - .build_system_metric_task(&opts.system_metric) + .build_export_metrics_task(&opts.export_metrics) .context(StartFrontendSnafu)?; instance diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index f88991377d21..ba0ab2705a51 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -44,8 +44,8 @@ use frontend::service_config::{ }; use mito2::config::MitoConfig; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; -use servers::system_metric::SystemMetricOption; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; @@ -113,7 +113,7 @@ pub struct StandaloneOptions { pub user_provider: Option, /// Options for different store engines. pub region_engine: Vec, - pub system_metric: SystemMetricOption, + pub export_metrics: ExportMetricsOption, } impl Default for StandaloneOptions { @@ -133,7 +133,7 @@ impl Default for StandaloneOptions { metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), logging: LoggingOptions::default(), - system_metric: SystemMetricOption::default(), + export_metrics: ExportMetricsOption::default(), user_provider: None, region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), @@ -157,8 +157,8 @@ impl StandaloneOptions { meta_client: None, logging: self.logging, user_provider: self.user_provider, - // Handle the system metric task run by standalone to frontend for execution - system_metric: self.system_metric, + // Handle the export metrics task run by standalone to frontend for execution + export_metrics: self.export_metrics, ..Default::default() } } @@ -405,7 +405,7 @@ impl StartCommand { .context(StartFrontendSnafu)?; frontend - .build_system_metric_task(&opts.frontend.system_metric) + .build_export_metrics_task(&opts.frontend.export_metrics) .context(StartFrontendSnafu)?; frontend diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 44fa5e7eb813..086c1da30726 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -28,9 +28,9 @@ use meta_client::MetaClientOptions; use mito2::config::MitoConfig; use secrecy::SecretString; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; -use servers::system_metric::SystemMetricOption; use servers::Mode; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256); @@ -242,7 +242,7 @@ pub struct DatanodeOptions { pub region_engine: Vec, pub logging: LoggingOptions, pub enable_telemetry: bool, - pub system_metric: SystemMetricOption, + pub export_metrics: ExportMetricsOption, } impl Default for DatanodeOptions { @@ -267,7 +267,7 @@ impl Default for DatanodeOptions { logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::datanode_default(), enable_telemetry: true, - system_metric: SystemMetricOption::default(), + export_metrics: ExportMetricsOption::default(), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index be7e5b7d416f..f19450485d8d 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -39,11 +39,11 @@ use mito2::engine::MitoEngine; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; use query::QueryEngineFactory; +use servers::export_metrics::ExportMetricsTask; use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; use servers::server::{start_server, ServerHandler, ServerHandlers}; -use servers::system_metric::SystemMetricTask; use servers::Mode; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; @@ -82,7 +82,7 @@ pub struct Datanode { greptimedb_telemetry_task: Arc, leases_notifier: Option>, plugins: Plugins, - system_metric_task: Option, + export_metrics_task: Option, } impl Datanode { @@ -94,7 +94,7 @@ impl Datanode { self.start_telemetry(); - if let Some(t) = self.system_metric_task.as_ref() { + if let Some(t) = self.export_metrics_task.as_ref() { t.start() } @@ -265,8 +265,8 @@ impl DatanodeBuilder { None }; - let system_metric_task = - SystemMetricTask::try_new(&self.opts.system_metric, Some(&self.plugins)) + let export_metrics_task = + ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins)) .context(StartServerSnafu)?; Ok(Datanode { @@ -277,7 +277,7 @@ impl DatanodeBuilder { region_event_receiver, leases_notifier, plugins: self.plugins.clone(), - system_metric_task, + export_metrics_task, }) } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index d0dd0391a4eb..eddd0e73a1b6 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -15,9 +15,9 @@ use common_telemetry::logging::LoggingOptions; use meta_client::MetaClientOptions; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; -use servers::system_metric::SystemMetricOption; use servers::Mode; use snafu::prelude::*; @@ -45,7 +45,7 @@ pub struct FrontendOptions { pub logging: LoggingOptions, pub datanode: DatanodeOptions, pub user_provider: Option, - pub system_metric: SystemMetricOption, + pub export_metrics: ExportMetricsOption, } impl Default for FrontendOptions { @@ -66,7 +66,7 @@ impl Default for FrontendOptions { logging: LoggingOptions::default(), datanode: DatanodeOptions::default(), user_provider: None, - system_metric: SystemMetricOption::default(), + export_metrics: ExportMetricsOption::default(), } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 23acc9de7f7a..59e6a7b4b2df 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -55,6 +55,7 @@ use query::QueryEngineRef; use raft_engine::{Config, ReadableSize, RecoveryMode}; use servers::error as server_error; use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu}; +use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask}; use servers::interceptor::{ PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef, }; @@ -66,7 +67,6 @@ use servers::query_handler::{ PromStoreProtocolHandler, ScriptHandler, }; use servers::server::{start_server, ServerHandlers}; -use servers::system_metric::{SystemMetricOption, SystemMetricTask}; use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::Dialect; @@ -118,7 +118,7 @@ pub struct Instance { heartbeat_task: Option, inserter: InserterRef, deleter: DeleterRef, - system_metric_task: Option, + export_metrics_task: Option, } impl Instance { @@ -196,9 +196,9 @@ impl Instance { Ok(()) } - pub fn build_system_metric_task(&mut self, opts: &SystemMetricOption) -> Result<()> { - self.system_metric_task = - SystemMetricTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?; + pub fn build_export_metrics_task(&mut self, opts: &ExportMetricsOption) -> Result<()> { + self.export_metrics_task = + ExportMetricsTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?; Ok(()) } @@ -231,7 +231,7 @@ impl FrontendInstance for Instance { self.script_executor.start(self)?; - if let Some(t) = self.system_metric_task.as_ref() { + if let Some(t) = self.export_metrics_task.as_ref() { t.start() } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index afdee140f25d..15711f9a7b19 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -144,7 +144,7 @@ impl FrontendBuilder { heartbeat_task: self.heartbeat_task, inserter, deleter, - system_metric_task: None, + export_metrics_task: None, }) } } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 7a354e67ae3e..5e5361bf6a76 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -26,10 +26,10 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_telemetry::info; use etcd_client::Client; use servers::configurator::ConfiguratorRef; +use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; -use servers::system_metric::SystemMetricTask; use snafu::ResultExt; use tokio::net::TcpListener; use tokio::select; @@ -37,7 +37,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; -use crate::error::InitRemoteWriteMetricTaskSnafu; +use crate::error::InitExportMetricsTaskSnafu; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; use crate::metasrv::builder::MetaSrvBuilder; @@ -60,7 +60,7 @@ pub struct MetaSrvInstance { plugins: Plugins, - system_metric_task: Option, + export_metrics_task: Option, } impl MetaSrvInstance { @@ -77,22 +77,22 @@ impl MetaSrvInstance { ); // put meta_srv into plugins for later use plugins.insert::>(Arc::new(meta_srv.clone())); - let system_metric_task = SystemMetricTask::try_new(&opts.system_metric, Some(&plugins)) - .context(InitRemoteWriteMetricTaskSnafu)?; + let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins)) + .context(InitExportMetricsTaskSnafu)?; Ok(MetaSrvInstance { meta_srv, http_srv, opts, signal_sender: None, plugins, - system_metric_task, + export_metrics_task, }) } pub async fn start(&mut self) -> Result<()> { self.meta_srv.try_start().await?; - if let Some(t) = self.system_metric_task.as_ref() { + if let Some(t) = self.export_metrics_task.as_ref() { t.start() } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index a973bbb25e33..7fa9f7d217c3 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -196,8 +196,8 @@ pub enum Error { location: Location, source: servers::error::Error, }, - #[snafu(display("Failed to init remote write metric task"))] - InitRemoteWriteMetricTask { + #[snafu(display("Failed to init export metrics task"))] + InitExportMetricsTask { location: Location, source: servers::error::Error, }, @@ -656,7 +656,7 @@ impl ErrorExt for Error { | Error::ParseNum { .. } | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } - | Error::InitRemoteWriteMetricTask { .. } + | Error::InitExportMetricsTask { .. } | Error::InvalidHeartbeatRequest { .. } | Error::TooManyPartitions { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 2a136a6729bb..6b5bb8d7336f 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -32,8 +32,8 @@ use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; +use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; -use servers::system_metric::SystemMetricOption; use snafu::ResultExt; use table::metadata::TableId; use tokio::sync::broadcast::error::RecvError; @@ -73,7 +73,7 @@ pub struct MetaSrvOptions { pub enable_telemetry: bool, pub data_home: String, pub wal: WalConfig, - pub system_metric: SystemMetricOption, + pub export_metrics: ExportMetricsOption, } impl Default for MetaSrvOptions { @@ -99,7 +99,7 @@ impl Default for MetaSrvOptions { enable_telemetry: true, data_home: METASRV_HOME.to_string(), wal: WalConfig::default(), - system_metric: SystemMetricOption::default(), + export_metrics: ExportMetricsOption::default(), } } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index f25ffd582d6a..7a3be28de055 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -221,8 +221,8 @@ pub enum Error { error: reqwest::Error, }, - #[snafu(display("Invalid prometheus remote write config, msg: {}", msg))] - InvalidRemoteWriteConfig { msg: String, location: Location }, + #[snafu(display("Invalid export metrics config, msg: {}", msg))] + InvalidExportMetricsConfig { msg: String, location: Location }, #[snafu(display("Failed to compress prometheus remote request"))] CompressPromRemoteRequest { @@ -468,7 +468,7 @@ impl ErrorExt for Error { | CompressPromRemoteRequest { .. } | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } - | InvalidRemoteWriteConfig { .. } + | InvalidExportMetricsConfig { .. } | InvalidFlightTicket { .. } | InvalidPrepareStatement { .. } | DataFrame { .. } diff --git a/src/servers/src/system_metric.rs b/src/servers/src/export_metrics.rs similarity index 57% rename from src/servers/src/system_metric.rs rename to src/servers/src/export_metrics.rs index f7006af2f2c2..6e330b5d4872 100644 --- a/src/servers/src/system_metric.rs +++ b/src/servers/src/export_metrics.rs @@ -12,71 +12,106 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::time::Duration; +use axum::http::HeaderValue; use common_base::Plugins; use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter}; use common_telemetry::{error, info}; use common_time::Timestamp; +use hyper::HeaderMap; use prost::Message; -use reqwest::Response; +use reqwest::header::HeaderName; +use reqwest::{Client, Response}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use tokio::time; -use crate::error::{InvalidRemoteWriteConfigSnafu, Result, SendPromRemoteRequestSnafu}; +use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu}; use crate::prom_store::snappy_compress; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] -pub struct SystemMetricOption { +pub struct ExportMetricsOption { pub enable: bool, pub endpoint: String, pub db: String, #[serde(with = "humantime_serde")] pub write_interval: Duration, + pub headers: HashMap, } -impl Default for SystemMetricOption { +impl Default for ExportMetricsOption { fn default() -> Self { Self { enable: false, endpoint: "127.0.0.1:4000".to_string(), db: String::new(), write_interval: Duration::from_secs(30), + headers: HashMap::new(), } } } #[derive(Default, Clone)] -pub struct SystemMetricTask { - config: SystemMetricOption, +pub struct ExportMetricsTask { + config: ExportMetricsOption, filter: Option, + headers: HeaderMap, } -impl SystemMetricTask { - pub fn try_new(config: &SystemMetricOption, plugins: Option<&Plugins>) -> Result> { +impl ExportMetricsTask { + pub fn try_new( + config: &ExportMetricsOption, + plugins: Option<&Plugins>, + ) -> Result> { if !config.enable { return Ok(None); } let filter = plugins.map(|p| p.get::()).unwrap_or(None); ensure!( config.write_interval.as_secs() != 0, - InvalidRemoteWriteConfigSnafu { - msg: "Expected System metric write_interval greater than zero" + InvalidExportMetricsConfigSnafu { + msg: "Expected export metrics write_interval greater than zero" } ); ensure!( !config.db.is_empty(), - InvalidRemoteWriteConfigSnafu { - msg: "Expected System metric db not empty" + InvalidExportMetricsConfigSnafu { + msg: "Expected export metrics db not empty" } ); + // construct http header + let mut headers = reqwest::header::HeaderMap::with_capacity(config.headers.len()); + config.headers.iter().try_for_each(|(k, v)| { + let header = match TryInto::::try_into(k) { + Ok(header) => header, + Err(_) => { + return InvalidExportMetricsConfigSnafu { + msg: format!("Export metrics: invalid HTTP header name: {}", k), + } + .fail() + } + }; + match TryInto::::try_into(v) { + Ok(value) => headers.insert(header, value), + Err(_) => { + return InvalidExportMetricsConfigSnafu { + msg: format!("Export metrics: invalid HTTP header value: {}", v), + } + .fail() + } + }; + Ok(()) + })?; Ok(Some(Self { config: config.clone(), filter, + headers, })) } + pub fn start(&self) { if !self.config.enable { return; @@ -88,22 +123,26 @@ impl SystemMetricTask { self.config.endpoint, self.config.db ); let filter = self.filter.clone(); + let headers = self.headers.clone(); let _handle = common_runtime::spawn_bg(async move { info!( - "Start system metric task to endpoint: {}, interval: {}s", + "Start export metrics task to endpoint: {}, interval: {}s", endpoint, sec ); // Pass the first tick. Because the first tick completes immediately. interval.tick().await; + let client = reqwest::Client::new(); loop { interval.tick().await; - match write_system_metric(&endpoint, filter.as_ref()).await { + match write_system_metric(&client, &endpoint, filter.as_ref(), headers.clone()) + .await + { Ok(resp) => { if !resp.status().is_success() { - error!("report system metric error, msg: {:#?}", resp); + error!("report export metrics error, msg: {:#?}", resp); } } - Err(e) => error!("report system metric failed, error {}", e), + Err(e) => error!("report export metrics failed, error {}", e), }; } }); @@ -113,7 +152,12 @@ impl SystemMetricTask { /// Export the collected metrics, encode metrics into [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), /// and send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) specified by `url`. /// User could use `MetricFilter` to filter metric they don't want collect -pub async fn write_system_metric(url: &str, filter: Option<&MetricFilter>) -> Result { +pub async fn write_system_metric( + client: &Client, + url: &str, + filter: Option<&MetricFilter>, + headers: HeaderMap, +) -> Result { let metric_families = prometheus::gather(); let request = convert_metric_to_write_request( metric_families, @@ -121,13 +165,12 @@ pub async fn write_system_metric(url: &str, filter: Option<&MetricFilter>) -> Re Timestamp::current_millis().value(), ); // RemoteWrite format require compress by snappy - let content = snappy_compress(&request.encode_to_vec())?; - let client = reqwest::Client::new(); client .post(url) .header("X-Prometheus-Remote-Write-Version", "0.1.0") .header("Content-Type", "application/x-protobuf") - .body(content) + .headers(headers) + .body(snappy_compress(&request.encode_to_vec())?) .send() .await .context(SendPromRemoteRequestSnafu) diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 7397164b5842..0d3ff0f85998 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; pub mod configurator; pub mod error; +pub mod export_metrics; pub mod grpc; pub mod heartbeat_options; pub mod http; @@ -40,7 +41,6 @@ pub mod query_handler; mod row_writer; pub mod server; mod shutdown; -pub mod system_metric; pub mod tls; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c8ffc91efc5b..35a86bf854e8 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -753,11 +753,13 @@ timeout = "10s" connect_timeout = "1s" tcp_nodelay = true -[frontend.system_metric] +[frontend.export_metrics] enable = false db = "" write_interval = "30s" +[frontend.export_metrics.headers] + [datanode] mode = "standalone" node_id = 0 @@ -812,11 +814,13 @@ parallel_scan_channel_size = 32 [datanode.logging] enable_otlp_tracing = false -[datanode.system_metric] +[datanode.export_metrics] enable = false db = "" write_interval = "30s" +[datanode.export_metrics.headers] + [logging] enable_otlp_tracing = false"#, store_type,