From 403a91b2f57c5f6fe5d64bcd4ae2971033716bb8 Mon Sep 17 00:00:00 2001 From: MattOslin Date: Wed, 17 Apr 2024 17:27:40 -0700 Subject: [PATCH] [inbox] Add AllLatest receive type, a generalization of NLatest (#93) Add an AllLatest receive type for the inbox, which is similar to NLatest except that it is unbounded in how many elements it can return. This makes it slightly less efficient, but is necessary for handling "the last N seconds" of messages on a topic that publishes at an inconsistent frequency. The main efficiency losses are that we have to allocate memory dynamically (not a big deal though, as we will grow to some max size and likely stay there) and that messages are copied out of the subscriber's memory pool and into the dynamic ring buffer (again not too big of an issue, as large messages are mostly on the heap anyway). --- trellis/containers/BUILD.bazel | 6 + trellis/containers/dynamic_ring_buffer.hpp | 188 ++++++++++++++++++ trellis/containers/ring_buffer.hpp | 1 - trellis/containers/test/BUILD.bazel | 9 + .../test/dynamic_ring_buffer_test.cpp | 76 +++++++ trellis/containers/test/ring_buffer_test.cpp | 2 - trellis/core/BUILD.bazel | 1 + trellis/core/inbox.hpp | 99 +++++++-- trellis/core/test/inbox_test.cpp | 103 +++++++++- 9 files changed, 467 insertions(+), 18 deletions(-) create mode 100644 trellis/containers/dynamic_ring_buffer.hpp create mode 100644 trellis/containers/test/dynamic_ring_buffer_test.cpp diff --git a/trellis/containers/BUILD.bazel b/trellis/containers/BUILD.bazel index f0dc3bd..0594232 100644 --- a/trellis/containers/BUILD.bazel +++ b/trellis/containers/BUILD.bazel @@ -21,3 +21,9 @@ cc_library( hdrs = ["ring_buffer.hpp"], visibility = ["//visibility:public"], ) + +cc_library( + name = "dynamic_ring_buffer", + hdrs = ["dynamic_ring_buffer.hpp"], + visibility = ["//visibility:public"], +) diff --git a/trellis/containers/dynamic_ring_buffer.hpp b/trellis/containers/dynamic_ring_buffer.hpp new file mode 100644 index 0000000..d4ff03d --- /dev/null +++ b/trellis/containers/dynamic_ring_buffer.hpp @@ -0,0 +1,188 @@ +/* + * Copyright (C) 2023 Agtonomy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef TRELLIS_CONTAINERS_DYNAMIC_RING_BUFFER_HPP_ +#define TRELLIS_CONTAINERS_DYNAMIC_RING_BUFFER_HPP_ + +#include +#include +namespace trellis::containers { + +/** + * @brief A dynamically sized ring buffer. + * + * Dynamically allocated but never deallocates. Only ever allocates to the maximum concurrently stored values. When the + * buffer is full, the capacity is doubled. + * + * Uses unmasked indices for simplicity, meaning we never have to bound our indices, they naturally underflow and + * overflow appropriately. This works since capacity is always a power of 2, so the overlow value is always divisible by + * capacity, so wrapping around and then masking with capacity always gives the next/previous element. + * + * @tparam T the type to store + */ +template +class DynamicRingBuffer { + public: + using value_type = T; + + DynamicRingBuffer() = default; + + ~DynamicRingBuffer() { + for (auto i = begin_; i != end_; ++i) std::destroy_at(&data_[Mask(i)]); + std::allocator{}.deallocate(data_, capacity_); + } + + DynamicRingBuffer(const DynamicRingBuffer& other) = delete; + DynamicRingBuffer& operator=(const DynamicRingBuffer&) = delete; + + DynamicRingBuffer(DynamicRingBuffer&& other) noexcept { + data_ = other.data_; + capacity_ = other.capacity_; + begin_ = other.begin_; + end_ = other.end_; + other.data_ = nullptr; + other.capacity_ = 0; + other.begin_ = 0; + other.end_ = 0; + } + + DynamicRingBuffer& operator=(DynamicRingBuffer&& other) noexcept { + for (auto i = begin_; i != end_; ++i) std::destroy_at(&data_[Mask(i)]); + std::allocator{}.deallocate(data_, capacity_); + data_ = other.data_; + capacity_ = other.capacity_; + begin_ = other.begin_; + end_ = other.end_; + other.data_ = nullptr; + other.capacity_ = 0; + other.begin_ = 0; + other.end_ = 0; + return *this; + } + + size_t size() const { return end_ - begin_; } // Overflow handles correctly. + bool empty() const { return end_ == begin_; } + + void push_back(T t) { + if (size() == capacity_) IncreaseCapacity(); + std::construct_at(&data_[Mask(end_)], std::move(t)); + ++end_; + } + + void pop_front() { + std::destroy_at(&data_[Mask(begin_)]); + ++begin_; + } + + class ConstIterator { + public: + using iterator_category = std::random_access_iterator_tag; + using value_type = T; + using pointer = const T*; + using reference = const T&; + using difference_type = std::ptrdiff_t; + + // Constructors. + ConstIterator() = default; + ConstIterator(const T* data, size_t capacity, size_t index) : data_{data}, capacity_{capacity}, index_{index} {} + + // Pointer like operators. + reference operator*() const { return data_[Mask(index_)]; } + pointer operator->() const { return &data_[Mask(index_)]; } + reference operator[](const difference_type offset) const { return data_[Mask(index_ + offset)]; } + + // Increment / Decrement + ConstIterator& operator++() { + ++index_; + return *this; + } + + ConstIterator& operator--() { + --index_; + return *this; + } + + // Arithmetic + ConstIterator& operator+=(const difference_type offset) { + index_ += offset; + return *this; + } + + ConstIterator operator+(const difference_type offset) const { return {data_, capacity_, index_ + offset}; } + + friend ConstIterator operator+(const difference_type offset, const ConstIterator& right) { + return {right.data_, right.capacity_, right.index_ + offset}; + } + + ConstIterator& operator-=(const difference_type offset) { + index_ -= offset; + return *this; + } + + ConstIterator operator-(const difference_type offset) const { return {data_, capacity_, index_ - offset}; } + + difference_type operator-(const ConstIterator& right) const { return index_ - right.index_; } + + private: + // Comparison operators + friend bool operator==(const ConstIterator&, const ConstIterator&) = default; + friend bool operator!=(const ConstIterator&, const ConstIterator&) = default; + + // Go from unmasked index to masked index (can be used to access data). + size_t Mask(const size_t index) const { return index & (capacity_ - 1); } + + const T* data_ = nullptr; + size_t capacity_ = 0; + size_t index_ = 0; // Unmasked, integer overflow works appropriately since capacity is a power of 2. + }; + + ConstIterator begin() const { return {data_, capacity_, begin_}; } + ConstIterator end() const { return {data_, capacity_, end_}; } + + private: + // Go from unmasked index to masked index (can be used to access data). + size_t Mask(const size_t index) const { return index & (capacity_ - 1); } + + void IncreaseCapacity() { + const auto new_capacity = capacity_ == 0 ? 1 : capacity_ * 2; + IncreaseCapacity(new_capacity); + } + + void IncreaseCapacity(const size_t new_capacity) { + const auto new_data = allocator_.allocate(new_capacity); + const auto size = this->size(); + for (auto new_i = size_t{}, old_i = begin_; old_i != end_; ++new_i, ++old_i) { + std::construct_at(&new_data[new_i], std::move(data_[Mask(old_i)])); + std::destroy_at(&data_[Mask(old_i)]); + } + allocator_.deallocate(data_, capacity_); + data_ = new_data; + capacity_ = new_capacity; + begin_ = 0; + end_ = size; + } + + T* data_ = nullptr; + size_t capacity_ = 0; // Always a power of 2 (or 0). + size_t begin_ = 0; // Unmasked, integer overflow works appropriately since capacity is a power of 2. + size_t end_ = 0; // Unmasked, integer overflow works appropriately since capacity is a power of 2. + std::allocator allocator_ = {}; +}; + +} // namespace trellis::containers + +#endif // TRELLIS_CONTAINERS_DYNAMIC_RING_BUFFER_HPP_ diff --git a/trellis/containers/ring_buffer.hpp b/trellis/containers/ring_buffer.hpp index c82b35c..8c87aea 100644 --- a/trellis/containers/ring_buffer.hpp +++ b/trellis/containers/ring_buffer.hpp @@ -20,7 +20,6 @@ #include #include - namespace trellis::containers { /** diff --git a/trellis/containers/test/BUILD.bazel b/trellis/containers/test/BUILD.bazel index 10caea2..144aac4 100644 --- a/trellis/containers/test/BUILD.bazel +++ b/trellis/containers/test/BUILD.bazel @@ -45,3 +45,12 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_test( + name = "dynamic_ring_buffer_test", + srcs = ["dynamic_ring_buffer_test.cpp"], + deps = [ + "//trellis/containers:dynamic_ring_buffer", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/trellis/containers/test/dynamic_ring_buffer_test.cpp b/trellis/containers/test/dynamic_ring_buffer_test.cpp new file mode 100644 index 0000000..2764e05 --- /dev/null +++ b/trellis/containers/test/dynamic_ring_buffer_test.cpp @@ -0,0 +1,76 @@ +#include "trellis/containers/dynamic_ring_buffer.hpp" + +#include + +namespace trellis::containers { + +namespace { + +using testing::ElementsAre; +using testing::Eq; +using testing::IsEmpty; +using testing::Pointee; +using testing::SizeIs; + +// unique_ptr is a good test element because it has move but not copy semantics. +using DynamicTestBuffer = DynamicRingBuffer>; + +} // namespace + +TEST(DynamicRingBuffer, Empty) { + const auto ring = DynamicTestBuffer{}; + ASSERT_THAT(ring, SizeIs(0)); + ASSERT_THAT(ring, IsEmpty()); + ASSERT_THAT(ring.begin(), Eq(ring.end())); +} + +TEST(DynamicRingBuffer, PushBack) { + auto ring = DynamicTestBuffer{}; + ring.push_back(std::make_unique(0)); + ring.push_back(std::make_unique(1)); + ring.push_back(std::make_unique(2)); + ASSERT_THAT(ring, SizeIs(3)); + ASSERT_THAT(ring, ElementsAre(Pointee(0), Pointee(1), Pointee(2))); +} + +TEST(DynamicRingBuffer, PopFront) { + auto ring = DynamicTestBuffer{}; + ring.push_back(std::make_unique(0)); + ring.push_back(std::make_unique(1)); + ring.push_back(std::make_unique(2)); + ring.pop_front(); + ASSERT_THAT(ring, SizeIs(2)); + ASSERT_THAT(ring, ElementsAre(Pointee(1), Pointee(2))); +} + +TEST(DynamicRingBuffer, PushPopFront) { + auto ring = DynamicTestBuffer{}; + ring.push_back(std::make_unique(0)); + ring.push_back(std::make_unique(1)); + ring.push_back(std::make_unique(2)); + ring.pop_front(); + ring.pop_front(); + ring.push_back(std::make_unique(3)); + ASSERT_THAT(ring, SizeIs(2)); + ASSERT_THAT(ring, ElementsAre(Pointee(2), Pointee(3))); +} + +TEST(DynamicRingBuffer, MoveConstructor) { + auto ring1 = DynamicTestBuffer{}; + ring1.push_back(std::make_unique(0)); + const auto ring2 = std::move(ring1); + ASSERT_THAT(ring1, SizeIs(0)); + ASSERT_THAT(ring2, ElementsAre(Pointee(0))); +} + +TEST(DynamicRingBuffer, MoveAssignment) { + auto ring1 = DynamicTestBuffer{}; + ring1.push_back(std::make_unique(0)); + auto ring2 = DynamicTestBuffer{}; + ring2.push_back(std::make_unique(1)); + ring2 = std::move(ring1); + ASSERT_THAT(ring1, SizeIs(0)); + ASSERT_THAT(ring2, ElementsAre(Pointee(0))); +} + +} // namespace trellis::containers diff --git a/trellis/containers/test/ring_buffer_test.cpp b/trellis/containers/test/ring_buffer_test.cpp index f5aba8c..5fc5e77 100644 --- a/trellis/containers/test/ring_buffer_test.cpp +++ b/trellis/containers/test/ring_buffer_test.cpp @@ -12,8 +12,6 @@ using testing::IsEmpty; using testing::Pointee; using testing::SizeIs; -constexpr size_t kTestCapacity = 10; - // unique_ptr is a good test element because it has move but not copy semantics. using TestBuffer = RingBuffer, 3>; diff --git a/trellis/core/BUILD.bazel b/trellis/core/BUILD.bazel index 18e7af5..d2b1470 100644 --- a/trellis/core/BUILD.bazel +++ b/trellis/core/BUILD.bazel @@ -222,6 +222,7 @@ cc_library( ":core_node", ":core_stamped_message", ":core_subscriber", + "//trellis/containers:dynamic_ring_buffer", "//trellis/containers:ring_buffer", ], ) diff --git a/trellis/core/inbox.hpp b/trellis/core/inbox.hpp index e8bded8..6feb5dc 100644 --- a/trellis/core/inbox.hpp +++ b/trellis/core/inbox.hpp @@ -21,6 +21,7 @@ #include "node.hpp" #include "stamped_message.hpp" #include "subscriber.hpp" +#include "trellis/containers/dynamic_ring_buffer.hpp" #include "trellis/containers/ring_buffer.hpp" namespace trellis::core { @@ -50,9 +51,27 @@ struct NLatest { static constexpr size_t kNLatest = N; }; +/** + * @brief Type for representing how to receive a single topic where we get the latest messages on the topic that are not + * timed out. + * + * Slightly more convenient than NLatest for the common case of getting all the latest messages, with the drawback that + * there is no cap on how many messages are stored and returned. + * + * Also less efficient than NLatest as we have to copy out of the receiver's memory pool since we don't know the amount + * of messages we may need to store. + * + * @tparam MSG_T The message type to receive. + */ +template +struct AllLatest { + using MessageType = MSG_T; + using AllLatestTag = int; // Add an arbitrary type tag so we know we are using this template. +}; + /** * @brief Type for representing how to receive a single topic which is coming from the owner of the inbox so it does not - * need to be received, only set. + * need to be received, only sent. * * @tparam MSG_T The message type. * @tparam SERIALIZED_T The type to serialize to (default is the same as MSG_T). @@ -76,6 +95,10 @@ concept IsNLatestReceiveType = requires { { R::kNLatest } -> std::convertible_to; }; +/// @brief Concept for a type that derives from AllLatest. +template +concept IsAllLatestReceiveType = requires { typename R::AllLatestTag; }; + /// @brief Concept for a type that derives from Loopback. template concept IsLoopbackReceiveType = requires { @@ -83,16 +106,17 @@ concept IsLoopbackReceiveType = requires { typename R::SerializationFn; }; -/// @brief Concept for a type that derives from either Latest or NLatest and is valid for use in the inbox. +/// @brief Concept for a type that derives from one of our receive types and is valid for use in the inbox. template concept IsReceiveType = requires { typename R::MessageType; } && - (IsLatestReceiveType || IsNLatestReceiveType || IsLoopbackReceiveType); + (IsLatestReceiveType || IsNLatestReceiveType || IsAllLatestReceiveType || + IsLoopbackReceiveType); /** * @brief An inbox for getting the latest messages on various channels. * * @tparam ReceiveTypes the message receive types which should be IsReceiveTypes, which are specializations of the - * templates Latest or NLatest. + * templates Latest, NLatest, AllLatest, or Loopback. */ template class Inbox { @@ -113,20 +137,16 @@ class Inbox { template struct InboxReturnType; - /// @brief Defines what a NLatest receive type will return in GetMessages. - template + /// @brief Defines what a NLatest or AllLatest receive type will return in GetMessages. + template + requires IsNLatestReceiveType || IsAllLatestReceiveType struct InboxReturnType { using type = std::vector>; }; - /// @brief Defines what a Latest receive type will return in GetMessages. - template - struct InboxReturnType { - using type = std::optional>; - }; - - /// @brief Defines what a Loopback receive type will return in GetMessages. - template + /// @brief Defines what a Latest or Loopback receive type will return in GetMessages. + template + requires IsLatestReceiveType || IsLoopbackReceiveType struct InboxReturnType { using type = std::optional>; }; @@ -222,6 +242,22 @@ class Inbox { time::TimePoint::duration timeout; }; + /// @brief Struct to hold the state required for receiving the all-latest messages. + /// @tparam R the ReceiveType to follow. + template + struct Receiver { + using ReceiveType = R; + using MessageType = ReceiveType::MessageType; + + // We use the default subscriber memory pool size since we will copy messages out of the subscriber since we don't + // know how large to size it. + Subscriber subscriber; + // We use a unique_ptr so we can pass capture in the sub callback safely, even if this receiver moves around. This + // ptr should always point to the same value. + std::unique_ptr>> buffer; + time::TimePoint::duration timeout; + }; + /// @brief Struct to hold the state required for receiving the loopback message. /// @tparam R the ReceiveType to follow. template @@ -280,6 +316,25 @@ class Inbox { return Receiver{std::move(subscriber), std::move(buffer), timeouts[Index]}; } + /// @brief Make an all-latest receiver for topic at position Index in the ReceiveTypes, topics, and timeouts. + template + static auto MakeAllLatestReceiver(Node& node, const TopicArray& topics, const MessageTimeouts& timeouts) { + using ReceiveType = std::tuple_element_t>; + using MessageType = ReceiveType::MessageType; + + // Not const to allow move. + auto buffer = std::make_unique>>(); + + // Not const to allow move. + auto subscriber = node.CreateSubscriber( + topics[Index], + [&buffer = *buffer](const time::TimePoint&, const time::TimePoint& msgtime, MessagePointer msg) { + buffer.push_back({msgtime, *msg}); // Copies the message out of the subscriber memory pool. + }); + + return Receiver{std::move(subscriber), std::move(buffer), timeouts[Index]}; + } + /// @brief Make a loopback receiver for topic at position Index in the ReceiveTypes, topics, and timeouts. template static auto MakeLoopbackReceiver(Node& node, const TopicArray& topics, const MessageTimeouts& timeouts) { @@ -301,6 +356,8 @@ class Inbox { return MakeNLatestReceiver(node, topics, timeouts); } else if constexpr (IsLoopbackReceiveType) { return MakeLoopbackReceiver(node, topics, timeouts); + } else if constexpr (IsAllLatestReceiveType) { + return MakeAllLatestReceiver(node, topics, timeouts); } } @@ -335,6 +392,20 @@ class Inbox { return ret; } + /// @brief Messages return generation for receiving the all-latest messages. + template + static auto Receive(const time::TimePoint& time, const Receiver& receiver) { + // Clear out stale messages from the buffer. Single pops are very efficient in the ring buffer. + while (!receiver.buffer->empty() && receiver.buffer->begin()->first < time - receiver.timeout) { + receiver.buffer->pop_front(); + } + + auto ret = InboxReturnType_t{}; + ret.reserve(receiver.buffer->size()); + for (const auto& [time, message] : *receiver.buffer) ret.emplace_back(time, message); + return ret; + } + /// @brief Messages return generation for receiving the loopback messages. template static InboxReturnType_t Receive(const time::TimePoint& time, const Receiver& receiver) { diff --git a/trellis/core/test/inbox_test.cpp b/trellis/core/test/inbox_test.cpp index bfe1dbd..a80a0c0 100644 --- a/trellis/core/test/inbox_test.cpp +++ b/trellis/core/test/inbox_test.cpp @@ -72,14 +72,41 @@ Matcher TestTwoIs(const std::string bar) { return Property("bar", &Test static_assert(IsLatestReceiveType>, "Test IsLatestReceiveType concept."); static_assert(!IsLatestReceiveType, "Test IsLatestReceiveType concept, not satisfied by arbitrary type."); -static_assert(!IsNLatestReceiveType>, "Test IsLatestReceiveType concept, not satisfied by NLatest type."); +static_assert(!IsLatestReceiveType>, + "Test IsLatestReceiveType concept, not satisfied by NLatest type."); +static_assert(!IsLatestReceiveType>, + "Test IsLatestReceiveType concept, not satisfied by AllLatest type."); +static_assert(!IsLatestReceiveType>, "Test IsLatestReceiveType concept, not satisfied by Loopback type."); static_assert(IsNLatestReceiveType>, "Test IsNLatestReceiveType concept."); static_assert(!IsNLatestReceiveType, "Test IsNLatestReceiveType concept, not satisfied by arbitrary type."); static_assert(!IsNLatestReceiveType>, "Test IsNLatestReceiveType concept, not satisfied by Latest type."); +static_assert(!IsNLatestReceiveType>, + "Test IsNLatestReceiveType concept, not satisfied by AllLatest type."); +static_assert(!IsNLatestReceiveType>, + "Test IsNLatestReceiveType concept, not satisfied by Loopback type."); + +static_assert(IsAllLatestReceiveType>, "Test IsAllLatestReceiveType concept."); +static_assert(!IsAllLatestReceiveType, "Test IsAllLatestReceiveType concept, not satisfied by arbitrary type."); +static_assert(!IsAllLatestReceiveType>, + "Test IsAllLatestReceiveType concept, not satisfied by Latest type."); +static_assert(!IsAllLatestReceiveType>, + "Test IsAllLatestReceiveType concept, not satisfied by NLatest type."); +static_assert(!IsAllLatestReceiveType>, + "Test IsAllLatestReceiveType concept, not satisfied by Loopback type."); + +static_assert(IsLoopbackReceiveType>, "Test IsLoopbackReceiveType concept."); +static_assert(!IsLoopbackReceiveType, "Test IsLoopbackReceiveType concept, not satisfied by arbitrary type."); +static_assert(!IsLoopbackReceiveType>, "Test IsLoopbackReceiveType concept, not satisfied by Latest type."); +static_assert(!IsLoopbackReceiveType>, + "Test IsLoopbackReceiveType concept, not satisfied by NLatest type."); +static_assert(!IsLoopbackReceiveType>, + "Test IsLoopbackReceiveType concept, not satisfied by AllLatest type."); static_assert(IsReceiveType>, "Test IsReceiveType concept, satisfied by Latest."); static_assert(IsReceiveType>, "Test IsReceiveType concept, satisfied by NLatest."); +static_assert(IsReceiveType>, "Test IsReceiveType concept, satisfied by AllLatest."); +static_assert(IsReceiveType>, "Test IsReceiveType concept, satisfied by Loopback."); TEST_F(TrellisFixture, InboxNoMessages) { StartRunnerThread(); @@ -297,6 +324,80 @@ TEST_F(TrellisFixture, InboxNLatestTimeout) { << "The oldest message has timed out."; } +TEST_F(TrellisFixture, InboxAllLatestEmpty) { + StartRunnerThread(); + + auto pub = node_.CreatePublisher("topic"); + + const auto inbox = Inbox>{node_, {"topic"}, {100ms}}; + + WaitForDiscovery(); + + WaitForSendReceive(); + + ASSERT_THAT(inbox.GetMessages(kT0), FieldsAre(IsEmpty())) << "No messages in all latest."; +} + +TEST_F(TrellisFixture, InboxAllLatestOne) { + StartRunnerThread(); + + auto pub = node_.CreatePublisher("topic"); + + const auto inbox = Inbox>{node_, {"topic"}, {100ms}}; + + WaitForDiscovery(); + + pub->Send(MakeTest("hello"), kT0); + + WaitForSendReceive(); + + ASSERT_THAT(inbox.GetMessages(kT0), FieldsAre(ElementsAre(StampedMessageIs(kT0, TestIs("hello"))))) + << "Only 1 message has been sent, so we only get one back."; +} + +TEST_F(TrellisFixture, InboxAllLatestMany) { + StartRunnerThread(); + + auto pub = node_.CreatePublisher("topic"); + + const auto inbox = Inbox>{node_, {"topic"}, {100ms}}; + + WaitForDiscovery(); + + pub->Send(MakeTest("hello"), kT0); + WaitForSendReceive(); + pub->Send(MakeTest("hello1"), kT0 + 1ms); + WaitForSendReceive(); + pub->Send(MakeTest("hello2"), kT0 + 2ms); + WaitForSendReceive(); + + ASSERT_THAT(inbox.GetMessages(kT0 + 3ms), FieldsAre(ElementsAre(StampedMessageIs(kT0, TestIs("hello")), + StampedMessageIs(kT0 + 1ms, TestIs("hello1")), + StampedMessageIs(kT0 + 2ms, TestIs("hello2"))))) + << "Multiple messages received, multiple messages returned."; +} + +TEST_F(TrellisFixture, InboxAllLatestTimeout) { + StartRunnerThread(); + + auto pub = node_.CreatePublisher("topic"); + + const auto inbox = Inbox>{node_, {"topic"}, {100ms}}; + + WaitForDiscovery(); + + pub->Send(MakeTest("hello"), kT0); + WaitForSendReceive(); + pub->Send(MakeTest("hello1"), kT0 + 1ms); + WaitForSendReceive(); + pub->Send(MakeTest("hello2"), kT0 + 2ms); + WaitForSendReceive(); + + ASSERT_THAT(inbox.GetMessages(kT0 + 101ms), FieldsAre(ElementsAre(StampedMessageIs(kT0 + 1ms, TestIs("hello1")), + StampedMessageIs(kT0 + 2ms, TestIs("hello2"))))) + << "The oldest message has timed out."; +} + TEST_F(TrellisFixture, InboxSimpleLoopback) { StartRunnerThread();