Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP(iox-10578): patched df upgrade 202-04-27 #17

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@ config_namespace! {
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
pub default_filter_selectivity: u8, default = 20

/// When set to true, the optimizer will not attempt to convert Union to Interleave
pub prefer_existing_union: bool, default = false
}
}

Expand Down
90 changes: 83 additions & 7 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,11 @@ fn ensure_distribution(
.collect::<Result<Vec<_>>>()?;

let children_plans = children.iter().map(|c| c.plan.clone()).collect::<Vec<_>>();
plan = if plan.as_any().is::<UnionExec>() && can_interleave(children_plans.iter()) {

plan = if plan.as_any().is::<UnionExec>()
&& !config.optimizer.prefer_existing_union
&& can_interleave(children_plans.iter())
{
// Add a special case for [`UnionExec`] since we want to "bubble up"
// hash-partitioned data. So instead of
//
Expand Down Expand Up @@ -1721,23 +1725,33 @@ pub(crate) mod tests {
/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans
/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
/// * `PREFER_EXISTING_UNION` (optional) - if true, will not attempt to convert Union to Interleave
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024);
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024, false);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024);
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, false);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, false);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION;

// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
// because they were written prior to the separation of `BasicEnforcement` into
Expand Down Expand Up @@ -3097,7 +3111,67 @@ pub(crate) mod tests {
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
assert_optimized!(expected, plan.clone(), false);

Ok(())
}

#[test]
fn union_not_to_interleave() -> Result<()> {
// group by (a as a1)
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
// group by (a as a2)
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);

// Union
let plan = Arc::new(UnionExec::new(vec![left, right]));

// final agg
let plan =
aggregate_exec_with_alias(plan, vec![("a1".to_string(), "a2".to_string())]);

// Only two RepartitionExecs added, no final RepartitionExec required
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20",
"AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
"UnionExec",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
// no sort in the plan but since we need it as a parameter, make it default false
let prefer_existing_sort = false;
let first_enforce_distribution = true;
let prefer_existing_union = true;

assert_optimized!(
expected,
plan.clone(),
first_enforce_distribution,
prefer_existing_sort,
prefer_existing_union
);
assert_optimized!(
expected,
plan,
!first_enforce_distribution,
prefer_existing_sort,
prefer_existing_union
);

Ok(())
}
Expand Down Expand Up @@ -3651,7 +3725,8 @@ pub(crate) mod tests {
true,
target_partitions,
true,
repartition_size
repartition_size,
false
);

let expected = [
Expand All @@ -3668,7 +3743,8 @@ pub(crate) mod tests {
true,
target_partitions,
true,
repartition_size
repartition_size,
false
);

Ok(())
Expand Down Expand Up @@ -3731,7 +3807,7 @@ pub(crate) mod tests {
)),
vec![("a".to_string(), "a".to_string())],
);
assert_optimized!(expected, plan, true, false, 2, true, 10);
assert_optimized!(expected, plan, true, false, 2, true, 10, false);
}
Ok(())
}
Expand Down
8 changes: 8 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,14 @@ impl SessionConfig {
self
}

/// Prefer existing union (true). See [prefer_existing_union] for more details
///
/// [prefer_existing_union]: datafusion_common::config::OptimizerOptions::prefer_existing_union
pub fn with_prefer_existing_union(mut self, enabled: bool) -> Self {
self.options.optimizer.prefer_existing_union = enabled;
self
}

/// Enables or disables the use of pruning predicate for parquet readers to skip row groups
pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.pruning = enabled;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
datafusion.optimizer.max_passes 3
datafusion.optimizer.prefer_existing_sort false
datafusion.optimizer.prefer_existing_union false
datafusion.optimizer.prefer_hash_join true
datafusion.optimizer.repartition_aggregations true
datafusion.optimizer.repartition_file_min_size 10485760
Expand Down Expand Up @@ -294,6 +295,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum es
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level
datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning.
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down