From ddb4facb043db180728990cee322b4a3464b91bb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Sep 2024 10:58:12 -0400 Subject: [PATCH] Refactor PrimitiveGroupValueBuilder to use `MaybeNullBufferBuilder` (#12623) * Refactor PrimitiveGroupValueBuilder to use BooleanBuilder * Refactor boolean buffer builder out * tweaks * tweak * simplify * Add specializations for null / non null --- .../src/aggregates/group_values/column.rs | 79 ++++------ .../aggregates/group_values/group_column.rs | 148 +++++++++++------- .../src/aggregates/group_values/mod.rs | 1 + .../aggregates/group_values/null_builder.rs | 115 ++++++++++++++ 4 files changed, 240 insertions(+), 103 deletions(-) create mode 100644 datafusion/physical-plan/src/aggregates/group_values/null_builder.rs diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 907e453cf606..91d87302ce99 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -16,7 +16,8 @@ // under the License. use crate::aggregates::group_values::group_column::{ - ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder, + ByteGroupValueBuilder, GroupColumn, NonNullPrimitiveGroupValueBuilder, + PrimitiveGroupValueBuilder, }; use crate::aggregates::group_values::GroupValues; use ahash::RandomState; @@ -123,6 +124,26 @@ impl GroupValuesColumn { } } +/// instantiates a [`PrimitiveGroupValueBuilder`] or +/// [`NonNullPrimitiveGroupValueBuilder`] and pushes it into $v +/// +/// Arguments: +/// `$v`: the vector to push the new builder into +/// `$nullable`: whether the input can contains nulls +/// `$t`: the primitive type of the builder +/// +macro_rules! instantiate_primitive { + ($v:expr, $nullable:expr, $t:ty) => { + if $nullable { + let b = PrimitiveGroupValueBuilder::<$t>::new(); + $v.push(Box::new(b) as _) + } else { + let b = NonNullPrimitiveGroupValueBuilder::<$t>::new(); + $v.push(Box::new(b) as _) + } + }; +} + impl GroupValues for GroupValuesColumn { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { let n_rows = cols[0].len(); @@ -133,54 +154,22 @@ impl GroupValues for GroupValuesColumn { for f in self.schema.fields().iter() { let nullable = f.is_nullable(); match f.data_type() { - &DataType::Int8 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Int16 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Int32 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Int64 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::UInt8 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::UInt16 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::UInt32 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::UInt64 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } + &DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type), + &DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type), + &DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type), + &DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type), + &DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type), + &DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type), + &DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type), + &DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type), &DataType::Float32 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) + instantiate_primitive!(v, nullable, Float32Type) } &DataType::Float64 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Date32 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) - } - &DataType::Date64 => { - let b = PrimitiveGroupValueBuilder::::new(nullable); - v.push(Box::new(b) as _) + instantiate_primitive!(v, nullable, Float64Type) } + &DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type), + &DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type), &DataType::Utf8 => { let b = ByteGroupValueBuilder::::new(OutputType::Utf8); v.push(Box::new(b) as _) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 53b13243b755..7409f5c214b9 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -32,11 +32,11 @@ use arrow::datatypes::GenericBinaryType; use arrow::datatypes::GenericStringType; use datafusion_common::utils::proxy::VecAllocExt; +use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; +use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; use std::sync::Arc; use std::vec; -use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; - /// Trait for storing a single column of group values in [`GroupValuesColumn`] /// /// Implementations of this trait store an in-progress collection of group values @@ -47,6 +47,8 @@ use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAP pub trait GroupColumn: Send + Sync { /// Returns equal if the row stored in this builder at `lhs_row` is equal to /// the row in `array` at `rhs_row` + /// + /// Note that this comparison returns true if both elements are NULL fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; /// Appends the row at `row` in `array` to this builder fn append_val(&mut self, array: &ArrayRef, row: usize); @@ -61,61 +63,96 @@ pub trait GroupColumn: Send + Sync { fn take_n(&mut self, n: usize) -> ArrayRef; } -/// An implementation of [`GroupColumn`] for primitive types. -pub struct PrimitiveGroupValueBuilder { +/// An implementation of [`GroupColumn`] for primitive values which are known to have no nulls +#[derive(Debug)] +pub struct NonNullPrimitiveGroupValueBuilder { group_values: Vec, - nulls: Vec, - /// whether the array contains at least one null, for fast non-null path - has_null: bool, - /// Can the input array contain nulls? - nullable: bool, } -impl PrimitiveGroupValueBuilder +impl NonNullPrimitiveGroupValueBuilder where T: ArrowPrimitiveType, { - pub fn new(nullable: bool) -> Self { + pub fn new() -> Self { Self { group_values: vec![], - nulls: vec![], - has_null: false, - nullable, } } } -impl GroupColumn for PrimitiveGroupValueBuilder { +impl GroupColumn for NonNullPrimitiveGroupValueBuilder { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - // non-null fast path - // both non-null - if !self.nullable { - return self.group_values[lhs_row] - == array.as_primitive::().value(rhs_row); - } + // know input has no nulls + self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) + } - // lhs is non-null - if self.nulls[lhs_row] { - if array.is_null(rhs_row) { - return false; - } + fn append_val(&mut self, array: &ArrayRef, row: usize) { + // input can't possibly have nulls, so don't worry about them + self.group_values.push(array.as_primitive::().value(row)) + } + + fn len(&self) -> usize { + self.group_values.len() + } + + fn size(&self) -> usize { + self.group_values.allocated_size() + } + + fn build(self: Box) -> ArrayRef { + let Self { group_values } = *self; - return self.group_values[lhs_row] - == array.as_primitive::().value(rhs_row); + let nulls = None; + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(group_values), + nulls, + )) + } + + fn take_n(&mut self, n: usize) -> ArrayRef { + let first_n = self.group_values.drain(0..n).collect::>(); + let first_n_nulls = None; + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(first_n), + first_n_nulls, + )) + } +} + +/// An implementation of [`GroupColumn`] for primitive values which may have nulls +#[derive(Debug)] +pub struct PrimitiveGroupValueBuilder { + group_values: Vec, + nulls: MaybeNullBufferBuilder, +} + +impl PrimitiveGroupValueBuilder +where + T: ArrowPrimitiveType, +{ + pub fn new() -> Self { + Self { + group_values: vec![], + nulls: MaybeNullBufferBuilder::new(), } + } +} - array.is_null(rhs_row) +impl GroupColumn for PrimitiveGroupValueBuilder { + fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + self.nulls.is_null(lhs_row) == array.is_null(rhs_row) + && self.group_values[lhs_row] == array.as_primitive::().value(rhs_row) } fn append_val(&mut self, array: &ArrayRef, row: usize) { - if self.nullable && array.is_null(row) { + if array.is_null(row) { + self.nulls.append(true); self.group_values.push(T::default_value()); - self.nulls.push(false); - self.has_null = true; } else { - let elem = array.as_primitive::().value(row); - self.group_values.push(elem); - self.nulls.push(true); + self.nulls.append(false); + self.group_values.push(array.as_primitive::().value(row)); } } @@ -128,32 +165,27 @@ impl GroupColumn for PrimitiveGroupValueBuilder { } fn build(self: Box) -> ArrayRef { - if self.has_null { - Arc::new(PrimitiveArray::::new( - ScalarBuffer::from(self.group_values), - Some(NullBuffer::from(self.nulls)), - )) - } else { - Arc::new(PrimitiveArray::::new( - ScalarBuffer::from(self.group_values), - None, - )) - } + let Self { + group_values, + nulls, + } = *self; + + let nulls = nulls.build(); + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(group_values), + nulls, + )) } fn take_n(&mut self, n: usize) -> ArrayRef { - if self.has_null { - let first_n = self.group_values.drain(0..n).collect::>(); - let first_n_nulls = self.nulls.drain(0..n).collect::>(); - Arc::new(PrimitiveArray::::new( - ScalarBuffer::from(first_n), - Some(NullBuffer::from(first_n_nulls)), - )) - } else { - let first_n = self.group_values.drain(0..n).collect::>(); - self.nulls.truncate(self.nulls.len() - n); - Arc::new(PrimitiveArray::::new(ScalarBuffer::from(first_n), None)) - } + let first_n = self.group_values.drain(0..n).collect::>(); + let first_n_nulls = self.nulls.take_n(n); + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(first_n), + first_n_nulls, + )) } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 5f13f5ca3259..fb7b66775092 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -38,6 +38,7 @@ use bytes::GroupValuesByes; use datafusion_physical_expr::binary_map::OutputType; mod group_column; +mod null_builder; /// Stores the group values during hash aggregation. /// diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs new file mode 100644 index 000000000000..0249390f38cd --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; + +/// Builder for an (optional) null mask +/// +/// Optimized for avoid creating the bitmask when all values are non-null +#[derive(Debug)] +pub(crate) enum MaybeNullBufferBuilder { + /// seen `row_count` rows but no nulls yet + NoNulls { row_count: usize }, + /// have at least one null value + /// + /// Note this is an Arrow *VALIDITY* buffer (so it is false for nulls, true + /// for non-nulls) + Nulls(BooleanBufferBuilder), +} + +impl MaybeNullBufferBuilder { + /// Create a new builder + pub fn new() -> Self { + Self::NoNulls { row_count: 0 } + } + + /// Return true if the row at index `row` is null + pub fn is_null(&self, row: usize) -> bool { + match self { + Self::NoNulls { .. } => false, + // validity mask means a unset bit is NULL + Self::Nulls(builder) => !builder.get_bit(row), + } + } + + /// Set the nullness of the next row to `is_null` + /// + /// num_values is the current length of the rows being tracked + /// + /// If `value` is true, the row is null. + /// If `value` is false, the row is non null + pub fn append(&mut self, is_null: bool) { + match self { + Self::NoNulls { row_count } if is_null => { + // have seen no nulls so far, this is the first null, + // need to create the nulls buffer for all currently valid values + // alloc 2x the need given we push a new but immediately + let mut nulls = BooleanBufferBuilder::new(*row_count * 2); + nulls.append_n(*row_count, true); + nulls.append(false); + *self = Self::Nulls(nulls); + } + Self::NoNulls { row_count } => { + *row_count += 1; + } + Self::Nulls(builder) => builder.append(!is_null), + } + } + + /// return the number of heap allocated bytes used by this structure to store boolean values + pub fn allocated_size(&self) -> usize { + match self { + Self::NoNulls { .. } => 0, + // BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes) + Self::Nulls(builder) => builder.capacity() / 8, + } + } + + /// Return a NullBuffer representing the accumulated nulls so far + pub fn build(self) -> Option { + match self { + Self::NoNulls { .. } => None, + Self::Nulls(mut builder) => Some(NullBuffer::from(builder.finish())), + } + } + + /// Returns a NullBuffer representing the first `n` rows accumulated so far + /// shifting any remaining down by `n` + pub fn take_n(&mut self, n: usize) -> Option { + match self { + Self::NoNulls { row_count } => { + *row_count -= n; + None + } + Self::Nulls(builder) => { + // Copy over the values at n..len-1 values to the start of a + // new builder and leave it in self + // + // TODO: it would be great to use something like `set_bits` from arrow here. + let mut new_builder = BooleanBufferBuilder::new(builder.len()); + for i in n..builder.len() { + new_builder.append(builder.get_bit(i)); + } + std::mem::swap(&mut new_builder, builder); + + // take only first n values from the original builder + new_builder.truncate(n); + Some(NullBuffer::from(new_builder.finish())) + } + } + } +}