Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Oct 23, 2024
1 parent 35518d8 commit 1945192
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def test_hll():
assert_gpu_and_cpu_are_equal_sql(
lambda spark : spark.read.parquet("/home/chongg/a"),
"tab",
"select c1, APPROX_COUNT_DISTINCT(c1) from tab group by c1"
"select c1, APPROX_COUNT_DISTINCT(c2) from tab group by c1"
)

@pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.rapids.aggregate

import scala.collection.immutable.Seq

import ai.rapids.cudf
import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation}
import com.nvidia.spark.rapids._
Expand All @@ -25,24 +27,29 @@ import com.nvidia.spark.rapids.jni.HLL
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.catalyst.util.{GenericArrayData, HyperLogLogPlusPlusHelper}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

case class CudfHLL(override val dataType: DataType) extends CudfAggregate {
case class CudfHLL(override val dataType: DataType,
numRegistersPerSketch: Int) extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) => input.reduce(ReductionAggregation.HLL(), DType.LIST)
override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.HLL(32 * 1024)
(input: cudf.ColumnVector) => input.reduce(
ReductionAggregation.HLL(numRegistersPerSketch), DType.STRUCT)
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.HLL(numRegistersPerSketch)
override val name: String = "CudfHLL"
}

case class CudfMergeHLL(override val dataType: DataType)
case class CudfMergeHLL(override val dataType: DataType,
numRegistersPerSketch: Int)
extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) =>
input.reduce(ReductionAggregation.mergeHLL(), DType.LIST)
input.reduce(ReductionAggregation.mergeHLL(numRegistersPerSketch), DType.STRUCT)

override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeHLL()
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.mergeHLL(numRegistersPerSketch)
override val name: String = "CudfMergeHLL"
}

Expand All @@ -67,27 +74,42 @@ case class GpuHLLEvaluation(childExpr: Expression, precision: Int)
}
}

case class GpuHLL(childExpr: Expression, precision: Int)
case class GpuHLL(childExpr: Expression, relativeSD: Double)
extends GpuAggregateFunction with Serializable {

// specify the HLL sketch type: list<byte>
private lazy val hllBufferType: DataType = ArrayType(ByteType, containsNull = false)
// Consistent with Spark
private lazy val numRegistersPerSketch: Int =
1 << Math.ceil(2.0d * Math.log(1.106d / relativeSD) / Math.log(2.0d)).toInt

// specify the HLL sketch type: struct<long, ..., long>
private lazy val hllBufferType: DataType = StructType.fromAttributes(aggBufferAttributes)

private lazy val hllBufferAttribute: AttributeReference =
AttributeReference("hllAttr", hllBufferType)()

// TODO: should be long array literal
override lazy val initialValues: Seq[Expression] =
Seq(GpuLiteral.create(new GenericArrayData(Array.ofDim[Byte](32 * 1024)), hllBufferType))

override lazy val inputProjection: Seq[Expression] = Seq(childExpr)

override lazy val updateAggregates: Seq[CudfAggregate] = Seq(CudfHLL(hllBufferType))
override lazy val updateAggregates: Seq[CudfAggregate] =
Seq(CudfHLL(hllBufferType, numRegistersPerSketch))

override lazy val mergeAggregates: Seq[CudfAggregate] =
Seq(CudfMergeHLL(hllBufferType, numRegistersPerSketch))

override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(CudfMergeHLL(hllBufferType))
override lazy val evaluateExpression: Expression =
GpuHLLEvaluation(hllBufferAttribute, numRegistersPerSketch)

override lazy val evaluateExpression: Expression = GpuHLLEvaluation(hllBufferAttribute, precision)
private val hllppHelper = new HyperLogLogPlusPlusHelper(relativeSD)

override def aggBufferAttributes: Seq[AttributeReference] = hllBufferAttribute :: Nil
/** Allocate enough words to store all registers. */
override val aggBufferAttributes: Seq[AttributeReference] = {
Seq.tabulate(hllppHelper.numWords) { i =>
AttributeReference(s"MS[$i]", LongType)()
}
}

override def dataType: DataType = hllBufferType

Expand Down

0 comments on commit 1945192

Please sign in to comment.