Skip to content

Commit

Permalink
Update Offsets Function
Browse files Browse the repository at this point in the history
  • Loading branch information
sdrp713 committed Sep 30, 2024
1 parent 928ca4f commit d50185c
Showing 4 changed files with 69 additions and 63 deletions.
49 changes: 32 additions & 17 deletions cpp/include/cudf/detail/sizes_to_offsets_iterator.cuh
Original file line number Diff line number Diff line change
@@ -226,25 +226,40 @@ static sizes_to_offsets_iterator<ScanIterator, LastType> make_sizes_to_offsets_i
return sizes_to_offsets_iterator<ScanIterator, LastType>(begin, end, last);
}


template <typename SizesIterator, typename OffsetsIterator>
auto sizes_to_offsets_batch(SizesIterator begin,
SizesIterator end,
OffsetsIterator result,
void sizes_to_offsets_batch(std::vector<cudf::device_span<thrust::pair<char const*, size_type> const>> strings_batch,
std::vector<std::unique_ptr<column>> offsets_columns,
rmm::device_uvector<int64_t> total_bytes,
rmm::cuda_stream_view stream)
{
using SizeType = typename thrust::iterator_traits<SizesIterator>::value_type;
static_assert(std::is_integral_v<SizeType>,
"Only numeric types are supported by sizes_to_offsets");

using LastType = std::conditional_t<std::is_signed_v<SizeType>, int64_t, uint64_t>;
auto last_element = rmm::device_scalar<LastType>(0, stream);
auto output_itr =
make_sizes_to_offsets_iterator(result, result + std::distance(begin, end), last_element.data());
// This function uses the type of the initialization parameter as the accumulator type
// when computing the individual scan output elements.
thrust::exclusive_scan(rmm::exec_policy(stream), begin, end, output_itr, LastType{0});
return last_element;
std::for_each (
thrust::make_zip_iterator(thrust::make_tuple(strings_batch.begin(), offsets_columns.begin(), thrust::make_counting_iterator(0))),
thrust::make_zip_iterator(thrust::make_tuple(strings_batch.end(), offsets_columns.end(), thrust::make_counting_iterator(strings_batch.size()))),
[total_bytes = total_bytes.data(), stream] (auto &elem) {
auto offsets_transformer =
cuda::proclaim_return_type<size_type>([] (auto item) -> size_type {
return (item.first != nullptr ? static_cast<size_type>(thrust::get<1>(item)) : size_type{0});
});

auto offsets_transformer_itr = thrust::make_transform_iterator(thrust::get<0>(elem), offsets_transformer);
auto d_offsets = thrust::get<1>(elem)->mutable_view().template data<int32_t>();
auto strings_count = thrust::get<1>(thrust::get<0>(elem));

auto map_fn = cuda::proclaim_return_type<size_type>(
[begin = offsets_transformer_itr, strings_count = strings_count] (size_type idx) -> size_type {
return idx < strings_count ? static_cast<size_type>(begin[idx]) : size_type{0};
}
);

auto input_itr = cudf::detail::make_counting_transform_iterator(0, map_fn);

auto output_itr =
make_sizes_to_offsets_iterator(d_offsets, d_offsets + std::distance(input_itr, input_itr + strings_count + 1), total_bytes + thrust::get<2>(elem));
// This function uses the type of the initialization parameter as the accumulator type
// when computing the individual scan output elements.
thrust::exclusive_scan(rmm::exec_policy_nosync(stream), input_itr, input_itr + strings_count + 1, output_itr, 0);

}
);
}


5 changes: 3 additions & 2 deletions cpp/include/cudf/detail/valid_if.cuh
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/default_stream.hpp>
@@ -204,11 +205,11 @@ std::pair<std::vector<bitmask_type*>, rmm::device_uvector<size_type>> valid_if_n
sizes.end(),
null_masks.begin(),
[stream, mr] __device__ (auto & size) {
return static_cast<bitmask_type*>(cudf::create_null_mask(size, mask_state::UNINITIALIZED, stream, mr));
return static_cast<bitmask_type*>(cudf::create_null_mask(size, mask_state::UNINITIALIZED, stream, mr).data());
}
);

auto device_null_masks = make_device_uvector_async(null_masks, stream);
auto device_null_masks = cudf::detail::make_device_uvector_async(null_masks, stream, mr);

