Skip to content

Commit

Permalink
fix: consider both db param and extended db header in Prometheus HTTP…
Browse files Browse the repository at this point in the history
… API (#3776)

* fix: consider both db param and extended db header in Prometheus HTTP API

Signed-off-by: Ruihang Xia <[email protected]>

* remove debug code

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Apr 23, 2024
1 parent 19a9035 commit f764fd5
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 10 deletions.
64 changes: 55 additions & 9 deletions src/servers/src/http/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::collections::{HashMap, HashSet};
use axum::extract::{Path, Query, State};
use axum::{Extension, Form};
use catalog::CatalogManagerRef;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
Expand All @@ -41,7 +40,7 @@ use schemars::JsonSchema;
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use snafu::{Location, ResultExt};

pub use super::prometheus_resp::PrometheusJsonResponse;
Expand Down Expand Up @@ -166,7 +165,7 @@ pub struct InstantQuery {
pub async fn instant_query(
State(handler): State<PrometheusHandlerRef>,
Query(params): Query<InstantQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Extension(mut query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<InstantQuery>,
) -> PrometheusJsonResponse {
// Extract time from query string, or use current server time if not specified.
Expand All @@ -185,6 +184,12 @@ pub async fn instant_query(
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
};

// update catalog and schema in query context if necessary
if let Some(db) = &params.db {
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema);
}

let result = handler.do_query(&prom_query, query_ctx).await;
let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
Expand Down Expand Up @@ -214,7 +219,7 @@ pub struct RangeQuery {
pub async fn range_query(
State(handler): State<PrometheusHandlerRef>,
Query(params): Query<RangeQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Extension(mut query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<RangeQuery>,
) -> PrometheusJsonResponse {
let prom_query = PromQuery {
Expand All @@ -228,6 +233,12 @@ pub async fn range_query(
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
};

// update catalog and schema in query context if necessary
if let Some(db) = &params.db {
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema);
}

let result = handler.do_query(&prom_query, query_ctx).await;
let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
Err(err) => {
Expand Down Expand Up @@ -294,8 +305,8 @@ pub async fn labels_query(
Extension(query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<LabelsQuery>,
) -> PrometheusJsonResponse {
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
let query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema);

let mut queries = params.matches.0;
if queries.is_empty() {
Expand Down Expand Up @@ -534,6 +545,35 @@ pub(crate) fn retrieve_metric_name_and_result_type(
Ok((metric_name, result_type))
}

/// Tries to get catalog and schema from an optional db param. And retrieves
/// them from [QueryContext] if they don't present.
pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
if let Some(db) = db {
parse_catalog_and_schema_from_db_string(db)
} else {
(
ctx.current_catalog().to_string(),
ctx.current_schema().to_string(),
)
}
}

/// Update catalog and schema in [QueryContext] if necessary.
pub(crate) fn try_update_catalog_schema(
ctx: QueryContextRef,
catalog: &str,
schema: &str,
) -> QueryContextRef {
if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
QueryContextBuilder::from_existing(&ctx)
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.build()
} else {
ctx
}
}

fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
match expr {
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(expr),
Expand Down Expand Up @@ -580,8 +620,8 @@ pub async fn label_values_query(
Extension(query_ctx): Extension<QueryContextRef>,
Query(params): Query<LabelValueQuery>,
) -> PrometheusJsonResponse {
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
let query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema);

if label_name == METRIC_NAME_LABEL {
let mut table_names = match handler
Expand Down Expand Up @@ -731,7 +771,7 @@ pub struct SeriesQuery {
pub async fn series_query(
State(handler): State<PrometheusHandlerRef>,
Query(params): Query<SeriesQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Extension(mut query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<SeriesQuery>,
) -> PrometheusJsonResponse {
let mut queries: Vec<String> = params.matches.0;
Expand All @@ -754,6 +794,12 @@ pub async fn series_query(
.or(form_params.lookback)
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());

// update catalog and schema in query context if necessary
if let Some(db) = &params.db {
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
query_ctx = try_update_catalog_schema(query_ctx, &catalog, &schema);
}

let mut series = Vec::new();
let mut merge_map = HashMap::new();
for query in queries {
Expand Down
12 changes: 12 additions & 0 deletions src/session/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ impl QueryContextBuilder {
.insert(key, value);
self
}

pub fn from_existing(context: &QueryContext) -> QueryContextBuilder {
QueryContextBuilder {
current_catalog: Some(context.current_catalog.clone()),
current_schema: Some(context.current_schema.clone()),
current_user: Some(context.current_user.load().clone().into()),
timezone: Some(context.timezone.load().clone().into()),
sql_dialect: Some(context.sql_dialect.clone()),
extension: Some(context.extension.clone()),
configuration_parameter: Some(context.configuration_parameter.clone()),
}
}
}

#[derive(Debug)]
Expand Down
30 changes: 29 additions & 1 deletion tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use serde_json::json;
use servers::http::error_result::ErrorResponse;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::handler::HealthResponse;
use servers::http::header::GREPTIME_TIMEZONE_HEADER_NAME;
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::test_helpers::TestClient;
Expand Down Expand Up @@ -543,6 +543,34 @@ pub async fn test_prom_http_api(store_type: StorageType) {
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);

// query an empty database should return nothing
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=demo&start=0&end=600")
.header(GREPTIME_DB_HEADER_NAME.clone(), "nonexistent")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!([])).unwrap()
);

// db header will be overrode by `db` in param
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=demo&start=0&end=600&db=public")
.header(GREPTIME_DB_HEADER_NAME.clone(), "nonexistent")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);

// multiple match[]
let res = client
.get("/v1/prometheus/api/v1/label/instance/values?match[]=up&match[]=system_metrics")
Expand Down

0 comments on commit f764fd5

Please sign in to comment.