Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

random sampling of dataset rows with improved memory utilization #2155

Merged
merged 19 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/bench/prims/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ if(BUILD_PRIMS_BENCH)

ConfigureBench(
NAME RANDOM_BENCH PATH bench/prims/random/make_blobs.cu bench/prims/random/permute.cu
bench/prims/random/rng.cu bench/prims/main.cpp
bench/prims/random/rng.cu bench/prims/random/subsample.cu bench/prims/main.cpp
)

ConfigureBench(NAME SPARSE_BENCH PATH bench/prims/sparse/convert_csr.cu bench/prims/main.cpp)
Expand Down
38 changes: 35 additions & 3 deletions cpp/bench/prims/matrix/gather.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,48 @@

#include <common/benchmark.hpp>

#include <raft/core/device_mdarray.hpp>
#include <raft/core/host_mdarray.hpp>
#include <raft/core/resource/cuda_stream.hpp>
#include <raft/matrix/gather.cuh>
#include <raft/random/rng.cuh>
#include <raft/util/itertools.hpp>

#include <rmm/device_uvector.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

namespace raft::bench::matrix {

template <typename IdxT>
struct GatherParams {
IdxT rows, cols, map_length;
bool host;
};

template <typename IdxT>
inline auto operator<<(std::ostream& os, const GatherParams<IdxT>& p) -> std::ostream&
{
os << p.rows << "#" << p.cols << "#" << p.map_length;
os << p.rows << "#" << p.cols << "#" << p.map_length << (p.host ? "#host" : "#device");
return os;
}

template <typename T, typename MapT, typename IdxT, bool Conditional = false>
struct Gather : public fixture {
Gather(const GatherParams<IdxT>& p)
: params(p), matrix(this->handle), map(this->handle), out(this->handle), stencil(this->handle)
: params(p),
old_mr(rmm::mr::get_current_device_resource()),
pool_mr(rmm::mr::get_current_device_resource(), 2 * (1ULL << 30)),
matrix(this->handle),
map(this->handle),
out(this->handle),
stencil(this->handle),
matrix_h(this->handle)
{
rmm::mr::set_current_device_resource(&pool_mr);
}

~Gather() { rmm::mr::set_current_device_resource(old_mr); }

void allocate_data(const ::benchmark::State& state) override
{
matrix = raft::make_device_matrix<T, IdxT>(handle, params.rows, params.cols);
Expand All @@ -59,6 +73,11 @@ struct Gather : public fixture {
if constexpr (Conditional) {
raft::random::uniform(handle, rng, stencil.data_handle(), params.map_length, T(-1), T(1));
}

if (params.host) {
matrix_h = raft::make_host_matrix<T, IdxT>(handle, params.rows, params.cols);
raft::copy(matrix_h.data_handle(), matrix.data_handle(), matrix.size(), stream);
}
resource::sync_stream(handle, stream);
}

Expand All @@ -77,14 +96,22 @@ struct Gather : public fixture {
raft::matrix::gather_if(
handle, matrix_const_view, out.view(), map_const_view, stencil_const_view, pred_op);
} else {
raft::matrix::gather(handle, matrix_const_view, map_const_view, out.view());
if (params.host) {
raft::matrix::detail::gather(
handle, make_const_mdspan(matrix_h.view()), map_const_view, out.view());
} else {
raft::matrix::gather(handle, matrix_const_view, map_const_view, out.view());
}
}
});
}

private:
GatherParams<IdxT> params;
rmm::mr::device_memory_resource* old_mr;
rmm::mr::pool_memory_resource<rmm::mr::device_memory_resource> pool_mr;
raft::device_matrix<T, IdxT> matrix, out;
raft::host_matrix<T, IdxT> matrix_h;
raft::device_vector<T, IdxT> stencil;
raft::device_vector<MapT, IdxT> map;
}; // struct Gather
Expand All @@ -100,4 +127,9 @@ RAFT_BENCH_REGISTER((Gather<float, uint32_t, int64_t>), "", gather_inputs_i64);
RAFT_BENCH_REGISTER((Gather<double, uint32_t, int64_t>), "", gather_inputs_i64);
RAFT_BENCH_REGISTER((GatherIf<float, uint32_t, int64_t>), "", gather_inputs_i64);
RAFT_BENCH_REGISTER((GatherIf<double, uint32_t, int64_t>), "", gather_inputs_i64);

auto inputs_host = raft::util::itertools::product<GatherParams<int64_t>>(
{10000000}, {100}, {1000, 1000000, 10000000}, {true});
RAFT_BENCH_REGISTER((Gather<float, uint32_t, int64_t>), "Host", inputs_host);

} // namespace raft::bench::matrix
112 changes: 112 additions & 0 deletions cpp/bench/prims/random/subsample.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <common/benchmark.hpp>