auto counting_iter = thrust::make_counting_iterator(0);
constexpr size_type block_size{256};
68 changes: 27 additions & 41 deletions cpp/include/cudf/strings/detail/strings_children.cuh
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
#include <cudf/detail/offsets_iterator_factory.cuh>
#include <cudf/detail/sizes_to_offsets_iterator.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/detail/utilities.hpp>
#include <cudf/strings/utilities.hpp>
#include <cudf/utilities/default_stream.hpp>
@@ -28,6 +29,7 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/resource_ref.hpp>
#include <rmm/device_uvector.hpp>

#include <thrust/for_each.h>
#include <thrust/iterator/counting_iterator.h>
@@ -43,74 +45,58 @@ std::pair<std::vector<std::unique_ptr<column>>, std::vector<int64_t>> make_offse
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
std::vector<std::unique_ptr<column>> offsets_columns;
rmm::device_uvector<rmm::device_scalar<int64_t>> total_bytes(offsets_transformer_itr.size(), stream, mr);
std::vector<std::unique_ptr<column>> offsets_columns(strings_batch.size());
rmm::device_uvector<int64_t> total_bytes(strings_batch.size(), stream);

auto constexpr size_type_max = static_cast<int64_t>(std::numeric_limits<size_type>::max());

std::transform (
strings_sizes.begin(),
strings_sizes.end(),
strings_batch.begin(),
strings_batch.end(),
std::back_inserter(offsets_columns),
[stream, mr] (auto &elem) {
auto const lcount = static_cast<int64_t>(thrust::get<1>(elem));
auto const strings_count = static_cast<size_type>(lcount);
return make_numeric_column(data_type{type_id::INT32}, strings_count + 1, mask_state::UNALLOCATED, stream, mr);
return make_numeric_column(data_type{type_id::INT32}, thrust::get<1>(elem) + 1, mask_state::UNALLOCATED, stream, mr);
}
);

std::transform (
thrust::make_zip_iterator(thrust::make_tuple(strings_batch.begin(), strings_sizes.begin(), offsets_columns.begin())),
thrust::make_zip_iterator(thrust::make_tuple(strings_batch.end(), strings_sizes.end(), offsets_columns.end())),
std::back_inserter(total_bytes),
[] (auto &elem) {
auto offsets_transformer =
cuda::proclaim_return_type<size_type>([] __device__(string_index_pair item) -> size_type {
return (item.first != nullptr ? static_cast<size_type>(item.second) : size_type{0});
});
auto offsets_transformer_itr = thrust::make_transform_iterator(thrust::get<0>(elem), offsets_transformer);
auto d_offsets = thrust::get<2>(elem)->mutable_view().template data<int32_t>();
auto map_fn = cuda::proclaim_return_type<size_type>(
[begin = offsets_transformer_itr, strings_count = thrust::get<1>(elem)] __device__(size_type idx) -> size_type {
return idx < strings_count ? static_cast<size_type>(begin[idx]) : size_type{0};
}
);
auto input_itr = cudf::detail::make_counting_transform_iterator(0, map_fn);
return cudf::detail::sizes_to_offsets_batch(input_itr, input_itr + thrust::get<1>(elem) + 1, d_offsets, stream);
}
);
sizes_to_offsets_batch(strings_batch, offsets_columns, total_bytes, stream);

auto host_total_bytes = make_std_vector_async(total_bytes, stream);
auto host_total_bytes = cudf::detail::make_std_vector_async(total_bytes, stream);

