Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hyper log log plus plus(HLL++) #2522

Open
wants to merge 29 commits into
base: branch-25.02
Choose a base branch
from
Open

Conversation

res-life
Copy link
Collaborator

@res-life res-life commented Oct 21, 2024

@res-life res-life requested a review from ttnghia October 21, 2024 12:45
@res-life res-life force-pushed the hll branch 3 times, most recently from b6f5cf5 to 526a61f Compare October 31, 2024 11:34
@res-life res-life changed the title [Do not review] Hyper log log plus plus(HLL++) Hyper log log plus plus(HLL++) Oct 31, 2024
src/main/cpp/src/HLLPP.cu Outdated Show resolved Hide resolved
auto input_cols = std::vector<int64_t const*>(input_iter, input_iter + input.num_children());
auto d_inputs = cudf::detail::make_device_uvector_async(input_cols, stream, mr);
auto result = cudf::make_numeric_column(
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::ALL_VALID, stream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need such all-valid null mask? How about cudf::mask_state::UNALLOCATED?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested Spark behavior, for approx_count_distinct(null) returns 0.
So the values in result column are always non-null

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant, if all rows are valid, we don't need to allocate a null mask.
BTW, we need to pass mr to the returning column (but do not pass it to the intermediate vector/column).

Suggested change
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::ALL_VALID, stream);
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::UNALLOCATED, stream, mr);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

