Skip to content

Commit

Permalink
feat: logs query endpoint (GreptimeTeam#5202)
Browse files Browse the repository at this point in the history
* define endpoint

Signed-off-by: Ruihang Xia <[email protected]>

* planner

Signed-off-by: Ruihang Xia <[email protected]>

* update lock file

Signed-off-by: Ruihang Xia <[email protected]>

* add unit test

Signed-off-by: Ruihang Xia <[email protected]>

* fix toml format

Signed-off-by: Ruihang Xia <[email protected]>

* revert metric change

Signed-off-by: Ruihang Xia <[email protected]>

* Update src/query/src/log_query/planner.rs

Co-authored-by: Copilot <[email protected]>

* fix compile

Signed-off-by: Ruihang Xia <[email protected]>

* refactor and tests

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Copilot <[email protected]>
  • Loading branch information
2 people authored and evenyag committed Jan 3, 2025
1 parent f11fb40 commit 8973995
Show file tree
Hide file tree
Showing 25 changed files with 827 additions and 25 deletions.
16 changes: 10 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ file-engine = { path = "src/file-engine" }
flow = { path = "src/flow" }
frontend = { path = "src/frontend", default-features = false }
index = { path = "src/index" }
log-query = { path = "src/log-query" }
log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
Expand Down
1 change: 1 addition & 0 deletions src/auth/src/permission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub enum PermissionReq<'a> {
GrpcRequest(&'a Request),
SqlStatement(&'a Statement),
PromQuery,
LogQuery,
Opentsdb,
LineProtocol,
PromStoreWrite,
Expand Down
1 change: 1 addition & 0 deletions src/common/function/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ pub mod function_registry;
pub mod handlers;
pub mod helper;
pub mod state;
pub mod utils;
12 changes: 1 addition & 11 deletions src/common/function/src/scalars/matches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,10 @@ impl PatternAst {
fn convert_literal(column: &str, pattern: &str) -> Expr {
logical_expr::col(column).like(logical_expr::lit(format!(
"%{}%",
Self::escape_pattern(pattern)
crate::utils::escape_like_pattern(pattern)
)))
}

fn escape_pattern(pattern: &str) -> String {
pattern
.chars()
.flat_map(|c| match c {
'\\' | '%' | '_' => vec!['\\', c],
_ => vec![c],
})
.collect::<String>()
}

/// Transform this AST with preset rules to make it correct.
fn transform_ast(self) -> Result<Self> {
self.transform_up(Self::collapse_binary_branch_fn)
Expand Down
58 changes: 58 additions & 0 deletions src/common/function/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Escapes special characters in the provided pattern string for `LIKE`.
///
/// Specifically, it prefixes the backslash (`\`), percent (`%`), and underscore (`_`)
/// characters with an additional backslash to ensure they are treated literally.
///
/// # Examples
///
/// ```rust
/// let escaped = escape_pattern("100%_some\\path");
/// assert_eq!(escaped, "100\\%\\_some\\\\path");
/// ```
pub fn escape_like_pattern(pattern: &str) -> String {
pattern
.chars()
.flat_map(|c| match c {
'\\' | '%' | '_' => vec!['\\', c],
_ => vec![c],
})
.collect::<String>()
}
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_escape_like_pattern() {
assert_eq!(
escape_like_pattern("100%_some\\path"),
"100\\%\\_some\\\\path"
);
assert_eq!(escape_like_pattern(""), "");
assert_eq!(escape_like_pattern("hello"), "hello");
assert_eq!(escape_like_pattern("\\%_"), "\\\\\\%\\_");
assert_eq!(escape_like_pattern("%%__\\\\"), "\\%\\%\\_\\_\\\\\\\\");
assert_eq!(escape_like_pattern("abc123"), "abc123");
assert_eq!(escape_like_pattern("%_\\"), "\\%\\_\\\\");
assert_eq!(
escape_like_pattern("%%__\\\\another%string"),
"\\%\\%\\_\\_\\\\\\\\another\\%string"
);
assert_eq!(escape_like_pattern("foo%bar_"), "foo\\%bar\\_");
assert_eq!(escape_like_pattern("\\_\\%"), "\\\\\\_\\\\\\%");
}
}
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ datafusion-expr.workspace = true
datanode.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
log-query.workspace = true
log-store.workspace = true
meta-client.workspace = true
opentelemetry-proto.workspace = true
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod builder;
mod grpc;
mod influxdb;
mod log_handler;
mod logs;
mod opentsdb;
mod otlp;
mod prom_store;
Expand Down Expand Up @@ -64,8 +65,8 @@ use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PipelineHandler, PromStoreProtocolHandler, ScriptHandler,
InfluxdbLineProtocolHandler, LogQueryHandler, OpenTelemetryProtocolHandler,
OpentsdbProtocolHandler, PipelineHandler, PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::ServerHandlers;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -99,6 +100,7 @@ pub trait FrontendInstance:
+ ScriptHandler
+ PrometheusHandler
+ PipelineHandler
+ LogQueryHandler
+ Send
+ Sync
+ 'static
Expand Down
67 changes: 67 additions & 0 deletions src/frontend/src/instance/logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use log_query::LogQuery;
use server_error::Result as ServerResult;
use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu};
use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef};
use servers::query_handler::LogQueryHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use tonic::async_trait;

use super::Instance;

#[async_trait]
impl LogQueryHandler for Instance {
async fn query(&self, mut request: LogQuery, ctx: QueryContextRef) -> ServerResult<Output> {
let interceptor = self
.plugins
.get::<LogQueryInterceptorRef<server_error::Error>>();

self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::LogQuery)
.context(AuthSnafu)?;

interceptor.as_ref().pre_query(&request, ctx.clone())?;

request
.time_filter
.canonicalize()
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;

let plan = self
.query_engine
.planner()
.plan_logs_query(request, ctx.clone())
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;

let output = self
.statement_executor
.exec_plan(plan, ctx.clone())
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;

Ok(interceptor.as_ref().post_query(output, ctx.clone())?)
}
}
1 change: 1 addition & 0 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ where
let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
builder =
builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
builder = builder.with_logs_handler(self.instance.clone());

if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
builder = builder.with_user_provider(user_provider);
Expand Down
1 change: 1 addition & 0 deletions src/log-query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ workspace = true
chrono.workspace = true
common-error.workspace = true
common-macro.workspace = true
serde.workspace = true
snafu.workspace = true
table.workspace = true
10 changes: 10 additions & 0 deletions src/log-query/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::any::Any;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::Snafu;

Expand All @@ -41,6 +42,15 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}

fn status_code(&self) -> StatusCode {
match self {
Error::InvalidTimeFilter { .. }
| Error::InvalidDateFormat { .. }
| Error::InvalidSpanFormat { .. }
| Error::EndBeforeStart { .. } => StatusCode::InvalidArguments,
}
}
}

pub type Result<T> = std::result::Result<T, Error>;
Loading

0 comments on commit 8973995

Please sign in to comment.