Skip to content

Commit

Permalink
[subscriber] Allow subscriber to be deleted properly (#97)
Browse files Browse the repository at this point in the history
Subscribers were handing out self-references without lifetimes attached
to other threads. This causes a race condition when deleting the
subscriber.
  • Loading branch information
MattOslin authored May 15, 2024
1 parent 1f1be99 commit 1031ef0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 30 deletions.
11 changes: 6 additions & 5 deletions trellis/core/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,18 @@ class Node {
std::string_view topic, typename trellis::core::SubscriberImpl<MSG_T, MAX_MSGS>::Callback callback,
std::optional<unsigned> watchdog_timeout_ms = {}, TimerImpl::Callback watchdog_callback = {},
std::optional<double> max_frequency = {}) {
const auto update_sim_fn = [this](const time::TimePoint& time) { UpdateSimulatedClock(time); };
auto update_sim_fn = [this](const time::TimePoint& time) { UpdateSimulatedClock(time); };
const bool do_watchdog = watchdog_timeout_ms.has_value() && watchdog_callback != nullptr;
const auto impl =
do_watchdog
? std::make_shared<SubscriberImpl<MSG_T, MAX_MSGS>>(
GetEventLoop(), std::string{topic}, std::move(callback), update_sim_fn, std::move(watchdog_callback),
? SubscriberImpl<MSG_T, MAX_MSGS>::Create(
GetEventLoop(), std::string{topic}, std::move(callback), std::move(update_sim_fn),
std::move(watchdog_callback),
[this, initial_delay_ms = watchdog_timeout_ms.value()](TimerImpl::Callback watchdog_callback) {
return CreateOneShotTimer(initial_delay_ms, std::move(watchdog_callback));
})
: std::make_shared<SubscriberImpl<MSG_T, MAX_MSGS>>(GetEventLoop(), std::string{topic}, callback,
update_sim_fn);
: SubscriberImpl<MSG_T, MAX_MSGS>::Create(GetEventLoop(), std::string{topic}, callback,
std::move(update_sim_fn));
if (max_frequency.has_value()) {
impl->SetMaxFrequencyThrottle(*max_frequency);
}
Expand Down
79 changes: 54 additions & 25 deletions trellis/core/subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace core {
* @tparam MAX_MSGS the max number of messages that can be allocated and passed out in the callback.
*/
template <typename MSG_T, size_t MAX_MSGS = containers::kDefaultSlotSize>
class SubscriberImpl {
class SubscriberImpl : public std::enable_shared_from_this<SubscriberImpl<MSG_T, MAX_MSGS>> {
public:
using MessagePool = containers::MemoryPool<MSG_T, MAX_MSGS>;
using PointerType = MessagePool::UniquePtr;
Expand All @@ -52,24 +52,44 @@ class SubscriberImpl {
*/
using Callback = std::function<void(const time::TimePoint& now, const time::TimePoint& msgtime, PointerType msg)>;
using UpdateSimulatedClockFunction = std::function<void(const time::TimePoint&)>;
using WatchdogCreateFunction = std::function<Timer(TimerImpl::Callback)>;

SubscriberImpl(const SubscriberImpl&) = delete;
SubscriberImpl& operator=(const SubscriberImpl&) = delete;
SubscriberImpl(SubscriberImpl&&) = delete;
SubscriberImpl& operator=(SubscriberImpl&&) = delete;

private:
SubscriberImpl(EventLoop ev, std::string topic, UpdateSimulatedClockFunction update_sim_fn)
: ev_{std::move(ev)},
topic_{std::move(topic)},
ecal_sub_{topic_},
ecal_sub_raw_{CreateRawTopicSubscriber(topic_)},
update_sim_fn_{std::move(update_sim_fn)} {}

public:
/**
* @brief Construct a subscriber for a given topic
*
* @param topic the topic string to subscribe to
* @param callback the callback function to receive messages on
* @param update_sim_fn the function to update sim time on receive
*/
SubscriberImpl(EventLoop ev, std::string topic, Callback callback, UpdateSimulatedClockFunction update_sim_fn)
: ev_{ev},
topic_{std::move(topic)},
ecal_sub_{topic_},
ecal_sub_raw_{CreateRawTopicSubscriber(topic_)},
update_sim_fn_{std::move(update_sim_fn)} {
auto callback_wrapper = [this, callback = std::move(callback)](
static std::shared_ptr<SubscriberImpl<MSG_T, MAX_MSGS>> Create(EventLoop ev, std::string topic, Callback callback,
UpdateSimulatedClockFunction update_sim_fn) {
auto ret = std::shared_ptr<SubscriberImpl<MSG_T, MAX_MSGS>>{
new SubscriberImpl<MSG_T, MAX_MSGS>{std::move(ev), std::move(topic), std::move(update_sim_fn)}};

auto callback_wrapper = [self = ret->weak_from_this(), callback = std::move(callback)](
const char* topic_name_, const trellis::core::TimestampedMessage& msg_, long long time_,
long long clock_, long long id_) { CallbackWrapperLogic(msg_, callback); };
ecal_sub_.AddReceiveCallback(std::move(callback_wrapper));
long long clock_, long long id_) {
const auto shared = self.lock();
if (shared == nullptr) return;
shared->CallbackWrapperLogic(msg_, callback);
};
ret->ecal_sub_.AddReceiveCallback(std::move(callback_wrapper));

return ret;
}

/**
Expand All @@ -78,39 +98,48 @@ class SubscriberImpl {
* @param topic the topic string to subscribe to
* @param callback the callback function to receive messages on
* @param update_sim_fn the function to update sim time on receive
* @param watchdog_callback the callback function to call when the watchdog timer fires
* @param watchdog_create_fn the function to create a watchdog timer
*/
SubscriberImpl(EventLoop ev, std::string topic, Callback callback, UpdateSimulatedClockFunction update_sim_fn,
TimerImpl::Callback watchdog_callback, auto watchdog_create_fn)
: ev_{ev},
topic_{std::move(topic)},
ecal_sub_{topic_},
ecal_sub_raw_{CreateRawTopicSubscriber(topic_)},
update_sim_fn_{std::move(update_sim_fn)} {
static std::shared_ptr<SubscriberImpl<MSG_T, MAX_MSGS>> Create(EventLoop ev, std::string topic, Callback callback,
UpdateSimulatedClockFunction update_sim_fn,
TimerImpl::Callback watchdog_callback,
WatchdogCreateFunction watchdog_create_fn) {
auto ret = std::shared_ptr<SubscriberImpl<MSG_T, MAX_MSGS>>{
new SubscriberImpl<MSG_T, MAX_MSGS>{std::move(ev), std::move(topic), std::move(update_sim_fn)}};

auto watchdog_wrapper =
(watchdog_callback == nullptr)
? watchdog_callback
: [this, watchdog_callback = std::move(watchdog_callback)](const time::TimePoint& tp) mutable {
if (messages_pending_count_.load() == 0) {
: [self = ret->weak_from_this(),
watchdog_callback = std::move(watchdog_callback)](const time::TimePoint& tp) mutable {
const auto shared = self.lock();
if (shared == nullptr) return;
if (shared->messages_pending_count_.load() == 0) {
watchdog_callback(tp);
} else {
trellis::core::Log::Warn("Watchdog timer fired while messages are pending. Ignoring!");
}
};

auto callback_wrapper = [this, callback = std::move(callback), watchdog_create_fn = std::move(watchdog_create_fn),
auto callback_wrapper = [self = ret->weak_from_this(), callback = std::move(callback),
watchdog_create_fn = std::move(watchdog_create_fn),
watchdog_wrapper = std::move(watchdog_wrapper)](
const char* topic_name_, const trellis::core::TimestampedMessage& msg_, long long time_,
long long clock_, long long id_) mutable {
if (watchdog_timer_ == nullptr) {
watchdog_timer_ = watchdog_create_fn(std::move(watchdog_wrapper));
const auto shared = self.lock();
if (shared == nullptr) return;
if (shared->watchdog_timer_ == nullptr) {
shared->watchdog_timer_ = watchdog_create_fn(std::move(watchdog_wrapper));
} else {
watchdog_timer_->Reset();
shared->watchdog_timer_->Reset();
}

CallbackWrapperLogic(msg_, callback);
shared->CallbackWrapperLogic(msg_, callback);
};
ecal_sub_.AddReceiveCallback(std::move(callback_wrapper));
ret->ecal_sub_.AddReceiveCallback(std::move(callback_wrapper));

return ret;
}

/**
Expand Down

0 comments on commit 1031ef0

Please sign in to comment.