diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 734b4dfb708a..e6cc3d6318c0 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -2239,3 +2239,14 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf={'spark.rapids.sql.agg.forceSinglePassPartialSort': True, 'spark.rapids.sql.agg.singlePassPartialSortEnabled': True}) + +# TODO assert approximate result between CPU and GPU +@pytest.mark.parametrize('data_gen', all_basic_gens_no_null, ids=idfn) +def test_hyper_log_log_plus_plus(data_gen): + with_gpu_session( + lambda spark : binary_op_df(spark, data_gen).groupby('a').agg(f.approx_count_distinct('b')).collect()) + +@pytest.mark.parametrize('data_gen', all_basic_gens_no_null, ids=idfn) +def test_hyper_log_log_plus_plus_reduce(data_gen): + with_gpu_session( + lambda spark : binary_op_df(spark, data_gen).agg(f.approx_count_distinct('b')).collect()) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 97440388c9f8..7e1170bde100 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3918,6 +3918,16 @@ object GpuOverrides extends Logging { GpuDynamicPruningExpression(child) } }), + expr[HyperLogLogPlusPlus]( + "Aggregation approximate count distinct", + ExprChecks.reductionAndGroupByAgg(TypeSig.LONG, TypeSig.LONG, + Seq(ParamCheck("input", TypeSig.cpuAtomics, TypeSig.all))), + (a, conf, p, r) => new UnaryExprMeta[HyperLogLogPlusPlus](a, conf, p, r) { + override def convertToGpu(child: Expression): GpuExpression = { + GpuHyperLogLogPlusPlus(child, a.relativeSD) + } + } + ), SparkShimImpl.ansiCastRule ).collect { case r if r != null => (r.getClassFor.asSubclass(classOf[Expression]), r)}.toMap diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHyperLogLogPlusPlus.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHyperLogLogPlusPlus.scala new file mode 100644 index 000000000000..fe801955535f --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHyperLogLogPlusPlus.scala @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.aggregate + +import scala.collection.immutable.Seq + +import ai.rapids.cudf +import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation} +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.Arm.withResourceIfAllowed +import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression +import com.nvidia.spark.rapids.jni.HLLPP +import com.nvidia.spark.rapids.shims.ShimExpression + +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.rapids.{GpuCreateNamedStruct, GpuGetStructField} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +case class CudfHLLPP(override val dataType: DataType, + precision: Int) extends CudfAggregate { + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (input: cudf.ColumnVector) => input.reduce( + ReductionAggregation.HLLPP(precision), DType.STRUCT) + override lazy val groupByAggregate: GroupByAggregation = + GroupByAggregation.HLLPP(precision) + override val name: String = "CudfHyperLogLogPlusPlus" +} + +case class CudfMergeHLLPP(override val dataType: DataType, + precision: Int) + extends CudfAggregate { + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (input: cudf.ColumnVector) => + input.reduce(ReductionAggregation.mergeHLL(precision), DType.STRUCT) + override lazy val groupByAggregate: GroupByAggregation = + GroupByAggregation.mergeHLL(precision) + override val name: String = "CudfMergeHyperLogLogPlusPlus" +} + +/** + * Perform the final evaluation step to compute approximate count distinct from sketches. + * Input is long columns, first construct struct of long then feed to cuDF + */ +case class GpuHyperLogLogPlusPlusEvaluation(childExpr: Expression, + precision: Int) + extends GpuExpression with ShimExpression { + override def dataType: DataType = LongType + + override def nullable: Boolean = false + + override def prettyName: String = "HyperLogLogPlusPlus_evaluation" + + override def children: scala.Seq[Expression] = Seq(childExpr) + + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + withResourceIfAllowed(childExpr.columnarEval(batch)) { sketches => + val distinctValues = HLLPP.estimateDistinctValueFromSketches( + sketches.getBase, precision) + GpuColumnVector.from(distinctValues, LongType) + } + } +} + +/** + * Gpu version of HyperLogLogPlusPlus + * Spark APPROX_COUNT_DISTINCT on NULLs returns zero + */ +case class GpuHyperLogLogPlusPlus(childExpr: Expression, relativeSD: Double) + extends GpuAggregateFunction with Serializable { + + // Consistent with Spark + private lazy val precision: Int = + Math.ceil(2.0d * Math.log(1.106d / relativeSD) / Math.log(2.0d)).toInt; + + private lazy val numRegistersPerSketch: Int = 1 << precision; + + // Each long contains 10 register(max 6 bits) + private lazy val numWords = numRegistersPerSketch / 10 + 1 + + // Spark agg buffer type: long array + private lazy val sparkAggBufferAttributes: Seq[AttributeReference] = { + Seq.tabulate(numWords) { i => + AttributeReference(s"MS[$i]", LongType)() + } + } + + /** + * Spark uses long columns to save agg buffer, e.g.: long[52] + * Each long compacts multiple registers to save memory + */ + override val aggBufferAttributes: Seq[AttributeReference] = sparkAggBufferAttributes + + /** + * init long array with all zero + */ + override lazy val initialValues: Seq[Expression] = Seq.tabulate(numWords) { _ => + GpuLiteral(0L, LongType) + } + + override lazy val inputProjection: Seq[Expression] = Seq(childExpr) + + /** + * cuDF HLLPP sketch type: struct + */ + private lazy val cuDFBufferType: DataType = StructType.fromAttributes(aggBufferAttributes) + + /** + * cuDF uses Struct column to do aggregate + */ + override lazy val updateAggregates: Seq[CudfAggregate] = + Seq(CudfHLLPP(cuDFBufferType, precision)) + + /** + * Convert long columns to Struct column + */ + private def genStruct: Seq[Expression] = { + val names = Seq.tabulate(numWords) { i => GpuLiteral(s"MS[$i]", StringType) } + Seq(GpuCreateNamedStruct(names.zip(aggBufferAttributes).flatten { case (a, b) => List(a, b) })) + } + + /** + * Convert Struct column to long columns + */ + override lazy val postUpdate: Seq[Expression] = Seq.tabulate(numWords) { + i => GpuGetStructField(postUpdateAttr.head, i) + } + + /** + * convert to Struct + */ + override lazy val preMerge: Seq[Expression] = genStruct + + override lazy val mergeAggregates: Seq[CudfAggregate] = + Seq(CudfMergeHLLPP(cuDFBufferType, precision)) + + /** + * Convert Struct column to long columns + */ + override lazy val postMerge: Seq[Expression] = Seq.tabulate(numWords) { + i => GpuGetStructField(postMergeAttr.head, i) + } + + override lazy val evaluateExpression: Expression = + GpuHyperLogLogPlusPlusEvaluation(genStruct.head, precision) + + override def dataType: DataType = LongType + + // Spark APPROX_COUNT_DISTINCT on NULLs returns zero + override def nullable: Boolean = false + + override def prettyName: String = "approx_count_distinct" + + override def children: Seq[Expression] = Seq(childExpr) +}