Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce LogicalPlan invariants, begin automatically checking them #13651

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6d43dc2
minor(13525): perform LP validation before and after each possible mu…
wiedld Dec 4, 2024
a855811
minor(13525): validate unique field names on query and subquery schem…
wiedld Dec 4, 2024
0163a40
minor(13525): validate union after each optimizer passes
wiedld Dec 4, 2024
bee7e92
refactor: make explicit what is an invariant of the logical plan, ver…
wiedld Dec 16, 2024
4eee9c4
chore: add link to invariant docs
wiedld Dec 16, 2024
a7d9770
fix: add new invariants module
wiedld Dec 16, 2024
72718ad
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-cha…
wiedld Dec 17, 2024
2002b1a
refactor: move all LP invariant checking into LP, delineate executabl…
wiedld Dec 17, 2024
fbc9c46
test: update test for slight error message change
wiedld Dec 17, 2024
e52187e
fix: push_down_filter optimization pass can push a IN(<subquery>) int…
wiedld Dec 17, 2024
ba26f13
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-cha…
wiedld Dec 23, 2024
ad1a1f8
refactor: move collect_subquery_cols() to common utils crate
wiedld Dec 23, 2024
1164a7b
refactor: clarify the purpose of assert_valid_optimization(), runs af…
wiedld Dec 23, 2024
7ad0b74
refactor: based upon performance tests, run the maximum number of che…
wiedld Dec 24, 2024
911d4b8
chore: update error naming and terminology used in code comments
wiedld Dec 24, 2024
810246d
refactor: use proper error methods
wiedld Dec 24, 2024
9842d19
chore: more cleanup of error messages
wiedld Dec 24, 2024
00700ae
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-cha…
wiedld Dec 25, 2024
9bca470
chore: handle option trailer to error message
wiedld Dec 25, 2024
529ac3e
test: update sqllogictests tests to not use multiline
wiedld Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,98 @@
// specific language governing permissions and limitations
// under the License.

use crate::analyzer::check_plan;
use crate::utils::collect_subquery_cols;
use datafusion_common::{
internal_err, plan_err,
tree_node::{TreeNode, TreeNodeRecursion},
DFSchemaRef, Result,
};

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{plan_err, Result};
use datafusion_expr::expr_rewriter::strip_outer_reference;
use datafusion_expr::utils::split_conjunction;
use datafusion_expr::{Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window};
use crate::{
expr::{Exists, InSubquery},
expr_rewriter::strip_outer_reference,
utils::{collect_subquery_cols, split_conjunction},
Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window,
};

pub enum InvariantLevel {
/// Invariants that are always true in DataFusion `LogicalPlan`s
/// such as the number of expected children and no duplicated output fields
Always,
/// Invariants that must hold true for the plan to be "executable"
/// such as the type and number of function arguments are correct and
/// that wildcards have been expanded
///
/// To ensure a LogicalPlan satisfies the `Executable` invariants, run the
/// `Analyzer`
Executable,
}

pub fn assert_always_invariants(plan: &LogicalPlan) -> Result<()> {
// Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#relation-name-tuples-in-logical-fields-and-logical-columns-are-unique>
assert_unique_field_names(plan)?;

Ok(())
}

pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> {
assert_always_invariants(plan)?;
assert_valid_semantic_plan(plan)?;
Ok(())
}

/// Returns an error if plan, and subplans, do not have unique fields.
///
/// This invariant is subject to change.
/// refer: <https://github.com/apache/datafusion/issues/13525#issuecomment-2494046463>
fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> {
plan.schema().check_names()
}

/// Returns an error if the plan is not sematically valid.
fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> {
assert_subqueries_are_valid(plan)?;

Ok(())
}

/// Returns an error if the plan does not have the expected schema.
/// Ignores metadata and nullability.
pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> Result<()> {
let equivalent = plan.schema().equivalent_names_and_types(schema);

if !equivalent {
internal_err!(
"Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}",
schema,
plan.schema()
)
} else {
Ok(())
}
}

