From 71c1c7ca240fad3fb2828fc7162b03628905c45a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 7 May 2024 17:07:16 +0800 Subject: [PATCH] fix: return metric name instead of query in Prometheus /series API (#3864) * fix: return metric name instead of query in Prometheus /series API Signed-off-by: Ruihang Xia * omit non-tag columns from result Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/servers/src/error.rs | 26 +++++-- src/servers/src/http/prometheus.rs | 110 ++++++++++++++++++++++++++--- tests-integration/tests/http.rs | 5 +- 3 files changed, 125 insertions(+), 16 deletions(-) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 303819c12a82..2c1da548a505 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -305,10 +305,25 @@ pub enum Error { }, #[snafu(display("Error accessing catalog"))] - CatalogError { source: catalog::error::Error }, + CatalogError { + source: catalog::error::Error, + location: Location, + }, #[snafu(display("Cannot find requested database: {}-{}", catalog, schema))] - DatabaseNotFound { catalog: String, schema: String }, + DatabaseNotFound { + catalog: String, + schema: String, + location: Location, + }, + + #[snafu(display("Cannot find requested table: {}-{}-{}", catalog, schema, table))] + TableNotFound { + catalog: String, + schema: String, + table: String, + location: Location, + }, #[cfg(feature = "mem-prof")] #[snafu(display("Failed to dump profile data"))] @@ -318,10 +333,10 @@ pub enum Error { }, #[snafu(display("Invalid prepare statement: {}", err_msg))] - InvalidPrepareStatement { err_msg: String }, + InvalidPrepareStatement { err_msg: String, location: Location }, #[snafu(display("Invalid flush argument: {}", err_msg))] - InvalidFlushArgument { err_msg: String }, + InvalidFlushArgument { err_msg: String, location: Location }, #[snafu(display("Failed to build gRPC reflection service"))] GrpcReflectionService { @@ -538,6 +553,9 @@ impl ErrorExt for Error { | InvalidAuthHeaderInvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader, DatabaseNotFound { .. } => StatusCode::DatabaseNotFound, + + TableNotFound { .. } => StatusCode::TableNotFound, + #[cfg(feature = "mem-prof")] DumpProfileData { source, .. } => source.status_code(), diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 0aa3b8abd6e2..c6440cc6504d 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -41,11 +41,12 @@ use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; use serde_json::Value; use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; -use snafu::{Location, ResultExt}; +use snafu::{Location, OptionExt, ResultExt}; pub use super::prometheus_resp::PrometheusJsonResponse; use crate::error::{ - CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, UnexpectedResultSnafu, + CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, TableNotFoundSnafu, + UnexpectedResultSnafu, }; use crate::http::header::collect_plan_metrics; use crate::prom_store::METRIC_NAME_LABEL; @@ -414,17 +415,41 @@ async fn get_all_column_names( async fn retrieve_series_from_query_result( result: Result, series: &mut Vec>, + query_ctx: &QueryContext, table_name: &str, + manager: &CatalogManagerRef, metrics: &mut HashMap, ) -> Result<()> { let result = result?; + + // fetch tag list + let table = manager + .table( + query_ctx.current_catalog(), + query_ctx.current_schema(), + table_name, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + catalog: query_ctx.current_catalog(), + schema: query_ctx.current_schema(), + table: table_name, + })?; + let tag_columns = table + .primary_key_columns() + .map(|c| c.name) + .collect::>(); + match result.data { - OutputData::RecordBatches(batches) => record_batches_to_series(batches, series, table_name), + OutputData::RecordBatches(batches) => { + record_batches_to_series(batches, series, table_name, &tag_columns) + } OutputData::Stream(stream) => { let batches = RecordBatches::try_collect(stream) .await .context(CollectRecordbatchSnafu)?; - record_batches_to_series(batches, series, table_name) + record_batches_to_series(batches, series, table_name, &tag_columns) } OutputData::AffectedRows(_) => Err(Error::UnexpectedResult { reason: "expected data result, but got affected rows".to_string(), @@ -468,8 +493,27 @@ fn record_batches_to_series( batches: RecordBatches, series: &mut Vec>, table_name: &str, + tag_columns: &HashSet, ) -> Result<()> { for batch in batches.iter() { + // project record batch to only contains tag columns + let projection = batch + .schema + .column_schemas() + .iter() + .enumerate() + .filter_map(|(idx, col)| { + if tag_columns.contains(&col.name) { + Some(idx) + } else { + None + } + }) + .collect::>(); + let batch = batch + .try_project(&projection) + .context(CollectRecordbatchSnafu)?; + for row in batch.rows() { let mut element: HashMap = row .iter() @@ -763,6 +807,50 @@ async fn retrieve_label_values_from_record_batch( Ok(()) } +/// Try to parse and extract the name of referenced metric from the promql query. +/// +/// Returns the metric name if a single metric is referenced, otherwise None. +fn retrieve_metric_name_from_promql(query: &str) -> Option { + let promql_expr = promql_parser::parser::parse(query).ok()?; + // promql_expr_to_metric_name(&promql_expr) + + struct MetricNameVisitor { + metric_name: Option, + } + + impl promql_parser::util::ExprVisitor for MetricNameVisitor { + type Error = (); + + fn pre_visit(&mut self, plan: &PromqlExpr) -> std::result::Result { + let query_metric_name = match plan { + PromqlExpr::VectorSelector(vs) => vs + .matchers + .find_matcher(promql_parser::label::METRIC_NAME) + .or_else(|| vs.name.clone()), + PromqlExpr::MatrixSelector(ms) => ms + .vs + .matchers + .find_matcher(promql_parser::label::METRIC_NAME) + .or_else(|| ms.vs.name.clone()), + _ => return Ok(true), + }; + + // set it to empty string if multiple metrics are referenced. + if self.metric_name.is_some() && query_metric_name.is_some() { + self.metric_name = Some(String::new()); + } else { + self.metric_name = query_metric_name.or_else(|| self.metric_name.clone()); + } + + Ok(true) + } + } + + let mut visitor = MetricNameVisitor { metric_name: None }; + promql_parser::util::walk_expr(&mut visitor, &promql_expr).ok()?; + visitor.metric_name +} + #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct SeriesQuery { start: Option, @@ -813,7 +901,7 @@ pub async fn series_query( let mut series = Vec::new(); let mut merge_map = HashMap::new(); for query in queries { - let table_name = query.clone(); + let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default(); let prom_query = PromQuery { query, start: start.clone(), @@ -824,9 +912,15 @@ pub async fn series_query( }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; - if let Err(err) = - retrieve_series_from_query_result(result, &mut series, &table_name, &mut merge_map) - .await + if let Err(err) = retrieve_series_from_query_result( + result, + &mut series, + &query_ctx, + &table_name, + &handler.catalog_manager(), + &mut merge_map, + ) + .await { return PrometheusJsonResponse::error(err.status_code().to_string(), err.output_msg()); } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index fe70b938abbd..1f2febc68576 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -492,7 +492,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { // series let res = client - .get("/v1/prometheus/api/v1/series?match[]=demo&start=0&end=0") + .get("/v1/prometheus/api/v1/series?match[]=demo{}&start=0&end=0") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -508,10 +508,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { .collect::>(); let expected = BTreeMap::from([ ("__name__".to_string(), "demo".to_string()), - ("ts".to_string(), "1970-01-01 00:00:00+0000".to_string()), - ("cpu".to_string(), "1.1".to_string()), ("host".to_string(), "host1".to_string()), - ("memory".to_string(), "2.2".to_string()), ]); assert_eq!(actual, expected);