diff --git a/Cargo.lock b/Cargo.lock index db8b5ef8ce4c..ce314d70b51e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2083,6 +2083,8 @@ name = "common-version" version = "0.7.1" dependencies = [ "build-data", + "schemars", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 54354000b451..5140c5729d71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -134,6 +134,7 @@ reqwest = { version = "0.11", default-features = false, features = [ ] } rskafka = "0.5" rust_decimal = "1.33" +schemars = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["float_roundtrip"] } serde_with = "3" diff --git a/src/common/version/Cargo.toml b/src/common/version/Cargo.toml index a766329843bf..1f7444e22b39 100644 --- a/src/common/version/Cargo.toml +++ b/src/common/version/Cargo.toml @@ -7,5 +7,10 @@ license.workspace = true [lints] workspace = true +[features] +codec = ["dep:serde", "dep:schemars"] + [dependencies] build-data = "0.1.4" +schemars = { workspace = true, optional = true } +serde = { workspace = true, optional = true } diff --git a/src/common/version/src/lib.rs b/src/common/version/src/lib.rs index b3cb124ffcb5..42bbeeb1e353 100644 --- a/src/common/version/src/lib.rs +++ b/src/common/version/src/lib.rs @@ -18,6 +18,11 @@ use std::sync::OnceLock; const UNKNOWN: &str = "unknown"; +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr( + feature = "codec", + derive(serde::Serialize, serde::Deserialize, schemars::JsonSchema) +)] pub struct BuildInfo { pub branch: Cow<'static, str>, pub commit: Cow<'static, str>, diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 5cfbdd546221..19f4a9f09d7b 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -41,6 +41,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true +common-version = { workspace = true, features = ["codec"] } dashmap.workspace = true datafusion.workspace = true datafusion-common.workspace = true @@ -83,7 +84,7 @@ rust-embed = { version = "6.6", features = ["debug-embed"] } rustls = "0.22" rustls-pemfile = "2.0" rustls-pki-types = "1.0" -schemars = "0.8" +schemars.workspace = true secrecy = { version = "0.8", features = ["serde", "alloc"] } serde.workspace = true serde_json.workspace = true diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index ef36dcf501ec..67debfd376dc 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -59,7 +59,8 @@ use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::influxdb_result_v1::InfluxdbV1Response; use crate::http::prometheus::{ - format_query, instant_query, label_values_query, labels_query, range_query, series_query, + build_info_query, format_query, instant_query, label_values_query, labels_query, range_query, + series_query, }; use crate::metrics::http_metrics_layer; use crate::metrics_handler::MetricsHandler; @@ -682,6 +683,7 @@ impl HttpServer { "/format_query", routing::post(format_query).get(format_query), ) + .route("/status/buildinfo", routing::get(build_info_query)) .route("/query", routing::post(instant_query).get(instant_query)) .route("/query_range", routing::post(range_query).get(range_query)) .route("/labels", routing::post(labels_query).get(labels_query)) diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 21e5b4c2ccd0..bb208fb1cee3 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -27,6 +27,7 @@ use common_query::{Output, OutputData}; use common_recordbatch::RecordBatches; use common_telemetry::tracing; use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; +use common_version::BuildInfo; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; use datatypes::vectors::{Float64Vector, StringVector}; @@ -51,21 +52,42 @@ use crate::http::header::collect_plan_metrics; use crate::prom_store::METRIC_NAME_LABEL; use crate::prometheus_handler::PrometheusHandlerRef; +/// For [ValueType::Vector] result type #[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] -pub struct PromSeries { +pub struct PromSeriesVector { pub metric: HashMap, - /// For [ValueType::Matrix] result type - pub values: Vec<(f64, String)>, - /// For [ValueType::Vector] result type #[serde(skip_serializing_if = "Option::is_none")] pub value: Option<(f64, String)>, } +/// For [ValueType::Matrix] result type +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] +pub struct PromSeriesMatrix { + pub metric: HashMap, + pub values: Vec<(f64, String)>, +} + +/// Variants corresponding to [ValueType] +#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq)] +#[serde(untagged)] +pub enum PromQueryResult { + Matrix(Vec), + Vector(Vec), + Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>), + String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>), +} + +impl Default for PromQueryResult { + fn default() -> Self { + PromQueryResult::Matrix(Default::default()) + } +} + #[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] pub struct PromData { #[serde(rename = "resultType")] pub result_type: String, - pub result: Vec, + pub result: PromQueryResult, } #[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq)] @@ -76,6 +98,7 @@ pub enum PrometheusResponse { Series(Vec>), LabelValues(Vec), FormatQuery(String), + BuildInfo(BuildInfo), } impl Default for PrometheusResponse { @@ -113,6 +136,19 @@ pub async fn format_query( } } +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct BuildInfoQuery {} + +#[axum_macros::debug_handler] +#[tracing::instrument( + skip_all, + fields(protocol = "prometheus", request_type = "build_info_query") +)] +pub async fn build_info_query() -> PrometheusJsonResponse { + let build_info = common_version::build_info().clone(); + PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info)) +} + #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct InstantQuery { query: Option, diff --git a/src/servers/src/http/prometheus_resp.rs b/src/servers/src/http/prometheus_resp.rs index 975fdc3bfc13..94d57ed65b5e 100644 --- a/src/servers/src/http/prometheus_resp.rs +++ b/src/servers/src/http/prometheus_resp.rs @@ -33,7 +33,9 @@ use serde_json::Value; use snafu::{OptionExt, ResultExt}; use super::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS}; -use super::prometheus::{PromData, PromSeries, PrometheusResponse}; +use super::prometheus::{ + PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse, +}; use crate::error::{CollectRecordbatchSnafu, InternalSnafu, Result}; #[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] @@ -256,24 +258,35 @@ impl PrometheusJsonResponse { } } - let result = buffer - .into_iter() - .map(|(tags, mut values)| { - let metric = tags.into_iter().collect(); - match result_type { - ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries { + // initialize result to return + let mut result = match result_type { + ValueType::Vector => PromQueryResult::Vector(vec![]), + ValueType::Matrix => PromQueryResult::Matrix(vec![]), + ValueType::Scalar => PromQueryResult::Scalar(None), + ValueType::String => PromQueryResult::String(None), + }; + + // accumulate data into result + buffer.into_iter().for_each(|(tags, mut values)| { + let metric = tags.into_iter().collect(); + match result { + PromQueryResult::Vector(ref mut v) => { + v.push(PromSeriesVector { metric, value: values.pop(), - ..Default::default() - }), - ValueType::Matrix => Ok(PromSeries { - metric, - values, - ..Default::default() - }), + }); } - }) - .collect::>>()?; + PromQueryResult::Matrix(ref mut v) => { + v.push(PromSeriesMatrix { metric, values }); + } + PromQueryResult::Scalar(ref mut v) => { + *v = values.pop(); + } + PromQueryResult::String(ref mut _v) => { + // TODO(ruihang): Not supported yet + } + } + }); let result_type_string = result_type.to_string(); let data = PrometheusResponse::PromData(PromData { diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 0d11456e5925..3298ead4e4a9 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -25,7 +25,10 @@ use common_catalog::consts::MITO_ENGINE; use common_query::Output; use common_recordbatch::RecordBatches; use servers::grpc::GrpcServerConfig; -use servers::http::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse}; +use servers::http::prometheus::{ + PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusJsonResponse, + PrometheusResponse, +}; use servers::server::Server; use tests_integration::test_util::{ setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType, @@ -465,6 +468,8 @@ pub async fn test_health_check(store_type: StorageType) { } pub async fn test_prom_gateway_query(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + // prepare connection let (addr, mut guard, fe_grpc_server) = setup_grpc_server(store_type, "prom_gateway").await; let grpc_client = Client::with_urls(vec![addr]); @@ -516,8 +521,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { status: "success".to_string(), data: PrometheusResponse::PromData(PromData { result_type: "vector".to_string(), - result: vec![ - PromSeries { + result: PromQueryResult::Vector(vec![ + PromSeriesVector { metric: [ ("k".to_string(), "a".to_string()), ("__name__".to_string(), "test".to_string()), @@ -525,9 +530,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { .into_iter() .collect(), value: Some((5.0, "2".to_string())), - ..Default::default() }, - PromSeries { + PromSeriesVector { metric: [ ("__name__".to_string(), "test".to_string()), ("k".to_string(), "b".to_string()), @@ -535,9 +539,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { .into_iter() .collect(), value: Some((5.0, "1".to_string())), - ..Default::default() }, - ], + ]), }), error: None, error_type: None, @@ -568,8 +571,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { status: "success".to_string(), data: PrometheusResponse::PromData(PromData { result_type: "matrix".to_string(), - result: vec![ - PromSeries { + result: PromQueryResult::Matrix(vec![ + PromSeriesMatrix { metric: [ ("__name__".to_string(), "test".to_string()), ("k".to_string(), "a".to_string()), @@ -577,9 +580,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { .into_iter() .collect(), values: vec![(5.0, "2".to_string()), (10.0, "2".to_string())], - ..Default::default() }, - PromSeries { + PromSeriesMatrix { metric: [ ("__name__".to_string(), "test".to_string()), ("k".to_string(), "b".to_string()), @@ -587,9 +589,8 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { .into_iter() .collect(), values: vec![(5.0, "1".to_string()), (10.0, "1".to_string())], - ..Default::default() }, - ], + ]), }), error: None, error_type: None, @@ -620,7 +621,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { status: "success".to_string(), data: PrometheusResponse::PromData(PromData { result_type: "matrix".to_string(), - result: vec![], + result: PromQueryResult::Matrix(vec![]), }), error: None, error_type: None, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 689eeeab8c41..b2b3230ca456 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -403,6 +403,22 @@ pub async fn test_prom_http_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); + // instant query 1+1 + let res = client + .get("/v1/prometheus/api/v1/query?query=1%2B1&time=1") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::( + json!({"resultType":"scalar","result":[1.0,"2"]}) + ) + .unwrap() + ); + // range query let res = client .get("/v1/prometheus/api/v1/query_range?query=up&start=1&end=100&step=5") @@ -539,6 +555,15 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert!(prom_resp.error.is_none()); assert!(prom_resp.error_type.is_none()); + // buildinfo + let res = client + .get("/v1/prometheus/api/v1/status/buildinfo") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + guard.remove_all().await; }