Skip to content

Commit

Permalink
refactor: clarify the purpose of assert_valid_optimization(), runs af…
Browse files Browse the repository at this point in the history
…ter all optimizer passes, except in debug mode it runs after each pass.
  • Loading branch information
wiedld committed Dec 23, 2024
1 parent ad1a1f8 commit 1164a7b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 28 deletions.
14 changes: 3 additions & 11 deletions datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,15 @@ fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> {

/// Returns an error if the plan does not have the expected schema.
/// Ignores metadata and nullability.
pub fn assert_expected_schema(
rule_name: &str,
schema: &DFSchemaRef,
plan: &LogicalPlan,
) -> Result<()> {
pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> Result<()> {
let equivalent = plan.schema().equivalent_names_and_types(schema);

if !equivalent {
let e = DataFusionError::Internal(format!(
Err(DataFusionError::Internal(format!(
"Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}",
schema,
plan.schema()
));
Err(DataFusionError::Context(
String::from(rule_name),
Box::new(e),
))
)))
} else {
Ok(())
}
Expand Down
52 changes: 35 additions & 17 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl Optimizer {
plan.check_invariants(InvariantLevel::Executable)
.map_err(|e| {
DataFusionError::Context(
"check_plan_before_optimizers".to_string(),
"check_plan_is_executable before optimizers".to_string(),
Box::new(e),
)
})?;
Expand All @@ -372,6 +372,8 @@ impl Optimizer {
let mut previous_plans = HashSet::with_capacity(16);
previous_plans.insert(LogicalPlanSignature::new(&new_plan));

let starting_schema = Arc::clone(new_plan.schema());

let mut i = 0;
while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
Expand All @@ -384,6 +386,7 @@ impl Optimizer {
.skip_failed_rules
.then(|| new_plan.clone());

#[cfg(debug_assertions)]
let starting_schema = Arc::clone(new_plan.schema());

let result = match rule.apply_order() {
Expand All @@ -395,14 +398,23 @@ impl Optimizer {
None => optimize_plan_node(new_plan, rule.as_ref(), config),
}
.and_then(|tnr| {
// verify after each optimizer pass.
assert_valid_optimization(rule.name(), &tnr.data, &starting_schema)
// in debug mode, run checks are each optimer pass
#[cfg(debug_assertions)]
assert_valid_optimization(&tnr.data, &starting_schema)
.map_err(|e| {
DataFusionError::Context(
format!("check_optimizer_specific_invariants after optimizer pass: {}", rule.name()),
Box::new(e),
)
})?;
#[cfg(debug_assertions)]
tnr.data.check_invariants(InvariantLevel::Executable)
.map_err(|e| {
DataFusionError::Context(
"check_optimized_plan".to_string(),
Box::new(e),
)
})?;
DataFusionError::Context(
format!("check_plan_is_executable after optimizer pass: {}", rule.name()),
Box::new(e),
)
})?;

Ok(tnr)
});
Expand Down Expand Up @@ -463,12 +475,20 @@ impl Optimizer {
i += 1;
}

// verify that the optimizer passes only mutated what was permitted.
assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
DataFusionError::Context(
"check_optimizer_specific_invariants after all passes".to_string(),
Box::new(e),
)
})?;

// verify LP is valid, after the last optimizer pass.
new_plan
.check_invariants(InvariantLevel::Executable)
.map_err(|e| {
DataFusionError::Context(
"check_plan_after_optimizers".to_string(),
"check_plan_is_executable after optimizers".to_string(),
Box::new(e),
)
})?;
Expand All @@ -479,19 +499,17 @@ impl Optimizer {
}
}

/// These are invariants which should hold true before and after each optimization.
/// These are invariants which should hold true before and after [`LogicalPlan`] optimization.
///
/// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular
/// LogicalPlan is valid. Instead this address if the optimization (before and after)
/// is valid based upon permitted changes.
/// LogicalPlan is valid. Instead this address if the optimization was valid based upon permitted changes.
fn assert_valid_optimization(
rule_name: &str,
plan: &LogicalPlan,
prev_schema: &Arc<DFSchema>,
) -> Result<()> {
// verify invariant: optimizer rule didn't change the schema
// verify invariant: optimizer passes should not change the schema
// Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
assert_expected_schema(rule_name, prev_schema, plan)?;
assert_expected_schema(prev_schema, plan)?;

Ok(())
}
Expand Down Expand Up @@ -549,8 +567,8 @@ mod tests {
let err = opt.optimize(plan, &config, &observe).unwrap_err();
assert_eq!(
"Optimizer rule 'get table_scan rule' failed\n\
caused by\ncheck_optimized_plan\n\
caused by\nget table_scan rule\n\
caused by\n\
check_optimizer_specific_invariants after optimizer pass: get table_scan rule\n\
caused by\n\
Internal error: Failed due to a difference in schemas, \
original schema: DFSchema { inner: Schema { \
Expand Down

0 comments on commit 1164a7b

Please sign in to comment.