Skip to content

Commit

Permalink
Avoid redundant pass-by-value in physical optimizer (apache#12261)
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi authored Sep 1, 2024
1 parent a8bca75 commit 5272007
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ fn adjust_input_keys_ordering(
return reorder_partitioned_join_keys(
requirements,
on,
vec![],
&[],
&join_constructor,
)
.map(Transformed::yes);
Expand Down Expand Up @@ -373,7 +373,7 @@ fn adjust_input_keys_ordering(
return reorder_partitioned_join_keys(
requirements,
on,
sort_options.clone(),
sort_options,
&join_constructor,
)
.map(Transformed::yes);
Expand Down Expand Up @@ -421,7 +421,7 @@ fn adjust_input_keys_ordering(
fn reorder_partitioned_join_keys<F>(
mut join_plan: PlanWithKeyRequirements,
on: &[(PhysicalExprRef, PhysicalExprRef)],
sort_options: Vec<SortOptions>,
sort_options: &[SortOptions],
join_constructor: &F,
) -> Result<PlanWithKeyRequirements>
where
Expand Down
24 changes: 12 additions & 12 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ fn try_pushdown_through_hash_join(

if !join_allows_pushdown(
&projection_as_columns,
hash_join.schema(),
&hash_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
Expand Down Expand Up @@ -662,7 +662,7 @@ fn try_pushdown_through_hash_join(
};

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
hash_join.left(),
Expand Down Expand Up @@ -700,15 +700,15 @@ fn try_swapping_with_cross_join(

if !join_allows_pushdown(
&projection_as_columns,
cross_join.schema(),
&cross_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
return Ok(None);
}

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
cross_join.left(),
Expand Down Expand Up @@ -740,7 +740,7 @@ fn try_swapping_with_nested_loop_join(

if !join_allows_pushdown(
&projection_as_columns,
nl_join.schema(),
&nl_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
Expand All @@ -762,7 +762,7 @@ fn try_swapping_with_nested_loop_join(
};

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
nl_join.left(),
Expand Down Expand Up @@ -796,7 +796,7 @@ fn try_swapping_with_sort_merge_join(

if !join_allows_pushdown(
&projection_as_columns,
sm_join.schema(),
&sm_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
Expand All @@ -813,7 +813,7 @@ fn try_swapping_with_sort_merge_join(
};

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
sm_join.children()[0],
Expand Down Expand Up @@ -850,7 +850,7 @@ fn try_swapping_with_sym_hash_join(

if !join_allows_pushdown(
&projection_as_columns,
sym_join.schema(),
&sym_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
Expand Down Expand Up @@ -881,7 +881,7 @@ fn try_swapping_with_sym_hash_join(
};

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
sym_join.left(),
Expand Down Expand Up @@ -1243,7 +1243,7 @@ fn new_indices_for_join_filter(
/// - Left or right table is not lost after the projection.
fn join_allows_pushdown(
projection_as_columns: &[(Column, String)],
join_schema: SchemaRef,
join_schema: &SchemaRef,
far_right_left_col_ind: i32,
far_left_right_col_ind: i32,
) -> bool {
Expand All @@ -1260,7 +1260,7 @@ fn join_allows_pushdown(
/// this function constructs the new [`ProjectionExec`]s that will come on top
/// of the original children of the join.
fn new_join_children(
projection_as_columns: Vec<(Column, String)>,
projection_as_columns: &[(Column, String)],
far_right_left_col_ind: i32,
far_left_right_col_ind: i32,
left_child: &Arc<dyn ExecutionPlan>,
Expand Down
26 changes: 14 additions & 12 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ fn pushdown_requirement_to_children(
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
let request_child = required_input_ordering[0].as_deref().unwrap_or(&[]);
let child_plan = plan.children().swap_remove(0).clone();
let child_plan = plan.children().swap_remove(0);
match determine_children_requirement(parent_required, request_child, child_plan) {
RequirementsCompatibility::Satisfy => {
let req = (!request_child.is_empty()).then(|| request_child.to_vec());
Expand Down Expand Up @@ -225,7 +225,7 @@ fn pushdown_requirement_to_children(
Some(JoinSide::Left) => try_pushdown_requirements_to_join(
smj,
parent_required,
parent_required_expr,
&parent_required_expr,
JoinSide::Left,
),
Some(JoinSide::Right) => {
Expand All @@ -238,7 +238,7 @@ fn pushdown_requirement_to_children(
try_pushdown_requirements_to_join(
smj,
parent_required,
new_right_required_expr,
&new_right_required_expr,
JoinSide::Right,
)
}
Expand Down Expand Up @@ -321,7 +321,7 @@ fn pushdown_would_violate_requirements(
fn determine_children_requirement(
parent_required: LexRequirementRef,
request_child: LexRequirementRef,
child_plan: Arc<dyn ExecutionPlan>,
child_plan: &Arc<dyn ExecutionPlan>,
) -> RequirementsCompatibility {
if child_plan
.equivalence_properties()
Expand All @@ -344,7 +344,7 @@ fn determine_children_requirement(
fn try_pushdown_requirements_to_join(
smj: &SortMergeJoinExec,
parent_required: LexRequirementRef,
sort_expr: Vec<PhysicalSortExpr>,
sort_expr: &[PhysicalSortExpr],
push_side: JoinSide,
) -> Result<Option<Vec<Option<LexRequirement>>>> {
let left_eq_properties = smj.left().equivalence_properties();
Expand All @@ -356,25 +356,27 @@ fn try_pushdown_requirements_to_join(
let right_ordering = smj.right().output_ordering().unwrap_or(&[]);
let (new_left_ordering, new_right_ordering) = match push_side {
JoinSide::Left => {
let left_eq_properties =
left_eq_properties.clone().with_reorder(sort_expr.clone());
let left_eq_properties = left_eq_properties
.clone()
.with_reorder(Vec::from(sort_expr));
if left_eq_properties
.ordering_satisfy_requirement(&left_requirement.unwrap_or_default())
{
// After re-ordering requirement is still satisfied
(sort_expr.as_slice(), right_ordering)
(sort_expr, right_ordering)
} else {
return Ok(None);
}
}
JoinSide::Right => {
let right_eq_properties =
right_eq_properties.clone().with_reorder(sort_expr.clone());
let right_eq_properties = right_eq_properties
.clone()
.with_reorder(Vec::from(sort_expr));
if right_eq_properties
.ordering_satisfy_requirement(&right_requirement.unwrap_or_default())
{
// After re-ordering requirement is still satisfied
(left_ordering, sort_expr.as_slice())
(left_ordering, sort_expr)
} else {
return Ok(None);
}
Expand All @@ -397,7 +399,7 @@ fn try_pushdown_requirements_to_join(
let should_pushdown = smj_eqs.ordering_satisfy_requirement(parent_required);
Ok(should_pushdown.then(|| {
let mut required_input_ordering = smj.required_input_ordering();
let new_req = Some(PhysicalSortRequirement::from_sort_exprs(&sort_expr));
let new_req = Some(PhysicalSortRequirement::from_sort_exprs(sort_expr));
match push_side {
JoinSide::Left => {
required_input_ordering[0] = new_req;
Expand Down
14 changes: 6 additions & 8 deletions datafusion/core/src/physical_optimizer/topk_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl TopKAggregation {
Some(Arc::new(new_aggr))
}

fn transform_sort(plan: Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
fn transform_sort(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
let sort = plan.as_any().downcast_ref::<SortExec>()?;

let children = sort.children();
Expand Down Expand Up @@ -142,13 +142,11 @@ impl PhysicalOptimizerRule for TopKAggregation {
) -> Result<Arc<dyn ExecutionPlan>> {
if config.optimizer.enable_topk_aggregation {
plan.transform_down(|plan| {
Ok(
if let Some(plan) = TopKAggregation::transform_sort(plan.clone()) {
Transformed::yes(plan)
} else {
Transformed::no(plan)
},
)
Ok(if let Some(plan) = TopKAggregation::transform_sort(&plan) {
Transformed::yes(plan)
} else {
Transformed::no(plan)
})
})
.data()
} else {
Expand Down

0 comments on commit 5272007

Please sign in to comment.