Skip to content

Commit

Permalink
feat(log-query): implement pagination with limit and offset parameters (
Browse files Browse the repository at this point in the history
#5241)

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 31, 2024
1 parent cc5b1d4 commit 5cf931c
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 6 deletions.
15 changes: 12 additions & 3 deletions src/log-query/src/log_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub struct LogQuery {
pub time_filter: TimeFilter,
/// Columns with filters to query.
pub columns: Vec<ColumnFilters>,
/// Maximum number of logs to return. If not provided, it will return all matched logs.
pub limit: Option<usize>,
/// Controls row skipping and fetch count for logs.
pub limit: Limit,
/// Adjacent lines to return.
pub context: Context,
}
Expand All @@ -42,7 +42,7 @@ impl Default for LogQuery {
table: TableName::new("", "", ""),
time_filter: Default::default(),
columns: vec![],
limit: None,
limit: Limit::default(),
context: Default::default(),
}
}
Expand Down Expand Up @@ -266,6 +266,15 @@ pub enum Context {
Seconds(usize, usize),
}

/// Represents limit and offset parameters for query pagination.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Limit {
/// Optional number of items to skip before starting to return results
pub skip: Option<usize>,
/// Optional number of items to return after skipping
pub fetch: Option<usize>,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
78 changes: 75 additions & 3 deletions src/query/src/log_query/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ impl LogQueryPlanner {

// Apply limit
plan_builder = plan_builder
.limit(0, query.limit.or(Some(DEFAULT_LIMIT)))
.limit(
query.limit.skip.unwrap_or(0),
Some(query.limit.fetch.unwrap_or(DEFAULT_LIMIT)),
)
.context(DataFusionPlanningSnafu)?;

// Build the final plan
Expand Down Expand Up @@ -179,7 +182,7 @@ mod tests {
use common_query::test_util::DummyDecoder;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use log_query::{ContentFilter, Context};
use log_query::{ContentFilter, Context, Limit};
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::table_name::TableName;
Expand Down Expand Up @@ -268,7 +271,10 @@ mod tests {
column_name: "message".to_string(),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
limit: Some(100),
limit: Limit {
skip: None,
fetch: Some(100),
},
context: Context::None,
};

Expand Down Expand Up @@ -361,6 +367,72 @@ mod tests {
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}

#[tokio::test]
async fn test_query_to_plan_with_only_skip() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);

let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
time_filter: TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
columns: vec![ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
limit: Limit {
skip: Some(10),
fetch: None,
},
context: Context::None,
};

let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=10, fetch=1000 [message:Utf8]\
\n Projection: greptime.public.test_table.message [message:Utf8]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn test_query_to_plan_without_limit() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);

let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
time_filter: TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
columns: vec![ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
limit: Limit {
skip: None,
fetch: None,
},
context: Context::None,
};

let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=0, fetch=1000 [message:Utf8]\
\n Projection: greptime.public.test_table.message [message:Utf8]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[test]
fn test_escape_pattern() {
assert_eq!(escape_like_pattern("test"), "test");
Expand Down

0 comments on commit 5cf931c

Please sign in to comment.