auto const threshold = cudf::strings::get_offset64_threshold();
std::for_each (
thrust::make_zip_iterator(thrust::make_tuple(host_total_bytes.begin(), strings_sizes.begin(), offsets_columns.begin(), strings_batch.begin())),
thrust::make_zip_iterator(thrust::make_tuple(host_total_bytes.end(), strings_sizes.end(), offsets_columns.end(), strings_batch.end())),
thrust::make_zip_iterator(thrust::make_tuple(host_total_bytes.begin(), offsets_columns.begin(), strings_batch.begin())),
thrust::make_zip_iterator(thrust::make_tuple(host_total_bytes.end(), offsets_columns.end(), strings_batch.end())),
[threshold, stream, mr] (auto &elem) {
auto offsets_transformer =
cuda::proclaim_return_type<size_type>([] __device__(string_index_pair item) -> size_type {
return (item.first != nullptr ? static_cast<size_type>(item.second) : size_type{0});
cuda::proclaim_return_type<size_type>([] (auto item) -> size_type {
return (item.first != nullptr ? static_cast<size_type>(thrust::get<1>(item)) : size_type{0});
});

auto offsets_transformer_itr = thrust::make_transform_iterator(thrust::get<0>(elem), offsets_transformer);
CUDF_EXPECTS(cudf::strings::is_large_strings_enabled() || (thrust::get<0>(elem) < threshold),
"Size of output exceeds the column size limit",
std::overflow_error);
auto strings_count = thrust::get<1>(thrust::get<0>(elem));

if (thrust::get<0>(elem) >= cudf::strings::get_offset64_threshold()) {
// recompute as int64 offsets when above the threshold
auto map_fn = cuda::proclaim_return_type<size_type>(
[begin = offsets_transformer_itr, strings_count = thrust::get<1>(elem)] __device__(size_type idx) -> size_type {
[begin = offsets_transformer_itr, strings_count = strings_count] (size_type idx) -> size_type {
return idx < strings_count ? static_cast<size_type>(begin[idx]) : size_type{0};
}
);

auto input_itr = cudf::detail::make_counting_transform_iterator(0, map_fn);
*(thrust::get<2>(elem)) = make_numeric_column(
data_type{type_id::INT64}, thrust::get<1>(elem) + 1, mask_state::UNALLOCATED, stream, mr);
auto d_offsets64 = (*(thrust::get<2>(elem)))->mutable_view().template data<int64_t>();
cudf::detail::sizes_to_offsets(input_itr, input_itr + thrust::get<1>(elem) + 1, d_offsets64, stream);

*(thrust::get<1>(elem)) = make_numeric_column(
data_type{type_id::INT64}, strings_count + 1, mask_state::UNALLOCATED, stream, mr);

auto d_offsets64 = (*(thrust::get<1>(elem)))->mutable_view().template data<int64_t>();

cudf::detail::sizes_to_offsets(input_itr, input_itr + strings_count + 1, d_offsets64, stream);
}
}
);

return std::pair(offsets_column, total_bytes);
return std::pair(offsets_columns, total_bytes);
}


10 changes: 7 additions & 3 deletions cpp/src/strings/strings_column_factories.cu
Original file line number Diff line number Diff line change
@@ -48,11 +48,12 @@ std::vector<std::unique_ptr<column>> make_strings_column_batch(

[offset_columns, total_bytes] = cudf::strings::detail::make_offsets_child_column_batch(strings_batch, stream, mr);


// create null mask
rmm::device_uvector<size_type> valid_counts(strings.size(), stream);
std::vector<bitmask_type*> null_masks(strings.size(), stream);
rmm::device_uvector<size_type> valid_counts(strings_batch.size(), stream);
std::vector<bitmask_type*> null_masks(strings_batch.size(), stream);

auto validator = [] __device__(string_index_pair const item) { return item.first != nullptr; };

[null_masks, valid_counts] = cudf::detail::valid_if_n_kernel(strings_batch, sizes, validator, stream, mr);

// build chars column
@@ -68,6 +69,7 @@ std::vector<std::unique_ptr<column>> make_strings_column_batch(
auto d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(thrust::get<0>(elem)->view());
auto chars_data = [d_offsets, bytes = bytes, begin, strings_count, null_count, stream, mr] {
auto const avg_bytes_per_row = bytes / std::max(strings_count - null_count, 1);

// use a character-parallel kernel for long string lengths
if (avg_bytes_per_row > FACTORY_BYTES_PER_ROW_THRESHOLD) {
auto const str_begin = thrust::make_transform_iterator(
@@ -91,10 +93,12 @@ std::vector<std::unique_ptr<column>> make_strings_column_batch(
int64_t const offset = thrust::get<1>(item);
if (str.first != nullptr) memcpy(d_chars + offset, str.first, str.second);
};

thrust::for_each_n(rmm::exec_policy(stream),
thrust::make_zip_iterator(thrust::make_tuple(begin, d_offsets)),
strings_count,
copy_chars);

return chars_data;
}
}();

0 comments on commit d50185c

Please sign in to comment.