#include <raft/core/device_mdarray.hpp>
#include <raft/core/device_resources.hpp>
#include <raft/core/host_mdarray.hpp>
#include <raft/core/operators.hpp>
#include <raft/random/permute.cuh>
#include <raft/random/rng.cuh>
#include <raft/random/sample_without_replacement.cuh>
#include <raft/spatial/knn/detail/ann_utils.cuh>
#include <raft/util/cudart_utils.hpp>

#include <rmm/device_scalar.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <cub/cub.cuh>

tfeher marked this conversation as resolved.
Show resolved Hide resolved
namespace raft::bench::random {

struct sample_inputs {
int n_samples;
int n_train;
int method;
}; // struct sample_inputs

inline auto operator<<(std::ostream& os, const sample_inputs& p) -> std::ostream&
{
os << p.n_samples << "#" << p.n_train << "#" << p.method;
return os;
}

// Sample with replacement. We use this as a baseline.
template <typename IdxT>
auto bernoulli_subsample(raft::resources const& res, IdxT n_samples, IdxT n_subsamples, int seed)
-> raft::device_vector<IdxT, IdxT>
{
RAFT_EXPECTS(n_subsamples <= n_samples, "Cannot have more training samples than dataset vectors");

auto indices = raft::make_device_vector<IdxT, IdxT>(res, n_subsamples);
raft::random::RngState state(123456ULL);
raft::random::uniformInt(
res, state, indices.data_handle(), n_subsamples, IdxT(0), IdxT(n_samples));
return indices;
}

template <typename T>
struct sample : public fixture {
sample(const sample_inputs& p)
: params(p),
old_mr(rmm::mr::get_current_device_resource()),
pool_mr(rmm::mr::get_current_device_resource(), 2 * GiB),
in(make_device_vector<T, int64_t>(res, p.n_samples)),
out(make_device_vector<T, int64_t>(res, p.n_train))
{
rmm::mr::set_current_device_resource(&pool_mr);
raft::random::RngState r(123456ULL);
}

~sample() { rmm::mr::set_current_device_resource(old_mr); }
void run_benchmark(::benchmark::State& state) override
{
std::ostringstream label_stream;
label_stream << params;
state.SetLabel(label_stream.str());

raft::random::RngState r(123456ULL);
loop_on_state(state, [this, &r]() {
if (params.method == 1) {
this->out =
bernoulli_subsample<T>(this->res, this->params.n_samples, this->params.n_train, 137);
} else if (params.method == 2) {
this->out = raft::random::excess_subsample<T, int64_t>(
this->res, r, this->params.n_samples, this->params.n_train);
}
});
}

private:
float GiB = 1073741824.0f;
raft::device_resources res;
rmm::mr::device_memory_resource* old_mr;
rmm::mr::pool_memory_resource<rmm::mr::device_memory_resource> pool_mr;
sample_inputs params;
raft::device_vector<T, int64_t> out, in;
}; // struct sample

const std::vector<sample_inputs> input_vecs = {{100000000, 10000000, 1},
{100000000, 50000000, 1},
{100000000, 100000000, 1},
{100000000, 10000000, 2},
{100000000, 50000000, 2},
{100000000, 100000000, 2}};

RAFT_BENCH_REGISTER(sample<int64_t>, "", input_vecs);

} // namespace raft::bench::random
87 changes: 87 additions & 0 deletions cpp/include/raft/matrix/detail/gather.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,19 @@

#pragma once

#include <raft/common/nvtx.hpp>
#include <raft/core/device_mdarray.hpp>
#include <raft/core/device_mdspan.hpp>
#include <raft/core/host_mdarray.hpp>
#include <raft/core/host_mdspan.hpp>
#include <raft/core/operators.hpp>
#include <raft/core/pinned_mdarray.hpp>
#include <raft/core/pinned_mdspan.hpp>
#include <raft/util/cuda_dev_essentials.cuh>
#include <raft/util/cudart_utils.hpp>

#include <omp.h>

#include <functional>

