diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 3cbe14cb558eb..509287b060cfd 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -321,6 +321,15 @@ config_namespace! { /// Should DataFusion keep the columns used for partition_by in the output RecordBatches pub keep_partition_by_columns: bool, default = false + + /// Aggregation rate (number of distinct groups / number of input rows) + /// threshold for skipping partial aggregation. If the value is greater + /// then partial aggregation will skip aggregation for further input + pub skip_partial_aggregation_probe_rate_threshold: f64, default = 0.8 + + /// Number of input rows partial aggregation partition should process, before + /// aggregation rate check and trying to switch to skipping aggregation mode + pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 } } diff --git a/datafusion/expr/src/groups_accumulator.rs b/datafusion/expr/src/groups_accumulator.rs index 2ffbfb266e9ca..da4c28b251f08 100644 --- a/datafusion/expr/src/groups_accumulator.rs +++ b/datafusion/expr/src/groups_accumulator.rs @@ -18,7 +18,7 @@ //! Vectorized [`GroupsAccumulator`] use arrow_array::{ArrayRef, BooleanArray}; -use datafusion_common::Result; +use datafusion_common::{not_impl_err, Result}; /// Describes how many rows should be emitted during grouping. #[derive(Debug, Clone, Copy)] @@ -158,6 +158,24 @@ pub trait GroupsAccumulator: Send { total_num_groups: usize, ) -> Result<()>; + /// Converts input batch to intermediate aggregate state, + /// without grouping (each input row considered as a separate + /// group). + fn convert_to_state( + &self, + _values: &[ArrayRef], + _opt_filter: Option<&BooleanArray>, + ) -> Result> { + not_impl_err!("Input batch conversion to state not implemented") + } + + /// Returns `true` is groups accumulator supports input batch + /// to intermediate aggregate state conversion (`convert_to_state` + /// method is implemented). + fn convert_to_state_supported(&self) -> bool { + false + } + /// Amount of memory used to store the state of this accumulator, /// in bytes. This function is called once per batch, so it should /// be `O(n)` to compute, not `O(num_groups)` diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 0ead22e90a163..1b6acb282f949 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -33,7 +33,7 @@ use arrow::{ }; use arrow::{ - array::{Array, BooleanArray, Int64Array, PrimitiveArray}, + array::{Array, BooleanArray, Int64Array, Int64Builder, PrimitiveArray}, buffer::BooleanBuffer, }; use datafusion_common::{ @@ -432,6 +432,49 @@ impl GroupsAccumulator for CountGroupsAccumulator { Ok(vec![Arc::new(counts) as ArrayRef]) } + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let values = &values[0]; + + let state_array = match (values.logical_nulls(), opt_filter) { + (Some(nulls), None) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls + .into_iter() + .for_each(|is_valid| builder.append_value(is_valid as i64)); + builder.finish() + } + (Some(nulls), Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls.into_iter().zip(filter.iter()).for_each( + |(is_valid, filter_value)| { + builder.append_value( + (is_valid && filter_value.is_some_and(|val| val)) as i64, + ) + }, + ); + builder.finish() + } + (None, Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + filter.into_iter().for_each(|filter_value| { + builder.append_value(filter_value.is_some_and(|val| val) as i64) + }); + builder.finish() + } + (None, None) => Int64Array::from_value(1, values.len()), + }; + + Ok(vec![Arc::new(state_array)]) + } + + fn convert_to_state_supported(&self) -> bool { + true + } + fn size(&self) -> usize { self.counts.capacity() * std::mem::size_of::() } diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index debb36852b224..5aad5486dc30c 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}; +use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, PrimitiveBuilder}; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::DataType; use datafusion_common::Result; @@ -134,6 +134,46 @@ where self.update_batch(values, group_indices, opt_filter, total_num_groups) } + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let values = values[0].as_primitive::(); + let mut state = PrimitiveBuilder::::with_capacity(values.len()) + .with_data_type(self.data_type.clone()); + + match opt_filter { + Some(filter) => { + values + .iter() + .zip(filter.iter()) + .for_each(|(val, filter_val)| match (val, filter_val) { + (Some(val), Some(true)) => { + let mut state_val = self.starting_value; + (self.prim_fn)(&mut state_val, val); + state.append_value(state_val); + } + (_, _) => state.append_null(), + }) + } + None => values.iter().for_each(|val| match val { + Some(val) => { + let mut state_val = self.starting_value; + (self.prim_fn)(&mut state_val, val); + state.append_value(state_val); + } + None => state.append_null(), + }), + }; + + Ok(vec![Arc::new(state.finish())]) + } + + fn convert_to_state_supported(&self) -> bool { + true + } + fn size(&self) -> usize { self.values.capacity() * std::mem::size_of::() + self.null_state.size() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e7cd5cb2725be..831e1c6d03cf7 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2442,4 +2442,185 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_skip_aggregation_after_first_batch() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr: Vec> = vec![create_aggregate_expr( + &count_udaf(), + &[col("val", &schema)?], + &[datafusion_expr::col("val")], + &[], + &[], + &schema, + "COUNT(val)", + false, + false, + false, + )?]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = Arc::new(MemoryExec::try_new( + &[input_data], + Arc::clone(&schema), + None, + )?); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + ScalarValue::Int64(Some(2)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rate_threshold", + ScalarValue::Float64(Some(0.1)), + ); + + let ctx = TaskContext::default().with_session_config(session_config); + let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + + let expected = [ + "+-----+-------------------+", + "| key | COUNT(val)[count] |", + "+-----+-------------------+", + "| 1 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 4 | 1 |", + "+-----+-------------------+", + ]; + assert_batches_eq!(expected, &output); + + Ok(()) + } + + #[tokio::test] + async fn test_skip_aggregation_after_threshold() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr: Vec> = vec![create_aggregate_expr( + &count_udaf(), + &[col("val", &schema)?], + &[datafusion_expr::col("val")], + &[], + &[], + &schema, + "COUNT(val)", + false, + false, + false, + )?]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = Arc::new(MemoryExec::try_new( + &[input_data], + Arc::clone(&schema), + None, + )?); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + ScalarValue::Int64(Some(5)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rate_threshold", + ScalarValue::Float64(Some(0.1)), + ); + + let ctx = TaskContext::default().with_session_config(session_config); + let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + + let expected = [ + "+-----+-------------------+", + "| key | COUNT(val)[count] |", + "+-----+-------------------+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | 2 |", + "| 4 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 4 | 1 |", + "+-----+-------------------+", + ]; + assert_batches_eq!(expected, &output); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 167ca72407503..523a0e2ac7f4a 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -39,7 +39,7 @@ use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; use arrow::datatypes::SchemaRef; use arrow_schema::SortOptions; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -62,6 +62,10 @@ pub(crate) enum ExecutionState { /// When producing output, the remaining rows to output are stored /// here and are sliced off as needed in batch_size chunks ProducingOutput(RecordBatch), + /// Indicates that GroupedHashAggregateStream should produce + /// intermediate aggregate state for each input rows without + /// their aggregation + SkippingAggregation, Done, } @@ -90,6 +94,69 @@ struct SpillState { merging_group_by: PhysicalGroupBy, } +struct SkipAggregationProbe { + /// Number of processed input rows + input_rows: usize, + /// Number of total group values for `input_rows` + num_groups: usize, + + /// Aggregation rate check should be performed only when the + /// number of input rows exceeds this threshold + probe_rows_threshold: usize, + /// Maximum allowed value of `input_rows` / `num_groups` to + /// continue aggregation + probe_rate_threshold: f64, + + /// Flag indicating that further data aggregation mey be skipped + should_skip: bool, + /// Flag indicating that further updates of `SkipAggregationProbe` + /// state won't make any effect + is_locked: bool, +} + +impl SkipAggregationProbe { + fn new(probe_rows_threshold: usize, probe_rate_threshold: f64) -> Self { + Self { + input_rows: 0, + num_groups: 0, + probe_rows_threshold, + probe_rate_threshold, + should_skip: false, + is_locked: false, + } + } + + /// Updates `SkipAggregationProbe` state: + /// - increments the number of input rows + /// - replaces the number of groups with the new value + /// - on `probe_rows_threshold` exceeded calculates aggregation rate + /// and sets `should_skip` flag + /// - if `should_skip` is set, locks further state updates + fn update_state(&mut self, input_rows: usize, num_groups: usize) { + if self.is_locked { + return; + } + self.input_rows += input_rows; + self.num_groups = num_groups; + if self.input_rows >= self.probe_rows_threshold { + self.should_skip = self.num_groups as f64 / self.input_rows as f64 + >= self.probe_rate_threshold; + self.is_locked = true; + } + } + + fn should_skip(&self) -> bool { + self.should_skip + } + + /// Provides an ability to externally set `should_skip` flag + /// to `false` and prohibit further state updates + fn forbid_skipping(&mut self) { + self.should_skip = false; + self.is_locked = true; + } +} + /// HashTable based Grouping Aggregator /// /// # Design Goals @@ -275,6 +342,10 @@ pub(crate) struct GroupedHashAggregateStream { /// the `GroupedHashAggregateStream` operation immediately switches to /// output mode and emits all groups. group_values_soft_limit: Option, + + /// Optional probe for skipping data aggregation, if supported by + /// current stream + skip_aggregation_probe: Option, } impl GroupedHashAggregateStream { @@ -365,6 +436,36 @@ impl GroupedHashAggregateStream { merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), }; + // Skip aggregation is supported if: + // - aggregation mode is Partial + // - input is not ordered by GROUP BY expressions, + // since Final mode expects unique group values as its input + // - all accumulators support input batch to intermediate + // aggregate state conversion + // - there is only one GROUP BY expressions set + let skip_aggregation_probe = if agg.mode == AggregateMode::Partial + && matches!(group_ordering, GroupOrdering::None) + && accumulators + .iter() + .all(|acc| acc.convert_to_state_supported()) + && agg_group_by.is_single() + { + Some(SkipAggregationProbe::new( + context + .session_config() + .options() + .execution + .skip_partial_aggregation_probe_rows_threshold, + context + .session_config() + .options() + .execution + .skip_partial_aggregation_probe_rate_threshold, + )) + } else { + None + }; + Ok(GroupedHashAggregateStream { schema: agg_schema, input, @@ -384,6 +485,7 @@ impl GroupedHashAggregateStream { runtime: context.runtime_env(), spill_state, group_values_soft_limit: agg.limit, + skip_aggregation_probe, }) } } @@ -434,12 +536,16 @@ impl Stream for GroupedHashAggregateStream { // new batch to aggregate Some(Ok(batch)) => { let timer = elapsed_compute.timer(); + let input_rows = batch.num_rows(); + // Make sure we have enough capacity for `batch`, otherwise spill extract_ok!(self.spill_previous_if_necessary(&batch)); // Do the grouping extract_ok!(self.group_aggregate_batch(batch)); + self.update_skip_aggregation_probe(input_rows); + // If we can begin emitting rows, do so, // otherwise keep consuming input assert!(!self.input_done); @@ -463,6 +569,8 @@ impl Stream for GroupedHashAggregateStream { extract_ok!(self.emit_early_if_necessary()); + extract_ok!(self.switch_to_skip_aggregation()); + timer.done(); } Some(Err(e)) => { @@ -476,6 +584,26 @@ impl Stream for GroupedHashAggregateStream { } } + ExecutionState::SkippingAggregation => { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let _timer = elapsed_compute.timer(); + let states = self.transform_to_states(batch)?; + return Poll::Ready(Some(Ok( + states.record_output(&self.baseline_metrics) + ))); + } + Some(Err(e)) => { + // inner had error, return to caller + return Poll::Ready(Some(Err(e))); + } + None => { + // inner is done, switching to `Done` state + self.exec_state = ExecutionState::Done; + } + } + } + ExecutionState::ProducingOutput(batch) => { // slice off a part of the batch, if needed let output_batch; @@ -484,6 +612,12 @@ impl Stream for GroupedHashAggregateStream { ( if self.input_done { ExecutionState::Done + } else if self + .skip_aggregation_probe + .as_ref() + .is_some_and(|probe| probe.should_skip()) + { + ExecutionState::SkippingAggregation } else { ExecutionState::ReadingInput }, @@ -797,4 +931,59 @@ impl GroupedHashAggregateStream { timer.done(); Ok(()) } + + // Updates skip aggregation probe state. + // In case stream has any spills, the probe is forcefully set to + // forbid aggregation skipping, and locked, since spilling resets + // total number of unique groups. + // + // Note: currently spilling is not supported for Partial aggregation + fn update_skip_aggregation_probe(&mut self, input_rows: usize) { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + if !self.spill_state.spills.is_empty() { + probe.forbid_skipping(); + } else { + probe.update_state(input_rows, self.group_values.len()); + } + }; + } + + // In case the probe indicates that aggregation may be + // skipped, forces stream to produce currently accumulated output. + fn switch_to_skip_aggregation(&mut self) -> Result<()> { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + if probe.should_skip() { + let batch = self.emit(EmitTo::All, false)?; + self.exec_state = ExecutionState::ProducingOutput(batch); + } + } + + Ok(()) + } + + // Transforms input batch to intermediate aggregate state, without grouping it + fn transform_to_states(&self, batch: RecordBatch) -> Result { + let group_values = evaluate_group_by(&self.group_by, &batch)?; + let input_values = evaluate_many(&self.aggregate_arguments, &batch)?; + let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; + + let mut output = group_values.first().cloned().ok_or_else(|| { + internal_datafusion_err!("group_values expected to have at least one element") + })?; + + let iter = self + .accumulators + .iter() + .zip(input_values.iter()) + .zip(filter_values.iter()); + + for ((acc, values), opt_filter) in iter { + let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean()); + output.extend(acc.convert_to_state(values, opt_filter)?); + } + + let states_batch = RecordBatch::try_new(self.schema(), output)?; + + Ok(states_batch) + } } diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt new file mode 100644 index 0000000000000..814943c935c4e --- /dev/null +++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The main goal of these tests is to verify correctness of transforming +# input values to state by accumulators, supporting `convert_to_state`. + + +# Setup test data table +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +# Create table for nullable aggregations test +statement ok +CREATE TABLE aggregate_test_100_null ( + c2 TINYINT NOT NULL, + c3 SMALLINT, + c11 FLOAT +); + +# Prepare settings to always skip aggregation after couple of batches +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 10; + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rate_threshold = 0.0; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4; + +# Inserting into nullable table with batch_size specified above +# to prevent creation on single in-memory batch +statement ok +INSERT INTO aggregate_test_100_null +SELECT + c2, + CASE WHEN c1 = 'e' THEN NULL ELSE c3 END as c3, + CASE WHEN c1 = 'a' THEN NULL ELSE c11 END as c11 +FROM aggregate_test_100; + +# Test count varchar / int / float +query IIII +SELECT c2, count(c1), count(c5), count(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 22 22 22 +2 22 22 22 +3 19 19 19 +4 23 23 23 +5 14 14 14 + +# Test min / max for int / float +query IIIRR +SELECT c2, min(c5), max(c5), min(c11), max(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 -1991133944 2143473091 0.064453244 0.89651865 +2 -2138770630 2053379412 0.055064857 0.8315913 +3 -2141999138 2030965207 0.034291923 0.9488028 +4 -1885422396 2064155045 0.028003037 0.7459874 +5 -2117946883 2025611582 0.12559289 0.87989986 + +# Test sum for int / float +query IIR +SELECT c2, sum(c5), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 -438598674 12.153253793716 +2 -8259865364 9.577824473381 +3 1956035476 9.590891361237 +4 16155718643 9.531112968922 +5 6449337880 7.074412226677 + +# Enabling PG dialect for filtered aggregates tests +statement ok +set datafusion.sql_parser.dialect = 'Postgres'; + +# Test count with filter +query III +SELECT + c2, + count(c3) FILTER (WHERE c3 > 0), + count(c3) FILTER (WHERE c11 > 10) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 13 0 +2 13 0 +3 13 0 +4 13 0 +5 5 0 + +# Test min / max with filter +query III +SELECT + c2, + min(c3) FILTER (WHERE c3 > 0), + max(c3) FILTER (WHERE c3 < 0) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 12 -5 +2 1 -29 +3 13 -2 +4 3 -38 +5 36 -5 + +# Test sum with filter +query II +SELECT + c2, + sum(c3) FILTER (WHERE c1 != 'e' AND c3 > 0) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 612 +2 565 +3 466 +4 417 +5 284 + +# Test count with nullable fields +query III +SELECT c2, count(c3), count(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 19 17 +2 17 19 +3 15 13 +4 16 19 +5 12 11 + +# Test min / max with nullable fields +query IIIRR +SELECT c2, min(c3), max(c3), min(c11), max(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 -99 125 0.064453244 0.89651865 +2 -117 122 0.09683716 0.8315913 +3 -101 123 0.034291923 0.94669616 +4 -117 123 0.028003037 0.7085086 +5 -101 118 0.12559289 0.87989986 + +# Test sum with nullable fields +query IIR +SELECT c2, sum(c3), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 367 12.153253793716 +2 184 9.577824473381 +3 395 9.590891361237 +4 29 9.531112968922 +5 -194 7.074412226677 diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 5e5de016e375e..e566c84e2d4f2 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -88,6 +88,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.skip_partial_aggregation_probe_rate_threshold | 0.8 | Aggregation rate (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | +| datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation rate check and trying to switch to skipping aggregation mode | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |