Skip to content

Commit

Permalink
Introduce LogicalPlan invariants, begin automatically checking them (#…
Browse files Browse the repository at this point in the history
…13651)

* minor(13525): perform LP validation before and after each possible mutation

* minor(13525): validate unique field names on query and subquery schemas, after each optimizer pass

* minor(13525): validate union after each optimizer passes

* refactor: make explicit what is an invariant of the logical plan, versus assertions made after a given analyzer or optimizer pass

* chore: add link to invariant docs

* fix: add new invariants module

* refactor: move all LP invariant checking into LP, delineate executable (valid semantic plan) vs basic LP invariants

* test: update test for slight error message change

* fix: push_down_filter optimization pass can push a IN(<subquery>) into a TableScan's filter clause

* refactor: move collect_subquery_cols() to common utils crate

* refactor: clarify the purpose of assert_valid_optimization(), runs after all optimizer passes, except in debug mode it runs after each pass.

* refactor: based upon performance tests, run the maximum number of checks without impa ct:
* assert_valid_optimization can run each optimizer pass
* remove the recursive cehck_fields, which caused the performance regression
* the full LP Invariants::Executable can only run in debug

* chore: update error naming and terminology used in code comments

* refactor: use proper error methods

* chore: more cleanup of error messages

* chore: handle option trailer to error message

* test: update sqllogictests tests to not use multiline
  • Loading branch information
wiedld authored Dec 26, 2024
1 parent 5045bde commit cf8f2f8
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,98 @@
// specific language governing permissions and limitations
// under the License.

use crate::analyzer::check_plan;
use crate::utils::collect_subquery_cols;
use datafusion_common::{
internal_err, plan_err,
tree_node::{TreeNode, TreeNodeRecursion},
DFSchemaRef, Result,
};

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{plan_err, Result};
use datafusion_expr::expr_rewriter::strip_outer_reference;
use datafusion_expr::utils::split_conjunction;
use datafusion_expr::{Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window};
use crate::{
expr::{Exists, InSubquery},
expr_rewriter::strip_outer_reference,
utils::{collect_subquery_cols, split_conjunction},
Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window,
};

pub enum InvariantLevel {
/// Invariants that are always true in DataFusion `LogicalPlan`s
/// such as the number of expected children and no duplicated output fields
Always,
/// Invariants that must hold true for the plan to be "executable"
/// such as the type and number of function arguments are correct and
/// that wildcards have been expanded
///
/// To ensure a LogicalPlan satisfies the `Executable` invariants, run the
/// `Analyzer`
Executable,
}

pub fn assert_always_invariants(plan: &LogicalPlan) -> Result<()> {
// Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#relation-name-tuples-in-logical-fields-and-logical-columns-are-unique>
assert_unique_field_names(plan)?;

Ok(())
}

pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> {
assert_always_invariants(plan)?;
assert_valid_semantic_plan(plan)?;
Ok(())
}

/// Returns an error if plan, and subplans, do not have unique fields.
///
/// This invariant is subject to change.
/// refer: <https://github.com/apache/datafusion/issues/13525#issuecomment-2494046463>
fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> {
plan.schema().check_names()
}

/// Returns an error if the plan is not sematically valid.
fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> {
assert_subqueries_are_valid(plan)?;

Ok(())
}

/// Returns an error if the plan does not have the expected schema.
/// Ignores metadata and nullability.
pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> Result<()> {
let equivalent = plan.schema().equivalent_names_and_types(schema);

if !equivalent {
internal_err!(
"Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}",
schema,
plan.schema()
)
} else {
Ok(())
}
}

/// Asserts that the subqueries are structured properly with valid node placement.
///
/// Refer to [`check_subquery_expr`] for more details.
fn assert_subqueries_are_valid(plan: &LogicalPlan) -> Result<()> {
plan.apply_with_subqueries(|plan: &LogicalPlan| {
plan.apply_expressions(|expr| {
// recursively look for subqueries
expr.apply(|expr| {
match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
check_subquery_expr(plan, &subquery.subquery, expr)?;
}
_ => {}
};
Ok(TreeNodeRecursion::Continue)
})
})
})
.map(|_| ())
}