auto result = cudf::make_numeric_column(
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::ALL_VALID, stream);
// evaluate from struct<long, ..., long>
thrust::for_each_n(rmm::exec_policy(stream),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to use exec_policy_nosync as much as possible.

Suggested change
thrust::for_each_n(rmm::exec_policy(stream),
thrust::for_each_n(rmm::exec_policy_nosync(stream),

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 34 to 36
* The input sketch values must be given in the format `LIST<INT8>`.
*
* @param input The sketch column which constains `LIST<INT8> values.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INT8 or INT64?

Copy link
Collaborator

@ttnghia ttnghia Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, in estimate_from_hll_sketches I see that the input is STRUCT<LONG, LONG, ....> instead of LIST<>. Why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's STRUCT<LONG, LONG, ....> consistent with Spark. The input is columnar data, e.g.: sketch 0 is composed of by all the data of the children at index 0.
Updated the function comments, refer to commit

@res-life
Copy link
Collaborator Author

Ready to review except test cases.

src/main/cpp/src/HLLPP.cu Outdated Show resolved Hide resolved
src/main/cpp/src/HLLPP.cu Outdated Show resolved Hide resolved
src/main/cpp/src/HLLPP.cu Outdated Show resolved Hide resolved
src/main/cpp/src/HLLPP.cu Outdated Show resolved Hide resolved
src/main/cpp/src/HLLPP.cu Outdated Show resolved Hide resolved
src/main/cpp/src/HLLPP.cu Outdated Show resolved Hide resolved
src/main/cpp/src/HLLPP.cu Outdated Show resolved Hide resolved
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* Estimate count distinct values for the input which contains
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contain what?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 27 to 31
/**
* Compute hash codes for the input, generate HyperLogLogPlusPlus(HLLPP)
* sketches from hash codes, and merge the sketches in the same group. Output is
* a struct column with multiple long columns which is consistent with Spark.
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Add @brief to C++ docs in this file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


namespace {

struct hllpp_agg_udf : cudf::groupby_host_udf {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have hllpp_reduce_udf, let's call this hllp_groupby_udf.

Suggested change
struct hllpp_agg_udf : cudf::groupby_host_udf {
struct hllp_groupby_udf : cudf::groupby_host_udf {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override
{
// groupby
auto const& group_values = get_grouped_values();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get_ functions return rvalue thus we should not bind the output to a reference.

Suggested change
auto const& group_values = get_grouped_values();
auto const group_values = get_grouped_values();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

[[nodiscard]] std::unique_ptr<cudf::column> operator()(
rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override
{
// groupby
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// groupby

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +104 to +105
int precision;
bool is_merge;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int precision;
bool is_merge;
private:
int precision;
bool is_merge;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

hllpp_reduct_udf(int precision_, bool is_merge_) : precision(precision_), is_merge(is_merge_) {}

/**
* Perform the main reduce computation for HLLPP UDF
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Perform the main reduce computation for HLLPP UDF
* @brief Perform the main reduce computation for HLLPP UDF.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 121 to 122
CUDF_EXPECTS(input.size() > 0,
"Hyper Log Log Plus Plus reduction requires input is not empty!");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Can we return an empty output instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, add a function get_empty_scalar

Comment on lines 146 to 148

int precision;
bool is_merge;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int precision;
bool is_merge;
private:
int precision;
bool is_merge;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 23 to 29
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_host_udf(int precision);

std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_merge_host_udf(int precision);

std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_host_udf(int precision);

std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_merge_host_udf(int precision);
Copy link
Collaborator

@ttnghia ttnghia Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Humn, I think these functions can return the raw pointer directly, because we will call .release() on them immediately in the JNI function. Doing this will reduce the overhead of using smart pointers.

Suggested change
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_host_udf(int precision);
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_merge_host_udf(int precision);
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_host_udf(int precision);
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_merge_host_udf(int precision);
cudf::host_udf_base* create_hllpp_reduction_host_udf(int precision);
cudf::host_udf_base* create_hllpp_reduction_merge_host_udf(int precision);
cudf::host_udf_base* create_hllpp_groupby_host_udf(int precision);
cudf::host_udf_base* create_hllpp_groupby_merge_host_udf(int precision);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 153 to 171
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_host_udf(int precision)
{
return std::make_unique<hllpp_reduct_udf>(precision, /*is_merge*/ false);
}

std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_merge_host_udf(int precision)
{
return std::make_unique<hllpp_reduct_udf>(precision, /*is_merge*/ true);
}

std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_host_udf(int precision)
{
return std::make_unique<hllpp_agg_udf>(precision, /*is_merge*/ false);
}

std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_merge_host_udf(int precision)
{
return std::make_unique<hllpp_agg_udf>(precision, /*is_merge*/ true);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_host_udf(int precision)
{
return std::make_unique<hllpp_reduct_udf>(precision, /*is_merge*/ false);
}
std::unique_ptr<cudf::host_udf_base> create_hllpp_reduction_merge_host_udf(int precision)
{
return std::make_unique<hllpp_reduct_udf>(precision, /*is_merge*/ true);
}
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_host_udf(int precision)
{
return std::make_unique<hllpp_agg_udf>(precision, /*is_merge*/ false);
}
std::unique_ptr<cudf::host_udf_base> create_hllpp_groupby_merge_host_udf(int precision)
{
return std::make_unique<hllpp_agg_udf>(precision, /*is_merge*/ true);
}
cudf::host_udf_base* create_hllpp_reduction_host_udf(int precision)
{
return new hllpp_reduct_udf(precision, /*is_merge*/ false);
}
cudf::host_udf_base* create_hllpp_reduction_merge_host_udf(int precision)
{
return new hllpp_reduct_udf(precision, /*is_merge*/ true);
}
cudf::host_udf_base* create_hllpp_groupby_host_udf(int precision)
{
return new hllpp_agg_udf(precision, /*is_merge*/ false);
}
cudf::host_udf_base* create_hllpp_groupby_merge_host_udf(int precision)
{
return new hllpp_agg_udf(precision, /*is_merge*/ true);
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}();
CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HyperLogLogPlusPlus(HLLPP) UDF instance.");

return reinterpret_cast<jlong>(udf_ptr.release());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

udf_ptr is a raw pointer.

Suggested change
return reinterpret_cast<jlong>(udf_ptr.release());
return reinterpret_cast<jlong>(udf_ptr);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 130 to 141
/**
* TODO: move this to cuDF HostUDFWrapper
*/
@Override
public void close() throws Exception {
close(udfNativeHandle);
}

/**
* TODO: move this to cuDF HostUDFWrapper
*/
static native void close(long ptr);
Copy link
Collaborator

@ttnghia ttnghia Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh moving these function to cudf JNI is possible. However, HOST UDF implementations can vary significantly, and some of them may have extra data members to close which is beyond the native UDF pointer. Thus, I think we should better to have the close() method implemented here (in the derived class) instead of in the base HostUDFWrapper class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK to me, but I have concerns:
If we have more derived classes in future, every derived classes should have a JNI close implemetation.
udfNativeHandle is in HostUDFWrapper, seems it's reasonable to close udfNativeHandle in HostUDFWrapper. But udfNativeHandle is created in the derived class. I'm not sure which place is better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some options:

  • If we close udfNativeHandle in the base class HostUDFWrapper in some method like closeHandle(), the derived class still needs to have a close() method to close its resources. This close() method will call closeHandle() in the base class.
  • If we close udfNativeHandle in the derived class through its close(), every derived class will have to do this same operation. Since every derived class has to implement a native method to create its native handle anyway. So each UDF has a pair of create() and close().
  • We will implement close() in the base class HostUDFWrapper and the derived Java class will override it only if it has more resources to close. This seems the best option. @res-life do you have time to refactor that, after this PR?

* sketch. Input is a struct column with multiple long columns which is
* consistent with Spark. Output is a struct scalar with multiple long values.
*/
ReductionMERGE(1),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ReductionMERGE(1),
ReductionMerge(1),

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 105 to 117
* The value of num_registers_per_sketch = 2^precision
* The children num of this Struct is: num_registers_per_sketch / 10 + 1,
* Here 10 means a INT64 contains 10 register values,
* each register value is 6 bits.
* Register value is the number of leading zero bits in xxhash64 hash code.
* xxhash64 hash code is 64 bits, Register value is 6 bits,
* 6 bits is enough to hold the max value 64.
*
* @param input The sketch column which constains Struct<INT64, INT64, ...>
* values.
* @param precision The num of bits for HLLPP register addressing.
* @return A INT64 column with each value indicates the approximate count
* distinct value.
Copy link
Collaborator

@ttnghia ttnghia Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to reformat + polish docs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 70 to 75
* `reduce_by_key` uses num_rows_input intermidate cache:
* https://github.com/NVIDIA/thrust/blob/2.1.0/thrust/system/detail/generic/reduce_by_key.inl#L112
*
* // scan the values by flag
* thrust::detail::temporary_array<ValueType,ExecutionPolicy>
* scanned_values(exec, n);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do these APIs affect us?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to use reduce_by_key, but it uses too much of memory, so give up using reduce_by_key.
And updated the comments to:

 * Tried to use `reduce_by_key`, but it uses too much of memory, so give up using `reduce_by_key`.
 * More details:
 * `reduce_by_key` uses num_rows_input intermidate cache:
 * https://github.com/NVIDIA/thrust/blob/2.1.0/thrust/system/detail/generic/reduce_by_key.inl#L112
 * // scan the values by flag
 * thrust::detail::temporary_array<ValueType,ExecutionPolicy>
 * scanned_values(exec, n);
 * Each sketch contains multiple integers, by default 512 integers(precision is
 * 9), num_rows_input * 512 is huge.

Comment on lines 733 to 742
auto d_results = [&] {
auto host_results_pointer_iter =
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) {
return results_column->mutable_view().template data<int64_t>();
});
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
return cudf::detail::make_device_uvector_async(
host_results_pointers, stream, cudf::get_current_device_resource_ref());
}();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

host_results_pointers can be destroyed before data is copied to device. Since we don't like stream sync, just move it out of the code block:

Suggested change
auto d_results = [&] {
auto host_results_pointer_iter =
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) {
return results_column->mutable_view().template data<int64_t>();
});
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
return cudf::detail::make_device_uvector_async(
host_results_pointers, stream, cudf::get_current_device_resource_ref());
}();
auto host_results_pointer_iter =
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) {
return results_column->mutable_view().template data<int64_t>();
});
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
auto d_results = cudf::detail::make_device_uvector_async(
host_results_pointers, stream, cudf::get_current_device_resource_ref());

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 437 to 445
auto d_results = [&] {
auto host_results_pointer_iter =
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) {
return results_column->mutable_view().template data<int64_t>();
});
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr);
}();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, host_results_pointers can be destroyed before data is copied to device. Just remove this code block.

Suggested change
auto d_results = [&] {
auto host_results_pointer_iter =
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) {
return results_column->mutable_view().template data<int64_t>();
});
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr);
}();
auto host_results_pointer_iter =
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) {
return results_column->mutable_view().template data<int64_t>();
});
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
auto d_results = cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 631 to 639
auto d_sketches_output = [&] {
auto host_results_pointer_iter =
thrust::make_transform_iterator(results.begin(), [](auto const& results_column) {
return results_column->mutable_view().template data<int64_t>();
});
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + results.size());
return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr);
}();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, host_results_pointers can be destroyed before data is copied to device. Move code out of block.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 803 to 811
auto d_results = [&] {
auto host_results_pointer_iter =
thrust::make_transform_iterator(children.begin(), [](auto const& results_column) {
return results_column->mutable_view().template data<int64_t>();
});
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr);
}();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar issue when host_results_pointers is destroyed before data is copied to device.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

res-life commented Jan 16, 2025

Since NVIDIA/spark-rapids#11638 passed all the HLLPP cases, and Nghia will be on vacation, could we put the Java/Cpp cases into a follow-up PR?

One issue we need to discuss, refer to link

@ttnghia Thanks for your dedicated review.

Comment on lines 450 to 452
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
auto d_results = cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I was still wrong. host_results_pointers is destroyed when this function ends but the kernel may not be executed on device yet. So we have to have stream sync in this function anyhow. Probably we sync here.

Suggested change
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
auto d_results = cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr);
auto host_results_pointers =
std::vector<int64_t*>(host_results_pointer_iter, host_results_pointer_iter + children.size());
auto d_results = cudf::detail::make_device_uvector_sync(host_results_pointers, stream, mr);

Copy link
Collaborator

@ttnghia ttnghia Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar for all other places. We need to sync stream so when the functions end we will not access to the released host buffer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@res-life
Copy link
Collaborator Author

build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants