Skip to content

Commit

Permalink
fix: return metric name instead of query in Prometheus /series API (#…
Browse files Browse the repository at this point in the history
…3864)

* fix: return metric name instead of query in Prometheus /series API

Signed-off-by: Ruihang Xia <[email protected]>

* omit non-tag columns from result

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored May 7, 2024
1 parent 1b58622 commit 71c1c7c
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 16 deletions.
26 changes: 22 additions & 4 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,25 @@ pub enum Error {
},

#[snafu(display("Error accessing catalog"))]
CatalogError { source: catalog::error::Error },
CatalogError {
source: catalog::error::Error,
location: Location,
},

#[snafu(display("Cannot find requested database: {}-{}", catalog, schema))]
DatabaseNotFound { catalog: String, schema: String },
DatabaseNotFound {
catalog: String,
schema: String,
location: Location,
},

#[snafu(display("Cannot find requested table: {}-{}-{}", catalog, schema, table))]
TableNotFound {
catalog: String,
schema: String,
table: String,
location: Location,
},

#[cfg(feature = "mem-prof")]
#[snafu(display("Failed to dump profile data"))]
Expand All @@ -318,10 +333,10 @@ pub enum Error {
},

#[snafu(display("Invalid prepare statement: {}", err_msg))]
InvalidPrepareStatement { err_msg: String },
InvalidPrepareStatement { err_msg: String, location: Location },

#[snafu(display("Invalid flush argument: {}", err_msg))]
InvalidFlushArgument { err_msg: String },
InvalidFlushArgument { err_msg: String, location: Location },

#[snafu(display("Failed to build gRPC reflection service"))]
GrpcReflectionService {
Expand Down Expand Up @@ -538,6 +553,9 @@ impl ErrorExt for Error {
| InvalidAuthHeaderInvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,

DatabaseNotFound { .. } => StatusCode::DatabaseNotFound,

TableNotFound { .. } => StatusCode::TableNotFound,

#[cfg(feature = "mem-prof")]
DumpProfileData { source, .. } => source.status_code(),

Expand Down
110 changes: 102 additions & 8 deletions src/servers/src/http/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use snafu::{Location, ResultExt};
use snafu::{Location, OptionExt, ResultExt};

pub use super::prometheus_resp::PrometheusJsonResponse;
use crate::error::{
CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, UnexpectedResultSnafu,
CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, TableNotFoundSnafu,
UnexpectedResultSnafu,
};
use crate::http::header::collect_plan_metrics;
use crate::prom_store::METRIC_NAME_LABEL;
Expand Down Expand Up @@ -414,17 +415,41 @@ async fn get_all_column_names(
async fn retrieve_series_from_query_result(
result: Result<Output>,
series: &mut Vec<HashMap<String, String>>,
query_ctx: &QueryContext,
table_name: &str,
manager: &CatalogManagerRef,
metrics: &mut HashMap<String, u64>,
) -> Result<()> {
let result = result?;

// fetch tag list
let table = manager
.table(
query_ctx.current_catalog(),
query_ctx.current_schema(),
table_name,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
catalog: query_ctx.current_catalog(),
schema: query_ctx.current_schema(),
table: table_name,
})?;
let tag_columns = table
.primary_key_columns()
.map(|c| c.name)
.collect::<HashSet<_>>();

match result.data {
OutputData::RecordBatches(batches) => record_batches_to_series(batches, series, table_name),
OutputData::RecordBatches(batches) => {
record_batches_to_series(batches, series, table_name, &tag_columns)
}
OutputData::Stream(stream) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
record_batches_to_series(batches, series, table_name)
record_batches_to_series(batches, series, table_name, &tag_columns)
}
OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
reason: "expected data result, but got affected rows".to_string(),
Expand Down Expand Up @@ -468,8 +493,27 @@ fn record_batches_to_series(
batches: RecordBatches,
series: &mut Vec<HashMap<String, String>>,
table_name: &str,
tag_columns: &HashSet<String>,
) -> Result<()> {
for batch in batches.iter() {
// project record batch to only contains tag columns
let projection = batch
.schema
.column_schemas()
.iter()
.enumerate()
.filter_map(|(idx, col)| {
if tag_columns.contains(&col.name) {
Some(idx)
} else {
None
}
})
.collect::<Vec<_>>();
let batch = batch
.try_project(&projection)
.context(CollectRecordbatchSnafu)?;

for row in batch.rows() {
let mut element: HashMap<String, String> = row
.iter()
Expand Down Expand Up @@ -763,6 +807,50 @@ async fn retrieve_label_values_from_record_batch(
Ok(())
}

/// Try to parse and extract the name of referenced metric from the promql query.
///
/// Returns the metric name if a single metric is referenced, otherwise None.
fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
let promql_expr = promql_parser::parser::parse(query).ok()?;
// promql_expr_to_metric_name(&promql_expr)

struct MetricNameVisitor {
metric_name: Option<String>,
}

impl promql_parser::util::ExprVisitor for MetricNameVisitor {
type Error = ();

fn pre_visit(&mut self, plan: &PromqlExpr) -> std::result::Result<bool, Self::Error> {
let query_metric_name = match plan {
PromqlExpr::VectorSelector(vs) => vs
.matchers
.find_matcher(promql_parser::label::METRIC_NAME)
.or_else(|| vs.name.clone()),
PromqlExpr::MatrixSelector(ms) => ms
.vs
.matchers
.find_matcher(promql_parser::label::METRIC_NAME)
.or_else(|| ms.vs.name.clone()),
_ => return Ok(true),
};

// set it to empty string if multiple metrics are referenced.
if self.metric_name.is_some() && query_metric_name.is_some() {
self.metric_name = Some(String::new());
} else {
self.metric_name = query_metric_name.or_else(|| self.metric_name.clone());
}

Ok(true)
}
}

let mut visitor = MetricNameVisitor { metric_name: None };
promql_parser::util::walk_expr(&mut visitor, &promql_expr).ok()?;
visitor.metric_name
}

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct SeriesQuery {
start: Option<String>,
Expand Down Expand Up @@ -813,7 +901,7 @@ pub async fn series_query(
let mut series = Vec::new();
let mut merge_map = HashMap::new();
for query in queries {
let table_name = query.clone();
let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
let prom_query = PromQuery {
query,
start: start.clone(),
Expand All @@ -824,9 +912,15 @@ pub async fn series_query(
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;

if let Err(err) =
retrieve_series_from_query_result(result, &mut series, &table_name, &mut merge_map)
.await
if let Err(err) = retrieve_series_from_query_result(
result,
&mut series,
&query_ctx,
&table_name,
&handler.catalog_manager(),
&mut merge_map,
)
.await
{
return PrometheusJsonResponse::error(err.status_code().to_string(), err.output_msg());
}
Expand Down
5 changes: 1 addition & 4 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {

// series
let res = client
.get("/v1/prometheus/api/v1/series?match[]=demo&start=0&end=0")
.get("/v1/prometheus/api/v1/series?match[]=demo{}&start=0&end=0")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
Expand All @@ -508,10 +508,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.collect::<BTreeMap<String, String>>();
let expected = BTreeMap::from([
("__name__".to_string(), "demo".to_string()),
("ts".to_string(), "1970-01-01 00:00:00+0000".to_string()),
("cpu".to_string(), "1.1".to_string()),
("host".to_string(), "host1".to_string()),
("memory".to_string(), "2.2".to_string()),
]);
assert_eq!(actual, expected);

Expand Down

0 comments on commit 71c1c7c

Please sign in to comment.