Skip to content

Commit

Permalink
feat!: switch prom remote write to metric engine (GreptimeTeam#3198)
Browse files Browse the repository at this point in the history
* feat: switch prom remote write to metric engine

Signed-off-by: Ruihang Xia <[email protected]>

* Apply suggestions from code review

Co-authored-by: dennis zhuang <[email protected]>

* fix compile

Signed-off-by: Ruihang Xia <[email protected]>

* read physical table name from url

Signed-off-by: Ruihang Xia <[email protected]>

* remove physical table from header

Signed-off-by: Ruihang Xia <[email protected]>

* fix merge error

Signed-off-by: Ruihang Xia <[email protected]>

* fix format

Signed-off-by: Ruihang Xia <[email protected]>

* add with_metric_engine option to config remote write behavior

Signed-off-by: Ruihang Xia <[email protected]>

* check parameter

Signed-off-by: Ruihang Xia <[email protected]>

* add specific config param

Signed-off-by: Ruihang Xia <[email protected]>

* default with_metric_engine to true

Signed-off-by: Ruihang Xia <[email protected]>

* update UT

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
waynexia and killme2008 authored Jan 22, 2024
1 parent 6a12c27 commit 31787f4
Show file tree
Hide file tree
Showing 19 changed files with 380 additions and 53 deletions.
3 changes: 3 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ enable = true
# Prometheus remote storage options, see `standalone.example.toml`.
[prom_store]
enable = true
# Whether to store the data from Prometheus remote write in metric engine.
# true by default
with_metric_engine = true

# Metasrv client options, see `datanode.example.toml`.
[meta_client]
Expand Down
3 changes: 3 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ enable = true
[prom_store]
# Whether to enable Prometheus remote write and read in HTTP API, true by default.
enable = true
# Whether to store the data from Prometheus remote write in metric engine.
# true by default
with_metric_engine = true

[wal]
# Available wal providers:
Expand Down
2 changes: 2 additions & 0 deletions src/common/query/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ pub const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
pub const GREPTIME_VALUE: &str = "greptime_value";
/// Default counter column name for OTLP metrics.
pub const GREPTIME_COUNT: &str = "greptime_count";
/// Default physical table name
pub const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";
12 changes: 12 additions & 0 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@ impl Instance {
.context(TableOperationSnafu)
}

pub async fn handle_metric_row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
physical_table: String,
) -> Result<Output> {
self.inserter
.handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
.await
.context(TableOperationSnafu)
}

