Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: applying multiple times EnforceDistribution generates invalid plan #14150

Open
xudong963 opened this issue Jan 16, 2025 · 4 comments
Open
Assignees
Labels
bug Something isn't working

Comments

@xudong963
Copy link
Member

Describe the bug

For a topk SQL: select * from aggregate_test_100 ORDER BY c13 limit 5;, If applied twice EnforceDistribution, will generate an invalid plan and result in the wrong result.

The root reason is that the fetch of the limit will be missed at the second EnforceDistribution.

To Reproduce

Here is an example to reproduce

use std::sync::Arc;
use futures::StreamExt;
use datafusion::prelude::*;
use datafusion::physical_optimizer::{
    coalesce_batches::CoalesceBatches,
    enforce_distribution::EnforceDistribution,
    output_requirements::OutputRequirements,
    PhysicalOptimizerRule,
};
use datafusion::error::Result;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion::physical_optimizer::limit_pushdown::LimitPushdown;
use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown;
use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};

#[tokio::main]
async fn main() -> Result<()> {
    // Create a configuration
    let config = SessionConfig::new();
    let ctx = SessionContext::new_with_config(config);

    // Create table schema and data
    // To reproduce to bug: the LOCATION should contain more than one aggregate_test_100.csv
    let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
        c1  VARCHAR NOT NULL,
        c2  TINYINT NOT NULL,
        c3  SMALLINT NOT NULL,
        c4  SMALLINT,
        c5  INT,
        c6  BIGINT NOT NULL,
        c7  SMALLINT NOT NULL,
        c8  INT NOT NULL,
        c9  BIGINT UNSIGNED NOT NULL,
        c10 VARCHAR NOT NULL,
        c11 FLOAT NOT NULL,
        c12 DOUBLE NOT NULL,
        c13 VARCHAR NOT NULL
    )
    STORED AS CSV
    LOCATION './testing/data/csv/' 
    OPTIONS ('format.has_header' 'true')";

    ctx.sql(sql).await?;

    let df = ctx.sql("SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT 5").await?;
    let logical_plan = df.logical_plan().clone();
    let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
        logical_plan,
        ctx.state().config_options(),
        |_, _| (),
    )?;

    let optimized_logical_plan = ctx.state().optimizer().optimize(
        analyzed_logical_plan,
        &ctx.state(),
        |_, _| (),
    )?;

    let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
        // If there is a output requirement of the query, make sure that
        // this information is not lost across different rules during optimization.
        Arc::new(OutputRequirements::new_add_mode()),
        // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
        // requirements. Please make sure that the whole plan tree is determined before this rule.
        // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
        // least one of the operators in the plan benefits from increased parallelism.
        Arc::new(EnforceDistribution::new()),
        Arc::new(EnforceSorting::new()),
        // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future.
        Arc::new(ProjectionPushdown::new()),
        // The CoalesceBatches rule will not influence the distribution and ordering of the
        // whole plan tree. Therefore, to avoid influencing other rules, it should run last.
        Arc::new(CoalesceBatches::new()),
        Arc::new(EnforceDistribution::new()), // -- Add enforce distribution rule again
        // Remove the ancillary output requirement operator since we are done with the planning
        // phase.
        Arc::new(OutputRequirements::new_remove_mode()),
        Arc::new(ProjectionPushdown::new()),
        // The LimitPushdown rule tries to push limits down as far as possible,
        // replacing operators with fetching variants, or adding limits
        // past operators that support limit pushdown.
        Arc::new(LimitPushdown::new()),
        // The SanityCheckPlan rule checks whether the order and
        // distribution requirements of each node in the plan
        // is satisfied. It will also reject non-runnable query
        // plans that use pipeline-breaking operators on infinite
        // input(s). The rule generates a diagnostic error
        // message for invalid plans. It makes no changes to the
        // given query plan; i.e. it only acts as a final
        // gatekeeping rule.
        Arc::new(SanityCheckPlan::new()),
    ];

    // 2. Generate initial physical plan
    let planner = DefaultPhysicalPlanner::default();
    let session_state = SessionStateBuilder::new().
        with_config(ctx.copied_config()).with_default_features().
        with_physical_optimizer_rules(optimizers).build();
    let optimized_physical_plan = planner.create_physical_plan(&optimized_logical_plan, &session_state).await?;

    let mut results = optimized_physical_plan.execute(0, ctx.task_ctx().clone()).unwrap();

    let batch = results.next().await.unwrap()?;
    dbg!(batch.num_rows()); // 10 rows: unexpected result
    Ok(())
}

Expected behavior

Generated a valid plan and correct result as the doc said: https://github.com/apache/datafusion/blob/main/datafusion/core/src/physical_optimizer/enforce_distribution.rs#L159-L168

Additional context

No response

@xudong963 xudong963 added the bug Something isn't working label Jan 16, 2025
@xudong963
Copy link
Member Author

Any thoughts? @alamb @andygrove @Dandandan @akurmustafa

If you also think it's a bug, I am pleasure to give a fix.

@alamb
Copy link
Contributor

alamb commented Jan 16, 2025

I agree that this sounds like a bug.

Thank you for the report @xudong963 and the offer to fix

@akurmustafa
Copy link
Contributor

Any thoughts? @alamb @andygrove @Dandandan @akurmustafa

If you also think it's a bug, I am pleasure to give a fix.

This is indeed a bug. Resulting plan should be correct independent of how many times they are applied. Thanks @xudong963 for reporting this.

@xudong963
Copy link
Member Author

Thanks for your reply, will give a fix next week

@xudong963 xudong963 self-assigned this Jan 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants