diff --git a/Cargo.lock b/Cargo.lock index b938e3173462..12110a6c30f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4152,6 +4152,7 @@ dependencies = [ "futures", "humantime-serde", "lazy_static", + "log-query", "log-store", "meta-client", "opentelemetry-proto 0.5.0", @@ -6122,6 +6123,7 @@ dependencies = [ "chrono", "common-error", "common-macro", + "serde", "snafu 0.8.5", "table", ] @@ -8160,7 +8162,7 @@ dependencies = [ "rand", "ring 0.17.8", "rust_decimal", - "thiserror 2.0.4", + "thiserror 2.0.6", "tokio", "tokio-rustls 0.26.0", "tokio-util", @@ -9098,6 +9100,7 @@ dependencies = [ "humantime", "itertools 0.10.5", "lazy_static", + "log-query", "meter-core", "meter-macros", "num", @@ -10952,6 +10955,7 @@ dependencies = [ "json5", "jsonb", "lazy_static", + "log-query", "loki-api", "mime_guess", "mysql_async", @@ -12434,11 +12438,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.4" +version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f49a1853cf82743e3b7950f77e0f4d622ca36cf4317cba00c767838bac8d490" +checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47" dependencies = [ - "thiserror-impl 2.0.4", + "thiserror-impl 2.0.6", ] [[package]] @@ -12454,9 +12458,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.4" +version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8381894bb3efe0c4acac3ded651301ceee58a15d47c2e34885ed1908ad667061" +checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 7ab000c6bcdd..2dec3f7ecc60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -238,6 +238,7 @@ file-engine = { path = "src/file-engine" } flow = { path = "src/flow" } frontend = { path = "src/frontend", default-features = false } index = { path = "src/index" } +log-query = { path = "src/log-query" } log-store = { path = "src/log-store" } meta-client = { path = "src/meta-client" } meta-srv = { path = "src/meta-srv" } diff --git a/src/auth/src/permission.rs b/src/auth/src/permission.rs index cdd370c5cda8..6c33a766a6ae 100644 --- a/src/auth/src/permission.rs +++ b/src/auth/src/permission.rs @@ -25,6 +25,7 @@ pub enum PermissionReq<'a> { GrpcRequest(&'a Request), SqlStatement(&'a Statement), PromQuery, + LogQuery, Opentsdb, LineProtocol, PromStoreWrite, diff --git a/src/common/function/src/lib.rs b/src/common/function/src/lib.rs index 4a6a6844d548..1c718634dcee 100644 --- a/src/common/function/src/lib.rs +++ b/src/common/function/src/lib.rs @@ -26,3 +26,4 @@ pub mod function_registry; pub mod handlers; pub mod helper; pub mod state; +pub mod utils; diff --git a/src/common/function/src/scalars/matches.rs b/src/common/function/src/scalars/matches.rs index d3276f83a2a0..1bd9e8e1b5f1 100644 --- a/src/common/function/src/scalars/matches.rs +++ b/src/common/function/src/scalars/matches.rs @@ -204,20 +204,10 @@ impl PatternAst { fn convert_literal(column: &str, pattern: &str) -> Expr { logical_expr::col(column).like(logical_expr::lit(format!( "%{}%", - Self::escape_pattern(pattern) + crate::utils::escape_like_pattern(pattern) ))) } - fn escape_pattern(pattern: &str) -> String { - pattern - .chars() - .flat_map(|c| match c { - '\\' | '%' | '_' => vec!['\\', c], - _ => vec![c], - }) - .collect::() - } - /// Transform this AST with preset rules to make it correct. fn transform_ast(self) -> Result { self.transform_up(Self::collapse_binary_branch_fn) diff --git a/src/common/function/src/utils.rs b/src/common/function/src/utils.rs new file mode 100644 index 000000000000..f2c18d5f6c77 --- /dev/null +++ b/src/common/function/src/utils.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Escapes special characters in the provided pattern string for `LIKE`. +/// +/// Specifically, it prefixes the backslash (`\`), percent (`%`), and underscore (`_`) +/// characters with an additional backslash to ensure they are treated literally. +/// +/// # Examples +/// +/// ```rust +/// let escaped = escape_pattern("100%_some\\path"); +/// assert_eq!(escaped, "100\\%\\_some\\\\path"); +/// ``` +pub fn escape_like_pattern(pattern: &str) -> String { + pattern + .chars() + .flat_map(|c| match c { + '\\' | '%' | '_' => vec!['\\', c], + _ => vec![c], + }) + .collect::() +} +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_escape_like_pattern() { + assert_eq!( + escape_like_pattern("100%_some\\path"), + "100\\%\\_some\\\\path" + ); + assert_eq!(escape_like_pattern(""), ""); + assert_eq!(escape_like_pattern("hello"), "hello"); + assert_eq!(escape_like_pattern("\\%_"), "\\\\\\%\\_"); + assert_eq!(escape_like_pattern("%%__\\\\"), "\\%\\%\\_\\_\\\\\\\\"); + assert_eq!(escape_like_pattern("abc123"), "abc123"); + assert_eq!(escape_like_pattern("%_\\"), "\\%\\_\\\\"); + assert_eq!( + escape_like_pattern("%%__\\\\another%string"), + "\\%\\%\\_\\_\\\\\\\\another\\%string" + ); + assert_eq!(escape_like_pattern("foo%bar_"), "foo\\%bar\\_"); + assert_eq!(escape_like_pattern("\\_\\%"), "\\\\\\_\\\\\\%"); + } +} diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index e21819c568f2..5542c19b54e9 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -41,6 +41,7 @@ datafusion-expr.workspace = true datanode.workspace = true humantime-serde.workspace = true lazy_static.workspace = true +log-query.workspace = true log-store.workspace = true meta-client.workspace = true opentelemetry-proto.workspace = true diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index b22bde96e0ff..c304eece4206 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -16,6 +16,7 @@ pub mod builder; mod grpc; mod influxdb; mod log_handler; +mod logs; mod opentsdb; mod otlp; mod prom_store; @@ -64,8 +65,8 @@ use servers::prometheus_handler::PrometheusHandler; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ - InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, - PipelineHandler, PromStoreProtocolHandler, ScriptHandler, + InfluxdbLineProtocolHandler, LogQueryHandler, OpenTelemetryProtocolHandler, + OpentsdbProtocolHandler, PipelineHandler, PromStoreProtocolHandler, ScriptHandler, }; use servers::server::ServerHandlers; use session::context::QueryContextRef; @@ -99,6 +100,7 @@ pub trait FrontendInstance: + ScriptHandler + PrometheusHandler + PipelineHandler + + LogQueryHandler + Send + Sync + 'static diff --git a/src/frontend/src/instance/logs.rs b/src/frontend/src/instance/logs.rs new file mode 100644 index 000000000000..f10ea168ff10 --- /dev/null +++ b/src/frontend/src/instance/logs.rs @@ -0,0 +1,67 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; +use client::Output; +use common_error::ext::BoxedError; +use log_query::LogQuery; +use server_error::Result as ServerResult; +use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu}; +use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef}; +use servers::query_handler::LogQueryHandler; +use session::context::QueryContextRef; +use snafu::ResultExt; +use tonic::async_trait; + +use super::Instance; + +#[async_trait] +impl LogQueryHandler for Instance { + async fn query(&self, mut request: LogQuery, ctx: QueryContextRef) -> ServerResult { + let interceptor = self + .plugins + .get::>(); + + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::LogQuery) + .context(AuthSnafu)?; + + interceptor.as_ref().pre_query(&request, ctx.clone())?; + + request + .time_filter + .canonicalize() + .map_err(BoxedError::new) + .context(ExecuteQuerySnafu)?; + + let plan = self + .query_engine + .planner() + .plan_logs_query(request, ctx.clone()) + .await + .map_err(BoxedError::new) + .context(ExecuteQuerySnafu)?; + + let output = self + .statement_executor + .exec_plan(plan, ctx.clone()) + .await + .map_err(BoxedError::new) + .context(ExecuteQuerySnafu)?; + + Ok(interceptor.as_ref().post_query(output, ctx.clone())?) + } +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 115002c3aba9..ccc98593cadb 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -87,6 +87,7 @@ where let ingest_interceptor = self.plugins.get::>(); builder = builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor); + builder = builder.with_logs_handler(self.instance.clone()); if let Some(user_provider) = self.plugins.get::() { builder = builder.with_user_provider(user_provider); diff --git a/src/log-query/Cargo.toml b/src/log-query/Cargo.toml index 9e503470149f..5e8db345b00e 100644 --- a/src/log-query/Cargo.toml +++ b/src/log-query/Cargo.toml @@ -11,5 +11,6 @@ workspace = true chrono.workspace = true common-error.workspace = true common-macro.workspace = true +serde.workspace = true snafu.workspace = true table.workspace = true diff --git a/src/log-query/src/error.rs b/src/log-query/src/error.rs index d8ec39a936eb..26554e478163 100644 --- a/src/log-query/src/error.rs +++ b/src/log-query/src/error.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::Snafu; @@ -41,6 +42,15 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidTimeFilter { .. } + | Error::InvalidDateFormat { .. } + | Error::InvalidSpanFormat { .. } + | Error::EndBeforeStart { .. } => StatusCode::InvalidArguments, + } + } } pub type Result = std::result::Result; diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index c8719b125905..24ad3e622042 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -13,6 +13,7 @@ // limitations under the License. use chrono::{DateTime, Datelike, Duration, NaiveDate, NaiveTime, TimeZone, Utc}; +use serde::{Deserialize, Serialize}; use table::table_name::TableName; use crate::error::{ @@ -21,9 +22,10 @@ use crate::error::{ }; /// GreptimeDB's log query request. +#[derive(Debug, Serialize, Deserialize)] pub struct LogQuery { /// A fully qualified table name to query logs from. - pub table_name: TableName, + pub table: TableName, /// Specifies the time range for the log query. See [`TimeFilter`] for more details. pub time_filter: TimeFilter, /// Columns with filters to query. @@ -34,6 +36,18 @@ pub struct LogQuery { pub context: Context, } +impl Default for LogQuery { + fn default() -> Self { + Self { + table: TableName::new("", "", ""), + time_filter: Default::default(), + columns: vec![], + limit: None, + context: Default::default(), + } + } +} + /// Represents a time range for log query. /// /// This struct allows various formats to express a time range from the user side @@ -58,7 +72,7 @@ pub struct LogQuery { /// /// This struct doesn't require a timezone to be presented. When the timezone is not /// provided, it will fill the default timezone with the same rules akin to other queries. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct TimeFilter { pub start: Option, pub end: Option, @@ -69,8 +83,7 @@ impl TimeFilter { /// Validate and canonicalize the time filter. /// /// This function will try to fill the missing fields and convert all dates to timestamps - // false positive - #[allow(unused_assignments)] + #[allow(unused_assignments)] // false positive pub fn canonicalize(&mut self) -> Result<()> { let mut start_dt = None; let mut end_dt = None; @@ -209,6 +222,7 @@ impl TimeFilter { } /// Represents a column with filters to query. +#[derive(Debug, Serialize, Deserialize)] pub struct ColumnFilters { /// Case-sensitive column name to query. pub column_name: String, @@ -216,6 +230,7 @@ pub struct ColumnFilters { pub filters: Vec, } +#[derive(Debug, Serialize, Deserialize)] pub enum ContentFilter { /// Only match the exact content. /// @@ -234,13 +249,16 @@ pub enum ContentFilter { Compound(Vec, BinaryOperator), } +#[derive(Debug, Serialize, Deserialize)] pub enum BinaryOperator { And, Or, } /// Controls how many adjacent lines to return. +#[derive(Debug, Default, Serialize, Deserialize)] pub enum Context { + #[default] None, /// Specify the number of lines before and after the matched line separately. Lines(usize, usize), diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 8139ea3aafbb..130037fec562 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -46,6 +46,7 @@ greptime-proto.workspace = true humantime.workspace = true itertools.workspace = true lazy_static.workspace = true +log-query.workspace = true meter-core.workspace = true meter-macros.workspace = true object-store.workspace = true diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 435e9b4bcc9e..6e1fbfae0af8 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -17,6 +17,7 @@ #![feature(trait_upcasting)] #![feature(try_blocks)] #![feature(stmt_expr_attributes)] +#![feature(iterator_try_collect)] mod analyze; pub mod dataframe; @@ -25,6 +26,7 @@ pub mod dist_plan; pub mod dummy_catalog; pub mod error; pub mod executor; +pub mod log_query; pub mod metrics; mod optimizer; pub mod parser; diff --git a/src/query/src/log_query.rs b/src/query/src/log_query.rs new file mode 100644 index 000000000000..b44053b579d6 --- /dev/null +++ b/src/query/src/log_query.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod error; +pub mod planner; diff --git a/src/query/src/log_query/error.rs b/src/query/src/log_query/error.rs new file mode 100644 index 000000000000..9045d30b6805 --- /dev/null +++ b/src/query/src/log_query/error.rs @@ -0,0 +1,84 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use datafusion::error::DataFusionError; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("General catalog error"))] + Catalog { + #[snafu(implicit)] + location: Location, + source: catalog::error::Error, + }, + + #[snafu(display("Internal error during building DataFusion plan"))] + DataFusionPlanning { + #[snafu(source)] + error: datafusion::error::DataFusionError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unknown table type, downcast failed"))] + UnknownTable { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Cannot find time index column"))] + TimeIndexNotFound { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unimplemented feature: {}", feature))] + Unimplemented { + #[snafu(implicit)] + location: Location, + feature: String, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + Catalog { source, .. } => source.status_code(), + DataFusionPlanning { .. } => StatusCode::External, + UnknownTable { .. } | TimeIndexNotFound { .. } => StatusCode::Internal, + Unimplemented { .. } => StatusCode::Unsupported, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; + +impl From for DataFusionError { + fn from(err: Error) -> Self { + DataFusionError::External(Box::new(err)) + } +} diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs new file mode 100644 index 000000000000..b5d4e385fbcb --- /dev/null +++ b/src/query/src/log_query/planner.rs @@ -0,0 +1,371 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use catalog::table_source::DfTableSourceProvider; +use common_function::utils::escape_like_pattern; +use datafusion::datasource::DefaultTableSource; +use datafusion_common::ScalarValue; +use datafusion_expr::utils::conjunction; +use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}; +use datafusion_sql::TableReference; +use datatypes::schema::Schema; +use log_query::{ColumnFilters, LogQuery, TimeFilter}; +use snafu::{OptionExt, ResultExt}; +use table::table::adapter::DfTableProviderAdapter; + +use crate::log_query::error::{ + CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnimplementedSnafu, + UnknownTableSnafu, +}; + +const DEFAULT_LIMIT: usize = 1000; + +pub struct LogQueryPlanner { + table_provider: DfTableSourceProvider, +} + +impl LogQueryPlanner { + pub fn new(table_provider: DfTableSourceProvider) -> Self { + Self { table_provider } + } + + pub async fn query_to_plan(&mut self, query: LogQuery) -> Result { + // Resolve table + let table_ref: TableReference = query.table.table_ref().into(); + let table_source = self + .table_provider + .resolve_table(table_ref.clone()) + .await + .context(CatalogSnafu)?; + let schema = table_source + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table_provider + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table() + .schema(); + + // Build the initial scan plan + let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None) + .context(DataFusionPlanningSnafu)?; + + // Collect filter expressions + let mut filters = Vec::new(); + + // Time filter + filters.push(self.build_time_filter(&query.time_filter, &schema)?); + + // Column filters and projections + let mut projected_columns = Vec::new(); + for column_filter in &query.columns { + if let Some(expr) = self.build_column_filter(column_filter)? { + filters.push(expr); + } + projected_columns.push(col(&column_filter.column_name)); + } + + // Apply filters + if !filters.is_empty() { + let filter_expr = filters.into_iter().reduce(|a, b| a.and(b)).unwrap(); + plan_builder = plan_builder + .filter(filter_expr) + .context(DataFusionPlanningSnafu)?; + } + + // Apply projections + plan_builder = plan_builder + .project(projected_columns) + .context(DataFusionPlanningSnafu)?; + + // Apply limit + plan_builder = plan_builder + .limit(0, query.limit.or(Some(DEFAULT_LIMIT))) + .context(DataFusionPlanningSnafu)?; + + // Build the final plan + let plan = plan_builder.build().context(DataFusionPlanningSnafu)?; + + Ok(plan) + } + + fn build_time_filter(&self, time_filter: &TimeFilter, schema: &Schema) -> Result { + let timestamp_col = schema + .timestamp_column() + .with_context(|| TimeIndexNotFoundSnafu {})? + .name + .clone(); + + let start_time = ScalarValue::Utf8(time_filter.start.clone()); + let end_time = ScalarValue::Utf8( + time_filter + .end + .clone() + .or(Some("9999-12-31T23:59:59Z".to_string())), + ); + let expr = col(timestamp_col.clone()) + .gt_eq(lit(start_time)) + .and(col(timestamp_col).lt_eq(lit(end_time))); + + Ok(expr) + } + + /// Returns filter expressions + fn build_column_filter(&self, column_filter: &ColumnFilters) -> Result> { + if column_filter.filters.is_empty() { + return Ok(None); + } + + let exprs = column_filter + .filters + .iter() + .map(|filter| match filter { + log_query::ContentFilter::Exact(pattern) => Ok(col(&column_filter.column_name) + .like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern)))))), + log_query::ContentFilter::Prefix(pattern) => Ok(col(&column_filter.column_name) + .like(lit(ScalarValue::Utf8(Some(format!( + "{}%", + escape_like_pattern(pattern) + )))))), + log_query::ContentFilter::Postfix(pattern) => Ok(col(&column_filter.column_name) + .like(lit(ScalarValue::Utf8(Some(format!( + "%{}", + escape_like_pattern(pattern) + )))))), + log_query::ContentFilter::Contains(pattern) => Ok(col(&column_filter.column_name) + .like(lit(ScalarValue::Utf8(Some(format!( + "%{}%", + escape_like_pattern(pattern) + )))))), + log_query::ContentFilter::Regex(..) => Err::( + UnimplementedSnafu { + feature: "regex filter", + } + .build(), + ), + log_query::ContentFilter::Compound(..) => Err::( + UnimplementedSnafu { + feature: "compound filter", + } + .build(), + ), + }) + .try_collect::>()?; + + Ok(conjunction(exprs)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use catalog::memory::MemoryCatalogManager; + use catalog::RegisterTableRequest; + use common_catalog::consts::DEFAULT_CATALOG_NAME; + use common_query::test_util::DummyDecoder; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, SchemaRef}; + use log_query::{ContentFilter, Context}; + use session::context::QueryContext; + use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + use table::table_name::TableName; + use table::test_util::EmptyTable; + + use super::*; + + fn mock_schema() -> SchemaRef { + let columns = vec![ + ColumnSchema::new( + "message".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "host".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ]; + + Arc::new(Schema::new(columns)) + } + + /// Registers table under `greptime`, with `message` and `timestamp` and `host` columns. + async fn build_test_table_provider( + table_name_tuples: &[(String, String)], + ) -> DfTableSourceProvider { + let catalog_list = MemoryCatalogManager::with_default_setup(); + for (schema_name, table_name) in table_name_tuples { + let schema = mock_schema(); + let table_meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![2]) + .value_indices(vec![0]) + .next_column_id(1024) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .name(table_name.to_string()) + .meta(table_meta) + .build() + .unwrap(); + let table = EmptyTable::from_table_info(&table_info); + + catalog_list + .register_table_sync(RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: schema_name.to_string(), + table_name: table_name.to_string(), + table_id: 1024, + table, + }) + .unwrap(); + } + + DfTableSourceProvider::new( + catalog_list, + false, + QueryContext::arc(), + DummyDecoder::arc(), + false, + ) + } + + #[tokio::test] + async fn test_query_to_plan() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let mut planner = LogQueryPlanner::new(table_provider); + + let log_query = LogQuery { + table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"), + time_filter: TimeFilter { + start: Some("2021-01-01T00:00:00Z".to_string()), + end: Some("2021-01-02T00:00:00Z".to_string()), + span: None, + }, + columns: vec![ColumnFilters { + column_name: "message".to_string(), + filters: vec![ContentFilter::Contains("error".to_string())], + }], + limit: Some(100), + context: Context::None, + }; + + let plan = planner.query_to_plan(log_query).await.unwrap(); + let expected = "Limit: skip=0, fetch=100 [message:Utf8]\ +\n Projection: greptime.public.test_table.message [message:Utf8]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + + #[tokio::test] + async fn test_build_time_filter() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let planner = LogQueryPlanner::new(table_provider); + + let time_filter = TimeFilter { + start: Some("2021-01-01T00:00:00Z".to_string()), + end: Some("2021-01-02T00:00:00Z".to_string()), + span: None, + }; + + let expr = planner + .build_time_filter(&time_filter, &mock_schema()) + .unwrap(); + + let expected_expr = col("timestamp") + .gt_eq(lit(ScalarValue::Utf8(Some( + "2021-01-01T00:00:00Z".to_string(), + )))) + .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some( + "2021-01-02T00:00:00Z".to_string(), + ))))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[tokio::test] + async fn test_build_time_filter_without_end() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let planner = LogQueryPlanner::new(table_provider); + + let time_filter = TimeFilter { + start: Some("2021-01-01T00:00:00Z".to_string()), + end: None, + span: None, + }; + + let expr = planner + .build_time_filter(&time_filter, &mock_schema()) + .unwrap(); + + let expected_expr = col("timestamp") + .gt_eq(lit(ScalarValue::Utf8(Some( + "2021-01-01T00:00:00Z".to_string(), + )))) + .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some( + "9999-12-31T23:59:59Z".to_string(), + ))))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[tokio::test] + async fn test_build_column_filter() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let planner = LogQueryPlanner::new(table_provider); + + let column_filter = ColumnFilters { + column_name: "message".to_string(), + filters: vec![ + ContentFilter::Contains("error".to_string()), + ContentFilter::Prefix("WARN".to_string()), + ], + }; + + let expr_option = planner.build_column_filter(&column_filter).unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + + let expected_expr = col("message") + .like(lit(ScalarValue::Utf8(Some("%error%".to_string())))) + .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[test] + fn test_escape_pattern() { + assert_eq!(escape_like_pattern("test"), "test"); + assert_eq!(escape_like_pattern("te%st"), "te\\%st"); + assert_eq!(escape_like_pattern("te_st"), "te\\_st"); + assert_eq!(escape_like_pattern("te\\st"), "te\\\\st"); + } +} diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 29a0a364ea36..20377c67c034 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -24,6 +24,7 @@ use datafusion::execution::context::SessionState; use datafusion::sql::planner::PlannerContext; use datafusion_expr::{Expr as DfExpr, LogicalPlan}; use datafusion_sql::planner::{ParserOptions, SqlToRel}; +use log_query::LogQuery; use promql_parser::parser::EvalStmt; use session::context::QueryContextRef; use snafu::ResultExt; @@ -31,6 +32,7 @@ use sql::ast::Expr as SqlExpr; use sql::statements::statement::Statement; use crate::error::{DataFusionSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu}; +use crate::log_query::planner::LogQueryPlanner; use crate::parser::QueryStatement; use crate::promql::planner::PromPlanner; use crate::query_engine::{DefaultPlanDecoder, QueryEngineState}; @@ -41,6 +43,12 @@ use crate::{DfContextProviderAdapter, QueryEngineContext}; pub trait LogicalPlanner: Send + Sync { async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result; + async fn plan_logs_query( + &self, + query: LogQuery, + query_ctx: QueryContextRef, + ) -> Result; + fn optimize(&self, plan: LogicalPlan) -> Result; fn as_any(&self) -> &dyn Any; @@ -182,6 +190,34 @@ impl LogicalPlanner for DfLogicalPlanner { } } + async fn plan_logs_query( + &self, + query: LogQuery, + query_ctx: QueryContextRef, + ) -> Result { + let plan_decoder = Arc::new(DefaultPlanDecoder::new( + self.session_state.clone(), + &query_ctx, + )?); + let table_provider = DfTableSourceProvider::new( + self.engine_state.catalog_manager().clone(), + self.engine_state.disallow_cross_catalog_query(), + query_ctx, + plan_decoder, + self.session_state + .config_options() + .sql_parser + .enable_ident_normalization, + ); + + let mut planner = LogQueryPlanner::new(table_provider); + planner + .query_to_plan(query) + .await + .map_err(BoxedError::new) + .context(QueryPlanSnafu) + } + fn optimize(&self, plan: LogicalPlan) -> Result { self.optimize_logical_plan(plan) } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index a90fb880e20d..74adaffd5ea8 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -66,6 +66,7 @@ itertools.workspace = true json5 = "0.4" jsonb.workspace = true lazy_static.workspace = true +log-query.workspace = true loki-api = "0.1" mime_guess = "2.0" notify.workspace = true diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 9841f02d6ead..dd618f24a3f7 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -66,8 +66,8 @@ use crate::metrics_handler::MetricsHandler; use crate::prometheus_handler::PrometheusHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ - InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, - PipelineHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef, + InfluxdbLineProtocolHandlerRef, LogQueryHandlerRef, OpenTelemetryProtocolHandlerRef, + OpentsdbProtocolHandlerRef, PipelineHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef, }; use crate::server::Server; @@ -80,6 +80,7 @@ mod extractor; pub mod handler; pub mod header; pub mod influxdb; +pub mod logs; pub mod mem_prof; pub mod opentsdb; pub mod otlp; @@ -506,6 +507,17 @@ impl HttpServerBuilder { } } + pub fn with_logs_handler(self, logs_handler: LogQueryHandlerRef) -> Self { + let logs_router = HttpServer::route_logs(logs_handler); + + Self { + router: self + .router + .nest(&format!("/{HTTP_API_VERSION}"), logs_router), + ..self + } + } + pub fn with_opentsdb_handler(self, handler: OpentsdbProtocolHandlerRef) -> Self { Self { router: self.router.nest( @@ -770,6 +782,12 @@ impl HttpServer { .with_state(api_state) } + fn route_logs(log_handler: LogQueryHandlerRef) -> Router { + Router::new() + .route("/logs", routing::get(logs::logs).post(logs::logs)) + .with_state(log_handler) + } + /// Route Prometheus [HTTP API]. /// /// [HTTP API]: https://prometheus.io/docs/prometheus/latest/querying/api/ diff --git a/src/servers/src/http/logs.rs b/src/servers/src/http/logs.rs new file mode 100644 index 000000000000..0375865b31da --- /dev/null +++ b/src/servers/src/http/logs.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Instant; + +use axum::extract::State; +use axum::response::{IntoResponse, Response}; +use axum::{Extension, Json}; +use common_telemetry::tracing; +use log_query::LogQuery; +use session::context::{Channel, QueryContext}; + +use crate::http::result::greptime_result_v1::GreptimedbV1Response; +use crate::query_handler::LogQueryHandlerRef; + +#[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "logs"))] +pub async fn logs( + State(handler): State, + Extension(mut query_ctx): Extension, + Json(params): Json, +) -> Response { + let exec_start = Instant::now(); + let db = query_ctx.get_db_string(); + + query_ctx.set_channel(Channel::Http); + let query_ctx = Arc::new(query_ctx); + + let _timer = crate::metrics::METRIC_HTTP_LOGS_INGESTION_ELAPSED + .with_label_values(&[db.as_str()]) + .start_timer(); + + let output = handler.query(params, query_ctx).await; + let resp = GreptimedbV1Response::from_output(vec![output]).await; + + resp.with_execution_time(exec_start.elapsed().as_millis() as u64) + .into_response() +} diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index 241bbe1d0e66..a9f5a60765c9 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; use common_query::Output; use datafusion_expr::LogicalPlan; +use log_query::LogQuery; use query::parser::PromQuery; use serde_json::Value; use session::context::QueryContextRef; @@ -458,3 +459,54 @@ where } } } + +/// LogQueryInterceptor can track life cycle of a log query request +/// and customize or abort its execution at given point. +pub trait LogQueryInterceptor { + type Error: ErrorExt; + + /// Called before query is actually executed. + fn pre_query(&self, _query: &LogQuery, _query_ctx: QueryContextRef) -> Result<(), Self::Error> { + Ok(()) + } + + /// Called after execution finished. The implementation can modify the + /// output if needed. + fn post_query( + &self, + output: Output, + _query_ctx: QueryContextRef, + ) -> Result { + Ok(output) + } +} + +pub type LogQueryInterceptorRef = + Arc + Send + Sync + 'static>; + +impl LogQueryInterceptor for Option<&LogQueryInterceptorRef> +where + E: ErrorExt, +{ + type Error = E; + + fn pre_query(&self, query: &LogQuery, query_ctx: QueryContextRef) -> Result<(), Self::Error> { + if let Some(this) = self { + this.pre_query(query, query_ctx) + } else { + Ok(()) + } + } + + fn post_query( + &self, + output: Output, + query_ctx: QueryContextRef, + ) -> Result { + if let Some(this) = self { + this.post_query(output, query_ctx) + } else { + Ok(output) + } + } +} diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 87ab38dc8215..fe81fed6ced5 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -72,6 +72,14 @@ lazy_static! { vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); + /// Http logs query duration per database. + pub static ref METRIC_HTTP_LOGS_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_servers_http_logs_elapsed", + "servers http logs elapsed", + &[METRIC_DB_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] + ) + .unwrap(); pub static ref METRIC_AUTH_FAILURE: IntCounterVec = register_int_counter_vec!( "greptime_servers_auth_failure_count", "servers auth failure count", diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index ff92d3c5d15b..171590d55e15 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -34,6 +34,7 @@ use api::v1::RowInsertRequests; use async_trait::async_trait; use common_query::Output; use headers::HeaderValue; +use log_query::LogQuery; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; @@ -52,6 +53,7 @@ pub type PromStoreProtocolHandlerRef = Arc; pub type ScriptHandlerRef = Arc; pub type PipelineHandlerRef = Arc; +pub type LogQueryHandlerRef = Arc; #[async_trait] pub trait ScriptHandler { @@ -174,3 +176,9 @@ pub trait PipelineHandler { //// Build a pipeline from a string. fn build_pipeline(&self, pipeline: &str) -> Result>; } + +/// Handle log query requests. +#[async_trait] +pub trait LogQueryHandler { + async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result; +}