/// Do necessary check on subquery expressions and fail the invalid plan
/// 1) Check whether the outer plan is in the allowed outer plans list to use subquery expressions,
Expand All @@ -36,7 +120,7 @@ pub fn check_subquery_expr(
inner_plan: &LogicalPlan,
expr: &Expr,
) -> Result<()> {
check_plan(inner_plan)?;
assert_subqueries_are_valid(inner_plan)?;
if let Expr::ScalarSubquery(subquery) = expr {
// Scalar subquery should only return one column
if subquery.subquery.schema().fields().len() > 1 {
Expand Down Expand Up @@ -108,12 +192,13 @@ pub fn check_subquery_expr(
match outer_plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::Window(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Join(_) => Ok(()),
_ => plan_err!(
"In/Exist subquery can only be used in \
Projection, Filter, Window functions, Aggregate and Join plan nodes, \
Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, \
but was used in [{}]",
outer_plan.display()
),
Expand Down Expand Up @@ -285,8 +370,8 @@ mod test {
use std::cmp::Ordering;
use std::sync::Arc;

use crate::{Extension, UserDefinedLogicalNodeCore};
use datafusion_common::{DFSchema, DFSchemaRef};
use datafusion_expr::{Extension, UserDefinedLogicalNodeCore};

use super::*;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mod ddl;
pub mod display;
pub mod dml;
mod extension;
pub(crate) mod invariants;
pub use invariants::{assert_expected_schema, check_subquery_expr, InvariantLevel};
mod plan;
mod statement;
pub mod tree_node;
Expand Down
11 changes: 11 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use std::hash::{Hash, Hasher};
use std::sync::{Arc, LazyLock};

use super::dml::CopyTo;
use super::invariants::{
assert_always_invariants, assert_executable_invariants, InvariantLevel,
};
use super::DdlStatement;
use crate::builder::{change_redundant_column, unnest_with_options};
use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction};
Expand Down Expand Up @@ -1127,6 +1130,14 @@ impl LogicalPlan {
}
}

/// checks that the plan conforms to the listed invariant level, returning an Error if not
pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
match check {
InvariantLevel::Always => assert_always_invariants(self),
InvariantLevel::Executable => assert_executable_invariants(self),
}
}

/// Helper for [Self::with_new_exprs] to use when no expressions are expected.
#[inline]
#[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
Expand Down
20 changes: 19 additions & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Expression utilities
use std::cmp::Ordering;
use std::collections::HashSet;
use std::collections::{BTreeSet, HashSet};
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -1402,6 +1402,24 @@ pub fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}

/// Determine the set of [`Column`]s produced by the subquery.
pub fn collect_subquery_cols(
exprs: &[Expr],
subquery_schema: &DFSchema,
) -> Result<BTreeSet<Column>> {
exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
let mut using_cols: Vec<Column> = vec![];
for col in expr.column_refs().into_iter() {
if subquery_schema.has_column(col) {
using_cols.push(col.clone());
}
}

cols.extend(using_cols);
Result::<_>::Ok(cols)
})
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
60 changes: 24 additions & 36 deletions datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@ use log::debug;

use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
use datafusion_common::Result;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{Expr, LogicalPlan};
use datafusion_expr::{InvariantLevel, LogicalPlan};

use crate::analyzer::count_wildcard_rule::CountWildcardRule;
use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction;
use crate::analyzer::subquery::check_subquery_expr;
use crate::analyzer::type_coercion::TypeCoercion;
use crate::utils::log_plan;

Expand All @@ -46,17 +42,24 @@ pub mod expand_wildcard_rule;
pub mod function_rewrite;
pub mod inline_table_scan;
pub mod resolve_grouping_function;
pub mod subquery;
pub mod type_coercion;

pub mod subquery {
#[deprecated(
since = "44.0.0",
note = "please use `datafusion_expr::check_subquery_expr` instead"
)]
pub use datafusion_expr::check_subquery_expr;
}

/// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make
/// the plan valid prior to the rest of the DataFusion optimization process.
///
/// `AnalyzerRule`s are different than an [`OptimizerRule`](crate::OptimizerRule)s
/// which must preserve the semantics of the `LogicalPlan`, while computing
/// results in a more optimal way.
///
/// For example, an `AnalyzerRule` may resolve [`Expr`]s into more specific
/// For example, an `AnalyzerRule` may resolve [`Expr`](datafusion_expr::Expr)s into more specific
/// forms such as a subquery reference, or do type coercion to ensure the types
/// of operands are correct.
///
Expand Down Expand Up @@ -140,6 +143,10 @@ impl Analyzer {
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
// verify the logical plan required invariants at the start, before analyzer
plan.check_invariants(InvariantLevel::Always)
.map_err(|e| e.context("Invalid input plan passed to Analyzer"))?;

let start_time = Instant::now();
let mut new_plan = plan;

Expand All @@ -161,39 +168,20 @@ impl Analyzer {

// TODO add common rule executor for Analyzer and Optimizer
for rule in rules {
new_plan = rule.analyze(new_plan, config).map_err(|e| {
DataFusionError::Context(rule.name().to_string(), Box::new(e))
})?;
new_plan = rule
.analyze(new_plan, config)
.map_err(|e| e.context(rule.name()))?;
log_plan(rule.name(), &new_plan);
observer(&new_plan, rule.as_ref());
}
// for easier display in explain output
check_plan(&new_plan).map_err(|e| {
DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
})?;

// verify at the end, after the last LP analyzer pass, that the plan is executable.
new_plan
.check_invariants(InvariantLevel::Executable)
.map_err(|e| e.context("Invalid (non-executable) plan after Analyzer"))?;

log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}

/// Do necessary check and fail the invalid plan
fn check_plan(plan: &LogicalPlan) -> Result<()> {
plan.apply_with_subqueries(|plan: &LogicalPlan| {
plan.apply_expressions(|expr| {
// recursively look for subqueries
expr.apply(|expr| {
match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
check_subquery_expr(plan, &subquery.subquery, expr)?;
}
_ => {}
};
Ok(TreeNodeRecursion::Continue)
})
})
})
.map(|_| ())
}
5 changes: 3 additions & 2 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ use std::ops::Deref;
use std::sync::Arc;

use crate::simplify_expressions::ExprSimplifier;
use crate::utils::collect_subquery_cols;

use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
};
use datafusion_common::{plan_err, Column, DFSchemaRef, HashMap, Result, ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction};
use datafusion_expr::utils::{
collect_subquery_cols, conjunction, find_join_exprs, split_conjunction,
};
use datafusion_expr::{
expr, lit, BinaryExpr, Cast, EmptyRelation, Expr, FetchType, LogicalPlan,
LogicalPlanBuilder, Operator,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ mod tests {
.build()?;

// Maybe okay if the table only has a single column?
let expected = "check_analyzed_plan\
let expected = "Invalid (non-executable) plan after Analyzer\
\ncaused by\
\nError during planning: InSubquery should only return one column, but found 4";
assert_analyzer_check_err(vec![], plan, expected);
Expand Down Expand Up @@ -930,7 +930,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = "check_analyzed_plan\
let expected = "Invalid (non-executable) plan after Analyzer\
\ncaused by\
\nError during planning: InSubquery should only return one column";
assert_analyzer_check_err(vec![], plan, expected);
Expand Down
Loading

0 comments on commit cf8f2f8

Please sign in to comment.