-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hyper log log plus plus(HLL++) #2522
base: branch-25.02
Are you sure you want to change the base?
Conversation
b6f5cf5
to
526a61f
Compare
src/main/cpp/src/HLLPP.cu
Outdated
auto input_cols = std::vector<int64_t const*>(input_iter, input_iter + input.num_children()); | ||
auto d_inputs = cudf::detail::make_device_uvector_async(input_cols, stream, mr); | ||
auto result = cudf::make_numeric_column( | ||
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::ALL_VALID, stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need such all-valid null mask? How about cudf::mask_state::UNALLOCATED
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested Spark behavior, for approx_count_distinct(null)
returns 0.
So the values in result column are always non-null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant, if all rows are valid, we don't need to allocate a null mask.
BTW, we need to pass mr
to the returning column (but do not pass it to the intermediate vector/column).
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::ALL_VALID, stream); | |
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::UNALLOCATED, stream, mr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/main/cpp/src/HLLPP.cu
Outdated
auto result = cudf::make_numeric_column( | ||
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::ALL_VALID, stream); | ||
// evaluate from struct<long, ..., long> | ||
thrust::for_each_n(rmm::exec_policy(stream), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to use exec_policy_nosync
as much as possible.
thrust::for_each_n(rmm::exec_policy(stream), | |
thrust::for_each_n(rmm::exec_policy_nosync(stream), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* The input sketch values must be given in the format `LIST<INT8>`. | ||
* | ||
* @param input The sketch column which constains `LIST<INT8> values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INT8
or INT64
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, in estimate_from_hll_sketches
I see that the input is STRUCT<LONG, LONG, ....>
instead of LIST<>
. Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's STRUCT<LONG, LONG, ....> consistent with Spark. The input is columnar data, e.g.: sketch 0 is composed of by all the data of the children at index 0.
Updated the function comments, refer to commit
Signed-off-by: Chong Gao <[email protected]>
Ready to review except test cases. |
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); | ||
|
||
/** | ||
* Estimate count distinct values for the input which contains |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Contain what?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* Compute hash codes for the input, generate HyperLogLogPlusPlus(HLLPP) | ||
* sketches from hash codes, and merge the sketches in the same group. Output is | ||
* a struct column with multiple long columns which is consistent with Spark. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Add @brief
to C++ docs in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
namespace { | ||
|
||
struct hllpp_agg_udf : cudf::groupby_host_udf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have hllpp_reduce_udf
, let's call this hllp_groupby_udf
.
struct hllpp_agg_udf : cudf::groupby_host_udf { | |
struct hllp_groupby_udf : cudf::groupby_host_udf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override | ||
{ | ||
// groupby | ||
auto const& group_values = get_grouped_values(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_
functions return rvalue
thus we should not bind the output to a reference.
auto const& group_values = get_grouped_values(); | |
auto const group_values = get_grouped_values(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
[[nodiscard]] std::unique_ptr<cudf::column> operator()( | ||
rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override | ||
{ | ||
// groupby |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// groupby |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
int precision; | ||
bool is_merge; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int precision; | |
bool is_merge; | |
private: | |
int precision; | |
bool is_merge; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
hllpp_reduct_udf(int precision_, bool is_merge_) : precision(precision_), is_merge(is_merge_) {} | ||
|
||
/** | ||
* Perform the main reduce computation for HLLPP UDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Perform the main reduce computation for HLLPP UDF | |
* @brief Perform the main reduce computation for HLLPP UDF. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
CUDF_EXPECTS(input.size() > 0, | ||
"Hyper Log Log Plus Plus reduction requires input is not empty!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? Can we return an empty output instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, add a function get_empty_scalar
|
||
int precision; | ||
bool is_merge; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int precision; | |
bool is_merge; | |
private: | |
int precision; | |
bool is_merge; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_host_udf(int precision); | ||
|
||
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_merge_host_udf(int precision); | ||
|
||
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_host_udf(int precision); | ||
|
||
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_merge_host_udf(int precision); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Humn, I think these functions can return the raw pointer directly, because we will call .release()
on them immediately in the JNI function. Doing this will reduce the overhead of using smart pointers.
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_host_udf(int precision); | |
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_merge_host_udf(int precision); | |
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_host_udf(int precision); | |
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_merge_host_udf(int precision); | |
cudf::host_udf_base* create_hllpp_reduction_host_udf(int precision); | |
cudf::host_udf_base* create_hllpp_reduction_merge_host_udf(int precision); | |
cudf::host_udf_base* create_hllpp_groupby_host_udf(int precision); | |
cudf::host_udf_base* create_hllpp_groupby_merge_host_udf(int precision); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_host_udf(int precision) | ||
{ | ||
return std::make_unique<hllpp_reduct_udf>(precision, /*is_merge*/ false); | ||
} | ||
|
||
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_merge_host_udf(int precision) | ||
{ | ||
return std::make_unique<hllpp_reduct_udf>(precision, /*is_merge*/ true); | ||
} | ||
|
||
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_host_udf(int precision) | ||
{ | ||
return std::make_unique<hllpp_agg_udf>(precision, /*is_merge*/ false); | ||
} | ||
|
||
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_merge_host_udf(int precision) | ||
{ | ||
return std::make_unique<hllpp_agg_udf>(precision, /*is_merge*/ true); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_host_udf(int precision) | |
{ | |
return std::make_unique<hllpp_reduct_udf>(precision, /*is_merge*/ false); | |
} | |
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_merge_host_udf(int precision) | |
{ | |
return std::make_unique<hllpp_reduct_udf>(precision, /*is_merge*/ true); | |
} | |
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_host_udf(int precision) | |
{ | |
return std::make_unique<hllpp_agg_udf>(precision, /*is_merge*/ false); | |
} | |
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_merge_host_udf(int precision) | |
{ | |
return std::make_unique<hllpp_agg_udf>(precision, /*is_merge*/ true); | |
} | |
cudf::host_udf_base* create_hllpp_reduction_host_udf(int precision) | |
{ | |
return new hllpp_reduct_udf(precision, /*is_merge*/ false); | |
} | |
cudf::host_udf_base* create_hllpp_reduction_merge_host_udf(int precision) | |
{ | |
return new hllpp_reduct_udf(precision, /*is_merge*/ true); | |
} | |
cudf::host_udf_base* create_hllpp_groupby_host_udf(int precision) | |
{ | |
return new hllpp_agg_udf(precision, /*is_merge*/ false); | |
} | |
cudf::host_udf_base* create_hllpp_groupby_merge_host_udf(int precision) | |
{ | |
return new hllpp_agg_udf(precision, /*is_merge*/ true); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
}(); | ||
CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HyperLogLogPlusPlus(HLLPP) UDF instance."); | ||
|
||
return reinterpret_cast<jlong>(udf_ptr.release()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
udf_ptr
is a raw pointer.
return reinterpret_cast<jlong>(udf_ptr.release()); | |
return reinterpret_cast<jlong>(udf_ptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* TODO: move this to cuDF HostUDFWrapper | ||
*/ | ||
@Override | ||
public void close() throws Exception { | ||
close(udfNativeHandle); | ||
} | ||
|
||
/** | ||
* TODO: move this to cuDF HostUDFWrapper | ||
*/ | ||
static native void close(long ptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh moving these function to cudf JNI is possible. However, HOST UDF implementations can vary significantly, and some of them may have extra data members to close which is beyond the native UDF pointer. Thus, I think we should better to have the close()
method implemented here (in the derived class) instead of in the base HostUDFWrapper
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK to me, but I have concerns:
If we have more derived classes in future, every derived classes should have a JNI close implemetation.
udfNativeHandle
is in HostUDFWrapper
, seems it's reasonable to close udfNativeHandle
in HostUDFWrapper
. But udfNativeHandle is created in the derived class. I'm not sure which place is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have some options:
- If we close
udfNativeHandle
in the base classHostUDFWrapper
in some method likecloseHandle()
, the derived class still needs to have aclose()
method to close its resources. Thisclose()
method will callcloseHandle()
in the base class. - If we close
udfNativeHandle
in the derived class through itsclose()
, every derived class will have to do this same operation. Since every derived class has to implement a native method to create its native handle anyway. So each UDF has a pair ofcreate()
andclose()
. - We will implement
close()
in the base classHostUDFWrapper
and the derived Java class will override it only if it has more resources to close. This seems the best option. @res-life do you have time to refactor that, after this PR?
* sketch. Input is a struct column with multiple long columns which is | ||
* consistent with Spark. Output is a struct scalar with multiple long values. | ||
*/ | ||
ReductionMERGE(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReductionMERGE(1), | |
ReductionMerge(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* The value of num_registers_per_sketch = 2^precision | ||
* The children num of this Struct is: num_registers_per_sketch / 10 + 1, | ||
* Here 10 means a INT64 contains 10 register values, | ||
* each register value is 6 bits. | ||
* Register value is the number of leading zero bits in xxhash64 hash code. | ||
* xxhash64 hash code is 64 bits, Register value is 6 bits, | ||
* 6 bits is enough to hold the max value 64. | ||
* | ||
* @param input The sketch column which constains Struct<INT64, INT64, ...> | ||
* values. | ||
* @param precision The num of bits for HLLPP register addressing. | ||
* @return A INT64 column with each value indicates the approximate count | ||
* distinct value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to reformat + polish docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* `reduce_by_key` uses num_rows_input intermidate cache: | ||
* https://github.com/NVIDIA/thrust/blob/2.1.0/thrust/system/detail/generic/reduce_by_key.inl#L112 | ||
* | ||
* // scan the values by flag | ||
* thrust::detail::temporary_array<ValueType,ExecutionPolicy> | ||
* scanned_values(exec, n); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do these APIs affect us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to use reduce_by_key
, but it uses too much of memory, so give up using reduce_by_key
.
And updated the comments to:
* Tried to use `reduce_by_key`, but it uses too much of memory, so give up using `reduce_by_key`.
* More details:
* `reduce_by_key` uses num_rows_input intermidate cache:
* https://github.com/NVIDIA/thrust/blob/2.1.0/thrust/system/detail/generic/reduce_by_key.inl#L112
* // scan the values by flag
* thrust::detail::temporary_array<ValueType,ExecutionPolicy>
* scanned_values(exec, n);
* Each sketch contains multiple integers, by default 512 integers(precision is
* 9), num_rows_input * 512 is huge.
auto d_results = [&] { | ||
auto host_results_pointer_iter = | ||
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) { | ||
return results_column->mutable_view().template data<int64_t>(); | ||
}); | ||
auto host_results_pointers = | ||
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | ||
return cudf::detail::make_device_uvector_async( | ||
host_results_pointers, stream, cudf::get_current_device_resource_ref()); | ||
}(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
host_results_pointers
can be destroyed before data is copied to device. Since we don't like stream sync, just move it out of the code block:
auto d_results = [&] { | |
auto host_results_pointer_iter = | |
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) { | |
return results_column->mutable_view().template data<int64_t>(); | |
}); | |
auto host_results_pointers = | |
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | |
return cudf::detail::make_device_uvector_async( | |
host_results_pointers, stream, cudf::get_current_device_resource_ref()); | |
}(); | |
auto host_results_pointer_iter = | |
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) { | |
return results_column->mutable_view().template data<int64_t>(); | |
}); | |
auto host_results_pointers = | |
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | |
auto d_results = cudf::detail::make_device_uvector_async( | |
host_results_pointers, stream, cudf::get_current_device_resource_ref()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
auto d_results = [&] { | ||
auto host_results_pointer_iter = | ||
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) { | ||
return results_column->mutable_view().template data<int64_t>(); | ||
}); | ||
auto host_results_pointers = | ||
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | ||
return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr); | ||
}(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, host_results_pointers
can be destroyed before data is copied to device. Just remove this code block.
auto d_results = [&] { | |
auto host_results_pointer_iter = | |
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) { | |
return results_column->mutable_view().template data<int64_t>(); | |
}); | |
auto host_results_pointers = | |
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | |
return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr); | |
}(); | |
auto host_results_pointer_iter = | |
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) { | |
return results_column->mutable_view().template data<int64_t>(); | |
}); | |
auto host_results_pointers = | |
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | |
auto d_results = cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
auto d_sketches_output = [&] { | ||
auto host_results_pointer_iter = | ||
thrust::make_transform_iterator(results.begin(), [](auto const& results_column) { | ||
return results_column->mutable_view().template data<int64_t>(); | ||
}); | ||
auto host_results_pointers = | ||
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + results.size()); | ||
return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr); | ||
}(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, host_results_pointers
can be destroyed before data is copied to device. Move code out of block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
auto d_results = [&] { | ||
auto host_results_pointer_iter = | ||
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) { | ||
return results_column->mutable_view().template data<int64_t>(); | ||
}); | ||
auto host_results_pointers = | ||
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | ||
return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr); | ||
}(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar issue when host_results_pointers
is destroyed before data is copied to device.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
build |
Since NVIDIA/spark-rapids#11638 passed all the HLLPP cases, and Nghia will be on vacation, could we put the Java/Cpp cases into a follow-up PR? One issue we need to discuss, refer to link @ttnghia Thanks for your dedicated review. |
auto host_results_pointers = | ||
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | ||
auto d_results = cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I was still wrong. host_results_pointers
is destroyed when this function ends but the kernel may not be executed on device yet. So we have to have stream sync in this function anyhow. Probably we sync here.
auto host_results_pointers = | |
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | |
auto d_results = cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr); | |
auto host_results_pointers = | |
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size()); | |
auto d_results = cudf::detail::make_device_uvector_sync(host_results_pointers, stream, mr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar for all other places. We need to sync stream so when the functions end we will not access to the released host buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
build |
Add support for Hyper log log plus plus(HLL++)
Depends on:
HOST_UDF
aggregation for groupby rapidsai/cudf#17592HOST_UDF
aggregation for reduction and segmented reduction rapidsai/cudf#17645Signed-off-by: Chong Gao [email protected]