Skip to content

Commit

Permalink
[mcap] Fix destructor race condition (#94)
Browse files Browse the repository at this point in the history
Since the writer was help outside of the subscribers (which are in
different threads), there is a race on delete to delete the subscribers
before they attempt to write new messages via the closing writer. To
solve this, we allow the subscribers to hold the writer together in a
shared_ptr. We also hold the data for the subscriber in a shared_ptr,
although after construction the subscriber holds the only ref. We also
solve the issue of the subscriber needing a callback which is self
referential by filling in a weak_ptr after construction. The previous
approach relied on construction completing before the first message is
received. While this race condition probably wouldn't have been realized
(subscriber construction is rather fast), we guard against it anyway,
especially as the user can try to create any number of subscribers, so
the first subscriber may have received a message before the overall
writer had completed construction. Each subscriber is now
self-contained, and does not rely on the overall class being
constructed, which is now just a wrapper around a vector of subscribers.

While we're here add a number of missing moves.

We also have to increment the ecal version in this PR, as ecal was
missing proper subscriber destructor handling, which led to race
conditions and sometimes getting `pure virtual method called`.
  • Loading branch information
MattOslin authored Apr 25, 2024
1 parent 403a91b commit f5fa954
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 133 deletions.
12 changes: 6 additions & 6 deletions third_party/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ def trellis_deps():
http_archive,
name = "ecal",
build_file = Label("//third_party:ecal.BUILD"),
sha256 = "5a0e575a73ab54b4ec2e48815af13860ce9fda43c9fa46341e244a543ccb1dd5",
strip_prefix = "ecal-5.12.0",
sha256 = "356e5cf65cc76b8c8a1aeb710d402fe9f5780746e679d74a0f70dfe8fd71ec9a",
strip_prefix = "ecal-5.12.4",
urls = [
"https://github.com/eclipse-ecal/ecal/archive/refs/tags/v5.12.0.tar.gz",
"https://github.com/eclipse-ecal/ecal/archive/refs/tags/v5.12.4.tar.gz",
],
)

