diff --git a/trellis/containers/BUILD.bazel b/trellis/containers/BUILD.bazel index f0dc3bd..0594232 100644 --- a/trellis/containers/BUILD.bazel +++ b/trellis/containers/BUILD.bazel @@ -21,3 +21,9 @@ cc_library( hdrs = ["ring_buffer.hpp"], visibility = ["//visibility:public"], ) + +cc_library( + name = "dynamic_ring_buffer", + hdrs = ["dynamic_ring_buffer.hpp"], + visibility = ["//visibility:public"], +) diff --git a/trellis/containers/dynamic_ring_buffer.hpp b/trellis/containers/dynamic_ring_buffer.hpp new file mode 100644 index 0000000..d4ff03d --- /dev/null +++ b/trellis/containers/dynamic_ring_buffer.hpp @@ -0,0 +1,188 @@ +/* + * Copyright (C) 2023 Agtonomy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef TRELLIS_CONTAINERS_DYNAMIC_RING_BUFFER_HPP_ +#define TRELLIS_CONTAINERS_DYNAMIC_RING_BUFFER_HPP_ + +#include +#include +namespace trellis::containers { + +/** + * @brief A dynamically sized ring buffer. + * + * Dynamically allocated but never deallocates. Only ever allocates to the maximum concurrently stored values. When the + * buffer is full, the capacity is doubled. + * + * Uses unmasked indices for simplicity, meaning we never have to bound our indices, they naturally underflow and + * overflow appropriately. This works since capacity is always a power of 2, so the overlow value is always divisible by + * capacity, so wrapping around and then masking with capacity always gives the next/previous element. + * + * @tparam T the type to store + */ +template +class DynamicRingBuffer { + public: + using value_type = T; + + DynamicRingBuffer() = default; + + ~DynamicRingBuffer() { + for (auto i = begin_; i != end_; ++i) std::destroy_at(&data_[Mask(i)]); + std::allocator{}.deallocate(data_, capacity_); + } + + DynamicRingBuffer(const DynamicRingBuffer& other) = delete; + DynamicRingBuffer& operator=(const DynamicRingBuffer&) = delete; + + DynamicRingBuffer(DynamicRingBuffer&& other) noexcept { + data_ = other.data_; + capacity_ = other.capacity_; + begin_ = other.begin_; + end_ = other.end_; + other.data_ = nullptr; + other.capacity_ = 0; + other.begin_ = 0; + other.end_ = 0; + } + + DynamicRingBuffer& operator=(DynamicRingBuffer&& other) noexcept { + for (auto i = begin_; i != end_; ++i) std::destroy_at(&data_[Mask(i)]); + std::allocator{}.deallocate(data_, capacity_); + data_ = other.data_; + capacity_ = other.capacity_; + begin_ = other.begin_; + end_ = other.end_; + other.data_ = nullptr; + other.capacity_ = 0; + other.begin_ = 0; + other.end_ = 0; + return *this; + } + + size_t size() const { return end_ - begin_; } // Overflow handles correctly. + bool empty() const { return end_ == begin_; } + + void push_back(T t) { + if (size() == capacity_) IncreaseCapacity(); + std::construct_at(&data_[Mask(end_)], std::move(t)); + ++end_; + } + + void pop_front() { + std::destroy_at(&data_[Mask(begin_)]); + ++begin_; + } + + class ConstIterator { + public: + using iterator_category = std::random_access_iterator_tag; + using value_type = T; + using pointer = const T*; + using reference = const T&; + using difference_type = std::ptrdiff_t; + + // Constructors. + ConstIterator() = default; + ConstIterator(const T* data, size_t capacity, size_t index) : data_{data}, capacity_{capacity}, index_{index} {} + + // Pointer like operators. + reference operator*() const { return data_[Mask(index_)]; } + pointer operator->() const { return &data_[Mask(index_)]; } + reference operator[](const difference_type offset) const { return data_[Mask(index_ + offset)]; } + + // Increment / Decrement + ConstIterator& operator++() { + ++index_; + return *this; + } + + ConstIterator& operator--() { + --index_; + return *this; + } + + // Arithmetic + ConstIterator& operator+=(const difference_type offset) { + index_ += offset; + return *this; + } + + ConstIterator operator+(const difference_type offset) const { return {data_, capacity_, index_ + offset}; } + + friend ConstIterator operator+(const difference_type offset, const ConstIterator& right) { + return {right.data_, right.capacity_, right.index_ + offset}; + } + + ConstIterator& operator-=(const difference_type offset) { + index_ -= offset; + return *this; + } + + ConstIterator operator-(const difference_type offset) const { return {data_, capacity_, index_ - offset}; } + + difference_type operator-(const ConstIterator& right) const { return index_ - right.index_; } + + private: + // Comparison operators + friend bool operator==(const ConstIterator&, const ConstIterator&) = default; + friend bool operator!=(const ConstIterator&, const ConstIterator&) = default; + + // Go from unmasked index to masked index (can be used to access data). + size_t Mask(const size_t index) const { return index & (capacity_ - 1); } + + const T* data_ = nullptr; + size_t capacity_ = 0; + size_t index_ = 0; // Unmasked, integer overflow works appropriately since capacity is a power of 2. + }; + + ConstIterator begin() const { return {data_, capacity_, begin_}; } + ConstIterator end() const { return {data_, capacity_, end_}; } + + private: + // Go from unmasked index to masked index (can be used to access data). + size_t Mask(const size_t index) const { return index & (capacity_ - 1); } + + void IncreaseCapacity() { + const auto new_capacity = capacity_ == 0 ? 1 : capacity_ * 2; + IncreaseCapacity(new_capacity); + } + + void IncreaseCapacity(const size_t new_capacity) { + const auto new_data = allocator_.allocate(new_capacity); + const auto size = this->size(); + for (auto new_i = size_t{}, old_i = begin_; old_i != end_; ++new_i, ++old_i) { + std::construct_at(&new_data[new_i], std::move(data_[Mask(old_i)])); + std::destroy_at(&data_[Mask(old_i)]); + } + allocator_.deallocate(data_, capacity_); + data_ = new_data; + capacity_ = new_capacity; + begin_ = 0; + end_ = size; + } + + T* data_ = nullptr; + size_t capacity_ = 0; // Always a power of 2 (or 0). + size_t begin_ = 0; // Unmasked, integer overflow works appropriately since capacity is a power of 2. + size_t end_ = 0; // Unmasked, integer overflow works appropriately since capacity is a power of 2. + std::allocator allocator_ = {}; +}; + +} // namespace trellis::containers + +#endif // TRELLIS_CONTAINERS_DYNAMIC_RING_BUFFER_HPP_ diff --git a/trellis/containers/ring_buffer.hpp b/trellis/containers/ring_buffer.hpp index c82b35c..8c87aea 100644 --- a/trellis/containers/ring_buffer.hpp +++ b/trellis/containers/ring_buffer.hpp @@ -20,7 +20,6 @@ #include #include - namespace trellis::containers { /** diff --git a/trellis/containers/test/BUILD.bazel b/trellis/containers/test/BUILD.bazel index 10caea2..144aac4 100644 --- a/trellis/containers/test/BUILD.bazel +++ b/trellis/containers/test/BUILD.bazel @@ -45,3 +45,12 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_test( + name = "dynamic_ring_buffer_test", + srcs = ["dynamic_ring_buffer_test.cpp"], + deps = [ + "//trellis/containers:dynamic_ring_buffer", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/trellis/containers/test/dynamic_ring_buffer_test.cpp b/trellis/containers/test/dynamic_ring_buffer_test.cpp new file mode 100644 index 0000000..2764e05 --- /dev/null +++ b/trellis/containers/test/dynamic_ring_buffer_test.cpp @@ -0,0 +1,76 @@ +#include "trellis/containers/dynamic_ring_buffer.hpp" + +#include + +namespace trellis::containers { + +namespace { + +using testing::ElementsAre; +using testing::Eq; +using testing::IsEmpty; +using testing::Pointee; +using testing::SizeIs; + +// unique_ptr is a good test element because it has move but not copy semantics. +using DynamicTestBuffer = DynamicRingBuffer>; + +} // namespace + +TEST(DynamicRingBuffer, Empty) { + const auto ring = DynamicTestBuffer{}; + ASSERT_THAT(ring, SizeIs(0)); + ASSERT_THAT(ring, IsEmpty()); + ASSERT_THAT(ring.begin(), Eq(ring.end())); +} + +TEST(DynamicRingBuffer, PushBack) { + auto ring = DynamicTestBuffer{}; + ring.push_back(std::make_unique(0)); + ring.push_back(std::make_unique(1)); + ring.push_back(std::make_unique(2)); + ASSERT_THAT(ring, SizeIs(3)); + ASSERT_THAT(ring, ElementsAre(Pointee(0), Pointee(1), Pointee(2))); +} + +TEST(DynamicRingBuffer, PopFront) { + auto ring = DynamicTestBuffer{}; + ring.push_back(std::make_unique(0)); + ring.push_back(std::make_unique(1)); + ring.push_back(std::make_unique(2)); + ring.pop_front(); + ASSERT_THAT(ring, SizeIs(2)); + ASSERT_THAT(ring, ElementsAre(Pointee(1), Pointee(2))); +} + +TEST(DynamicRingBuffer, PushPopFront) { + auto ring = DynamicTestBuffer{}; + ring.push_back(std::make_unique(0)); + ring.push_back(std::make_unique(1)); + ring.push_back(std::make_unique(2)); + ring.pop_front(); + ring.pop_front(); + ring.push_back(std::make_unique(3)); + ASSERT_THAT(ring, SizeIs(2)); + ASSERT_THAT(ring, ElementsAre(Pointee(2), Pointee(3))); +} + +TEST(DynamicRingBuffer, MoveConstructor) { + auto ring1 = DynamicTestBuffer{}; + ring1.push_back(std::make_unique(0)); + const auto ring2 = std::move(ring1); + ASSERT_THAT(ring1, SizeIs(0)); + ASSERT_THAT(ring2, ElementsAre(Pointee(0))); +} + +TEST(DynamicRingBuffer, MoveAssignment) { + auto ring1 = DynamicTestBuffer{}; + ring1.push_back(std::make_unique(0)); + auto ring2 = DynamicTestBuffer{}; + ring2.push_back(std::make_unique(1)); + ring2 = std::move(ring1); + ASSERT_THAT(ring1, SizeIs(0)); + ASSERT_THAT(ring2, ElementsAre(Pointee(0))); +} + +} // namespace trellis::containers diff --git a/trellis/containers/test/ring_buffer_test.cpp b/trellis/containers/test/ring_buffer_test.cpp index f5aba8c..5fc5e77 100644 --- a/trellis/containers/test/ring_buffer_test.cpp +++ b/trellis/containers/test/ring_buffer_test.cpp @@ -12,8 +12,6 @@ using testing::IsEmpty; using testing::Pointee; using testing::SizeIs; -constexpr size_t kTestCapacity = 10; - // unique_ptr is a good test element because it has move but not copy semantics. using TestBuffer = RingBuffer, 3>; diff --git a/trellis/core/BUILD.bazel b/trellis/core/BUILD.bazel index 18e7af5..d2b1470 100644 --- a/trellis/core/BUILD.bazel +++ b/trellis/core/BUILD.bazel @@ -222,6 +222,7 @@ cc_library( ":core_node", ":core_stamped_message", ":core_subscriber", + "//trellis/containers:dynamic_ring_buffer", "//trellis/containers:ring_buffer", ], ) diff --git a/trellis/core/inbox.hpp b/trellis/core/inbox.hpp index e8bded8..6feb5dc 100644 --- a/trellis/core/inbox.hpp +++ b/trellis/core/inbox.hpp @@ -21,6 +21,7 @@ #include "node.hpp" #include "stamped_message.hpp" #include "subscriber.hpp" +#include "trellis/containers/dynamic_ring_buffer.hpp" #include "trellis/containers/ring_buffer.hpp" namespace trellis::core { @@ -50,9 +51,27 @@ struct NLatest { static constexpr size_t kNLatest = N; }; +/** + * @brief Type for representing how to receive a single topic where we get the latest messages on the topic that are not + * timed out. + * + * Slightly more convenient than NLatest for the common case of getting all the latest messages, with the drawback that + * there is no cap on how many messages are stored and returned. + * + * Also less efficient than NLatest as we have to copy out of the receiver's memory pool since we don't know the amount + * of messages we may need to store. + * + * @tparam MSG_T The message type to receive. + */ +template +struct AllLatest { + using MessageType = MSG_T; + using AllLatestTag = int; // Add an arbitrary type tag so we know we are using this template. +}; + /** * @brief Type for representing how to receive a single topic which is coming from the owner of the inbox so it does not - * need to be received, only set. + * need to be received, only sent. * * @tparam MSG_T The message type. * @tparam SERIALIZED_T The type to serialize to (default is the same as MSG_T). @@ -76,6 +95,10 @@ concept IsNLatestReceiveType = requires { { R::kNLatest } -> std::convertible_to; }; +/// @brief Concept for a type that derives from AllLatest. +template +concept IsAllLatestReceiveType = requires { typename R::AllLatestTag; }; + /// @brief Concept for a type that derives from Loopback. template concept IsLoopbackReceiveType = requires { @@ -83,16 +106,17 @@ concept IsLoopbackReceiveType = requires { typename R::SerializationFn; }; -/// @brief Concept for a type that derives from either Latest or NLatest and is valid for use in the inbox. +/// @brief Concept for a type that derives from one of our receive types and is valid for use in the inbox. template concept IsReceiveType = requires { typename R::MessageType; } && - (IsLatestReceiveType || IsNLatestReceiveType || IsLoopbackReceiveType); + (IsLatestReceiveType || IsNLatestReceiveType || IsAllLatestReceiveType || + IsLoopbackReceiveType); /** * @brief An inbox for getting the latest messages on various channels. * * @tparam ReceiveTypes the message receive types which should be IsReceiveTypes, which are specializations of the - * templates Latest or NLatest. + * templates Latest, NLatest, AllLatest, or Loopback. */ template class Inbox { @@ -113,20 +137,16 @@ class Inbox { template struct InboxReturnType; - /// @brief Defines what a NLatest receive type will return in GetMessages. - template + /// @brief Defines what a NLatest or AllLatest receive type will return in GetMessages. + template + requires IsNLatestReceiveType || IsAllLatestReceiveType struct InboxReturnType { using type = std::vector>; }; - /// @brief Defines what a Latest receive type will return in GetMessages. - template - struct InboxReturnType { - using type = std::optional>; - }; - - /// @brief Defines what a Loopback receive type will return in GetMessages. - template + /// @brief Defines what a Latest or Loopback receive type will return in GetMessages. + template + requires IsLatestReceiveType || IsLoopbackReceiveType struct InboxReturnType { using type = std::optional>; }; @@ -222,6 +242,22 @@ class Inbox { time::TimePoint::duration timeout; }; + /// @brief Struct to hold the state required for receiving the all-latest messages. + /// @tparam R the ReceiveType to follow. + template + struct Receiver { + using ReceiveType = R; + using MessageType = ReceiveType::MessageType; + + // We use the default subscriber memory pool size since we will copy messages out of the subscriber since we don't + // know how large to size it. + Subscriber subscriber; + // We use a unique_ptr so we can pass capture in the sub callback safely, even if this receiver moves around. This + // ptr should always point to the same value. + std::unique_ptr>> buffer; + time::TimePoint::duration timeout; + }; + /// @brief Struct to hold the state required for receiving the loopback message. /// @tparam R the ReceiveType to follow. template @@ -280,6 +316,25 @@ class Inbox { return Receiver{std::move(subscriber), std::move(buffer), timeouts[Index]}; } + /// @brief Make an all-latest receiver for topic at position Index in the ReceiveTypes, topics, and timeouts. + template + static auto MakeAllLatestReceiver(Node& node, const TopicArray& topics, const MessageTimeouts& timeouts) { + using ReceiveType = std::tuple_element_t>; + using MessageType = ReceiveType::MessageType; + + // Not const to allow move. + auto buffer = std::make_unique>>(); + + // Not const to allow move. + auto subscriber = node.CreateSubscriber( + topics[Index], + [&buffer = *buffer](const time::TimePoint&, const time::TimePoint& msgtime, MessagePointer msg) { + buffer.push_back({msgtime, *msg}); // Copies the message out of the subscriber memory pool. + }); + + return Receiver{std::move(subscriber), std::move(buffer), timeouts[Index]}; + } + /// @brief Make a loopback receiver for topic at position Index in the ReceiveTypes, topics, and timeouts. template static auto MakeLoopbackReceiver(Node& node, const TopicArray& topics, const MessageTimeouts& timeouts) { @@ -301,6 +356,8 @@ class Inbox { return MakeNLatestReceiver(node, topics, timeouts); } else if constexpr (IsLoopbackReceiveType) { return MakeLoopbackReceiver(node, topics, timeouts); + } else if constexpr (IsAllLatestReceiveType) { + return MakeAllLatestReceiver(node, topics, timeouts); } } @@ -335,6 +392,20 @@ class Inbox { return ret; } + /// @brief Messages return generation for receiving the all-latest messages. + template + static auto Receive(const time::TimePoint& time, const Receiver& receiver) { + // Clear out stale messages from the buffer. Single pops are very efficient in the ring buffer. + while (!receiver.buffer->empty() && receiver.buffer->begin()->first < time - receiver.timeout) { + receiver.buffer->pop_front(); + } + + auto ret = InboxReturnType_t{}; + ret.reserve(receiver.buffer->size()); + for (const auto& [time, message] : *receiver.buffer) ret.emplace_back(time, message); + return ret; + } + /// @brief Messages return generation for receiving the loopback messages. template static InboxReturnType_t Receive(const time::TimePoint& time, const Receiver& receiver) { diff --git a/trellis/core/test/inbox_test.cpp b/trellis/core/test/inbox_test.cpp index bfe1dbd..a80a0c0 100644 --- a/trellis/core/test/inbox_test.cpp +++ b/trellis/core/test/inbox_test.cpp @@ -72,14 +72,41 @@ Matcher TestTwoIs(const std::string bar) { return Property("bar", &Test static_assert(IsLatestReceiveType>, "Test IsLatestReceiveType concept."); static_assert(!IsLatestReceiveType, "Test IsLatestReceiveType concept, not satisfied by arbitrary type."); -static_assert(!IsNLatestReceiveType>, "Test IsLatestReceiveType concept, not satisfied by NLatest type."); +static_assert(!IsLatestReceiveType>, + "Test IsLatestReceiveType concept, not satisfied by NLatest type."); +static_assert(!IsLatestReceiveType>, + "Test IsLatestReceiveType concept, not satisfied by AllLatest type."); +static_assert(!IsLatestReceiveType>, "Test IsLatestReceiveType concept, not satisfied by Loopback type."); static_assert(IsNLatestReceiveType>, "Test IsNLatestReceiveType concept."); static_assert(!IsNLatestReceiveType, "Test IsNLatestReceiveType concept, not satisfied by arbitrary type."); static_assert(!IsNLatestReceiveType>, "Test IsNLatestReceiveType concept, not satisfied by Latest type."); +static_assert(!IsNLatestReceiveType>, + "Test IsNLatestReceiveType concept, not satisfied by AllLatest type."); +static_assert(!IsNLatestReceiveType>, + "Test IsNLatestReceiveType concept, not satisfied by Loopback type."); + +static_assert(IsAllLatestReceiveType>, "Test IsAllLatestReceiveType concept."); +static_assert(!IsAllLatestReceiveType, "Test IsAllLatestReceiveType concept, not satisfied by arbitrary type."); +static_assert(!IsAllLatestReceiveType>, + "Test IsAllLatestReceiveType concept, not satisfied by Latest type."); +static_assert(!IsAllLatestReceiveType>, + "Test IsAllLatestReceiveType concept, not satisfied by NLatest type."); +static_assert(!IsAllLatestReceiveType>, + "Test IsAllLatestReceiveType concept, not satisfied by Loopback type."); + +static_assert(IsLoopbackReceiveType>, "Test IsLoopbackReceiveType concept."); +static_assert(!IsLoopbackReceiveType, "Test IsLoopbackReceiveType concept, not satisfied by arbitrary type."); +static_assert(!IsLoopbackReceiveType>, "Test IsLoopbackReceiveType concept, not satisfied by Latest type."); +static_assert(!IsLoopbackReceiveType>, + "Test IsLoopbackReceiveType concept, not satisfied by NLatest type."); +static_assert(!IsLoopbackReceiveType>, + "Test IsLoopbackReceiveType concept, not satisfied by AllLatest type."); static_assert(IsReceiveType>, "Test IsReceiveType concept, satisfied by Latest."); static_assert(IsReceiveType>, "Test IsReceiveType concept, satisfied by NLatest."); +static_assert(IsReceiveType>, "Test IsReceiveType concept, satisfied by AllLatest."); +static_assert(IsReceiveType>, "Test IsReceiveType concept, satisfied by Loopback."); TEST_F(TrellisFixture, InboxNoMessages) { StartRunnerThread(); @@ -297,6 +324,80 @@ TEST_F(TrellisFixture, InboxNLatestTimeout) { << "The oldest message has timed out."; } +TEST_F(TrellisFixture, InboxAllLatestEmpty) { + StartRunnerThread(); + + auto pub = node_.CreatePublisher("topic"); + + const auto inbox = Inbox>{node_, {"topic"}, {100ms}}; + + WaitForDiscovery(); + + WaitForSendReceive(); + + ASSERT_THAT(inbox.GetMessages(kT0), FieldsAre(IsEmpty())) << "No messages in all latest."; +} + +TEST_F(TrellisFixture, InboxAllLatestOne) { + StartRunnerThread(); + + auto pub = node_.CreatePublisher("topic"); + + const auto inbox = Inbox>{node_, {"topic"}, {100ms}}; + + WaitForDiscovery(); + + pub->Send(MakeTest("hello"), kT0); + + WaitForSendReceive(); + + ASSERT_THAT(inbox.GetMessages(kT0), FieldsAre(ElementsAre(StampedMessageIs(kT0, TestIs("hello"))))) + << "Only 1 message has been sent, so we only get one back."; +} + +TEST_F(TrellisFixture, InboxAllLatestMany) { + StartRunnerThread(); + + auto pub = node_.CreatePublisher("topic"); + + const auto inbox = Inbox>{node_, {"topic"}, {100ms}}; + + WaitForDiscovery(); + + pub->Send(MakeTest("hello"), kT0); + WaitForSendReceive(); + pub->Send(MakeTest("hello1"), kT0 + 1ms); + WaitForSendReceive(); + pub->Send(MakeTest("hello2"), kT0 + 2ms); + WaitForSendReceive(); + + ASSERT_THAT(inbox.GetMessages(kT0 + 3ms), FieldsAre(ElementsAre(StampedMessageIs(kT0, TestIs("hello")), + StampedMessageIs(kT0 + 1ms, TestIs("hello1")), + StampedMessageIs(kT0 + 2ms, TestIs("hello2"))))) + << "Multiple messages received, multiple messages returned."; +} + +TEST_F(TrellisFixture, InboxAllLatestTimeout) { + StartRunnerThread(); + + auto pub = node_.CreatePublisher("topic"); + + const auto inbox = Inbox>{node_, {"topic"}, {100ms}}; + + WaitForDiscovery(); + + pub->Send(MakeTest("hello"), kT0); + WaitForSendReceive(); + pub->Send(MakeTest("hello1"), kT0 + 1ms); + WaitForSendReceive(); + pub->Send(MakeTest("hello2"), kT0 + 2ms); + WaitForSendReceive(); + + ASSERT_THAT(inbox.GetMessages(kT0 + 101ms), FieldsAre(ElementsAre(StampedMessageIs(kT0 + 1ms, TestIs("hello1")), + StampedMessageIs(kT0 + 2ms, TestIs("hello2"))))) + << "The oldest message has timed out."; +} + TEST_F(TrellisFixture, InboxSimpleLoopback) { StartRunnerThread();