Skip to content

Commit

Permalink
Refactor; Update cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Dec 20, 2024
1 parent 98a4b08 commit 9d1bac2
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 13 deletions.
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/hashing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

_struct_of_xxhash_gens = StructGen([(f"c{i}", g) for i, g in enumerate(_atomic_gens)])

# will be used by HyperLogLogPlusPLus(approx_count_distinct)
# This is also used by HyperLogLogPlusPLus(approx_count_distinct)
xxhash_gens = (_atomic_gens + [_struct_of_xxhash_gens] + single_level_array_gens
+ nested_array_gens_sample + [
all_basic_struct_gen,
Expand Down
16 changes: 11 additions & 5 deletions integration_tests/src/main/python/hyper_log_log_plus_plus_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@

from asserts import assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from hashing_test import xxhash_gens
from marks import ignore_order

@pytest.mark.parametrize('data_gen', all_basic_gens + decimal_gens, ids=idfn)

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', xxhash_gens, ids=idfn)
def test_hllpp_groupby(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, [("c1", int_gen), ("c2", data_gen)]),
lambda spark: gen_df(spark, [("c1", int_gen), ("c2", data_gen)]),
"tab",
"select c1, APPROX_COUNT_DISTINCT(c2) from tab group by c1")

@pytest.mark.parametrize('data_gen', all_basic_gens + decimal_gens, ids=idfn)

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', xxhash_gens, ids=idfn)
def test_hllpp_reduction(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark : unary_op_df(spark, data_gen),
lambda spark: unary_op_df(spark, data_gen),
"tab",
"select APPROX_COUNT_DISTINCT(a) from tab")
"select APPROX_COUNT_DISTINCT(a) from tab")
Original file line number Diff line number Diff line change
Expand Up @@ -3991,7 +3991,9 @@ object GpuOverrides extends Logging {
expr[HyperLogLogPlusPlus](
"Aggregation approximate count distinct",
ExprChecks.reductionAndGroupByAgg(TypeSig.LONG, TypeSig.LONG,
Seq(ParamCheck("input", TypeSig.cpuAtomics, TypeSig.all))),
// HyperLogLogPlusPlus depends on Xxhash64
// HyperLogLogPlusPlus supports all the types that Xxhash 64 supports
Seq(ParamCheck("input",XxHash64Shims.supportedTypes, TypeSig.all))),
(a, conf, p, r) => new UnaryExprMeta[HyperLogLogPlusPlus](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression = {
GpuHyperLogLogPlusPlus(child, a.relativeSD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResourceIfAllowed
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression
import com.nvidia.spark.rapids.jni.HLLPPHostUDF
import com.nvidia.spark.rapids.jni.HyperLogLogPlusPlusHostUDF
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
Expand All @@ -36,11 +36,13 @@ case class CudfHLLPP(override val dataType: DataType,
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) => input.reduce(
ReductionAggregation.hostUDF(
HLLPPHostUDF.createHLLPPHostUDF(HLLPPHostUDF.AggregationType.Reduction, precision)),
HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF(
HyperLogLogPlusPlusHostUDF.AggregationType.Reduction, precision)),
DType.STRUCT)
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.hostUDF(
HLLPPHostUDF.createHLLPPHostUDF(HLLPPHostUDF.AggregationType.GroupBy, precision)
HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF(
HyperLogLogPlusPlusHostUDF.AggregationType.GroupBy, precision)
)
override val name: String = "CudfHyperLogLogPlusPlus"
}
Expand All @@ -51,11 +53,13 @@ case class CudfMergeHLLPP(override val dataType: DataType,
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) => input.reduce(
ReductionAggregation.hostUDF(
HLLPPHostUDF.createHLLPPHostUDF(HLLPPHostUDF.AggregationType.Reduction_MERGE, precision)),
HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF(
HyperLogLogPlusPlusHostUDF.AggregationType.Reduction_MERGE, precision)),
DType.STRUCT)
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.hostUDF(
HLLPPHostUDF.createHLLPPHostUDF(HLLPPHostUDF.AggregationType.GroupByMerge, precision)
HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF(
HyperLogLogPlusPlusHostUDF.AggregationType.GroupByMerge, precision)
)
override val name: String = "CudfMergeHyperLogLogPlusPlus"
}
Expand All @@ -77,7 +81,7 @@ case class GpuHyperLogLogPlusPlusEvaluation(childExpr: Expression,

override def columnarEval(batch: ColumnarBatch): GpuColumnVector = {
withResourceIfAllowed(childExpr.columnarEval(batch)) { sketches =>
val distinctValues = HLLPPHostUDF.estimateDistinctValueFromSketches(
val distinctValues = HyperLogLogPlusPlusHostUDF.estimateDistinctValueFromSketches(
sketches.getBase, precision)
GpuColumnVector.from(distinctValues, LongType)
}
Expand Down

0 comments on commit 9d1bac2

Please sign in to comment.