Skip to content

Commit

Permalink
[inbox] Add AllLatest receive type, a generalization of NLatest (#93)
Browse files Browse the repository at this point in the history
Add an AllLatest receive type for the inbox, which is similar to NLatest
except that it is unbounded in how many elements it can return. This
makes it slightly less efficient, but is necessary for handling "the
last N seconds" of messages on a topic that publishes at an inconsistent
frequency.

The main efficiency losses are that we have to allocate memory
dynamically (not a big deal though, as we will grow to some max size and
likely stay there) and that messages are copied out of the subscriber's
memory pool and into the dynamic ring buffer (again not too big of an
issue, as large messages are mostly on the heap anyway).
  • Loading branch information
MattOslin authored Apr 18, 2024
1 parent 075b700 commit 403a91b
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 18 deletions.
6 changes: 6 additions & 0 deletions trellis/containers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,9 @@ cc_library(
hdrs = ["ring_buffer.hpp"],
visibility = ["//visibility:public"],
)

cc_library(
name = "dynamic_ring_buffer",
hdrs = ["dynamic_ring_buffer.hpp"],
visibility = ["//visibility:public"],
)
188 changes: 188 additions & 0 deletions trellis/containers/dynamic_ring_buffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (C) 2023 Agtonomy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

#ifndef TRELLIS_CONTAINERS_DYNAMIC_RING_BUFFER_HPP_
#define TRELLIS_CONTAINERS_DYNAMIC_RING_BUFFER_HPP_

#include <cstddef>
#include <memory>
namespace trellis::containers {

/**
* @brief A dynamically sized ring buffer.
*
* Dynamically allocated but never deallocates. Only ever allocates to the maximum concurrently stored values. When the
* buffer is full, the capacity is doubled.
*
* Uses unmasked indices for simplicity, meaning we never have to bound our indices, they naturally underflow and
* overflow appropriately. This works since capacity is always a power of 2, so the overlow value is always divisible by
* capacity, so wrapping around and then masking with capacity always gives the next/previous element.
*
* @tparam T the type to store
*/
template <typename T>
class DynamicRingBuffer {
public:
using value_type = T;

DynamicRingBuffer() = default;

~DynamicRingBuffer() {
for (auto i = begin_; i != end_; ++i) std::destroy_at(&data_[Mask(i)]);
std::allocator<T>{}.deallocate(data_, capacity_);
}

DynamicRingBuffer(const DynamicRingBuffer& other) = delete;
DynamicRingBuffer& operator=(const DynamicRingBuffer&) = delete;

DynamicRingBuffer(DynamicRingBuffer&& other) noexcept {
data_ = other.data_;
capacity_ = other.capacity_;
begin_ = other.begin_;
end_ = other.end_;
other.data_ = nullptr;
other.capacity_ = 0;
other.begin_ = 0;
other.end_ = 0;
}

DynamicRingBuffer& operator=(DynamicRingBuffer&& other) noexcept {
for (auto i = begin_; i != end_; ++i) std::destroy_at(&data_[Mask(i)]);
std::allocator<T>{}.deallocate(data_, capacity_);
data_ = other.data_;
capacity_ = other.capacity_;
begin_ = other.begin_;
end_ = other.end_;
other.data_ = nullptr;
other.capacity_ = 0;
other.begin_ = 0;
other.end_ = 0;
return *this;
}

size_t size() const { return end_ - begin_; } // Overflow handles correctly.
bool empty() const { return end_ == begin_; }

void push_back(T t) {
if (size() == capacity_) IncreaseCapacity();
std::construct_at(&data_[Mask(end_)], std::move(t));
++end_;
}

void pop_front() {
std::destroy_at(&data_[Mask(begin_)]);
++begin_;
}

class ConstIterator {
public:
using iterator_category = std::random_access_iterator_tag;
using value_type = T;
using pointer = const T*;
using reference = const T&;
using difference_type = std::ptrdiff_t;

// Constructors.
ConstIterator() = default;
ConstIterator(const T* data, size_t capacity, size_t index) : data_{data}, capacity_{capacity}, index_{index} {}

// Pointer like operators.
reference operator*() const { return data_[Mask(index_)]; }
pointer operator->() const { return &data_[Mask(index_)]; }
reference operator[](const difference_type offset) const { return data_[Mask(index_ + offset)]; }

// Increment / Decrement
ConstIterator& operator++() {
++index_;
return *this;
}

ConstIterator& operator--() {
--index_;
return *this;
}

// Arithmetic
ConstIterator& operator+=(const difference_type offset) {
index_ += offset;
return *this;
}

ConstIterator operator+(const difference_type offset) const { return {data_, capacity_, index_ + offset}; }

friend ConstIterator operator+(const difference_type offset, const ConstIterator& right) {
return {right.data_, right.capacity_, right.index_ + offset};
}

ConstIterator& operator-=(const difference_type offset) {
index_ -= offset;
return *this;
}

ConstIterator operator-(const difference_type offset) const { return {data_, capacity_, index_ - offset}; }

difference_type operator-(const ConstIterator& right) const { return index_ - right.index_; }

private:
// Comparison operators
friend bool operator==(const ConstIterator&, const ConstIterator&) = default;
friend bool operator!=(const ConstIterator&, const ConstIterator&) = default;

// Go from unmasked index to masked index (can be used to access data).
size_t Mask(const size_t index) const { return index & (capacity_ - 1); }

const T* data_ = nullptr;
size_t capacity_ = 0;
size_t index_ = 0; // Unmasked, integer overflow works appropriately since capacity is a power of 2.
};

ConstIterator begin() const { return {data_, capacity_, begin_}; }
ConstIterator end() const { return {data_, capacity_, end_}; }

private:
// Go from unmasked index to masked index (can be used to access data).
size_t Mask(const size_t index) const { return index & (capacity_ - 1); }

void IncreaseCapacity() {
const auto new_capacity = capacity_ == 0 ? 1 : capacity_ * 2;
IncreaseCapacity(new_capacity);
}

void IncreaseCapacity(const size_t new_capacity) {
const auto new_data = allocator_.allocate(new_capacity);
const auto size = this->size();
for (auto new_i = size_t{}, old_i = begin_; old_i != end_; ++new_i, ++old_i) {
std::construct_at(&new_data[new_i], std::move(data_[Mask(old_i)]));
std::destroy_at(&data_[Mask(old_i)]);
}
allocator_.deallocate(data_, capacity_);
data_ = new_data;
capacity_ = new_capacity;
begin_ = 0;
end_ = size;
}

T* data_ = nullptr;
size_t capacity_ = 0; // Always a power of 2 (or 0).
size_t begin_ = 0; // Unmasked, integer overflow works appropriately since capacity is a power of 2.
size_t end_ = 0; // Unmasked, integer overflow works appropriately since capacity is a power of 2.
std::allocator<T> allocator_ = {};
};

} // namespace trellis::containers

#endif // TRELLIS_CONTAINERS_DYNAMIC_RING_BUFFER_HPP_
1 change: 0 additions & 1 deletion trellis/containers/ring_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include <array>
#include <cstddef>

namespace trellis::containers {

/**
Expand Down
9 changes: 9 additions & 0 deletions trellis/containers/test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,12 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "dynamic_ring_buffer_test",
srcs = ["dynamic_ring_buffer_test.cpp"],
deps = [
"//trellis/containers:dynamic_ring_buffer",
"@com_google_googletest//:gtest_main",
],
)
76 changes: 76 additions & 0 deletions trellis/containers/test/dynamic_ring_buffer_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include "trellis/containers/dynamic_ring_buffer.hpp"

#include <gmock/gmock.h>

namespace trellis::containers {

namespace {

using testing::ElementsAre;
using testing::Eq;
using testing::IsEmpty;
using testing::Pointee;
using testing::SizeIs;

// unique_ptr is a good test element because it has move but not copy semantics.
using DynamicTestBuffer = DynamicRingBuffer<std::unique_ptr<int>>;

} // namespace

TEST(DynamicRingBuffer, Empty) {
const auto ring = DynamicTestBuffer{};
ASSERT_THAT(ring, SizeIs(0));
ASSERT_THAT(ring, IsEmpty());
ASSERT_THAT(ring.begin(), Eq(ring.end()));
}

TEST(DynamicRingBuffer, PushBack) {
auto ring = DynamicTestBuffer{};
ring.push_back(std::make_unique<int>(0));
ring.push_back(std::make_unique<int>(1));
ring.push_back(std::make_unique<int>(2));
ASSERT_THAT(ring, SizeIs(3));
ASSERT_THAT(ring, ElementsAre(Pointee(0), Pointee(1), Pointee(2)));
}

TEST(DynamicRingBuffer, PopFront) {
auto ring = DynamicTestBuffer{};
ring.push_back(std::make_unique<int>(0));
ring.push_back(std::make_unique<int>(1));
ring.push_back(std::make_unique<int>(2));
ring.pop_front();
ASSERT_THAT(ring, SizeIs(2));
ASSERT_THAT(ring, ElementsAre(Pointee(1), Pointee(2)));
}

TEST(DynamicRingBuffer, PushPopFront) {
auto ring = DynamicTestBuffer{};
ring.push_back(std::make_unique<int>(0));
ring.push_back(std::make_unique<int>(1));
ring.push_back(std::make_unique<int>(2));
ring.pop_front();
ring.pop_front();
ring.push_back(std::make_unique<int>(3));
ASSERT_THAT(ring, SizeIs(2));
ASSERT_THAT(ring, ElementsAre(Pointee(2), Pointee(3)));
}

TEST(DynamicRingBuffer, MoveConstructor) {
auto ring1 = DynamicTestBuffer{};
ring1.push_back(std::make_unique<int>(0));
const auto ring2 = std::move(ring1);
ASSERT_THAT(ring1, SizeIs(0));
ASSERT_THAT(ring2, ElementsAre(Pointee(0)));
}

TEST(DynamicRingBuffer, MoveAssignment) {
auto ring1 = DynamicTestBuffer{};
ring1.push_back(std::make_unique<int>(0));
auto ring2 = DynamicTestBuffer{};
ring2.push_back(std::make_unique<int>(1));
ring2 = std::move(ring1);
ASSERT_THAT(ring1, SizeIs(0));
ASSERT_THAT(ring2, ElementsAre(Pointee(0)));
}

} // namespace trellis::containers
2 changes: 0 additions & 2 deletions trellis/containers/test/ring_buffer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ using testing::IsEmpty;
using testing::Pointee;
using testing::SizeIs;

constexpr size_t kTestCapacity = 10;

// unique_ptr is a good test element because it has move but not copy semantics.
using TestBuffer = RingBuffer<std::unique_ptr<int>, 3>;

Expand Down
1 change: 1 addition & 0 deletions trellis/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ cc_library(
":core_node",
":core_stamped_message",
":core_subscriber",
"//trellis/containers:dynamic_ring_buffer",
"//trellis/containers:ring_buffer",
],
)
Expand Down
Loading

0 comments on commit 403a91b

Please sign in to comment.