diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 990182c38738..41fb7cc5ff8a 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -28,7 +28,7 @@ use datafusion_expr::{ use std::sync::Arc; /// Optimization rule that tries pushes down LIMIT n -/// where applicable to reduce the amount of scanned / processed data +/// where applicable to reduce the amount of scanned / processed data. #[derive(Default)] pub struct LimitPushDown {} @@ -39,29 +39,67 @@ impl LimitPushDown { } } +/// Ancestor indicates the current ancestor in the LogicalPlan tree +/// when traversing down related to "limit push down". +enum Ancestor { + /// Limit + FromLimit, + /// Offset + FromOffset, + /// Other nodes that don't affect the adjustment of "Limit" + NotRelevant, +} + +/// +/// When doing limit push down with "offset" and "limit" during traversal, +/// the "limit" should be adjusted. +/// limit_push_down is a recursive function that tracks three important information +/// to make the adjustment. +/// +/// 1. ancestor: the kind of Ancestor. +/// 2. ancestor_offset: ancestor's offset value +/// 3. ancestor_limit: ancestor's limit value +/// +/// (ancestor_offset, ancestor_limit) is updated in the following cases +/// 1. Ancestor_Limit(n1) -> .. -> Current_Limit(n2) +/// When the ancestor is a "Limit" and the current node is a "Limit", +/// it is updated to (None, min(n1, n2))). +/// 2. Ancestor_Limit(n1) -> .. -> Current_Offset(m1) +/// it is updated to (m1, n1 + m1). +/// 3. Ancestor_Offset(m1) -> .. -> Current_Offset(m2) +/// it is updated to (m2, None). +/// 4. Ancestor_Offset(m1) -> .. -> Current_Limit(n1) +/// it is updated to (None, n1). Note that this won't happen when we +/// generate the plan from SQL, it can happen when we build the plan +/// using LogicalPlanBuilder. fn limit_push_down( _optimizer: &LimitPushDown, - upper_limit: Option, + ancestor: Ancestor, + ancestor_offset: Option, + ancestor_limit: Option, plan: &LogicalPlan, _optimizer_config: &OptimizerConfig, - is_offset: bool, ) -> Result { - match (plan, upper_limit) { - (LogicalPlan::Limit(Limit { n, input }), upper_limit) => { - let new_limit: usize = if is_offset { - *n + upper_limit.unwrap_or(0) - } else { - upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n) + match (plan, ancestor_limit) { + (LogicalPlan::Limit(Limit { n, input }), ancestor_limit) => { + let (new_ancestor_offset, new_ancestor_limit) = match ancestor { + Ancestor::FromLimit | Ancestor::FromOffset => ( + None, + Some(ancestor_limit.map_or(*n, |x| std::cmp::min(x, *n))), + ), + Ancestor::NotRelevant => (None, Some(*n)), }; + Ok(LogicalPlan::Limit(Limit { - n: new_limit, + n: new_ancestor_limit.unwrap_or(*n), // push down limit to plan (minimum of upper limit and current limit) input: Arc::new(limit_push_down( _optimizer, - Some(new_limit), + Ancestor::FromLimit, + new_ancestor_offset, + new_ancestor_limit, input.as_ref(), _optimizer_config, - false, )?), })) } @@ -74,15 +112,15 @@ fn limit_push_down( limit, projected_schema, }), - Some(upper_limit), + Some(ancestor_limit), ) => Ok(LogicalPlan::TableScan(TableScan { table_name: table_name.clone(), source: source.clone(), projection: projection.clone(), filters: filters.clone(), limit: limit - .map(|x| std::cmp::min(x, upper_limit)) - .or(Some(upper_limit)), + .map(|x| std::cmp::min(x, ancestor_limit)) + .or(Some(ancestor_limit)), projected_schema: projected_schema.clone(), })), ( @@ -92,17 +130,18 @@ fn limit_push_down( schema, alias, }), - upper_limit, + ancestor_limit, ) => { // Push down limit directly (projection doesn't change number of rows) Ok(LogicalPlan::Projection(Projection { expr: expr.clone(), input: Arc::new(limit_push_down( _optimizer, - upper_limit, + ancestor, + ancestor_offset, + ancestor_limit, input.as_ref(), _optimizer_config, - false, )?), schema: schema.clone(), alias: alias.clone(), @@ -114,20 +153,21 @@ fn limit_push_down( alias, schema, }), - Some(upper_limit), + Some(ancestor_limit), ) => { // Push down limit through UNION let new_inputs = inputs .iter() .map(|x| { Ok(LogicalPlan::Limit(Limit { - n: upper_limit, + n: ancestor_limit, input: Arc::new(limit_push_down( _optimizer, - Some(upper_limit), + Ancestor::FromLimit, + None, + Some(ancestor_limit), x, _optimizer_config, - false, )?), })) }) @@ -139,21 +179,24 @@ fn limit_push_down( })) } // offset 5 limit 10 then push limit 15 (5 + 10) - // Limit should always be Offset's input - (LogicalPlan::Offset(Offset { offset, input }), upper_limit) => { - let new_limit = if let Some(ul) = upper_limit { - ul + *offset - } else { - *offset + (LogicalPlan::Offset(Offset { offset, input }), ancestor_limit) => { + let (new_ancestor_offset, new_ancestor_limit) = match ancestor { + Ancestor::FromLimit => { + (Some(*offset), ancestor_limit.map(|x| x + *offset)) + } + Ancestor::FromOffset => (Some(*offset), None), + Ancestor::NotRelevant => (Some(*offset), None), }; + Ok(LogicalPlan::Offset(Offset { offset: *offset, input: Arc::new(limit_push_down( _optimizer, - Some(new_limit), + Ancestor::FromOffset, + new_ancestor_offset, + new_ancestor_limit, input.as_ref(), _optimizer_config, - true, )?), })) } @@ -208,17 +251,19 @@ fn generate_push_down_join( return Ok(LogicalPlan::Join(Join { left: Arc::new(limit_push_down( _optimizer, + Ancestor::FromLimit, + None, left_limit, left.as_ref(), _optimizer_config, - true, )?), right: Arc::new(limit_push_down( _optimizer, + Ancestor::FromLimit, + None, right_limit, right.as_ref(), _optimizer_config, - true, )?), on: on.clone(), filter: filter.clone(), @@ -246,7 +291,16 @@ fn push_down_children_limit( let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| limit_push_down(_optimizer, None, plan, _optimizer_config, false)) + .map(|plan| { + limit_push_down( + _optimizer, + Ancestor::NotRelevant, + None, + None, + plan, + _optimizer_config, + ) + }) .collect::>>()?; from_plan(plan, &expr, &new_inputs) @@ -258,7 +312,14 @@ impl OptimizerRule for LimitPushDown { plan: &LogicalPlan, optimizer_config: &OptimizerConfig, ) -> Result { - limit_push_down(self, None, plan, optimizer_config, false) + limit_push_down( + self, + Ancestor::NotRelevant, + None, + None, + plan, + optimizer_config, + ) } fn name(&self) -> &str { @@ -387,6 +448,20 @@ mod test { Ok(()) } + #[test] + fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan).offset(10)?.build()?; + + // Should not push any limit down to table provider + // When it has a select + let expected = "Offset: 10\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + #[test] fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> { let table_scan = test_table_scan()?; @@ -420,7 +495,27 @@ mod test { .build()?; let expected = "Offset: 10\ - \n Limit: 1010\ + \n Limit: 1000\ + \n Projection: #test.a\ + \n TableScan: test projection=None, limit=1000"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_pushdown_with_limit_after_offset() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a")])? + .offset(10)? + .limit(1000)? + .build()?; + + let expected = "Limit: 1000\ + \n Offset: 10\ \n Projection: #test.a\ \n TableScan: test projection=None, limit=1010"; @@ -498,7 +593,7 @@ mod test { } #[test] - fn limit_should_not_push_down_with_offset_join() -> Result<()> { + fn limit_offset_should_not_push_down_with_offset_join() -> Result<()> { let table_scan_1 = test_table_scan()?; let table_scan_2 = test_table_scan_with_name("test2")?; @@ -515,7 +610,35 @@ mod test { // Limit pushdown Not supported in Join let expected = "Offset: 10\ - \n Limit: 1010\ + \n Limit: 1000\ + \n Inner Join: #test.a = #test2.a\ + \n TableScan: test projection=None\ + \n TableScan: test2 projection=None"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn offset_limit_should_not_push_down_with_offset_join() -> Result<()> { + let table_scan_1 = test_table_scan()?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let plan = LogicalPlanBuilder::from(table_scan_1) + .join( + &LogicalPlanBuilder::from(table_scan_2).build()?, + JoinType::Inner, + (vec!["a"], vec!["a"]), + None, + )? + .offset(10)? + .limit(1000)? + .build()?; + + // Limit pushdown Not supported in Join + let expected = "Limit: 1000\ + \n Offset: 10\ \n Inner Join: #test.a = #test2.a\ \n TableScan: test projection=None\ \n TableScan: test2 projection=None"; @@ -526,7 +649,7 @@ mod test { } #[test] - fn limit_should_not_push_down_with_offset_sub_query() -> Result<()> { + fn limit_offset_should_not_push_down_with_offset_sub_query() -> Result<()> { let table_scan_1 = test_table_scan_with_name("test1")?; let table_scan_2 = test_table_scan_with_name("test2")?; @@ -544,7 +667,38 @@ mod test { // Limit pushdown Not supported in sub_query let expected = "Offset: 10\ - \n Limit: 110\ + \n Limit: 100\ + \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\ + \n Projection: #test1.a\ + \n TableScan: test1 projection=None)\ + \n Projection: #test2.a\ + \n TableScan: test2 projection=None"; + + assert_optimized_plan_eq(&outer_query, expected); + + Ok(()) + } + + #[test] + fn offset_limit_should_not_push_down_with_offset_sub_query() -> Result<()> { + let table_scan_1 = test_table_scan_with_name("test1")?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let subquery = LogicalPlanBuilder::from(table_scan_1) + .project(vec![col("a")])? + .filter(col("a").eq(col("test1.a")))? + .build()?; + + let outer_query = LogicalPlanBuilder::from(table_scan_2) + .project(vec![col("a")])? + .filter(exists(Arc::new(subquery)))? + .offset(10)? + .limit(100)? + .build()?; + + // Limit pushdown Not supported in sub_query + let expected = "Limit: 100\ + \n Offset: 10\ \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\ \n Projection: #test1.a\ \n TableScan: test1 projection=None)\ @@ -582,6 +736,34 @@ mod test { Ok(()) } + #[test] + fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> { + let table_scan_1 = test_table_scan()?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let plan = LogicalPlanBuilder::from(table_scan_1) + .join( + &LogicalPlanBuilder::from(table_scan_2).build()?, + JoinType::Left, + (vec!["a"], vec!["a"]), + None, + )? + .offset(10)? + .limit(1000)? + .build()?; + + // Limit pushdown Not supported in Join + let expected = "Limit: 1000\ + \n Offset: 10\ + \n Left Join: #test.a = #test2.a\ + \n TableScan: test projection=None, limit=1010\ + \n TableScan: test2 projection=None"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + #[test] fn limit_should_push_down_right_outer_join() -> Result<()> { let table_scan_1 = test_table_scan()?; @@ -607,4 +789,32 @@ mod test { Ok(()) } + + #[test] + fn limit_should_push_down_right_outer_join_with_offset() -> Result<()> { + let table_scan_1 = test_table_scan()?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let plan = LogicalPlanBuilder::from(table_scan_1) + .join( + &LogicalPlanBuilder::from(table_scan_2).build()?, + JoinType::Right, + (vec!["a"], vec!["a"]), + None, + )? + .offset(10)? + .limit(1000)? + .build()?; + + // Limit pushdown with offset supported in right outer join + let expected = "Limit: 1000\ + \n Offset: 10\ + \n Right Join: #test.a = #test2.a\ + \n TableScan: test projection=None\ + \n TableScan: test2 projection=None, limit=1010"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index b21550567f53..238cf7dbeb7a 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -297,10 +297,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let plan = self.order_by(plan, query.order_by)?; - let plan: LogicalPlan = self.limit(plan, query.limit)?; - - //make limit as offset's input will enable limit push down simply - self.offset(plan, query.offset) + // Offset is the parent of Limit. + // If both OFFSET and LIMIT appear, + // then OFFSET rows are skipped before starting to count the LIMIT rows that are returned. + // see https://www.postgresql.org/docs/current/queries-limit.html + let plan = self.offset(plan, query.offset)?; + self.limit(plan, query.limit) } fn set_expr_to_plan( @@ -4878,8 +4880,8 @@ mod tests { #[test] fn test_zero_offset_with_limit() { let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;"; - let expected = "Offset: 0\ - \n Limit: 5\ + let expected = "Limit: 5\ + \n Offset: 0\ \n Projection: #person.id\ \n Filter: #person.id > Int64(100)\ \n TableScan: person projection=None"; @@ -4903,8 +4905,8 @@ mod tests { #[test] fn test_offset_after_limit() { let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;"; - let expected = "Offset: 3\ - \n Limit: 5\ + let expected = "Limit: 5\ + \n Offset: 3\ \n Projection: #person.id\ \n Filter: #person.id > Int64(100)\ \n TableScan: person projection=None"; @@ -4914,8 +4916,8 @@ mod tests { #[test] fn test_offset_before_limit() { let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;"; - let expected = "Offset: 3\ - \n Limit: 5\ + let expected = "Limit: 5\ + \n Offset: 3\ \n Projection: #person.id\ \n Filter: #person.id > Int64(100)\ \n TableScan: person projection=None";