pub async fn handle_deletes(
&self,
requests: DeleteRequests,
Expand Down
46 changes: 38 additions & 8 deletions src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::prom_store::{self, Metrics};
use servers::query_handler::{
PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
Expand Down Expand Up @@ -153,18 +155,36 @@ impl Instance {

#[async_trait]
impl PromStoreProtocolHandler for Instance {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
async fn write(
&self,
request: WriteRequest,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> ServerResult<()> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;

let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?;
let _ = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
let _ = self
.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
} else {
let _ = self
.handle_row_inserts(requests, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
}

PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok(())
Expand Down Expand Up @@ -239,10 +259,20 @@ impl ExportMetricHandler {

#[async_trait]
impl PromStoreProtocolHandler for ExportMetricHandler {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
async fn write(
&self,
request: WriteRequest,
ctx: QueryContextRef,
_: bool,
) -> ServerResult<()> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?;
self.inserter
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.handle_metric_row_inserts(
requests,
ctx,
&self.statement_executor,
GREPTIME_PHYSICAL_TABLE.to_string(),
)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ impl Services {
let _ = http_server_builder
.with_prom_handler(instance.clone())
.with_prometheus_handler(instance.clone());
http_server_builder
.set_prom_store_with_metric_engine(opts.prom_store.with_metric_engine);
}

if opts.otlp.enable {
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/service_config/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct PromStoreOptions {
pub enable: bool,
pub with_metric_engine: bool,
}

impl Default for PromStoreOptions {
fn default() -> Self {
Self { enable: true }
Self {
enable: true,
with_metric_engine: true,
}
}
}

Expand All @@ -33,5 +37,6 @@ mod tests {
fn test_prom_store_options() {
let default = PromStoreOptions::default();
assert!(default.enable);
assert!(default.with_metric_engine)
}
}
132 changes: 124 additions & 8 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
use api::v1::{
AlterExpr, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest, RowInsertRequests,
AlterExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest,
RowInsertRequests, SemanticType,
};
use catalog::CatalogManagerRef;
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
Expand All @@ -35,6 +37,9 @@ use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use table::requests::InsertRequest as TableInsertRequest;
use table::table_reference::TableReference;
use table::TableRef;
Expand Down Expand Up @@ -95,7 +100,7 @@ impl Inserter {
});
validate_column_count_match(&requests)?;

self.create_or_alter_tables_on_demand(&requests, &ctx, statement_executor)
self.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor)
.await?;
let inserts = RowToRegion::new(
self.catalog_manager.as_ref(),
Expand All @@ -109,6 +114,44 @@ impl Inserter {
Ok(Output::AffectedRows(affected_rows as _))
}

/// Handle row inserts request with metric engine.
pub async fn handle_metric_row_inserts(
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
physical_table: String,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
req.rows
.as_ref()
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_column_count_match(&requests)?;

// check and create physical table
self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
.await?;

// check and create logical tables
self.create_or_alter_tables_on_demand(
&requests,
&ctx,
Some(physical_table.to_string()),
statement_executor,
)
.await?;
let inserts =
RowToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, &ctx)
.convert(requests)
.await?;

let affected_rows = self.do_request(inserts, &ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
}

pub async fn handle_table_insert(
&self,
request: TableInsertRequest,
Expand Down Expand Up @@ -206,9 +249,10 @@ impl Inserter {
&self,
requests: &RowInsertRequests,
ctx: &QueryContextRef,
on_physical_table: Option<String>,
statement_executor: &StatementExecutor,
) -> Result<()> {
// TODO(jeremy): create and alter in batch?
// TODO(jeremy): create and alter in batch? (from `handle_metric_row_inserts`)
for req in &requests.inserts {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
Expand All @@ -219,13 +263,76 @@ impl Inserter {
self.alter_table_on_demand(req, table, ctx, statement_executor)
.await?
}
None => self.create_table(req, ctx, statement_executor).await?,
None => {
self.create_table(req, ctx, &on_physical_table, statement_executor)
.await?
}
}
}

Ok(())
}

async fn create_physical_table_on_demand(
&self,
ctx: &QueryContextRef,
physical_table: String,
statement_executor: &StatementExecutor,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();

// check if exist
if self
.get_table(catalog_name, schema_name, &physical_table)
.await?
.is_some()
{
return Ok(());
}

let table_reference = TableReference::full(catalog_name, schema_name, &physical_table);
info!("Physical metric table `{table_reference}` does not exist, try creating table");

// schema with timestamp and field column
let default_schema = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
},
ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
datatype: ColumnDataType::Float64 as _,
semantic_type: SemanticType::Field as _,
datatype_extension: None,
},
];
let create_table_expr = &mut build_create_table_expr(&table_reference, &default_schema)?;

create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr
.table_options
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());

// create physical table
let res = statement_executor
.create_table_inner(create_table_expr, None)
.await;

match res {
Ok(_) => {
info!("Successfully created table {table_reference}",);
Ok(())
}
Err(err) => {
error!("Failed to create table {table_reference}: {err}",);
Err(err)
}
}
}

async fn get_table(
&self,
catalog: &str,
Expand Down Expand Up @@ -289,10 +396,14 @@ impl Inserter {
}
}

/// Create a table with schema from insert request.
///
/// To create a metric engine logical table, specify the `on_physical_table` parameter.
async fn create_table(
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
on_physical_table: &Option<String>,
statement_executor: &StatementExecutor,
) -> Result<()> {
let table_ref =
Expand All @@ -301,10 +412,15 @@ impl Inserter {
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;

info!(
"Table {}.{}.{} does not exist, try create table",
table_ref.catalog, table_ref.schema, table_ref.table,
);
if let Some(physical_table) = on_physical_table {
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr.table_options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_table.clone(),
);
}

info!("Table `{table_ref}` does not exist, try creating table",);

// TODO(weny): multiple regions table.
let res = statement_executor
Expand Down
8 changes: 7 additions & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ pub enum Error {

#[snafu(display("Missing query context"))]
MissingQueryContext { location: Location },

#[snafu(display(
"Invalid parameter, physical_table is not expected when metric engine is disabled"
))]
UnexpectedPhysicalTable { location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -488,7 +493,8 @@ impl ErrorExt for Error {
| UrlDecode { .. }
| IncompatibleSchema { .. }
| MissingQueryContext { .. }
| MysqlValueConversion { .. } => StatusCode::InvalidArguments,
| MysqlValueConversion { .. }
| UnexpectedPhysicalTable { .. } => StatusCode::InvalidArguments,

InfluxdbLinesWrite { source, .. }
| PromSeriesWrite { source, .. }
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/export_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub async fn write_system_metric_by_handler(
filter.as_ref(),
Timestamp::current_millis().value(),
);
if let Err(e) = handler.write(request, ctx.clone()).await {
if let Err(e) = handler.write(request, ctx.clone(), false).await {
error!("report export metrics by handler failed, error {}", e);
}
}
Expand Down
Loading

0 comments on commit 31787f4

Please sign in to comment.