Skip to content

Commit

Permalink
Dynamically optimize aggregate based on shuffle stats (#919)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan authored Nov 28, 2023
1 parent b8bd8fc commit e474e34
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::iter::FromIterator;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
Expand Down Expand Up @@ -348,9 +349,13 @@ impl UnresolvedStage {
&input_locations,
)?;

// Optimize join order based on new resolved statistics
// Optimize join order and statistics based on new resolved statistics
let optimize_join = JoinSelection::new();
let plan = optimize_join.optimize(plan, SessionConfig::default().options())?;
let config = SessionConfig::default();
let plan = optimize_join.optimize(plan, config.options())?;
let optimize_aggregate = AggregateStatistics::new();
let plan =
optimize_aggregate.optimize(plan, SessionConfig::default().options())?;

Ok(ResolvedStage::new(
self.stage_id,
Expand Down

0 comments on commit e474e34

Please sign in to comment.