Expand Down Expand Up @@ -197,9 +197,9 @@ def trellis_deps():
http_archive,
name = "mcap",
build_file = Label("//third_party:mcap.BUILD"),
sha256 = "2833f72344308ea58639f3b363a0cf17669580ae7ab435f43f3b104cff6ef548",
strip_prefix = "mcap-releases-cpp-v0.8.0",
urls = ["https://github.com/foxglove/mcap/archive/refs/tags/releases/cpp/v0.8.0.tar.gz"],
sha256 = "64ff3e51119f37ffcfaf9deecbd987a7cb4d4d9035d74a3fd3773395a470fda1",
strip_prefix = "mcap-releases-cpp-v1.4.0",
urls = ["https://github.com/foxglove/mcap/archive/refs/tags/releases/cpp/v1.4.0.tar.gz"],
)
maybe(
http_archive,
Expand Down
3 changes: 2 additions & 1 deletion trellis/core/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ Timer Node::CreateTimer(unsigned interval_ms, TimerImpl::Callback callback, unsi
}

Timer Node::CreateOneShotTimer(unsigned initial_delay_ms, TimerImpl::Callback callback) {
auto timer = std::make_shared<TimerImpl>(GetEventLoop(), TimerImpl::Type::kOneShot, callback, 0, initial_delay_ms);
auto timer =
std::make_shared<TimerImpl>(GetEventLoop(), TimerImpl::Type::kOneShot, std::move(callback), 0, initial_delay_ms);

timers_.push_back(timer);
return timer;
Expand Down
8 changes: 4 additions & 4 deletions trellis/core/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ class Node {
const auto impl =
do_watchdog
? std::make_shared<SubscriberImpl<MSG_T, MAX_MSGS>>(
GetEventLoop(), std::string{topic}, callback, update_sim_fn, watchdog_callback,
GetEventLoop(), std::string{topic}, std::move(callback), update_sim_fn, std::move(watchdog_callback),
[this, initial_delay_ms = watchdog_timeout_ms.value()](TimerImpl::Callback watchdog_callback) {
return CreateOneShotTimer(initial_delay_ms, watchdog_callback);
return CreateOneShotTimer(initial_delay_ms, std::move(watchdog_callback));
})
: std::make_shared<SubscriberImpl<MSG_T, MAX_MSGS>>(GetEventLoop(), std::string{topic}, callback,
update_sim_fn);
Expand Down Expand Up @@ -173,8 +173,8 @@ class Node {
const std::string& topic, typename trellis::core::SubscriberImpl<google::protobuf::Message>::Callback callback,
std::optional<unsigned> watchdog_timeout_ms = {}, TimerImpl::Callback watchdog_callback = {},
std::optional<double> max_frequency = {}) {
return CreateSubscriber<google::protobuf::Message>(topic, callback, watchdog_timeout_ms, watchdog_callback,
max_frequency);
return CreateSubscriber<google::protobuf::Message>(topic, std::move(callback), watchdog_timeout_ms,
std::move(watchdog_callback), max_frequency);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions trellis/core/service_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class ServiceClientImpl {
template <typename REQ_T, typename RESP_T>
void CallAsync(const std::string& method_name, const REQ_T& req, Callback<RESP_T> cb, unsigned timeout_ms = 0) {
// XXX(bsirang): look into eliminating the copy of `req` here
asio::post(*priv_ev_loop_, [this, cb, method_name, req, timeout_ms]() {
asio::post(*priv_ev_loop_, [this, cb = std::move(cb), method_name, req, timeout_ms]() {
if (!client_->IsConnected()) {
asio::post(*priv_ev_loop_, [cb]() {
asio::post(*priv_ev_loop_, [cb = std::move(cb)]() {
if (cb) cb(kFailure, nullptr);
});
return;
Expand All @@ -70,12 +70,12 @@ class ServiceClientImpl {
resp.ParseFromString(service_response.response);
}
// Invoke callback from event loop thread...
asio::post(*priv_ev_loop_, [status, cb, resp = std::move(resp)]() {
asio::post(*priv_ev_loop_, [status, cb = std::move(cb), resp = std::move(resp)]() {
if (cb) cb(status, &resp);
});
}
} else {
asio::post(*priv_ev_loop_, [cb]() {
asio::post(*priv_ev_loop_, [cb = std::move(cb)]() {
if (cb) cb(kTimedOut, nullptr);
});
}
Expand Down
5 changes: 5 additions & 0 deletions trellis/core/subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class SubscriberImpl {
callback_wrapper_ = [this, callback = std::move(callback)](
const char* topic_name_, const trellis::core::TimestampedMessage& msg_, long long time_,
long long clock_, long long id_) { CallbackWrapperLogic(msg_, callback); };
// TODO: This forces a copy of the callback.
ecal_sub_.AddReceiveCallback(callback_wrapper_);
}

Expand Down Expand Up @@ -110,6 +111,7 @@ class SubscriberImpl {

CallbackWrapperLogic(msg_, callback);
};
// TODO: This forces a copy of the callback.
ecal_sub_.AddReceiveCallback(callback_wrapper_);
}

Expand Down Expand Up @@ -137,6 +139,7 @@ class SubscriberImpl {
*/
void Enable() {
if (!callback_enabled_) {
// TODO: This forces a copy of the callback.
ecal_sub_.AddReceiveCallback(callback_wrapper_);
callback_enabled_ = true;
}
Expand Down Expand Up @@ -276,6 +279,8 @@ class SubscriberImpl {
eCAL::protobuf::CSubscriber<trellis::core::TimestampedMessage> ecal_sub_;

// The callback is stored as a class member so that it can be enabled and disabled.
// TODO: This forces all callbacks to be copiable, which is not ideal for users who want to create callbacks without
// resorting to shared_ptrs.
eCAL::protobuf::CSubscriber<trellis::core::TimestampedMessage>::MsgReceiveCallbackT callback_wrapper_;

std::shared_ptr<eCAL::protobuf::CSubscriber<MSG_T>>
Expand Down
2 changes: 1 addition & 1 deletion trellis/core/timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace core {
TimerImpl::TimerImpl(EventLoop loop, Type type, Callback callback, unsigned interval_ms, unsigned delay_ms)
: loop_{loop},
type_{type},
callback_{callback},
callback_{std::move(callback)},
interval_ms_{interval_ms},
delay_ms_(delay_ms),
timer_{CreateSteadyTimer(loop, delay_ms)} {
Expand Down
2 changes: 1 addition & 1 deletion trellis/utils/mcap/mcap_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// For whatever reason, the MCAP library was designed to be "pseudo" header only, meaning
// that in exactly one translation unit in your project, the preprocessor macro `MCAP_IMPLEMENTATION` must be defined
// before including the library headers so that the implementation gets compiled in. This source file serves that
// purose.
// purpose.
#define MCAP_IMPLEMENTATION
#include <mcap/reader.hpp>
#include <mcap/writer.hpp>
152 changes: 90 additions & 62 deletions trellis/utils/mcap/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,77 +17,105 @@

#include "writer.hpp"

namespace trellis {
namespace utils {
namespace mcap {
Writer::Writer(core::Node& node, const std::vector<std::string>& topics, const std::string& outfile,
::mcap::McapWriterOptions options)
: subscribers_{} {
CreateSubscriberList(node, topics);
const auto res = writer_.open(outfile, options);
if (!res.ok()) {
throw std::runtime_error("Failed to open " + outfile + " for writing: " + res.message);
}
namespace trellis::utils::mcap {

namespace {

struct FileWriter {
::mcap::McapWriter writer = {};
std::mutex mutex = {};
};

std::shared_ptr<FileWriter> MakeFileWriter(const std::string_view outfile, ::mcap::McapWriterOptions options) {
const auto ret = std::make_shared<FileWriter>();
const auto res = ret->writer.open(outfile, options);
if (!res.ok()) throw(std::runtime_error{fmt::format("Failed to open {} for writing: {}", outfile, res.message)});
return ret;
}

Writer::~Writer() { writer_.close(); }

void Writer::CreateSubscriberList(core::Node& node, const std::vector<std::string>& topics) {
unsigned subscriber_index{0};
for (const auto& topic : topics) {
subscribers_.emplace_back(SubscriberInfo{
false, topic,
node.CreateRawSubscriber(
topic, [this, subscriber_index](const core::time::TimePoint& now, const core::TimestampedMessage& msg) {
std::lock_guard<std::mutex> guard(mutex_);
if (subscribers_.at(subscriber_index).initialized == false) {
InitializeMcapChannel(subscriber_index);
}
WriteMessage(subscriber_index, now, msg);
})});
++subscriber_index;
// Data for each subscriber
struct SubscriberData {
bool initialized = {}; /// track initialization to know mcap schema and channel exists for this subscriber
std::string topic{}; /// topic name
std::shared_ptr<FileWriter> file_writer = {}; /// the mutex protected file writer
std::weak_ptr<core::SubscriberRawImpl> subscriber = {}; /// the subscriber object, weak ptr to avoid circular ref
::mcap::ChannelId channel_id = {}; /// Identifier to reference the channel for this subscriber
unsigned sequence = {}; /// sequence number for each message
};

// mutex should be locked before calling this function
void TryInitializeMcapChannel(SubscriberData& data) {
// The subscriber may still be being constructed (unlikely), so we guard against it being nullptr.
const auto subscriber = data.subscriber.lock();
if (subscriber == nullptr) {
core::Log::Error("Subscriber is nullptr, cannot initialize MCAP channel");
return;
}
}

void Writer::InitializeMcapChannel(unsigned subscriber_index) {
const auto& subscriber = subscribers_.at(subscriber_index).subscriber;
const auto& topic = subscribers_.at(subscriber_index).topic;
const auto& message_name = subscriber->GetDescriptor()->full_name();
auto& channel_id = subscribers_.at(subscriber_index).channel_id;
auto& initialized = subscribers_.at(subscriber_index).initialized;
const auto descriptor = subscriber->GetDescriptor();
if (descriptor == nullptr) {
core::Log::Error("Descriptor is nullptr, cannot initialize MCAP channel");
return;
}

const auto& message_name = descriptor->full_name();

// Add both the schema and channel to the writer, and then record the channel ID for the future
::mcap::Schema schema(message_name, "protobuf", subscriber->GenerateFileDescriptorSet().SerializeAsString());
writer_.addSchema(schema);
::mcap::Channel channel(topic, "protobuf", schema.id);
writer_.addChannel(channel);
channel_id = channel.id;
initialized = true;

core::Log::Info("Initialized MCAP recorder channel for {} on {} with id {}", message_name, topic, channel_id);
// Not const to receive the schema id
auto schema = ::mcap::Schema{
message_name, "protobuf",
trellis::utils::protobuf::GenerateFileDescriptorSetFromTopLevelDescriptor(descriptor).SerializeAsString()};
data.file_writer->writer.addSchema(schema);
// Not const to receive the channel id
auto channel = ::mcap::Channel{data.topic, "protobuf", schema.id};
data.file_writer->writer.addChannel(channel);
data.channel_id = channel.id;
data.initialized = true;
core::Log::Info("Initialized MCAP recorder channel for {} on {} with id {}", message_name, data.topic,
data.channel_id);
}

void Writer::WriteMessage(unsigned subscriber_index, const core::time::TimePoint& now,
const core::TimestampedMessage& msg) {
auto& subscriber = subscribers_.at(subscriber_index);
const auto& channel_id = subscriber.channel_id;
auto& sequence = subscriber.sequence;
::mcap::Message mcap_msg;
mcap_msg.channelId = channel_id;
mcap_msg.sequence = sequence;
mcap_msg.publishTime = core::time::TimePointToNanoseconds(core::time::TimePointFromTimestamp(msg.timestamp()));
mcap_msg.logTime = core::time::TimePointToNanoseconds(now);
mcap_msg.data = reinterpret_cast<const std::byte*>(msg.payload().data());
mcap_msg.dataSize = msg.payload().size();

const auto res = writer_.write(mcap_msg);
void WriteMessage(const core::time::TimePoint& now, const core::TimestampedMessage& msg, SubscriberData& data) {
const auto mcap_msg = ::mcap::Message{
.channelId = data.channel_id,
.sequence = data.sequence,
.logTime = core::time::TimePointToNanoseconds(now),
.publishTime = core::time::TimePointToNanoseconds(core::time::TimePointFromTimestamp(msg.timestamp())),
.dataSize = msg.payload().size(),
.data = reinterpret_cast<const std::byte*>(msg.payload().data())};

const auto res = data.file_writer->writer.write(mcap_msg);
if (!res.ok()) {
writer_.close();
throw std::runtime_error("MCAP write failed: " + res.message);
data.file_writer->writer.close();
throw(std::runtime_error{fmt::format("MCAP write failed: {}", res.message)});
}
++sequence;
++data.sequence;
}

core::SubscriberRaw CreateSubscriber(core::Node& node, const std::string_view topic,
std::shared_ptr<FileWriter> file_writer) {
// A bit of a chicken and egg problem, we need the callback to be able to access the subscriber to fill in the schema.
// This introduces a small race condition that the subscriber may be nullptr when the first message arrives.
// Hence we use a shared ptr to update the data after creating the subscriber, and we guard in the
// InitalizeMcapChannel function against data with nullptr subscriber.
const auto data = std::make_shared<SubscriberData>(
SubscriberData{.topic = std::string{topic}, .file_writer = std::move(file_writer)});
const auto ret = node.CreateRawSubscriber(
std::string{topic}, [data](const core::time::TimePoint& now, const core::TimestampedMessage& msg) {
const auto lock = std::scoped_lock{data->file_writer->mutex};
if (!data->initialized) TryInitializeMcapChannel(*data);
if (data->initialized) WriteMessage(now, msg, *data);
});
data->subscriber = ret;
return ret;
}

} // namespace

Writer::Writer(core::Node& node, const std::vector<std::string>& topics, const std::string_view outfile,
const ::mcap::McapWriterOptions& options) {
const auto file_writer = MakeFileWriter(outfile, options);
for (const auto& topic : topics) subscribers_.push_back(CreateSubscriber(node, topic, file_writer));
}

} // namespace mcap
} // namespace utils
} // namespace trellis
} // namespace trellis::utils::mcap
65 changes: 11 additions & 54 deletions trellis/utils/mcap/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@
#include "trellis/core/node.hpp"
#include "trellis/core/subscriber.hpp"

namespace trellis {
namespace utils {
namespace mcap {
namespace trellis::utils::mcap {

/**
* @brief Log writer utility for subscribing to trellis topics and writing messages to an MCAP log file
*
* This class does not need to be aware of the message types at compile time. Instead, it will forward the message
* payload directly to disk and inform MCAP of the message schema at runtime.
*
* Note that since there is an issue with ecal where recreating the same subscribers in the same process results in no
* messages being received, this class should be used with caution. It is recommended to only create one instance of
* this class per process.
*/
class Writer {
public:
Expand All @@ -44,60 +46,15 @@ class Writer {
* @param node trellis node by which to create subscribers from
* @param topics list of topics to subscribe to
* @param outfile the path of the output mcap file
* @param options mcap writer options (optional)
*/
Writer(core::Node& node, const std::vector<std::string>& topics, const std::string& outfile,
::mcap::McapWriterOptions options = ::mcap::McapWriterOptions(""));

/**
* @brief Finalizes the log writer
* @param options mcap writer options (optional) the default has some compression
*/
~Writer();

// Move/copy not allowed
Writer(const Writer&) = delete;
Writer(Writer&&) = delete;
Writer& operator=(const Writer&) = delete;
Writer& operator=(Writer&&) = delete;
Writer(core::Node& node, const std::vector<std::string>& topics, std::string_view outfile,
const ::mcap::McapWriterOptions& options = ::mcap::McapWriterOptions("protobuf"));

private:
/**
* @brief Create subscribers based on our topic list
*/
void CreateSubscriberList(core::Node& node, const std::vector<std::string>& topics);
/**
* @brief Initialize an MCAP channel by specifying metadata such as the message schema and topic
*
* @param subscriber_index Relevant index into the subscriber list
*/
void InitializeMcapChannel(unsigned subscriber_index);
/**
* @brief Write a message to the log
*
* @param subscriber_index Relevant index into the subscriber list
* @param now Subscriber receive time (e.g. now)
* @param msg The message to write to the log
*/
void WriteMessage(unsigned subscriber_index, const core::time::TimePoint& now, const core::TimestampedMessage& msg);

/**
* @brief Per-subscriber metadata
*/
struct SubscriberInfo {
bool initialized{false}; /// track initialization to know mcap schema and channel exists for this subscriber
std::string topic{}; /// topic name
core::SubscriberRaw subscriber{nullptr}; /// trellis subscriber handle
::mcap::ChannelId channel_id{}; /// Identifier to reference the channel for this subscriber
unsigned sequence{0}; /// sequence number for each message
};
using SubscriberList = std::vector<SubscriberInfo>;
SubscriberList subscribers_{};
std::mutex mutex_;
::mcap::McapWriter writer_{};
std::vector<core::SubscriberRaw> subscribers_;
};

} // namespace mcap
} // namespace utils
} // namespace trellis
} // namespace trellis::utils::mcap

#endif // TRELLIS_UTILS_MCAP_WRITER_HPP_
#endif // TRELLIS_UTILS_MCAP_WRITER_HPP_

0 comments on commit f5fa954

Please sign in to comment.