Skip to content

Commit

Permalink
Support HyperLogLog++
Browse files Browse the repository at this point in the history
Signed-off-by: Chong Gao <[email protected]>
  • Loading branch information
Chong Gao committed Oct 31, 2024
1 parent ed4c878 commit 0fc310f
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 0 deletions.
6 changes: 6 additions & 0 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2239,3 +2239,9 @@ def do_it(spark):
assert_gpu_and_cpu_are_equal_collect(do_it,
conf={'spark.rapids.sql.agg.forceSinglePassPartialSort': True,
'spark.rapids.sql.agg.singlePassPartialSortEnabled': True})

# TODO assert approximate result between CPU and GPU
@pytest.mark.parametrize('data_gen', all_basic_gens_no_null, ids=idfn)
def test_hyper_log_log_plus_plus(data_gen):
with_gpu_session(
lambda spark : binary_op_df(spark, data_gen).groupby('a').agg(f.approx_count_distinct('a'))).collect()
Original file line number Diff line number Diff line change
Expand Up @@ -3918,6 +3918,16 @@ object GpuOverrides extends Logging {
GpuDynamicPruningExpression(child)
}
}),
expr[HyperLogLogPlusPlus](
"Aggregation approximate count distinct",
ExprChecks.reductionAndGroupByAgg(TypeSig.LONG, TypeSig.LONG,
Seq(ParamCheck("input", TypeSig.cpuAtomics, TypeSig.all))),
(a, conf, p, r) => new UnaryExprMeta[HyperLogLogPlusPlus](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression = {
GpuHyperLogLogPlusPlus(child, a.relativeSD)
}
}
),
SparkShimImpl.ansiCastRule
).collect { case r if r != null => (r.getClassFor.asSubclass(classOf[Expression]), r)}.toMap

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.aggregate

import scala.collection.immutable.Seq

import ai.rapids.cudf
import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResourceIfAllowed
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression
import com.nvidia.spark.rapids.jni.HLLPP
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.rapids.{GpuCreateNamedStruct, GpuGetStructField}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

case class CudfHLLPP(override val dataType: DataType,
precision: Int) extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) => input.reduce(
ReductionAggregation.HLLPP(precision), DType.STRUCT)
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.HLLPP(precision)
override val name: String = "CudfHyperLogLogPlusPlus"
}

case class CudfMergeHLLPP(override val dataType: DataType,
precision: Int)
extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) =>
input.reduce(ReductionAggregation.mergeHLL(precision), DType.STRUCT)
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.mergeHLL(precision)
override val name: String = "CudfMergeHyperLogLogPlusPlus"
}

/**
* Perform the final evaluation step to compute approximate count distinct from sketches.
* Input is long columns, first construct struct of long then feed to cuDF
*/
case class GpuHyperLogLogPlusPlusEvaluation(childExpr: Expression,
precision: Int)
extends GpuExpression with ShimExpression {
override def dataType: DataType = LongType

override def nullable: Boolean = false

override def prettyName: String = "HyperLogLogPlusPlus_evaluation"

override def children: scala.Seq[Expression] = Seq(childExpr)

override def columnarEval(batch: ColumnarBatch): GpuColumnVector = {
withResourceIfAllowed(childExpr.columnarEval(batch)) { sketches =>
val distinctValues = HLLPP.estimateDistinctValueFromSketches(
sketches.getBase, precision)
GpuColumnVector.from(distinctValues, LongType)
}
}
}

/**
* Gpu version of HyperLogLogPlusPlus
* Spark APPROX_COUNT_DISTINCT on NULLs returns zero
*/
case class GpuHyperLogLogPlusPlus(childExpr: Expression, relativeSD: Double)
extends GpuAggregateFunction with Serializable {

// Consistent with Spark
private lazy val precision: Int =
Math.ceil(2.0d * Math.log(1.106d / relativeSD) / Math.log(2.0d)).toInt;

private lazy val numRegistersPerSketch: Int = 1 << precision;

// Each long contains 10 register(max 6 bits)
private lazy val numWords = numRegistersPerSketch / 10 + 1

// Spark agg buffer type: long array
private lazy val sparkAggBufferAttributes: Seq[AttributeReference] = {
Seq.tabulate(numWords) { i =>
AttributeReference(s"MS[$i]", LongType)()
}
}

/**
* Spark uses long columns to save agg buffer, e.g.: long[52]
* Each long compacts multiple registers to save memory
*/
override val aggBufferAttributes: Seq[AttributeReference] = sparkAggBufferAttributes

/**
* init long array with all zero
*/
override lazy val initialValues: Seq[Expression] = Seq.tabulate(numWords) { _ =>
GpuLiteral(0L, LongType)
}

override lazy val inputProjection: Seq[Expression] = Seq(childExpr)

/**
* cuDF HLLPP sketch type: struct<long, ..., long>
*/
private lazy val cuDFBufferType: DataType = StructType.fromAttributes(aggBufferAttributes)

/**
* cuDF uses Struct<long, ..., long> column to do aggregate
*/
override lazy val updateAggregates: Seq[CudfAggregate] =
Seq(CudfHLLPP(cuDFBufferType, precision))

/**
* Convert long columns to Struct<long, ..., long> column
*/
private def genStruct: Seq[Expression] = {
val names = Seq.tabulate(numWords) { i => GpuLiteral(s"MS[$i]", StringType) }
Seq(GpuCreateNamedStruct(names.zip(aggBufferAttributes).flatten { case (a, b) => List(a, b) }))
}

/**
* Convert Struct<long, ..., long> column to long columns
*/
override lazy val postUpdate: Seq[Expression] = Seq.tabulate(numWords) {
i => GpuGetStructField(postUpdateAttr.head, i)
}

/**
* convert to Struct<long, ..., long>
*/
override lazy val preMerge: Seq[Expression] = genStruct

override lazy val mergeAggregates: Seq[CudfAggregate] =
Seq(CudfMergeHLLPP(cuDFBufferType, precision))

/**
* Convert Struct<long, ..., long> column to long columns
*/
override lazy val postMerge: Seq[Expression] = Seq.tabulate(numWords) {
i => GpuGetStructField(postMergeAttr.head, i)
}

override lazy val evaluateExpression: Expression =
GpuHyperLogLogPlusPlusEvaluation(genStruct.head, precision)

override def dataType: DataType = LongType

// Spark APPROX_COUNT_DISTINCT on NULLs returns zero
override def nullable: Boolean = false

override def prettyName: String = "approx_count_distinct"

override def children: Seq[Expression] = Seq(childExpr)
}

0 comments on commit 0fc310f

Please sign in to comment.