From 0c05dab08241af7e4c2695a600e323b1eefd1166 Mon Sep 17 00:00:00 2001 From: Abrar Rahman Protyasha Date: Thu, 28 Oct 2021 00:52:57 -0400 Subject: [PATCH] Add Foxy support Signed-off-by: Abrar Rahman Protyasha Going from `SerializedPublisher` to generic Signed-off-by: Abrar Rahman Protyasha Migrate from rclcpp generic pub/sub With this patch, the package builds successfully in rolling, but there are failing tests to address, and all of this is before having built in foxy at all (and before adding back some of the `create_subscription` code from PR 24). Signed-off-by: Abrar Rahman Protyasha Add missing compress/decompress logic Signed-off-by: Abrar Rahman Protyasha Update generic pub sub source Signed-off-by: Abrar Rahman Protyasha Publish the message rather than its shared ptr Signed-off-by: Abrar Rahman Protyasha Rename connext foxy package Signed-off-by: Abrar Rahman Protyasha Address uncrustify error Signed-off-by: Abrar Rahman Protyasha Migrate back to rosidl_target_interfaces Signed-off-by: Abrar Rahman Protyasha --- CMakeLists.txt | 2 + package.xml | 2 +- src/domain_bridge/domain_bridge.cpp | 125 ++++++++++----------- src/domain_bridge/generic_publisher.cpp | 55 +++++++++ src/domain_bridge/generic_publisher.hpp | 46 ++++++++ src/domain_bridge/generic_subscription.cpp | 115 +++++++++++++++++++ src/domain_bridge/generic_subscription.hpp | 101 +++++++++++++++++ 7 files changed, 378 insertions(+), 68 deletions(-) create mode 100644 src/domain_bridge/generic_publisher.cpp create mode 100644 src/domain_bridge/generic_publisher.hpp create mode 100644 src/domain_bridge/generic_subscription.cpp create mode 100644 src/domain_bridge/generic_subscription.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d6f850b..2588b06 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,8 @@ add_library(${PROJECT_NAME}_lib SHARED src/${PROJECT_NAME}/compress_messages.cpp src/${PROJECT_NAME}/domain_bridge.cpp src/${PROJECT_NAME}/domain_bridge_options.cpp + src/${PROJECT_NAME}/generic_publisher.cpp + src/${PROJECT_NAME}/generic_subscription.cpp src/${PROJECT_NAME}/parse_domain_bridge_yaml_config.cpp src/${PROJECT_NAME}/qos_options.cpp src/${PROJECT_NAME}/service_bridge_options.cpp diff --git a/package.xml b/package.xml index d9fd5bd..853fdca 100644 --- a/package.xml +++ b/package.xml @@ -30,7 +30,7 @@ test_msgs - rmw_connextdds + rmw_connext_cpp rmw_cyclonedds_cpp rmw_fastrtps_cpp diff --git a/src/domain_bridge/domain_bridge.cpp b/src/domain_bridge/domain_bridge.cpp index 86886ae..991e2fa 100644 --- a/src/domain_bridge/domain_bridge.cpp +++ b/src/domain_bridge/domain_bridge.cpp @@ -30,8 +30,6 @@ #include "rclcpp/executor.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" -#include "rclcpp/generic_publisher.hpp" -#include "rclcpp/generic_subscription.hpp" #include "rclcpp/rclcpp.hpp" #include "rclcpp/serialization.hpp" #include "rosbag2_cpp/typesupport_helpers.hpp" @@ -44,43 +42,24 @@ #include "domain_bridge/topic_bridge_options.hpp" #include "domain_bridge/msg/compressed_msg.hpp" +#include "generic_publisher.hpp" +#include "generic_subscription.hpp" #include "wait_for_graph_events.hpp" namespace domain_bridge { -/// \internal -/** - * A hack, because PublisherBase doesn't support publishing serialized messages and because - * GenericPublisher cannot be created with a typesupport handle :/ - */ -class SerializedPublisher +[[noreturn]] static void unreachable() { -public: - template - explicit SerializedPublisher(std::shared_ptr> impl) - : impl_(std::move(impl)), - publish_method_pointer_(static_cast( - &rclcpp::Publisher::publish)) - {} - - explicit SerializedPublisher(std::shared_ptr impl) - : impl_(std::move(impl)), - publish_method_pointer_(static_cast( - &rclcpp::GenericPublisher::publish)) - {} - - void publish(const rclcpp::SerializedMessage & message) - { - ((*impl_).*publish_method_pointer_)(message); // this is a bit horrible, but it works ... - } - -private: - std::shared_ptr impl_; - using PointerToMemberMethod = - void (rclcpp::PublisherBase::*)(const rclcpp::SerializedMessage & message); - PointerToMemberMethod publish_method_pointer_; -}; +#if defined(__has_builtin) +#if __has_builtin(__builtin_unreachable) + __builtin_unreachable(); +#endif +#elif (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 5)) + __builtin_unreachable(); +#endif + throw std::logic_error("This code should be unreachable."); +} /// Implementation of \ref DomainBridge. class DomainBridgeImpl @@ -90,8 +69,8 @@ class DomainBridgeImpl using TopicBridgeMap = std::map< TopicBridge, std::pair< - std::shared_ptr, - std::shared_ptr>>; + std::shared_ptr, + std::shared_ptr>>; using ServiceBridgeMap = std::map< detail::ServiceBridge, std::pair, std::shared_ptr>>; @@ -195,32 +174,39 @@ class DomainBridgeImpl type, "rosidl_typesupport_cpp"); } - std::shared_ptr + std::shared_ptr create_publisher( rclcpp::Node::SharedPtr node, const std::string & topic_name, - const std::string & type, const rclcpp::QoS & qos, - rclcpp::PublisherOptionsWithAllocator> & options) + const rosidl_message_type_support_t & typesupport_handle, + rclcpp::CallbackGroup::SharedPtr group) { - std::shared_ptr publisher; if (options_.mode() != DomainBridgeOptions::Mode::Compress) { - publisher = std::make_shared( - node->create_generic_publisher(topic_name, type, qos, options)); - } else { - publisher = std::make_shared( - node->create_publisher(topic_name, qos, options)); + auto publisher = std::make_shared( + node->get_node_base_interface().get(), + typesupport_handle, + topic_name, + qos); + node->get_node_topics_interface()->add_publisher(publisher, std::move(group)); + return publisher; } + auto publisher = std::make_shared( + node->get_node_base_interface().get(), + *rosidl_typesupport_cpp::get_message_type_support_handle(), + topic_name, + qos); + node->get_node_topics_interface()->add_publisher(publisher, std::move(group)); return publisher; } - std::shared_ptr create_subscription( + std::shared_ptr create_subscription( rclcpp::Node::SharedPtr node, - std::shared_ptr publisher, + std::shared_ptr publisher, const std::string & topic_name, - const std::string & type, const rclcpp::QoS & qos, - rclcpp::SubscriptionOptionsWithAllocator> & options) + const rosidl_message_type_support_t & typesupport_handle, + rclcpp::CallbackGroup::SharedPtr group) { std::function)> callback; switch (options_.mode()) { @@ -236,7 +222,7 @@ class DomainBridgeImpl compressed_msg.data = domain_bridge::compress_message(cctx, std::move(*msg)); rclcpp::SerializedMessage serialized_compressed_msg; serializer.serialize_message(&compressed_msg, &serialized_compressed_msg); - publisher->publish(serialized_compressed_msg); + publisher->publish(serialized_compressed_msg.get_rcl_serialized_message()); }; break; case DomainBridgeOptions::Mode::Decompress: @@ -251,31 +237,39 @@ class DomainBridgeImpl serializer.deserialize_message(serialized_compressed_msg.get(), &compressed_msg); rclcpp::SerializedMessage msg = domain_bridge::decompress_message( dctx, std::move(compressed_msg.data)); - publisher->publish(msg); + publisher->publish(msg.get_rcl_serialized_message()); }; break; default: // fallthrough case DomainBridgeOptions::Mode::Normal: callback = [publisher](std::shared_ptr msg) { // Publish message into the other domain - publisher->publish(*msg); + publisher->publish(msg->get_rcl_serialized_message()); }; break; } if (options_.mode() != DomainBridgeOptions::Mode::Decompress) { // Create subscription - return node->create_generic_subscription( + auto subscription = std::make_shared( + node->get_node_base_interface().get(), + typesupport_handle, topic_name, - type, qos, - callback, - options); + callback); + node->get_node_topics_interface()->add_subscription(subscription, std::move(group)); + return subscription; } - return node->create_subscription( + auto subscription = std::make_shared( + node->get_node_base_interface().get(), + *rosidl_typesupport_cpp::get_message_type_support_handle(), topic_name, qos, - callback, - options); + [publisher](std::shared_ptr msg) { + // Publish message into the other domain + publisher->publish(msg->get_rcl_serialized_message()); + }); + node->get_node_topics_interface()->add_subscription(subscription, std::move(group)); + return subscription; } void bridge_topic( @@ -378,29 +372,26 @@ class DomainBridgeImpl std::cerr << warning << std::endl; } - rclcpp::PublisherOptionsWithAllocator> publisher_options; - rclcpp::SubscriptionOptionsWithAllocator> subscription_options; - - publisher_options.callback_group = topic_options.callback_group(); - subscription_options.callback_group = topic_options.callback_group(); + auto typesupport_handle = rosbag2_cpp::get_typesupport_handle( + type, "rosidl_typesupport_cpp", loaded_typesupports_.at(type)); // Create publisher for the 'to_domain' // The publisher should be created first so it is available to the subscription callback auto publisher = this->create_publisher( to_domain_node, topic_remapped, - type, qos, - publisher_options); + *typesupport_handle, + topic_options.callback_group()); // Create subscription for the 'from_domain' auto subscription = this->create_subscription( from_domain_node, publisher, topic, - type, qos, - subscription_options); + *typesupport_handle, + topic_options.callback_group()); this->bridged_topics_[topic_bridge] = {publisher, subscription}; }; diff --git a/src/domain_bridge/generic_publisher.cpp b/src/domain_bridge/generic_publisher.cpp new file mode 100644 index 0000000..ad55a81 --- /dev/null +++ b/src/domain_bridge/generic_publisher.cpp @@ -0,0 +1,55 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// NOTE: This file was directly copied from rosbag2_transport +// TODO(jacobperron): Replace with upstream implementation when available +// https://github.com/ros2/rclcpp/pull/1452 + +#include "generic_publisher.hpp" + +#include +#include + +namespace +{ +rcl_publisher_options_t rosbag2_get_publisher_options(const rclcpp::QoS & qos) +{ + auto options = rcl_publisher_get_default_options(); + options.qos = qos.get_rmw_qos_profile(); + return options; +} +} // unnamed namespace + +namespace domain_bridge +{ + +GenericPublisher::GenericPublisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const rosidl_message_type_support_t & type_support, + const std::string & topic_name, + const rclcpp::QoS & qos) +: rclcpp::PublisherBase(node_base, topic_name, type_support, rosbag2_get_publisher_options(qos)) +{} + +void GenericPublisher::publish(const rmw_serialized_message_t & message) +{ + auto return_code = rcl_publish_serialized_message( + get_publisher_handle().get(), &message, nullptr); + + if (return_code != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to publish serialized message"); + } +} + +} // namespace domain_bridge diff --git a/src/domain_bridge/generic_publisher.hpp b/src/domain_bridge/generic_publisher.hpp new file mode 100644 index 0000000..9a5d85f --- /dev/null +++ b/src/domain_bridge/generic_publisher.hpp @@ -0,0 +1,46 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// NOTE: This file was directly copied from rosbag2_transport +// TODO(jacobperron): Replace with upstream implementation when available +// https://github.com/ros2/rclcpp/pull/1452 + +#ifndef DOMAIN_BRIDGE__GENERIC_PUBLISHER_HPP_ +#define DOMAIN_BRIDGE__GENERIC_PUBLISHER_HPP_ + +#include +#include + +#include "rclcpp/rclcpp.hpp" + +namespace domain_bridge +{ + +class GenericPublisher : public rclcpp::PublisherBase +{ +public: + GenericPublisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const rosidl_message_type_support_t & type_support, + const std::string & topic_name, + const rclcpp::QoS & qos); + + virtual ~GenericPublisher() = default; + + void publish(const rmw_serialized_message_t & message); +}; + +} // namespace domain_bridge + +#endif // DOMAIN_BRIDGE__GENERIC_PUBLISHER_HPP_ diff --git a/src/domain_bridge/generic_subscription.cpp b/src/domain_bridge/generic_subscription.cpp new file mode 100644 index 0000000..55caaaf --- /dev/null +++ b/src/domain_bridge/generic_subscription.cpp @@ -0,0 +1,115 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// NOTE: This file was directly copied from rosbag2_transport +// TODO(jacobperron): Replace with upstream implementation when available +// https://github.com/ros2/rclcpp/pull/1452 + +#include "generic_subscription.hpp" + +#include +#include + +#include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/subscription.hpp" + +namespace +{ +rcl_subscription_options_t rosbag2_get_subscription_options(const rclcpp::QoS & qos) +{ + auto options = rcl_subscription_get_default_options(); + options.qos = qos.get_rmw_qos_profile(); + return options; +} +} // unnamed namespace + +namespace domain_bridge +{ + +GenericSubscription::GenericSubscription( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const rosidl_message_type_support_t & ts, + const std::string & topic_name, + const rclcpp::QoS & qos, + std::function)> callback) +: SubscriptionBase( + node_base, + ts, + topic_name, + rosbag2_get_subscription_options(qos), + true), + default_allocator_(rcutils_get_default_allocator()), + callback_(callback), + qos_(qos) +{} + +std::shared_ptr GenericSubscription::create_message() +{ + return create_serialized_message(); +} + +std::shared_ptr GenericSubscription::create_serialized_message() +{ + return borrow_serialized_message(0); +} + +void GenericSubscription::handle_message( + std::shared_ptr & message, const rclcpp::MessageInfo & message_info) +{ + (void) message; + (void) message_info; + throw std::runtime_error{"unexpected callback being called"}; +} + +void GenericSubscription::handle_loaned_message( + void * message, const rclcpp::MessageInfo & message_info) +{ + (void) message; + (void) message_info; + throw std::runtime_error{"unexpected callback being called"}; +} + +void GenericSubscription::handle_serialized_message( + const std::shared_ptr & serialized_message, + const rclcpp::MessageInfo & message_info) +{ + (void)message_info; + auto typed_message = std::static_pointer_cast(serialized_message); + callback_(typed_message); +} + +void GenericSubscription::return_message(std::shared_ptr & message) +{ + auto typed_message = std::static_pointer_cast(message); + return_serialized_message(typed_message); +} + +void GenericSubscription::return_serialized_message( + std::shared_ptr & message) +{ + message.reset(); +} + +const rclcpp::QoS & GenericSubscription::qos_profile() const +{ + return qos_; +} + +std::shared_ptr +GenericSubscription::borrow_serialized_message(size_t capacity) +{ + return std::make_shared(capacity); +} + +} // namespace domain_bridge diff --git a/src/domain_bridge/generic_subscription.hpp b/src/domain_bridge/generic_subscription.hpp new file mode 100644 index 0000000..a293777 --- /dev/null +++ b/src/domain_bridge/generic_subscription.hpp @@ -0,0 +1,101 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// NOTE: This file was directly copied from rosbag2_transport +// TODO(jacobperron): Replace with upstream implementation when available +// https://github.com/ros2/rclcpp/pull/1452 + +#ifndef DOMAIN_BRIDGE__GENERIC_SUBSCRIPTION_HPP_ +#define DOMAIN_BRIDGE__GENERIC_SUBSCRIPTION_HPP_ + +#include +#include + +#include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/subscription.hpp" + +namespace domain_bridge +{ + +/** + * This class is an implementation of an rclcpp::Subscription for serialized messages whose topic + * is not known at compile time (hence templating does not work). + * + * It does not support intra-process handling + */ +class GenericSubscription : public rclcpp::SubscriptionBase +{ +public: + // cppcheck-suppress unknownMacro + RCLCPP_SMART_PTR_DEFINITIONS(GenericSubscription) + + /** + * Constructor. In order to properly subscribe to a topic, this subscription needs to be added to + * the node_topic_interface of the node passed into this constructor. + * + * \param node_base NodeBaseInterface pointer used in parts of the setup. + * \param ts Type support handle + * \param topic_name Topic name + * \param callback Callback for new messages of serialized form + */ + GenericSubscription( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const rosidl_message_type_support_t & ts, + const std::string & topic_name, + const rclcpp::QoS & qos, + std::function)> callback); + + // Same as create_serialized_message() as the subscription is to serialized_messages only + std::shared_ptr create_message() override; + + std::shared_ptr create_serialized_message() override; + + void handle_message( + std::shared_ptr & message, const rclcpp::MessageInfo & message_info) override; + + void handle_loaned_message( + void * loaned_message, const rclcpp::MessageInfo & message_info) override; + + // Post-Galactic, handle_serialized_message is a pure virtual function in + // rclcpp::SubscriptionBase, so we must override it. However, in order to + // make this change compatible with both Galactic and later, we leave off + // the 'override' flag. + void + handle_serialized_message( + const std::shared_ptr & serialized_message, + const rclcpp::MessageInfo & message_info); + + // Same as return_serialized_message() as the subscription is to serialized_messages only + void return_message(std::shared_ptr & message) override; + + void return_serialized_message(std::shared_ptr & message) override; + + // Provide a const reference to the QoS Profile used to create this subscription. + const rclcpp::QoS & qos_profile() const; + +private: + // cppcheck-suppress unknownMacro + RCLCPP_DISABLE_COPY(GenericSubscription) + + std::shared_ptr borrow_serialized_message(size_t capacity); + rcutils_allocator_t default_allocator_; + std::function)> callback_; + const rclcpp::QoS qos_; +}; + +} // namespace domain_bridge + +#endif // DOMAIN_BRIDGE__GENERIC_SUBSCRIPTION_HPP_