Skip to content

Commit

Permalink
fix: make config independent
Browse files Browse the repository at this point in the history
  • Loading branch information
Taylor-lagrange committed Dec 19, 2023
1 parent 5860d72 commit 338173f
Show file tree
Hide file tree
Showing 23 changed files with 251 additions and 108 deletions.
5 changes: 1 addition & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ parallel_scan_channel_size = 32

# standalone/frontend/datanode/metasrv export the metrics generated by itself
# send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`)
# [logging.remote_write]
# [remote_write]
# whether enable export remote_write, default is false
# enable = false
# The url of remote write endpoint.
Expand Down
7 changes: 7 additions & 0 deletions src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ pub enum Error {
source: common_recordbatch::error::Error,
},

#[snafu(display("Failed to init remote write metric task"))]
InitRemoteWriteMetricTask {
location: Location,
source: servers::error::Error,
},

#[snafu(display("Failed to pretty print Recordbatches"))]
PrettyPrintRecordBatches {
location: Location,
Expand Down Expand Up @@ -260,6 +266,7 @@ impl ErrorExt for Error {
| Error::NotDataFromOutput { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. }
| Error::InitRemoteWriteMetricTask { .. }
| Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments,

Error::StartProcedureManager { source, .. }
Expand Down
4 changes: 4 additions & 0 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

instance
.build_remote_write_metric_task(&opts.remote_write)
.context(StartFrontendSnafu)?;

instance
.build_servers(opts)
.await
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use frontend::error::{Result as FeResult, TomlFormatSnafu};
use frontend::frontend::{FrontendOptions, TomlSerializable};
use meta_srv::metasrv::MetaSrvOptions;
use serde::{Deserialize, Serialize};
use servers::remote_writer::RemoteWriteOptions;
use snafu::ResultExt;

use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu};
Expand All @@ -36,6 +37,7 @@ pub struct MixOptions {
pub frontend: FrontendOptions,
pub datanode: DatanodeOptions,
pub logging: LoggingOptions,
pub remote_write: RemoteWriteOptions,
}

impl From<MixOptions> for FrontendOptions {
Expand Down
21 changes: 18 additions & 3 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ use frontend::service_config::{
use mito2::config::MitoConfig;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::remote_writer::{RemoteWriteMetricTask, RemoteWriteOptions};
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;

use crate::error::{
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu,
InitRemoteWriteMetricTaskSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
use crate::options::{MixOptions, Options, TopLevelOptions};

Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct StandaloneOptions {
pub user_provider: Option<String>,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub remote_write: RemoteWriteOptions,
}

impl Default for StandaloneOptions {
Expand All @@ -125,6 +127,7 @@ impl Default for StandaloneOptions {
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
logging: LoggingOptions::default(),
remote_write: RemoteWriteOptions::default(),
user_provider: None,
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
Expand Down Expand Up @@ -169,6 +172,7 @@ pub struct Instance {
datanode: Datanode,
frontend: FeInstance,
procedure_manager: ProcedureManagerRef,
remote_write_metric_task: Option<RemoteWriteMetricTask>,
}

impl Instance {
Expand All @@ -180,6 +184,10 @@ impl Instance {
.await
.context(StartProcedureManagerSnafu)?;

if let Some(t) = self.remote_write_metric_task.as_ref() {
t.start()
}

self.frontend.start().await.context(StartFrontendSnafu)?;
Ok(())
}
Expand Down Expand Up @@ -309,6 +317,7 @@ impl StartCommand {
let procedure = opts.procedure.clone();
let frontend = opts.clone().frontend_options();
let logging = opts.logging.clone();
let remote_write = opts.remote_write.clone();
let datanode = opts.datanode_options();

Ok(Options::Standalone(Box::new(MixOptions {
Expand All @@ -318,6 +327,7 @@ impl StartCommand {
frontend,
datanode,
logging,
remote_write,
})))
}

Expand Down Expand Up @@ -364,6 +374,10 @@ impl StartCommand {
)
.await?;

let remote_write_metric_task =
RemoteWriteMetricTask::try_new(&opts.remote_write, Some(&fe_plugins))
.context(InitRemoteWriteMetricTaskSnafu)?;

let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(fe_plugins)
.try_build()
Expand All @@ -379,6 +393,7 @@ impl StartCommand {
datanode,
frontend,
procedure_manager,
remote_write_metric_task,
})
}

Expand Down
4 changes: 0 additions & 4 deletions src/common/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ deadlock_detection = ["parking_lot/deadlock_detection"]
[dependencies]
backtrace = "0.3"
common-error.workspace = true
common-time.workspace = true
console-subscriber = { version = "0.1", optional = true }
greptime-proto.workspace = true
humantime-serde.workspace = true
Expand All @@ -25,10 +24,7 @@ opentelemetry-semantic-conventions = "0.13.0"
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot = { version = "0.12" }
prometheus.workspace = true
prost.workspace = true
reqwest.workspace = true
serde.workspace = true
snap.workspace = true
tokio.workspace = true
tracing = "0.1"
tracing-appender = "0.2"
Expand Down
26 changes: 0 additions & 26 deletions src/common/telemetry/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! logging stuffs, inspired by databend
use std::env;
use std::sync::{Arc, Mutex, Once};
use std::time::Duration;

use once_cell::sync::Lazy;
use opentelemetry::{global, KeyValue};
Expand All @@ -32,7 +31,6 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{filter, EnvFilter, Registry};

use crate::metric::report_metric_task;
pub use crate::{debug, error, info, trace, warn};

const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
Expand All @@ -45,7 +43,6 @@ pub struct LoggingOptions {
pub enable_otlp_tracing: bool,
pub otlp_endpoint: Option<String>,
pub tracing_sample_ratio: Option<f64>,
pub remote_write: RemoteWriteOptions,
}

impl PartialEq for LoggingOptions {
Expand All @@ -68,26 +65,6 @@ impl Default for LoggingOptions {
enable_otlp_tracing: false,
otlp_endpoint: None,
tracing_sample_ratio: None,
remote_write: RemoteWriteOptions::default(),
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct RemoteWriteOptions {
pub enable: bool,
pub endpoint: String,
#[serde(with = "humantime_serde")]
pub write_interval: Duration,
}

impl Default for RemoteWriteOptions {
fn default() -> Self {
Self {
enable: false,
endpoint: String::new(),
write_interval: Duration::from_secs(30),
}
}
}
Expand Down Expand Up @@ -265,8 +242,5 @@ pub fn init_global_logging(
.expect("error setting global tracing subscriber");
}

// report metric base on config remote_write
report_metric_task(opts.remote_write.clone());

guards
}
83 changes: 20 additions & 63 deletions src/common/telemetry/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// metric stuffs, inspired by databend
use std::sync::Arc;

use common_time::Timestamp;
// metric stuffs, inspired by databend
use greptime_proto::prometheus::remote::{Sample, TimeSeries};
use greptime_proto::prometheus::*;
use prometheus::proto::{LabelPair, MetricFamily, MetricType};
use prometheus::{Encoder, TextEncoder};
use prost::Message;
use reqwest::blocking::Response;

use crate::logging::RemoteWriteOptions;

pub fn dump_metrics() -> Result<String, String> {
let mut buffer = Vec::new();
Expand All @@ -36,72 +32,35 @@ pub fn dump_metrics() -> Result<String, String> {

/// `MetricFilter` used in `report_metric_task`.
/// for metric user don't want collect, return a `false`, else return a `true`
pub type MetricFilter = Box<dyn Fn(&MetricFamily) -> bool>;
#[derive(Clone)]
pub struct MetricFilter {
inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>,
}

pub fn report_metric_task(config: RemoteWriteOptions) {
if !config.enable {
return;
impl MetricFilter {
pub fn new(inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>) -> Self {
Self { inner }
}
if config.write_interval.as_secs() == 0 {
panic!("Expected Remote write write_interval greater than zero")
pub fn filter(&self, mf: &MetricFamily) -> bool {
(self.inner)(mf)
}
let _ = std::thread::spawn(move || loop {
std::thread::sleep(config.write_interval);
match report_metric(&config.endpoint, None) {
Ok(resp) => {
if !resp.status().is_success() {
tracing::error!("report metric in remte write error, msg: {:#?}", resp);
}
}
Err(e) => tracing::error!("report metric in remte write failed, error {:#?}", e),
};
});
}

/// Export the collected metrics, encode metrics into [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/),
/// and send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) specified by `url`.
/// User could use `MetricFilter` to filter metric they don't want collect
/// This function write in sync way, **don't use in async function**
pub fn report_metric(url: &str, filter: Option<&MetricFilter>) -> Result<Response, String> {
let mut buffer = Vec::new();
let metric_families = prometheus::gather();
let request = convert_metric_to_write_request(
metric_families,
filter,
Timestamp::current_millis().value(),
);
request.encode(&mut buffer).map_err(|e| e.to_string())?;
// RemoteWrite format require compress by snappy
let mut compressor = snap::raw::Encoder::new();
let content = compressor
.compress_vec(&buffer)
.map_err(|e| e.to_string())?;
let client = reqwest::blocking::Client::new();
let resp = client
.post(url)
.header("X-Prometheus-Remote-Write-Version", "0.1.0")
.header("Content-Type", "application/x-protobuf")
.body(content)
.send()
.map_err(|e| e.to_string())?;
Ok(resp)
}

fn convert_metric_to_write_request(
pub fn convert_metric_to_write_request(
metric_families: Vec<MetricFamily>,
metric_filter: Option<&MetricFilter>,
default_timstamp: i64,
default_timestamp: i64,
) -> remote::WriteRequest {
let mut timeseries: Vec<TimeSeries> = Vec::with_capacity(metric_families.len());
for mf in metric_families {
if !metric_filter.map(|f| f(&mf)).unwrap_or(true) {
if !metric_filter.map(|f| f.filter(&mf)).unwrap_or(true) {
continue;
}
let mf_type = mf.get_field_type();
let mf_name = mf.get_name();
for m in mf.get_metric() {
let timestamp = if m.get_timestamp_ms() == 0 {
default_timstamp
default_timestamp
} else {
m.get_timestamp_ms()
};
Expand Down Expand Up @@ -266,6 +225,8 @@ fn convert_label(

#[cfg(test)]
mod test {
use std::sync::Arc;

use prometheus::core::Collector;
use prometheus::proto::{LabelPair, MetricFamily, MetricType};
use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts};
Expand Down Expand Up @@ -418,13 +379,9 @@ TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }],
let mut mf = counter_1.collect();
mf.append(&mut counter_2.collect());

let filter: MetricFilter = Box::new(|mf: &MetricFamily| {
if mf.get_name().starts_with("filter") {
false
} else {
true
}
});
let filter = MetricFilter::new(Arc::new(|mf: &MetricFamily| {
!mf.get_name().starts_with("filter")
}));
let write_quest1 = convert_metric_to_write_request(mf.clone(), None, 0);
let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0);
assert_eq!(
Expand Down
Loading

0 comments on commit 338173f

Please sign in to comment.