Skip to content

Commit

Permalink
Making sure serialize_buffer properly destroys buffer, if needed.
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Oct 17, 2023
1 parent 0e09575 commit 6fe8cd4
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 70 deletions.
4 changes: 2 additions & 2 deletions examples/1d_stencil/1d_stencil_6.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ struct partition_data

// Create a new (uninitialized) partition of the given size.
explicit partition_data(std::size_t size)
: data_(std::allocator<double>().allocate(size), size, buffer_type::take)
: data_(new double[size], size, buffer_type::take)
, size_(size)
, min_index_(0)
{
}

// Create a new (initialized) partition of the given size.
partition_data(std::size_t size, double initial_value)
: data_(std::allocator<double>().allocate(size), size, buffer_type::take)
: data_(new double[size], size, buffer_type::take)
, size_(size)
, min_index_(0)
{
Expand Down
4 changes: 2 additions & 2 deletions examples/1d_stencil/1d_stencil_7.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ struct partition_data

// Create a new (uninitialized) partition of the given size.
explicit partition_data(std::size_t size)
: data_(std::allocator<double>().allocate(size), size, buffer_type::take)
: data_(new double[size], size, buffer_type::take)
, size_(size)
, min_index_(0)
{
}

// Create a new (initialized) partition of the given size.
partition_data(std::size_t size, double initial_value)
: data_(std::allocator<double>().allocate(size), size, buffer_type::take)
: data_(new double[size], size, buffer_type::take)
, size_(size)
, min_index_(0)
{
Expand Down
4 changes: 2 additions & 2 deletions examples/1d_stencil/1d_stencil_8.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ struct partition_data

struct hold_reference
{
hold_reference(buffer_type const& data)
explicit hold_reference(buffer_type const& data)
: data_(data)
{
}

void operator()(double*) {} // no deletion necessary
void operator()(double const*) const {} // no deletion necessary

buffer_type data_;
};
Expand Down
19 changes: 10 additions & 9 deletions examples/quickstart/zerocopy_rdma.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ class pointer_allocator
{
}

pointer address(reference value) const
static pointer address(reference value)
{
return &value;
}
const_pointer address(const_reference value) const
static const_pointer address(const_reference value)
{
return &value;
}
Expand Down Expand Up @@ -118,7 +118,7 @@ struct zerocopy_server : hpx::components::component_base<zerocopy_server>
}

public:
zerocopy_server(std::size_t size = 0)
explicit zerocopy_server(std::size_t size = 0)
: data_(size, 3.1415)
{
}
Expand All @@ -127,7 +127,7 @@ struct zerocopy_server : hpx::components::component_base<zerocopy_server>
// Retrieve an array of doubles to the given address
transfer_buffer_type get_here(std::size_t size, std::size_t remote_buffer)
{
pointer_allocator<double> allocator(
pointer_allocator<double> const allocator(
reinterpret_cast<double*>(remote_buffer), size);

// lock the mutex, will be unlocked by the transfer buffer's deleter
Expand Down Expand Up @@ -225,9 +225,7 @@ struct zerocopy : hpx::components::client_base<zerocopy, zerocopy_server>
///////////////////////////////////////////////////////////////////////////////
int main()
{
std::vector<hpx::id_type> localities = hpx::find_all_localities();

for (hpx::id_type const& id : localities)
for (hpx::id_type const& id : hpx::find_all_localities())
{
zerocopy zc = hpx::new_<zerocopy_server>(id, ZEROCOPY_DATASIZE);

Expand All @@ -238,7 +236,10 @@ int main()
hpx::chrono::high_resolution_timer t;

for (int i = 0; i != 100; ++i)
zc.get(hpx::launch::sync, ZEROCOPY_DATASIZE);
{
[[maybe_unused]] auto r =
zc.get(hpx::launch::sync, ZEROCOPY_DATASIZE);
}

double d = t.elapsed();
std::cout << "Elapsed time 'get' (locality "
Expand All @@ -252,7 +253,7 @@ int main()
for (int i = 0; i != 100; ++i)
zc.get_here(hpx::launch::sync, buffer);

double d = t.elapsed();
double const d = t.elapsed();
std::cout << "Elapsed time 'get_here' (locality "
<< hpx::naming::get_locality_id_from_id(id) << "): " << d
<< "[s]\n";
Expand Down
152 changes: 101 additions & 51 deletions libs/core/serialization/include/hpx/serialization/serialize_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,36 @@
#include <boost/shared_array.hpp>
#endif

#include <algorithm>
#include <cstddef>
#include <memory>

namespace hpx::serialization {

namespace detail {

template <typename Deallocator>
struct array_deleter
{
template <typename T>
void operator()(
T* p, Deallocator dealloc, std::size_t size) const noexcept
{
std::destroy(p, p + size);
dealloc.deallocate(p, size);
}
};

template <typename T>
struct array_deleter<std::allocator<T>>
{
void operator()(
T const* p, std::allocator<T>, std::size_t) const noexcept
{
delete[] p;
}
};
} // namespace detail

///////////////////////////////////////////////////////////////////////////
template <typename T, typename Allocator>
class serialize_buffer
Expand All @@ -41,20 +65,28 @@ namespace hpx::serialization {
static constexpr void no_deleter(T*) noexcept {}

template <typename Deallocator>
static void deleter(
T* p, Deallocator dealloc, std::size_t size) noexcept
struct deleter
{
dealloc.deallocate(p, size);
}
void operator()(
T* p, Deallocator dealloc, std::size_t size) const noexcept
{
std::destroy_at(p);
dealloc.deallocate(p, size);
}
};

public:
enum init_mode
{
copy = 0, // constructor copies data
reference = 1, // constructor does not copy data and does not
// manage the lifetime of it
take = 2 // constructor does not copy data but does take
// ownership and manages the lifetime of it
copy = 0, // constructor copies data
reference = 1, // constructor does not copy data and does not
// manage the lifetime of it
take = 2, // constructor does not copy data but takes
// ownership of array pointer and manages the
// lifetime of it
take_single = 3 // constructor does not copy data but takes
// ownership of pointer to non-array and manages
// the lifetime of it
};

using value_type = T;
Expand All @@ -72,9 +104,10 @@ namespace hpx::serialization {
, size_(size)
, alloc_(alloc)
{
data_.reset(alloc_.allocate(size),
[alloc = this->alloc_, size = this->size_](T* p) noexcept {
serialize_buffer::deleter<allocator_type>(p, alloc, size);
auto* p = alloc_.allocate(size);
data_.reset(
p, [alloc = this->alloc_, size = this->size_](T* p) noexcept {
serialize_buffer::deleter<allocator_type>()(p, alloc, size);
});
}

Expand All @@ -88,27 +121,34 @@ namespace hpx::serialization {
{
if (mode == copy)
{
data_.reset(alloc_.allocate(size),
auto* p = alloc_.allocate(size);
data_.reset(p,
[alloc = this->alloc_, size = this->size_](T* p) noexcept {
serialize_buffer::deleter<allocator_type>(
p, alloc, size);
detail::array_deleter<allocator_type>()(p, alloc, size);
});
if (size != 0)
std::copy(data, data + size, data_.get());
std::uninitialized_copy(data, data + size, p);
}
else if (mode == reference)
{
data_ = buffer_type(data, &serialize_buffer::no_deleter);
}
else
else if (mode == take_single)
{
// take ownership
data_ = buffer_type(data,
[alloc = this->alloc_, size = this->size_](T* p) noexcept {
serialize_buffer::deleter<allocator_type>(
serialize_buffer::deleter<allocator_type>()(
p, alloc, size);
});
}
else if (mode == take)
{
// take ownership
data_ = buffer_type(data,
[alloc = this->alloc_, size = this->size_](T* p) noexcept {
detail::array_deleter<allocator_type>()(p, alloc, size);
});
}
}

template <typename Deallocator>
Expand All @@ -120,49 +160,58 @@ namespace hpx::serialization {
{
// if 2 allocators are specified we assume mode 'take'
data_ = buffer_type(data, [this, dealloc](T* p) noexcept {
serialize_buffer::deleter<Deallocator>(p, dealloc, size_);
detail::array_deleter<Deallocator>()(p, dealloc, size_);
});
}

template <typename Deleter>
serialize_buffer(T* data, std::size_t size, init_mode mode,
Deleter const& deleter,
allocator_type const& alloc = allocator_type())
Deleter&& deleter, allocator_type const& alloc = allocator_type())
: data_()
, size_(size)
, alloc_(alloc)
{
if (mode == copy)
{
data_.reset(alloc_.allocate(size), deleter);
if (size != 0)
std::copy(data, data + size, data_.get());
auto* p = alloc_.allocate(size);
data_ = buffer_type(p,
[alloc = this->alloc_, size = this->size_](T* p) noexcept {
detail::array_deleter<allocator_type>()(p, alloc, size);
});
std::uninitialized_copy(data, data + size, p);
}
else
{
// reference or take ownership, behavior is defined by deleter
data_ = buffer_type(data, deleter);
data_ = buffer_type(data,
[deleter = HPX_FORWARD(Deleter, deleter)](
T* p) noexcept { deleter(p); });
}
}

template <typename Deleter>
serialize_buffer(T const* data, std::size_t size,
init_mode mode, //-V659
Deleter const& deleter,
allocator_type const& alloc = allocator_type())
Deleter&& deleter, allocator_type const& alloc = allocator_type())
: data_()
, size_(size)
, alloc_(alloc)
{
if (mode == copy)
{
data_.reset(alloc_.allocate(size), deleter);
if (size != 0)
std::copy(data, data + size, data_.get());
auto* p = alloc_.allocate(size);
data_ = buffer_type(p,
[alloc = this->alloc_, size = this->size_](T* p) noexcept {
detail::array_deleter<allocator_type>()(p, alloc, size);
});
std::uninitialized_copy(data, data + size, p);
}
else if (mode == reference)
{
data_ = buffer_type(const_cast<T*>(data), deleter);
// reference behavior is defined by deleter
data_ = buffer_type(const_cast<T*>(data),
[deleter = HPX_FORWARD(Deleter, deleter)](
T* p) noexcept { deleter(p); });
}
else
{
Expand Down Expand Up @@ -193,26 +242,28 @@ namespace hpx::serialization {
, alloc_(alloc)
{
// create from const data implies 'copy' mode
data_.reset(alloc_.allocate(size),
[alloc = this->alloc_, size = this->size_](T* p) noexcept {
serialize_buffer::deleter<allocator_type>(p, alloc, size);
auto* p = alloc_.allocate(size);
data_ = buffer_type(
p, [alloc = this->alloc_, size = this->size_](T* p) noexcept {
detail::array_deleter<allocator_type>()(p, alloc, size);
});
if (size != 0)
std::copy(data, data + size, data_.get());
std::uninitialized_copy(data, data + size, p);
}

template <typename Deleter>
serialize_buffer(T const* data, std::size_t size,
Deleter const& deleter,
serialize_buffer(T const* data, std::size_t size, Deleter&& deleter,
allocator_type const& alloc = allocator_type())
: data_()
, size_(size)
, alloc_(alloc)
{
// create from const data implies 'copy' mode
data_.reset(alloc_.allocate(size), deleter);
if (size != 0)
std::copy(data, data + size, data_.get());
auto* p = alloc_.allocate(size);
data_ = buffer_type(
p, [alloc = this->alloc_, size = this->size_](T* p) noexcept {
detail::array_deleter<allocator_type>()(p, alloc, size);
});
std::uninitialized_copy(data, data + size, p);
}

serialize_buffer(T const* data, std::size_t size, init_mode mode,
Expand All @@ -223,13 +274,12 @@ namespace hpx::serialization {
{
if (mode == copy)
{
data_.reset(alloc_.allocate(size),
auto* p = alloc_.allocate(size);
data_ = buffer_type(p,
[alloc = this->alloc_, size = this->size_](T* p) noexcept {
serialize_buffer::deleter<allocator_type>(
p, alloc, size);
detail::array_deleter<allocator_type>()(p, alloc, size);
});
if (size != 0)
std::copy(data, data + size, data_.get());
std::uninitialized_copy(data, data + size, p);
}
else if (mode == reference)
{
Expand Down Expand Up @@ -316,9 +366,9 @@ namespace hpx::serialization {
{
ar >> size_ >> alloc_; // -V128

data_.reset(alloc_.allocate(size_),
[alloc = this->alloc_, size = this->size_](T* p) {
serialize_buffer::deleter<allocator_type>(p, alloc, size);
data_ = buffer_type(alloc_.allocate(size_),
[alloc = this->alloc_, size = this->size_](T* p) noexcept {
detail::array_deleter<allocator_type>()(p, alloc, size);
});

if (size_ != 0)
Expand Down
Loading

0 comments on commit 6fe8cd4

Please sign in to comment.