-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add support for Hyper Log Log PLus Plus(HLL++) #11638
base: branch-25.02
Are you sure you want to change the base?
Conversation
d42d80a
to
1945192
Compare
} | ||
} | ||
|
||
case class GpuHLL(childExpr: Expression, relativeSD: Double) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let' call by full name like GpuHyperLogLogPlusPlus
to better reflect the CPU version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
ReductionAggregation.HLL(numRegistersPerSketch), DType.STRUCT) | ||
override lazy val groupByAggregate: GroupByAggregation = | ||
GroupByAggregation.HLL(numRegistersPerSketch) | ||
override val name: String = "CudfHLL" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if "PlusPlus" is necessary.
override val name: String = "CudfHLL" | |
override val name: String = "CudfHyperLogLogPlusPlus" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
0a4939f
to
eb00c2b
Compare
Signed-off-by: Chong Gao <[email protected]>
Ready to review except test cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
expr[HyperLogLogPlusPlus]( | ||
"Aggregation approximate count distinct", | ||
ExprChecks.reductionAndGroupByAgg(TypeSig.LONG, TypeSig.LONG, | ||
Seq(ParamCheck("input", TypeSig.cpuAtomics, TypeSig.all))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Using cpuAtomics
for a GPU field gets to be kind of confusing. Could you please create a gpuAtomics
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update to support map, array and list because this is merged: NVIDIA/spark-rapids-jni#2575
Explain for HLLPP:
6 bits is enough to save a register value.
TODO: @revans2 could you have a look first? |
|
@ttnghia @revans2 case class CudfMergeHLLPP(override val dataType: DataType,
precision: Int)
override lazy val groupByAggregate: GroupByAggregation = {
val hll = new HyperLogLogPlusPlusHostUDF(AggregationType.GroupByMerge, precision)
// here hll is memory leaked; if close it, will cause core dump.
GroupByAggregation.hostUDF(hll)
}
} static final class HostUDFAggregation extends Aggregation {
// this is referring a `HyperLogLogPlusPlusHostUDF`
private final HostUDFWrapper wrapper;
private HostUDFAggregation(HostUDFWrapper wrapper) {
super(Kind.HOST_UDF);
this.wrapper = wrapper;
}
@Override
long createNativeInstance() {
return Aggregation.createHostUDFAgg(wrapper.udfNativeHandle);
}
} We can not close the native object in Please discuss here to find a solution. |
HLLPP cases passed, but have memory leak, refer to the above comment. |
The ideal solution is that do not trigger creating a native UDF from java side. Put all the resource management into C++ layer. Not sure if it's doable. |
How about this?
|
I don't think this is that big of an issue. It will require a little bit of work though. There are two places we need to worry about this in a CudfAggregate spark-rapids/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateBase.scala Lines 285 to 286 in 03b85b1
reducetionAggregate, which is a function that takes an input and returns a Scalar, so we can modify the function to close the HostUDF when it is done internally,. The second is groupByAggregate spark-rapids/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala Lines 569 to 591 in 03b85b1
The main thing here is that we need to make sure that we have a way to create the HostUDF when we need it and then close things when the aggregation operation is done. https://github.com/NVIDIA/spark-rapids/blob/03b85b184e141849e414e90a83a4b589fe7e57fc/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala#L578C59-L578C67 is the place that I think we need to create it. Currently Then after the table has been processed spark-rapids/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala Line 588 in 03b85b1
cudfAgg values that are also auto-closable and then we close them. The big thing with all of this is to document how all of it works.
|
closes ##5199
depends on
Description
Spark
approx_count_distinct
description linkSpark accepts one column(can be nested column) and a double literal
relativeSD
.Depending on JNI PR:
NVIDIA/spark-rapids-jni#2522
TODO
Perf test
correctness
The results are identical between CPU and GPU.
Signed-off-by: Chong Gao [email protected]