Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support not-equal matcher for PromQL metric names #5385

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ common-time.workspace = true
common-version.workspace = true
datafusion-expr.workspace = true
datanode.workspace = true
datatypes.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
log-query.workspace = true
Expand All @@ -47,6 +48,7 @@ operator.workspace = true
partition.workspace = true
pipeline.workspace = true
prometheus.workspace = true
promql-parser.workspace = true
prost.workspace = true
query.workspace = true
raft-engine.workspace = true
Expand Down
17 changes: 17 additions & 0 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to collect recordbatch"))]
CollectRecordbatch {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},

#[snafu(display("Failed to plan statement"))]
PlanStatement {
#[snafu(implicit)]
Expand Down Expand Up @@ -224,6 +231,13 @@ pub enum Error {
source: servers::error::Error,
},

#[snafu(display("Failed to create logical plan for prometheus metric names query"))]
PrometheusMetricNamesQueryPlan {
#[snafu(implicit)]
location: Location,
source: servers::error::Error,
},

#[snafu(display("Failed to describe schema for given statement"))]
DescribeStatement {
#[snafu(implicit)]
Expand Down Expand Up @@ -349,8 +363,11 @@ impl ErrorExt for Error {
Error::HandleHeartbeatResponse { source, .. } => source.status_code(),

Error::PromStoreRemoteQueryPlan { source, .. }
| Error::PrometheusMetricNamesQueryPlan { source, .. }
| Error::ExecutePromql { source, .. } => source.status_code(),

Error::CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,

Error::SqlExecIntercepted { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Error::ShutdownServer { source, .. } => source.status_code(),
Expand Down
13 changes: 13 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod logs;
mod opentsdb;
mod otlp;
mod prom_store;
mod promql;
mod region_query;
pub mod standalone;

Expand Down Expand Up @@ -47,6 +48,7 @@ use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
use promql_parser::label::Matcher;
use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
Expand Down Expand Up @@ -450,6 +452,17 @@ impl PrometheusHandler for Instance {
Ok(interceptor.post_execute(output, query_ctx)?)
}

async fn query_metric_names(
&self,
matchers: Vec<Matcher>,
ctx: &QueryContextRef,
) -> server_error::Result<Vec<String>> {
self.handle_query_metric_names(matchers, ctx)
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)
}

fn catalog_manager(&self) -> CatalogManagerRef {
self.catalog_manager.clone()
}
Expand Down
99 changes: 99 additions & 0 deletions src/frontend/src/instance/promql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use catalog::information_schema::TABLES;
use client::OutputData;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
use common_recordbatch::util;
use common_telemetry::tracing;
use datatypes::prelude::Value;
use promql_parser::label::Matcher;
use servers::prometheus;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

use crate::error::{
CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu, Result, TableNotFoundSnafu,
};
use crate::instance::Instance;

impl Instance {
/// Handles metric names query request, returns the names.
#[tracing::instrument(skip_all)]
pub(crate) async fn handle_query_metric_names(
&self,
matchers: Vec<Matcher>,
ctx: &QueryContextRef,
) -> Result<Vec<String>> {
let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
.with_label_values(&[ctx.get_db_string().as_str()])
.start_timer();

let table = self
.catalog_manager
.table(
DEFAULT_CATALOG_NAME,
INFORMATION_SCHEMA_NAME,
TABLES,
Some(ctx),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: "greptime.information_schema.tables",
})?;

let dataframe = self
.query_engine
.read_table(table)
.with_context(|_| ReadTableSnafu {
table_name: "greptime.information_schema.tables",
})?;

let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
.context(PrometheusMetricNamesQueryPlanSnafu)?;

let results = self
.query_engine
.execute(logical_plan, ctx.clone())
.await
.context(ExecLogicalPlanSnafu)?;

let batches = match results.data {
OutputData::Stream(stream) => util::collect(stream)
.await
.context(CollectRecordbatchSnafu)?,
OutputData::RecordBatches(rbs) => rbs.take(),
_ => unreachable!("should not happen"),
};
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());

for batch in batches {
// Only one column the results, ensured by `prometheus::metric_name_matchers_to_plan`.
let names = batch.column(0);

for i in 0..names.len() {
let Value::String(name) = names.get(i) else {
unreachable!();
};
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

results.push(name.into_string());
}
}

Ok(results)
}
}
7 changes: 7 additions & 0 deletions src/frontend/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ lazy_static! {
pub static ref GRPC_HANDLE_PROMQL_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED
.with_label_values(&["promql"]);

pub static ref PROMQL_QUERY_METRICS_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_frontend_promql_query_metrics_elapsed",
"frontend promql query metric names elapsed",
&["db"]
)
.unwrap();

/// The number of OpenTelemetry metrics send by frontend node.
pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_metrics_rows",
Expand Down
1 change: 1 addition & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ session.workspace = true
snafu.workspace = true
snap = "1"
sql.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio.workspace = true
Expand Down
Loading
Loading