/// Asserts that the subqueries are structured properly with valid node placement.
///
/// Refer to [`check_subquery_expr`] for more details.
fn assert_subqueries_are_valid(plan: &LogicalPlan) -> Result<()> {
plan.apply_with_subqueries(|plan: &LogicalPlan| {
plan.apply_expressions(|expr| {
// recursively look for subqueries
expr.apply(|expr| {
match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
check_subquery_expr(plan, &subquery.subquery, expr)?;
}
_ => {}
};
Ok(TreeNodeRecursion::Continue)
})
})
})
.map(|_| ())
}

/// Do necessary check on subquery expressions and fail the invalid plan
/// 1) Check whether the outer plan is in the allowed outer plans list to use subquery expressions,
Expand All @@ -36,7 +120,7 @@ pub fn check_subquery_expr(
inner_plan: &LogicalPlan,
expr: &Expr,
) -> Result<()> {
check_plan(inner_plan)?;
assert_subqueries_are_valid(inner_plan)?;
if let Expr::ScalarSubquery(subquery) = expr {
// Scalar subquery should only return one column
if subquery.subquery.schema().fields().len() > 1 {
Expand Down Expand Up @@ -108,12 +192,13 @@ pub fn check_subquery_expr(
match outer_plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::Window(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Join(_) => Ok(()),
_ => plan_err!(
"In/Exist subquery can only be used in \
Projection, Filter, Window functions, Aggregate and Join plan nodes, \
Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, \
wiedld marked this conversation as resolved.
Show resolved Hide resolved
but was used in [{}]",
outer_plan.display()
),
Expand Down Expand Up @@ -285,8 +370,8 @@ mod test {
use std::cmp::Ordering;
use std::sync::Arc;

use crate::{Extension, UserDefinedLogicalNodeCore};
use datafusion_common::{DFSchema, DFSchemaRef};
use datafusion_expr::{Extension, UserDefinedLogicalNodeCore};

use super::*;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mod ddl;
pub mod display;
pub mod dml;
mod extension;
pub(crate) mod invariants;
wiedld marked this conversation as resolved.
Show resolved Hide resolved
pub use invariants::{assert_expected_schema, check_subquery_expr, InvariantLevel};
mod plan;
mod statement;
pub mod tree_node;
Expand Down
11 changes: 11 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use std::hash::{Hash, Hasher};
use std::sync::{Arc, LazyLock};

use super::dml::CopyTo;
use super::invariants::{
assert_always_invariants, assert_executable_invariants, InvariantLevel,
};
use super::DdlStatement;
use crate::builder::{change_redundant_column, unnest_with_options};
use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction};
Expand Down Expand Up @@ -1127,6 +1130,14 @@ impl LogicalPlan {
}
}

/// checks that the plan conforms to the listed invariant level, returning an Error if not
pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
match check {
InvariantLevel::Always => assert_always_invariants(self),
InvariantLevel::Executable => assert_executable_invariants(self),
}
}

/// Helper for [Self::with_new_exprs] to use when no expressions are expected.
#[inline]
#[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
Expand Down
20 changes: 19 additions & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Expression utilities

use std::cmp::Ordering;
use std::collections::HashSet;
use std::collections::{BTreeSet, HashSet};
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -1402,6 +1402,24 @@ pub fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}

/// Determine the set of [`Column`]s produced by the subquery.
wiedld marked this conversation as resolved.
Show resolved Hide resolved
pub fn collect_subquery_cols(
exprs: &[Expr],
subquery_schema: &DFSchema,
) -> Result<BTreeSet<Column>> {
exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
let mut using_cols: Vec<Column> = vec![];
for col in expr.column_refs().into_iter() {
if subquery_schema.has_column(col) {
using_cols.push(col.clone());
}
}

cols.extend(using_cols);
Result::<_>::Ok(cols)
})
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
60 changes: 24 additions & 36 deletions datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@ use log::debug;

use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
use datafusion_common::Result;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{Expr, LogicalPlan};
use datafusion_expr::{InvariantLevel, LogicalPlan};

use crate::analyzer::count_wildcard_rule::CountWildcardRule;
use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction;
use crate::analyzer::subquery::check_subquery_expr;
use crate::analyzer::type_coercion::TypeCoercion;
use crate::utils::log_plan;

Expand All @@ -46,17 +42,24 @@ pub mod expand_wildcard_rule;
pub mod function_rewrite;
pub mod inline_table_scan;
pub mod resolve_grouping_function;
pub mod subquery;
pub mod type_coercion;

pub mod subquery {
wiedld marked this conversation as resolved.
Show resolved Hide resolved
#[deprecated(
since = "44.0.0",
note = "please use `datafusion_expr::check_subquery_expr` instead"
)]
pub use datafusion_expr::check_subquery_expr;
}

/// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make
/// the plan valid prior to the rest of the DataFusion optimization process.
///
/// `AnalyzerRule`s are different than an [`OptimizerRule`](crate::OptimizerRule)s
/// which must preserve the semantics of the `LogicalPlan`, while computing
/// results in a more optimal way.
///
/// For example, an `AnalyzerRule` may resolve [`Expr`]s into more specific
/// For example, an `AnalyzerRule` may resolve [`Expr`](datafusion_expr::Expr)s into more specific
/// forms such as a subquery reference, or do type coercion to ensure the types
/// of operands are correct.
///
Expand Down Expand Up @@ -140,6 +143,10 @@ impl Analyzer {
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
// verify the logical plan required invariants at the start, before analyzer
plan.check_invariants(InvariantLevel::Always)
.map_err(|e| e.context("Invalid input plan passed to Analyzer"))?;

let start_time = Instant::now();
let mut new_plan = plan;

Expand All @@ -161,39 +168,20 @@ impl Analyzer {

// TODO add common rule executor for Analyzer and Optimizer
for rule in rules {
new_plan = rule.analyze(new_plan, config).map_err(|e| {
DataFusionError::Context(rule.name().to_string(), Box::new(e))
})?;
new_plan = rule
.analyze(new_plan, config)
.map_err(|e| e.context(rule.name()))?;
log_plan(rule.name(), &new_plan);
observer(&new_plan, rule.as_ref());
}
// for easier display in explain output
check_plan(&new_plan).map_err(|e| {
DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
})?;

// verify at the end, after the last LP analyzer pass, that the plan is executable.
new_plan
.check_invariants(InvariantLevel::Executable)
.map_err(|e| e.context("Invalid (non-executable) plan after Analyzer"))?;

log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}

/// Do necessary check and fail the invalid plan
fn check_plan(plan: &LogicalPlan) -> Result<()> {
plan.apply_with_subqueries(|plan: &LogicalPlan| {
plan.apply_expressions(|expr| {
// recursively look for subqueries
expr.apply(|expr| {
match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
check_subquery_expr(plan, &subquery.subquery, expr)?;
}
_ => {}
};
Ok(TreeNodeRecursion::Continue)
})
})
})
.map(|_| ())
}
5 changes: 3 additions & 2 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ use std::ops::Deref;
use std::sync::Arc;

use crate::simplify_expressions::ExprSimplifier;
use crate::utils::collect_subquery_cols;

use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
};
use datafusion_common::{plan_err, Column, DFSchemaRef, HashMap, Result, ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction};
use datafusion_expr::utils::{
collect_subquery_cols, conjunction, find_join_exprs, split_conjunction,
};
use datafusion_expr::{
expr, lit, BinaryExpr, Cast, EmptyRelation, Expr, FetchType, LogicalPlan,
LogicalPlanBuilder, Operator,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ mod tests {
.build()?;

// Maybe okay if the table only has a single column?
let expected = "check_analyzed_plan\
let expected = "Invalid (non-executable) plan after Analyzer\
\ncaused by\
\nError during planning: InSubquery should only return one column, but found 4";
assert_analyzer_check_err(vec![], plan, expected);
Expand Down Expand Up @@ -930,7 +930,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = "check_analyzed_plan\
let expected = "Invalid (non-executable) plan after Analyzer\
\ncaused by\
\nError during planning: InSubquery should only return one column";
assert_analyzer_check_err(vec![], plan, expected);
Expand Down
Loading
Loading