diff --git a/third_party/repositories.bzl b/third_party/repositories.bzl index e3713d1..ec77282 100644 --- a/third_party/repositories.bzl +++ b/third_party/repositories.bzl @@ -6,10 +6,10 @@ def trellis_deps(): http_archive, name = "ecal", build_file = Label("//third_party:ecal.BUILD"), - sha256 = "5a0e575a73ab54b4ec2e48815af13860ce9fda43c9fa46341e244a543ccb1dd5", - strip_prefix = "ecal-5.12.0", + sha256 = "356e5cf65cc76b8c8a1aeb710d402fe9f5780746e679d74a0f70dfe8fd71ec9a", + strip_prefix = "ecal-5.12.4", urls = [ - "https://github.com/eclipse-ecal/ecal/archive/refs/tags/v5.12.0.tar.gz", + "https://github.com/eclipse-ecal/ecal/archive/refs/tags/v5.12.4.tar.gz", ], ) @@ -197,9 +197,9 @@ def trellis_deps(): http_archive, name = "mcap", build_file = Label("//third_party:mcap.BUILD"), - sha256 = "2833f72344308ea58639f3b363a0cf17669580ae7ab435f43f3b104cff6ef548", - strip_prefix = "mcap-releases-cpp-v0.8.0", - urls = ["https://github.com/foxglove/mcap/archive/refs/tags/releases/cpp/v0.8.0.tar.gz"], + sha256 = "64ff3e51119f37ffcfaf9deecbd987a7cb4d4d9035d74a3fd3773395a470fda1", + strip_prefix = "mcap-releases-cpp-v1.4.0", + urls = ["https://github.com/foxglove/mcap/archive/refs/tags/releases/cpp/v1.4.0.tar.gz"], ) maybe( http_archive, diff --git a/trellis/core/node.cpp b/trellis/core/node.cpp index a72782a..35b2356 100644 --- a/trellis/core/node.cpp +++ b/trellis/core/node.cpp @@ -95,7 +95,8 @@ Timer Node::CreateTimer(unsigned interval_ms, TimerImpl::Callback callback, unsi } Timer Node::CreateOneShotTimer(unsigned initial_delay_ms, TimerImpl::Callback callback) { - auto timer = std::make_shared(GetEventLoop(), TimerImpl::Type::kOneShot, callback, 0, initial_delay_ms); + auto timer = + std::make_shared(GetEventLoop(), TimerImpl::Type::kOneShot, std::move(callback), 0, initial_delay_ms); timers_.push_back(timer); return timer; diff --git a/trellis/core/node.hpp b/trellis/core/node.hpp index 16c9678..2bba8fd 100644 --- a/trellis/core/node.hpp +++ b/trellis/core/node.hpp @@ -129,9 +129,9 @@ class Node { const auto impl = do_watchdog ? std::make_shared>( - GetEventLoop(), std::string{topic}, callback, update_sim_fn, watchdog_callback, + GetEventLoop(), std::string{topic}, std::move(callback), update_sim_fn, std::move(watchdog_callback), [this, initial_delay_ms = watchdog_timeout_ms.value()](TimerImpl::Callback watchdog_callback) { - return CreateOneShotTimer(initial_delay_ms, watchdog_callback); + return CreateOneShotTimer(initial_delay_ms, std::move(watchdog_callback)); }) : std::make_shared>(GetEventLoop(), std::string{topic}, callback, update_sim_fn); @@ -173,8 +173,8 @@ class Node { const std::string& topic, typename trellis::core::SubscriberImpl::Callback callback, std::optional watchdog_timeout_ms = {}, TimerImpl::Callback watchdog_callback = {}, std::optional max_frequency = {}) { - return CreateSubscriber(topic, callback, watchdog_timeout_ms, watchdog_callback, - max_frequency); + return CreateSubscriber(topic, std::move(callback), watchdog_timeout_ms, + std::move(watchdog_callback), max_frequency); } /** diff --git a/trellis/core/service_client.hpp b/trellis/core/service_client.hpp index b244eb7..243b9c5 100644 --- a/trellis/core/service_client.hpp +++ b/trellis/core/service_client.hpp @@ -52,9 +52,9 @@ class ServiceClientImpl { template void CallAsync(const std::string& method_name, const REQ_T& req, Callback cb, unsigned timeout_ms = 0) { // XXX(bsirang): look into eliminating the copy of `req` here - asio::post(*priv_ev_loop_, [this, cb, method_name, req, timeout_ms]() { + asio::post(*priv_ev_loop_, [this, cb = std::move(cb), method_name, req, timeout_ms]() { if (!client_->IsConnected()) { - asio::post(*priv_ev_loop_, [cb]() { + asio::post(*priv_ev_loop_, [cb = std::move(cb)]() { if (cb) cb(kFailure, nullptr); }); return; @@ -70,12 +70,12 @@ class ServiceClientImpl { resp.ParseFromString(service_response.response); } // Invoke callback from event loop thread... - asio::post(*priv_ev_loop_, [status, cb, resp = std::move(resp)]() { + asio::post(*priv_ev_loop_, [status, cb = std::move(cb), resp = std::move(resp)]() { if (cb) cb(status, &resp); }); } } else { - asio::post(*priv_ev_loop_, [cb]() { + asio::post(*priv_ev_loop_, [cb = std::move(cb)]() { if (cb) cb(kTimedOut, nullptr); }); } diff --git a/trellis/core/subscriber.hpp b/trellis/core/subscriber.hpp index d4cfc20..bbc1a73 100644 --- a/trellis/core/subscriber.hpp +++ b/trellis/core/subscriber.hpp @@ -69,6 +69,7 @@ class SubscriberImpl { callback_wrapper_ = [this, callback = std::move(callback)]( const char* topic_name_, const trellis::core::TimestampedMessage& msg_, long long time_, long long clock_, long long id_) { CallbackWrapperLogic(msg_, callback); }; + // TODO: This forces a copy of the callback. ecal_sub_.AddReceiveCallback(callback_wrapper_); } @@ -110,6 +111,7 @@ class SubscriberImpl { CallbackWrapperLogic(msg_, callback); }; + // TODO: This forces a copy of the callback. ecal_sub_.AddReceiveCallback(callback_wrapper_); } @@ -137,6 +139,7 @@ class SubscriberImpl { */ void Enable() { if (!callback_enabled_) { + // TODO: This forces a copy of the callback. ecal_sub_.AddReceiveCallback(callback_wrapper_); callback_enabled_ = true; } @@ -276,6 +279,8 @@ class SubscriberImpl { eCAL::protobuf::CSubscriber ecal_sub_; // The callback is stored as a class member so that it can be enabled and disabled. + // TODO: This forces all callbacks to be copiable, which is not ideal for users who want to create callbacks without + // resorting to shared_ptrs. eCAL::protobuf::CSubscriber::MsgReceiveCallbackT callback_wrapper_; std::shared_ptr> diff --git a/trellis/core/timer.cpp b/trellis/core/timer.cpp index 79349bd..f063147 100644 --- a/trellis/core/timer.cpp +++ b/trellis/core/timer.cpp @@ -23,7 +23,7 @@ namespace core { TimerImpl::TimerImpl(EventLoop loop, Type type, Callback callback, unsigned interval_ms, unsigned delay_ms) : loop_{loop}, type_{type}, - callback_{callback}, + callback_{std::move(callback)}, interval_ms_{interval_ms}, delay_ms_(delay_ms), timer_{CreateSteadyTimer(loop, delay_ms)} { diff --git a/trellis/utils/mcap/mcap_impl.cpp b/trellis/utils/mcap/mcap_impl.cpp index 93ce2ee..746800c 100644 --- a/trellis/utils/mcap/mcap_impl.cpp +++ b/trellis/utils/mcap/mcap_impl.cpp @@ -18,7 +18,7 @@ // For whatever reason, the MCAP library was designed to be "pseudo" header only, meaning // that in exactly one translation unit in your project, the preprocessor macro `MCAP_IMPLEMENTATION` must be defined // before including the library headers so that the implementation gets compiled in. This source file serves that -// purose. +// purpose. #define MCAP_IMPLEMENTATION #include #include diff --git a/trellis/utils/mcap/writer.cpp b/trellis/utils/mcap/writer.cpp index 6b022dc..fdec97e 100644 --- a/trellis/utils/mcap/writer.cpp +++ b/trellis/utils/mcap/writer.cpp @@ -17,77 +17,105 @@ #include "writer.hpp" -namespace trellis { -namespace utils { -namespace mcap { -Writer::Writer(core::Node& node, const std::vector& topics, const std::string& outfile, - ::mcap::McapWriterOptions options) - : subscribers_{} { - CreateSubscriberList(node, topics); - const auto res = writer_.open(outfile, options); - if (!res.ok()) { - throw std::runtime_error("Failed to open " + outfile + " for writing: " + res.message); - } +namespace trellis::utils::mcap { + +namespace { + +struct FileWriter { + ::mcap::McapWriter writer = {}; + std::mutex mutex = {}; +}; + +std::shared_ptr MakeFileWriter(const std::string_view outfile, ::mcap::McapWriterOptions options) { + const auto ret = std::make_shared(); + const auto res = ret->writer.open(outfile, options); + if (!res.ok()) throw(std::runtime_error{fmt::format("Failed to open {} for writing: {}", outfile, res.message)}); + return ret; } -Writer::~Writer() { writer_.close(); } - -void Writer::CreateSubscriberList(core::Node& node, const std::vector& topics) { - unsigned subscriber_index{0}; - for (const auto& topic : topics) { - subscribers_.emplace_back(SubscriberInfo{ - false, topic, - node.CreateRawSubscriber( - topic, [this, subscriber_index](const core::time::TimePoint& now, const core::TimestampedMessage& msg) { - std::lock_guard guard(mutex_); - if (subscribers_.at(subscriber_index).initialized == false) { - InitializeMcapChannel(subscriber_index); - } - WriteMessage(subscriber_index, now, msg); - })}); - ++subscriber_index; +// Data for each subscriber +struct SubscriberData { + bool initialized = {}; /// track initialization to know mcap schema and channel exists for this subscriber + std::string topic{}; /// topic name + std::shared_ptr file_writer = {}; /// the mutex protected file writer + std::weak_ptr subscriber = {}; /// the subscriber object, weak ptr to avoid circular ref + ::mcap::ChannelId channel_id = {}; /// Identifier to reference the channel for this subscriber + unsigned sequence = {}; /// sequence number for each message +}; + +// mutex should be locked before calling this function +void TryInitializeMcapChannel(SubscriberData& data) { + // The subscriber may still be being constructed (unlikely), so we guard against it being nullptr. + const auto subscriber = data.subscriber.lock(); + if (subscriber == nullptr) { + core::Log::Error("Subscriber is nullptr, cannot initialize MCAP channel"); + return; } -} -void Writer::InitializeMcapChannel(unsigned subscriber_index) { - const auto& subscriber = subscribers_.at(subscriber_index).subscriber; - const auto& topic = subscribers_.at(subscriber_index).topic; - const auto& message_name = subscriber->GetDescriptor()->full_name(); - auto& channel_id = subscribers_.at(subscriber_index).channel_id; - auto& initialized = subscribers_.at(subscriber_index).initialized; + const auto descriptor = subscriber->GetDescriptor(); + if (descriptor == nullptr) { + core::Log::Error("Descriptor is nullptr, cannot initialize MCAP channel"); + return; + } + + const auto& message_name = descriptor->full_name(); // Add both the schema and channel to the writer, and then record the channel ID for the future - ::mcap::Schema schema(message_name, "protobuf", subscriber->GenerateFileDescriptorSet().SerializeAsString()); - writer_.addSchema(schema); - ::mcap::Channel channel(topic, "protobuf", schema.id); - writer_.addChannel(channel); - channel_id = channel.id; - initialized = true; - - core::Log::Info("Initialized MCAP recorder channel for {} on {} with id {}", message_name, topic, channel_id); + // Not const to receive the schema id + auto schema = ::mcap::Schema{ + message_name, "protobuf", + trellis::utils::protobuf::GenerateFileDescriptorSetFromTopLevelDescriptor(descriptor).SerializeAsString()}; + data.file_writer->writer.addSchema(schema); + // Not const to receive the channel id + auto channel = ::mcap::Channel{data.topic, "protobuf", schema.id}; + data.file_writer->writer.addChannel(channel); + data.channel_id = channel.id; + data.initialized = true; + core::Log::Info("Initialized MCAP recorder channel for {} on {} with id {}", message_name, data.topic, + data.channel_id); } -void Writer::WriteMessage(unsigned subscriber_index, const core::time::TimePoint& now, - const core::TimestampedMessage& msg) { - auto& subscriber = subscribers_.at(subscriber_index); - const auto& channel_id = subscriber.channel_id; - auto& sequence = subscriber.sequence; - ::mcap::Message mcap_msg; - mcap_msg.channelId = channel_id; - mcap_msg.sequence = sequence; - mcap_msg.publishTime = core::time::TimePointToNanoseconds(core::time::TimePointFromTimestamp(msg.timestamp())); - mcap_msg.logTime = core::time::TimePointToNanoseconds(now); - mcap_msg.data = reinterpret_cast(msg.payload().data()); - mcap_msg.dataSize = msg.payload().size(); - - const auto res = writer_.write(mcap_msg); +void WriteMessage(const core::time::TimePoint& now, const core::TimestampedMessage& msg, SubscriberData& data) { + const auto mcap_msg = ::mcap::Message{ + .channelId = data.channel_id, + .sequence = data.sequence, + .logTime = core::time::TimePointToNanoseconds(now), + .publishTime = core::time::TimePointToNanoseconds(core::time::TimePointFromTimestamp(msg.timestamp())), + .dataSize = msg.payload().size(), + .data = reinterpret_cast(msg.payload().data())}; + + const auto res = data.file_writer->writer.write(mcap_msg); if (!res.ok()) { - writer_.close(); - throw std::runtime_error("MCAP write failed: " + res.message); + data.file_writer->writer.close(); + throw(std::runtime_error{fmt::format("MCAP write failed: {}", res.message)}); } - ++sequence; + ++data.sequence; +} + +core::SubscriberRaw CreateSubscriber(core::Node& node, const std::string_view topic, + std::shared_ptr file_writer) { + // A bit of a chicken and egg problem, we need the callback to be able to access the subscriber to fill in the schema. + // This introduces a small race condition that the subscriber may be nullptr when the first message arrives. + // Hence we use a shared ptr to update the data after creating the subscriber, and we guard in the + // InitalizeMcapChannel function against data with nullptr subscriber. + const auto data = std::make_shared( + SubscriberData{.topic = std::string{topic}, .file_writer = std::move(file_writer)}); + const auto ret = node.CreateRawSubscriber( + std::string{topic}, [data](const core::time::TimePoint& now, const core::TimestampedMessage& msg) { + const auto lock = std::scoped_lock{data->file_writer->mutex}; + if (!data->initialized) TryInitializeMcapChannel(*data); + if (data->initialized) WriteMessage(now, msg, *data); + }); + data->subscriber = ret; + return ret; +} + +} // namespace + +Writer::Writer(core::Node& node, const std::vector& topics, const std::string_view outfile, + const ::mcap::McapWriterOptions& options) { + const auto file_writer = MakeFileWriter(outfile, options); + for (const auto& topic : topics) subscribers_.push_back(CreateSubscriber(node, topic, file_writer)); } -} // namespace mcap -} // namespace utils -} // namespace trellis +} // namespace trellis::utils::mcap diff --git a/trellis/utils/mcap/writer.hpp b/trellis/utils/mcap/writer.hpp index d38b782..a54c02b 100644 --- a/trellis/utils/mcap/writer.hpp +++ b/trellis/utils/mcap/writer.hpp @@ -26,15 +26,17 @@ #include "trellis/core/node.hpp" #include "trellis/core/subscriber.hpp" -namespace trellis { -namespace utils { -namespace mcap { +namespace trellis::utils::mcap { /** * @brief Log writer utility for subscribing to trellis topics and writing messages to an MCAP log file * * This class does not need to be aware of the message types at compile time. Instead, it will forward the message * payload directly to disk and inform MCAP of the message schema at runtime. + * + * Note that since there is an issue with ecal where recreating the same subscribers in the same process results in no + * messages being received, this class should be used with caution. It is recommended to only create one instance of + * this class per process. */ class Writer { public: @@ -44,60 +46,15 @@ class Writer { * @param node trellis node by which to create subscribers from * @param topics list of topics to subscribe to * @param outfile the path of the output mcap file - * @param options mcap writer options (optional) - */ - Writer(core::Node& node, const std::vector& topics, const std::string& outfile, - ::mcap::McapWriterOptions options = ::mcap::McapWriterOptions("")); - - /** - * @brief Finalizes the log writer + * @param options mcap writer options (optional) the default has some compression */ - ~Writer(); - - // Move/copy not allowed - Writer(const Writer&) = delete; - Writer(Writer&&) = delete; - Writer& operator=(const Writer&) = delete; - Writer& operator=(Writer&&) = delete; + Writer(core::Node& node, const std::vector& topics, std::string_view outfile, + const ::mcap::McapWriterOptions& options = ::mcap::McapWriterOptions("protobuf")); private: - /** - * @brief Create subscribers based on our topic list - */ - void CreateSubscriberList(core::Node& node, const std::vector& topics); - /** - * @brief Initialize an MCAP channel by specifying metadata such as the message schema and topic - * - * @param subscriber_index Relevant index into the subscriber list - */ - void InitializeMcapChannel(unsigned subscriber_index); - /** - * @brief Write a message to the log - * - * @param subscriber_index Relevant index into the subscriber list - * @param now Subscriber receive time (e.g. now) - * @param msg The message to write to the log - */ - void WriteMessage(unsigned subscriber_index, const core::time::TimePoint& now, const core::TimestampedMessage& msg); - - /** - * @brief Per-subscriber metadata - */ - struct SubscriberInfo { - bool initialized{false}; /// track initialization to know mcap schema and channel exists for this subscriber - std::string topic{}; /// topic name - core::SubscriberRaw subscriber{nullptr}; /// trellis subscriber handle - ::mcap::ChannelId channel_id{}; /// Identifier to reference the channel for this subscriber - unsigned sequence{0}; /// sequence number for each message - }; - using SubscriberList = std::vector; - SubscriberList subscribers_{}; - std::mutex mutex_; - ::mcap::McapWriter writer_{}; + std::vector subscribers_; }; -} // namespace mcap -} // namespace utils -} // namespace trellis +} // namespace trellis::utils::mcap -#endif // TRELLIS_UTILS_MCAP_WRITER_HPP_ +#endif // TRELLIS_UTILS_MCAP_WRITER_HPP_