namespace raft {
Expand Down Expand Up @@ -336,6 +346,83 @@ void gather_if(const InputIteratorT in,
gatherImpl(in, D, N, map, stencil, map_length, out, pred_op, transform_op, stream);
}

/**
* Helper function to gather a set of vectors from a (host) dataset.
*/
template <typename T, typename IdxT, typename MatIdxT = int64_t>
void gather_buff(host_matrix_view<const T, MatIdxT> dataset,
host_vector_view<const IdxT, MatIdxT> indices,
MatIdxT offset,
pinned_matrix_view<T, MatIdxT> buff)
{
raft::common::nvtx::range<common::nvtx::domain::raft> fun_scope("gather_host_buff");
IdxT batch_size = std::min<IdxT>(buff.extent(0), indices.extent(0) - offset);

#pragma omp for
for (IdxT i = 0; i < batch_size; i++) {
IdxT in_idx = indices(offset + i);
for (IdxT k = 0; k < buff.extent(1); k++) {
buff(i, k) = dataset(in_idx, k);
}
}
}

template <typename T, typename IdxT, typename MatIdxT = int64_t>
void gather(raft::resources const& res,
host_matrix_view<const T, MatIdxT> dataset,
device_vector_view<const IdxT, MatIdxT> indices,
raft::device_matrix_view<T, MatIdxT> output)
{
raft::common::nvtx::range<common::nvtx::domain::raft> fun_scope("gather");
IdxT n_dim = output.extent(1);
IdxT n_train = output.extent(0);
auto indices_host = raft::make_host_vector<IdxT, MatIdxT>(n_train);
raft::copy(
indices_host.data_handle(), indices.data_handle(), n_train, resource::get_cuda_stream(res));
resource::sync_stream(res);

const size_t buffer_size = 32768 * 1024; // bytes
const size_t max_batch_size =
std::min<size_t>(round_up_safe<size_t>(buffer_size / n_dim, 32), n_train);
RAFT_LOG_DEBUG("Gathering data with batch size %zu", max_batch_size);

// Gather the vector on the host in tmp buffers. We use two buffers to overlap H2D sync
// and gathering the data.
auto out_tmp1 = raft::make_pinned_matrix<T, MatIdxT>(res, max_batch_size, n_dim);
auto out_tmp2 = raft::make_pinned_matrix<T, MatIdxT>(res, max_batch_size, n_dim);

// Usually a limited number of threads provide sufficient bandwidth for gathering data.
int n_threads = std::min(omp_get_max_threads(), 32);

// The gather_buff function has a parallel for loop. We start the the omp parallel
// region here, to avoid repeated overhead within the device_offset loop.
tfeher marked this conversation as resolved.
Show resolved Hide resolved
#pragma omp parallel num_threads(n_threads)
{
auto view1 = out_tmp1.view();
auto view2 = out_tmp2.view();
gather_buff(dataset, make_const_mdspan(indices_host.view()), (MatIdxT)0, view1);
for (MatIdxT device_offset = 0; device_offset < n_train; device_offset += max_batch_size) {
MatIdxT batch_size = std::min<IdxT>(max_batch_size, n_train - device_offset);

#pragma omp master
raft::copy(output.data_handle() + device_offset * n_dim,
view1.data_handle(),
batch_size * n_dim,
resource::get_cuda_stream(res));
// Start gathering the next batch on the host.
MatIdxT host_offset = device_offset + batch_size;
batch_size = std::min<IdxT>(max_batch_size, n_train - host_offset);
if (batch_size > 0) {
gather_buff(dataset, make_const_mdspan(indices_host.view()), host_offset, view2);
}
#pragma omp master
resource::sync_stream(res);
#pragma omp barrier
std::swap(view1, view2);
}
}
}

} // namespace detail
} // namespace matrix
} // namespace raft
57 changes: 57 additions & 0 deletions cpp/include/raft/matrix/detail/sample_rows.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <raft/core/device_mdarray.hpp>
#include <raft/core/device_mdspan.hpp>
#include <raft/core/logger.hpp>
#include <raft/core/resources.hpp>
#include <raft/matrix/gather.cuh>
#include <raft/random/rng.cuh>
#include <raft/util/cuda_utils.cuh>
#include <raft/util/cudart_utils.hpp>

namespace raft::matrix::detail {

/** Select rows randomly from input and copy to output. */
template <typename T, typename IdxT = int64_t>
void sample_rows(raft::resources const& res,
random::RngState random_state,
const T* input,
IdxT n_rows_input,
raft::device_matrix_view<T, IdxT> output)
{
IdxT n_dim = output.extent(1);
IdxT n_samples = output.extent(0);

raft::device_vector<IdxT, IdxT> train_indices =
raft::random::excess_subsample<IdxT, int64_t>(res, random_state, n_rows_input, n_samples);

cudaPointerAttributes attr;
RAFT_CUDA_TRY(cudaPointerGetAttributes(&attr, input));
T* ptr = reinterpret_cast<T*>(attr.devicePointer);
if (ptr != nullptr) {
raft::matrix::gather(res,
raft::make_device_matrix_view<const T, IdxT>(ptr, n_rows_input, n_dim),
raft::make_const_mdspan(train_indices.view()),
output);
} else {
auto dataset = raft::make_host_matrix_view<const T, IdxT>(input, n_rows_input, n_dim);
raft::matrix::detail::gather(res, dataset, make_const_mdspan(train_indices.view()), output);
}
}
} // namespace raft::matrix::detail
Loading
Loading