Skip to content

Commit

Permalink
test: add crdb/limit.slt (#216)
Browse files Browse the repository at this point in the history
* test: add `crdb/limit.slt`

* ci: add args `--all` on cargo test
  • Loading branch information
KKould authored Aug 27, 2024
1 parent 88c68b5 commit 5cd67ca
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 96 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --all
# 2
fmt:
name: Rust fmt
Expand Down
21 changes: 11 additions & 10 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,22 +201,21 @@ impl<S: Storage> Database<S> {
],
)
.batch(
"Combine Operators".to_string(),
"Limit Pushdown".to_string(),
HepBatchStrategy::fix_point_topdown(10),
vec![
NormalizationRuleImpl::CollapseProject,
NormalizationRuleImpl::CollapseGroupByAgg,
NormalizationRuleImpl::CombineFilter,
NormalizationRuleImpl::LimitProjectTranspose,
NormalizationRuleImpl::PushLimitThroughJoin,
NormalizationRuleImpl::PushLimitIntoTableScan,
],
)
.batch(
"Limit Pushdown".to_string(),
"Combine Operators".to_string(),
HepBatchStrategy::fix_point_topdown(10),
vec![
NormalizationRuleImpl::LimitProjectTranspose,
NormalizationRuleImpl::PushLimitThroughJoin,
NormalizationRuleImpl::PushLimitIntoTableScan,
NormalizationRuleImpl::EliminateLimits,
NormalizationRuleImpl::CollapseProject,
NormalizationRuleImpl::CollapseGroupByAgg,
NormalizationRuleImpl::CombineFilter,
],
)
.batch(
Expand Down Expand Up @@ -364,7 +363,9 @@ mod test {
fn test_udtf() -> Result<(), DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let fnck_sql = DataBaseBuilder::path(temp_dir.path()).build()?;
let (schema, tuples) = fnck_sql.run("select number from table(numbers(10))")?;
let (schema, tuples) = fnck_sql.run(
"SELECT * FROM (select * from table(numbers(10)) a ORDER BY number LIMIT 5) OFFSET 3",
)?;
println!("{}", create_table(&schema, &tuples));

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dql/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Limit {
}

let offset_val = offset.unwrap_or(0);
let offset_limit = offset_val + limit.unwrap_or(1) - 1;
let offset_limit = offset_val.saturating_add(limit.unwrap_or(usize::MAX)) - 1;

let mut i = 0;
let mut coroutine = build_read(input, cache, transaction);
Expand Down
5 changes: 1 addition & 4 deletions src/optimizer/rule/normalization/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::optimizer::rule::normalization::compilation_in_advance::{
EvaluatorBind, ExpressionRemapper,
};
use crate::optimizer::rule::normalization::pushdown_limit::{
EliminateLimits, LimitProjectTranspose, PushLimitIntoScan, PushLimitThroughJoin,
LimitProjectTranspose, PushLimitIntoScan, PushLimitThroughJoin,
};
use crate::optimizer::rule::normalization::pushdown_predicates::PushPredicateIntoScan;
use crate::optimizer::rule::normalization::pushdown_predicates::PushPredicateThroughJoin;
Expand All @@ -34,7 +34,6 @@ pub enum NormalizationRuleImpl {
CombineFilter,
// PushDown limit
LimitProjectTranspose,
EliminateLimits,
PushLimitThroughJoin,
PushLimitIntoTableScan,
// PushDown predicates
Expand All @@ -57,7 +56,6 @@ impl MatchPattern for NormalizationRuleImpl {
NormalizationRuleImpl::CollapseGroupByAgg => CollapseGroupByAgg.pattern(),
NormalizationRuleImpl::CombineFilter => CombineFilter.pattern(),
NormalizationRuleImpl::LimitProjectTranspose => LimitProjectTranspose.pattern(),
NormalizationRuleImpl::EliminateLimits => EliminateLimits.pattern(),
NormalizationRuleImpl::PushLimitThroughJoin => PushLimitThroughJoin.pattern(),
NormalizationRuleImpl::PushLimitIntoTableScan => PushLimitIntoScan.pattern(),
NormalizationRuleImpl::PushPredicateThroughJoin => PushPredicateThroughJoin.pattern(),
Expand All @@ -80,7 +78,6 @@ impl NormalizationRule for NormalizationRuleImpl {
NormalizationRuleImpl::LimitProjectTranspose => {
LimitProjectTranspose.apply(node_id, graph)
}
NormalizationRuleImpl::EliminateLimits => EliminateLimits.apply(node_id, graph),
NormalizationRuleImpl::PushLimitThroughJoin => {
PushLimitThroughJoin.apply(node_id, graph)
}
Expand Down
82 changes: 1 addition & 81 deletions src/optimizer/rule/normalization/pushdown_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use crate::optimizer::core::pattern::PatternChildrenPredicate;
use crate::optimizer::core::rule::{MatchPattern, NormalizationRule};
use crate::optimizer::heuristic::graph::{HepGraph, HepNodeId};
use crate::planner::operator::join::JoinType;
use crate::planner::operator::limit::LimitOperator;
use crate::planner::operator::Operator;
use itertools::Itertools;
use lazy_static::lazy_static;
use std::cmp;

lazy_static! {
static ref LIMIT_PROJECT_TRANSPOSE_RULE: Pattern = {
Pattern {
Expand Down Expand Up @@ -66,51 +65,6 @@ impl NormalizationRule for LimitProjectTranspose {
}
}

/// Combines two adjacent Limit operators into one, merging the expressions into one single
/// expression.
pub struct EliminateLimits;

impl MatchPattern for EliminateLimits {
fn pattern(&self) -> &Pattern {
&ELIMINATE_LIMITS_RULE
}
}

impl NormalizationRule for EliminateLimits {
fn apply(&self, node_id: HepNodeId, graph: &mut HepGraph) -> Result<(), DatabaseError> {
if let Operator::Limit(op) = graph.operator(node_id) {
if let Some(child_id) = graph.eldest_child_at(node_id) {
if let Operator::Limit(child_op) = graph.operator(child_id) {
let offset = Self::binary_options(op.offset, child_op.offset, |a, b| a + b);
let limit = Self::binary_options(op.limit, child_op.limit, cmp::min);

let new_limit_op = LimitOperator { offset, limit };

graph.remove_node(child_id, false);
graph.replace_node(node_id, Operator::Limit(new_limit_op));
}
}
}

Ok(())
}
}

impl EliminateLimits {
fn binary_options<F: Fn(usize, usize) -> usize>(
a: Option<usize>,
b: Option<usize>,
_fn: F,
) -> Option<usize> {
match (a, b) {
(Some(a), Some(b)) => Some(_fn(a, b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
}
}
}

/// Add extra limits below JOIN:
/// 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and right sides,
/// respectively.
Expand Down Expand Up @@ -190,7 +144,6 @@ mod tests {
use crate::optimizer::heuristic::batch::HepBatchStrategy;
use crate::optimizer::heuristic::optimizer::HepOptimizer;
use crate::optimizer::rule::normalization::NormalizationRuleImpl;
use crate::planner::operator::limit::LimitOperator;
use crate::planner::operator::Operator;
use crate::storage::rocksdb::RocksTransaction;

Expand Down Expand Up @@ -219,39 +172,6 @@ mod tests {
Ok(())
}

#[test]
fn test_eliminate_limits() -> Result<(), DatabaseError> {
let plan = select_sql_run("select c1, c2 from t1 limit 1 offset 1")?;

let mut optimizer = HepOptimizer::new(plan.clone()).batch(
"test_eliminate_limits".to_string(),
HepBatchStrategy::once_topdown(),
vec![NormalizationRuleImpl::EliminateLimits],
);

let new_limit_op = LimitOperator {
offset: Some(2),
limit: Some(1),
};

optimizer.graph.add_root(Operator::Limit(new_limit_op));

let best_plan = optimizer.find_best::<RocksTransaction>(None)?;

if let Operator::Limit(op) = &best_plan.operator {
assert_eq!(op.limit, Some(1));
assert_eq!(op.offset, Some(3));
} else {
unreachable!("Should be a project operator")
}

if let Operator::Limit(_) = &best_plan.childrens[0].operator {
unreachable!("Should not be a limit operator")
}

Ok(())
}

#[test]
fn test_push_limit_through_join() -> Result<(), DatabaseError> {
let plan = select_sql_run("select * from t1 left join t2 on c1 = c3 limit 1")?;
Expand Down
Loading

0 comments on commit 5cd67ca

Please sign in to comment.