Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: logs query endpoint #5202

Merged
merged 11 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ file-engine = { path = "src/file-engine" }
flow = { path = "src/flow" }
frontend = { path = "src/frontend", default-features = false }
index = { path = "src/index" }
log-query = { path = "src/log-query" }
log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
Expand Down
1 change: 1 addition & 0 deletions src/auth/src/permission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub enum PermissionReq<'a> {
GrpcRequest(&'a Request),
SqlStatement(&'a Statement),
PromQuery,
LogQuery,
Opentsdb,
LineProtocol,
PromStoreWrite,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ datafusion-expr.workspace = true
datanode.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
log-query.workspace = true
log-store.workspace = true
meta-client.workspace = true
opentelemetry-proto.workspace = true
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod builder;
mod grpc;
mod influxdb;
mod log_handler;
mod logs;
mod opentsdb;
mod otlp;
mod prom_store;
Expand Down Expand Up @@ -64,8 +65,8 @@ use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PipelineHandler, PromStoreProtocolHandler, ScriptHandler,
InfluxdbLineProtocolHandler, LogQueryHandler, OpenTelemetryProtocolHandler,
OpentsdbProtocolHandler, PipelineHandler, PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::ServerHandlers;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -99,6 +100,7 @@ pub trait FrontendInstance:
+ ScriptHandler
+ PrometheusHandler
+ PipelineHandler
+ LogQueryHandler
+ Send
+ Sync
+ 'static
Expand Down
73 changes: 73 additions & 0 deletions src/frontend/src/instance/logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use log_query::LogQuery;
use server_error::Result as ServerResult;
use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu};
use servers::interceptor::LogQueryInterceptorRef;
use servers::query_handler::LogQueryHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use tonic::async_trait;

use super::Instance;

#[async_trait]
impl LogQueryHandler for Instance {
async fn query(&self, mut request: LogQuery, ctx: QueryContextRef) -> ServerResult<Output> {
let interceptor = self
.plugins
.get::<LogQueryInterceptorRef<server_error::Error>>();

self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::LogQuery)
.context(AuthSnafu)?;

if let Some(interceptor) = &interceptor {
interceptor.pre_query(&request, ctx.clone())?;
}
waynexia marked this conversation as resolved.
Show resolved Hide resolved

request
.time_filter
.canonicalize()
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;

let plan = self
.query_engine
.planner()
.plan_logs_query(request, ctx.clone())
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;

let output = self
.statement_executor
.exec_plan(plan, ctx.clone())
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;

if let Some(interceptor) = &interceptor {
Ok(interceptor.post_query(output, ctx.clone())?)
} else {
Ok(output)
}
waynexia marked this conversation as resolved.
Show resolved Hide resolved
}
}
1 change: 1 addition & 0 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ where
let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
builder =
builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
builder = builder.with_logs_handler(self.instance.clone());

if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
builder = builder.with_user_provider(user_provider);
Expand Down
1 change: 1 addition & 0 deletions src/log-query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ workspace = true
chrono.workspace = true
common-error.workspace = true
common-macro.workspace = true
serde.workspace = true
snafu.workspace = true
table.workspace = true
10 changes: 10 additions & 0 deletions src/log-query/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::any::Any;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::Snafu;

Expand All @@ -41,6 +42,15 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}

fn status_code(&self) -> StatusCode {
match self {
Error::InvalidTimeFilter { .. }
| Error::InvalidDateFormat { .. }
| Error::InvalidSpanFormat { .. }
| Error::EndBeforeStart { .. } => StatusCode::InvalidArguments,
}
}
}

pub type Result<T> = std::result::Result<T, Error>;
26 changes: 22 additions & 4 deletions src/log-query/src/log_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use chrono::{DateTime, Datelike, Duration, NaiveDate, NaiveTime, TimeZone, Utc};
use serde::{Deserialize, Serialize};
use table::table_name::TableName;

use crate::error::{
Expand All @@ -21,9 +22,10 @@ use crate::error::{
};

/// GreptimeDB's log query request.
#[derive(Debug, Serialize, Deserialize)]
pub struct LogQuery {
/// A fully qualified table name to query logs from.
pub table_name: TableName,
pub table: TableName,
/// Specifies the time range for the log query. See [`TimeFilter`] for more details.
pub time_filter: TimeFilter,
/// Columns with filters to query.
Expand All @@ -34,6 +36,18 @@ pub struct LogQuery {
pub context: Context,
}

impl Default for LogQuery {
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
fn default() -> Self {
Self {
table: TableName::new("", "", ""),
time_filter: Default::default(),
columns: vec![],
limit: None,
context: Default::default(),
}
}
}

shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
/// Represents a time range for log query.
///
/// This struct allows various formats to express a time range from the user side
Expand All @@ -58,7 +72,7 @@ pub struct LogQuery {
///
/// This struct doesn't require a timezone to be presented. When the timezone is not
/// provided, it will fill the default timezone with the same rules akin to other queries.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TimeFilter {
pub start: Option<String>,
pub end: Option<String>,
Expand All @@ -69,8 +83,7 @@ impl TimeFilter {
/// Validate and canonicalize the time filter.
///
/// This function will try to fill the missing fields and convert all dates to timestamps
// false positive
#[allow(unused_assignments)]
#[allow(unused_assignments)] // false positive
pub fn canonicalize(&mut self) -> Result<()> {
let mut start_dt = None;
let mut end_dt = None;
Expand Down Expand Up @@ -209,13 +222,15 @@ impl TimeFilter {
}

/// Represents a column with filters to query.
#[derive(Debug, Serialize, Deserialize)]
pub struct ColumnFilters {
/// Case-sensitive column name to query.
pub column_name: String,
/// Filters to apply to the column. Can be empty.
pub filters: Vec<ContentFilter>,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum ContentFilter {
/// Only match the exact content.
///
Expand All @@ -234,13 +249,16 @@ pub enum ContentFilter {
Compound(Vec<ContentFilter>, BinaryOperator),
}

#[derive(Debug, Serialize, Deserialize)]
pub enum BinaryOperator {
And,
Or,
}

/// Controls how many adjacent lines to return.
#[derive(Debug, Default, Serialize, Deserialize)]
pub enum Context {
#[default]
None,
/// Specify the number of lines before and after the matched line separately.
Lines(usize, usize),
Expand Down
1 change: 1 addition & 0 deletions src/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ greptime-proto.workspace = true
humantime.workspace = true
itertools.workspace = true
lazy_static.workspace = true
log-query.workspace = true
meter-core.workspace = true
meter-macros.workspace = true
object-store.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#![feature(trait_upcasting)]
#![feature(try_blocks)]
#![feature(stmt_expr_attributes)]
#![feature(iterator_try_collect)]

mod analyze;
pub mod dataframe;
Expand All @@ -25,6 +26,7 @@ pub mod dist_plan;
pub mod dummy_catalog;
pub mod error;
pub mod executor;
pub mod log_query;
pub mod metrics;
mod optimizer;
pub mod parser;
Expand Down
16 changes: 16 additions & 0 deletions src/query/src/log_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod error;
pub mod planner;
Loading
Loading