diff --git a/CMakeLists.txt b/CMakeLists.txt
index d6f850b..2588b06 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -34,6 +34,8 @@ add_library(${PROJECT_NAME}_lib SHARED
src/${PROJECT_NAME}/compress_messages.cpp
src/${PROJECT_NAME}/domain_bridge.cpp
src/${PROJECT_NAME}/domain_bridge_options.cpp
+ src/${PROJECT_NAME}/generic_publisher.cpp
+ src/${PROJECT_NAME}/generic_subscription.cpp
src/${PROJECT_NAME}/parse_domain_bridge_yaml_config.cpp
src/${PROJECT_NAME}/qos_options.cpp
src/${PROJECT_NAME}/service_bridge_options.cpp
diff --git a/package.xml b/package.xml
index d9fd5bd..853fdca 100644
--- a/package.xml
+++ b/package.xml
@@ -30,7 +30,7 @@
test_msgs
- rmw_connextdds
+ rmw_connext_cpp
rmw_cyclonedds_cpp
rmw_fastrtps_cpp
diff --git a/src/domain_bridge/domain_bridge.cpp b/src/domain_bridge/domain_bridge.cpp
index 86886ae..991e2fa 100644
--- a/src/domain_bridge/domain_bridge.cpp
+++ b/src/domain_bridge/domain_bridge.cpp
@@ -30,8 +30,6 @@
#include "rclcpp/executor.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
-#include "rclcpp/generic_publisher.hpp"
-#include "rclcpp/generic_subscription.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/serialization.hpp"
#include "rosbag2_cpp/typesupport_helpers.hpp"
@@ -44,43 +42,24 @@
#include "domain_bridge/topic_bridge_options.hpp"
#include "domain_bridge/msg/compressed_msg.hpp"
+#include "generic_publisher.hpp"
+#include "generic_subscription.hpp"
#include "wait_for_graph_events.hpp"
namespace domain_bridge
{
-/// \internal
-/**
- * A hack, because PublisherBase doesn't support publishing serialized messages and because
- * GenericPublisher cannot be created with a typesupport handle :/
- */
-class SerializedPublisher
+[[noreturn]] static void unreachable()
{
-public:
- template
- explicit SerializedPublisher(std::shared_ptr> impl)
- : impl_(std::move(impl)),
- publish_method_pointer_(static_cast(
- &rclcpp::Publisher::publish))
- {}
-
- explicit SerializedPublisher(std::shared_ptr impl)
- : impl_(std::move(impl)),
- publish_method_pointer_(static_cast(
- &rclcpp::GenericPublisher::publish))
- {}
-
- void publish(const rclcpp::SerializedMessage & message)
- {
- ((*impl_).*publish_method_pointer_)(message); // this is a bit horrible, but it works ...
- }
-
-private:
- std::shared_ptr impl_;
- using PointerToMemberMethod =
- void (rclcpp::PublisherBase::*)(const rclcpp::SerializedMessage & message);
- PointerToMemberMethod publish_method_pointer_;
-};
+#if defined(__has_builtin)
+#if __has_builtin(__builtin_unreachable)
+ __builtin_unreachable();
+#endif
+#elif (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 5))
+ __builtin_unreachable();
+#endif
+ throw std::logic_error("This code should be unreachable.");
+}
/// Implementation of \ref DomainBridge.
class DomainBridgeImpl
@@ -90,8 +69,8 @@ class DomainBridgeImpl
using TopicBridgeMap = std::map<
TopicBridge,
std::pair<
- std::shared_ptr,
- std::shared_ptr>>;
+ std::shared_ptr,
+ std::shared_ptr>>;
using ServiceBridgeMap = std::map<
detail::ServiceBridge,
std::pair, std::shared_ptr>>;
@@ -195,32 +174,39 @@ class DomainBridgeImpl
type, "rosidl_typesupport_cpp");
}
- std::shared_ptr
+ std::shared_ptr
create_publisher(
rclcpp::Node::SharedPtr node,
const std::string & topic_name,
- const std::string & type,
const rclcpp::QoS & qos,
- rclcpp::PublisherOptionsWithAllocator> & options)
+ const rosidl_message_type_support_t & typesupport_handle,
+ rclcpp::CallbackGroup::SharedPtr group)
{
- std::shared_ptr publisher;
if (options_.mode() != DomainBridgeOptions::Mode::Compress) {
- publisher = std::make_shared(
- node->create_generic_publisher(topic_name, type, qos, options));
- } else {
- publisher = std::make_shared(
- node->create_publisher(topic_name, qos, options));
+ auto publisher = std::make_shared(
+ node->get_node_base_interface().get(),
+ typesupport_handle,
+ topic_name,
+ qos);
+ node->get_node_topics_interface()->add_publisher(publisher, std::move(group));
+ return publisher;
}
+ auto publisher = std::make_shared(
+ node->get_node_base_interface().get(),
+ *rosidl_typesupport_cpp::get_message_type_support_handle(),
+ topic_name,
+ qos);
+ node->get_node_topics_interface()->add_publisher(publisher, std::move(group));
return publisher;
}
- std::shared_ptr create_subscription(
+ std::shared_ptr create_subscription(
rclcpp::Node::SharedPtr node,
- std::shared_ptr publisher,
+ std::shared_ptr publisher,
const std::string & topic_name,
- const std::string & type,
const rclcpp::QoS & qos,
- rclcpp::SubscriptionOptionsWithAllocator> & options)
+ const rosidl_message_type_support_t & typesupport_handle,
+ rclcpp::CallbackGroup::SharedPtr group)
{
std::function)> callback;
switch (options_.mode()) {
@@ -236,7 +222,7 @@ class DomainBridgeImpl
compressed_msg.data = domain_bridge::compress_message(cctx, std::move(*msg));
rclcpp::SerializedMessage serialized_compressed_msg;
serializer.serialize_message(&compressed_msg, &serialized_compressed_msg);
- publisher->publish(serialized_compressed_msg);
+ publisher->publish(serialized_compressed_msg.get_rcl_serialized_message());
};
break;
case DomainBridgeOptions::Mode::Decompress:
@@ -251,31 +237,39 @@ class DomainBridgeImpl
serializer.deserialize_message(serialized_compressed_msg.get(), &compressed_msg);
rclcpp::SerializedMessage msg = domain_bridge::decompress_message(
dctx, std::move(compressed_msg.data));
- publisher->publish(msg);
+ publisher->publish(msg.get_rcl_serialized_message());
};
break;
default: // fallthrough
case DomainBridgeOptions::Mode::Normal:
callback = [publisher](std::shared_ptr msg) {
// Publish message into the other domain
- publisher->publish(*msg);
+ publisher->publish(msg->get_rcl_serialized_message());
};
break;
}
if (options_.mode() != DomainBridgeOptions::Mode::Decompress) {
// Create subscription
- return node->create_generic_subscription(
+ auto subscription = std::make_shared(
+ node->get_node_base_interface().get(),
+ typesupport_handle,
topic_name,
- type,
qos,
- callback,
- options);
+ callback);
+ node->get_node_topics_interface()->add_subscription(subscription, std::move(group));
+ return subscription;
}
- return node->create_subscription(
+ auto subscription = std::make_shared(
+ node->get_node_base_interface().get(),
+ *rosidl_typesupport_cpp::get_message_type_support_handle(),
topic_name,
qos,
- callback,
- options);
+ [publisher](std::shared_ptr msg) {
+ // Publish message into the other domain
+ publisher->publish(msg->get_rcl_serialized_message());
+ });
+ node->get_node_topics_interface()->add_subscription(subscription, std::move(group));
+ return subscription;
}
void bridge_topic(
@@ -378,29 +372,26 @@ class DomainBridgeImpl
std::cerr << warning << std::endl;
}
- rclcpp::PublisherOptionsWithAllocator> publisher_options;
- rclcpp::SubscriptionOptionsWithAllocator> subscription_options;
-
- publisher_options.callback_group = topic_options.callback_group();
- subscription_options.callback_group = topic_options.callback_group();
+ auto typesupport_handle = rosbag2_cpp::get_typesupport_handle(
+ type, "rosidl_typesupport_cpp", loaded_typesupports_.at(type));
// Create publisher for the 'to_domain'
// The publisher should be created first so it is available to the subscription callback
auto publisher = this->create_publisher(
to_domain_node,
topic_remapped,
- type,
qos,
- publisher_options);
+ *typesupport_handle,
+ topic_options.callback_group());
// Create subscription for the 'from_domain'
auto subscription = this->create_subscription(
from_domain_node,
publisher,
topic,
- type,
qos,
- subscription_options);
+ *typesupport_handle,
+ topic_options.callback_group());
this->bridged_topics_[topic_bridge] = {publisher, subscription};
};
diff --git a/src/domain_bridge/generic_publisher.cpp b/src/domain_bridge/generic_publisher.cpp
new file mode 100644
index 0000000..ad55a81
--- /dev/null
+++ b/src/domain_bridge/generic_publisher.cpp
@@ -0,0 +1,55 @@
+// Copyright 2018, Bosch Software Innovations GmbH.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// NOTE: This file was directly copied from rosbag2_transport
+// TODO(jacobperron): Replace with upstream implementation when available
+// https://github.com/ros2/rclcpp/pull/1452
+
+#include "generic_publisher.hpp"
+
+#include
+#include
+
+namespace
+{
+rcl_publisher_options_t rosbag2_get_publisher_options(const rclcpp::QoS & qos)
+{
+ auto options = rcl_publisher_get_default_options();
+ options.qos = qos.get_rmw_qos_profile();
+ return options;
+}
+} // unnamed namespace
+
+namespace domain_bridge
+{
+
+GenericPublisher::GenericPublisher(
+ rclcpp::node_interfaces::NodeBaseInterface * node_base,
+ const rosidl_message_type_support_t & type_support,
+ const std::string & topic_name,
+ const rclcpp::QoS & qos)
+: rclcpp::PublisherBase(node_base, topic_name, type_support, rosbag2_get_publisher_options(qos))
+{}
+
+void GenericPublisher::publish(const rmw_serialized_message_t & message)
+{
+ auto return_code = rcl_publish_serialized_message(
+ get_publisher_handle().get(), &message, nullptr);
+
+ if (return_code != RCL_RET_OK) {
+ rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to publish serialized message");
+ }
+}
+
+} // namespace domain_bridge
diff --git a/src/domain_bridge/generic_publisher.hpp b/src/domain_bridge/generic_publisher.hpp
new file mode 100644
index 0000000..9a5d85f
--- /dev/null
+++ b/src/domain_bridge/generic_publisher.hpp
@@ -0,0 +1,46 @@
+// Copyright 2018, Bosch Software Innovations GmbH.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// NOTE: This file was directly copied from rosbag2_transport
+// TODO(jacobperron): Replace with upstream implementation when available
+// https://github.com/ros2/rclcpp/pull/1452
+
+#ifndef DOMAIN_BRIDGE__GENERIC_PUBLISHER_HPP_
+#define DOMAIN_BRIDGE__GENERIC_PUBLISHER_HPP_
+
+#include
+#include
+
+#include "rclcpp/rclcpp.hpp"
+
+namespace domain_bridge
+{
+
+class GenericPublisher : public rclcpp::PublisherBase
+{
+public:
+ GenericPublisher(
+ rclcpp::node_interfaces::NodeBaseInterface * node_base,
+ const rosidl_message_type_support_t & type_support,
+ const std::string & topic_name,
+ const rclcpp::QoS & qos);
+
+ virtual ~GenericPublisher() = default;
+
+ void publish(const rmw_serialized_message_t & message);
+};
+
+} // namespace domain_bridge
+
+#endif // DOMAIN_BRIDGE__GENERIC_PUBLISHER_HPP_
diff --git a/src/domain_bridge/generic_subscription.cpp b/src/domain_bridge/generic_subscription.cpp
new file mode 100644
index 0000000..55caaaf
--- /dev/null
+++ b/src/domain_bridge/generic_subscription.cpp
@@ -0,0 +1,115 @@
+// Copyright 2018, Bosch Software Innovations GmbH.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// NOTE: This file was directly copied from rosbag2_transport
+// TODO(jacobperron): Replace with upstream implementation when available
+// https://github.com/ros2/rclcpp/pull/1452
+
+#include "generic_subscription.hpp"
+
+#include
+#include
+
+#include "rclcpp/any_subscription_callback.hpp"
+#include "rclcpp/subscription.hpp"
+
+namespace
+{
+rcl_subscription_options_t rosbag2_get_subscription_options(const rclcpp::QoS & qos)
+{
+ auto options = rcl_subscription_get_default_options();
+ options.qos = qos.get_rmw_qos_profile();
+ return options;
+}
+} // unnamed namespace
+
+namespace domain_bridge
+{
+
+GenericSubscription::GenericSubscription(
+ rclcpp::node_interfaces::NodeBaseInterface * node_base,
+ const rosidl_message_type_support_t & ts,
+ const std::string & topic_name,
+ const rclcpp::QoS & qos,
+ std::function)> callback)
+: SubscriptionBase(
+ node_base,
+ ts,
+ topic_name,
+ rosbag2_get_subscription_options(qos),
+ true),
+ default_allocator_(rcutils_get_default_allocator()),
+ callback_(callback),
+ qos_(qos)
+{}
+
+std::shared_ptr GenericSubscription::create_message()
+{
+ return create_serialized_message();
+}
+
+std::shared_ptr GenericSubscription::create_serialized_message()
+{
+ return borrow_serialized_message(0);
+}
+
+void GenericSubscription::handle_message(
+ std::shared_ptr & message, const rclcpp::MessageInfo & message_info)
+{
+ (void) message;
+ (void) message_info;
+ throw std::runtime_error{"unexpected callback being called"};
+}
+
+void GenericSubscription::handle_loaned_message(
+ void * message, const rclcpp::MessageInfo & message_info)
+{
+ (void) message;
+ (void) message_info;
+ throw std::runtime_error{"unexpected callback being called"};
+}
+
+void GenericSubscription::handle_serialized_message(
+ const std::shared_ptr & serialized_message,
+ const rclcpp::MessageInfo & message_info)
+{
+ (void)message_info;
+ auto typed_message = std::static_pointer_cast(serialized_message);
+ callback_(typed_message);
+}
+
+void GenericSubscription::return_message(std::shared_ptr & message)
+{
+ auto typed_message = std::static_pointer_cast(message);
+ return_serialized_message(typed_message);
+}
+
+void GenericSubscription::return_serialized_message(
+ std::shared_ptr & message)
+{
+ message.reset();
+}
+
+const rclcpp::QoS & GenericSubscription::qos_profile() const
+{
+ return qos_;
+}
+
+std::shared_ptr
+GenericSubscription::borrow_serialized_message(size_t capacity)
+{
+ return std::make_shared(capacity);
+}
+
+} // namespace domain_bridge
diff --git a/src/domain_bridge/generic_subscription.hpp b/src/domain_bridge/generic_subscription.hpp
new file mode 100644
index 0000000..a293777
--- /dev/null
+++ b/src/domain_bridge/generic_subscription.hpp
@@ -0,0 +1,101 @@
+// Copyright 2018, Bosch Software Innovations GmbH.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// NOTE: This file was directly copied from rosbag2_transport
+// TODO(jacobperron): Replace with upstream implementation when available
+// https://github.com/ros2/rclcpp/pull/1452
+
+#ifndef DOMAIN_BRIDGE__GENERIC_SUBSCRIPTION_HPP_
+#define DOMAIN_BRIDGE__GENERIC_SUBSCRIPTION_HPP_
+
+#include
+#include
+
+#include "rclcpp/any_subscription_callback.hpp"
+#include "rclcpp/macros.hpp"
+#include "rclcpp/serialization.hpp"
+#include "rclcpp/subscription.hpp"
+
+namespace domain_bridge
+{
+
+/**
+ * This class is an implementation of an rclcpp::Subscription for serialized messages whose topic
+ * is not known at compile time (hence templating does not work).
+ *
+ * It does not support intra-process handling
+ */
+class GenericSubscription : public rclcpp::SubscriptionBase
+{
+public:
+ // cppcheck-suppress unknownMacro
+ RCLCPP_SMART_PTR_DEFINITIONS(GenericSubscription)
+
+ /**
+ * Constructor. In order to properly subscribe to a topic, this subscription needs to be added to
+ * the node_topic_interface of the node passed into this constructor.
+ *
+ * \param node_base NodeBaseInterface pointer used in parts of the setup.
+ * \param ts Type support handle
+ * \param topic_name Topic name
+ * \param callback Callback for new messages of serialized form
+ */
+ GenericSubscription(
+ rclcpp::node_interfaces::NodeBaseInterface * node_base,
+ const rosidl_message_type_support_t & ts,
+ const std::string & topic_name,
+ const rclcpp::QoS & qos,
+ std::function)> callback);
+
+ // Same as create_serialized_message() as the subscription is to serialized_messages only
+ std::shared_ptr create_message() override;
+
+ std::shared_ptr create_serialized_message() override;
+
+ void handle_message(
+ std::shared_ptr & message, const rclcpp::MessageInfo & message_info) override;
+
+ void handle_loaned_message(
+ void * loaned_message, const rclcpp::MessageInfo & message_info) override;
+
+ // Post-Galactic, handle_serialized_message is a pure virtual function in
+ // rclcpp::SubscriptionBase, so we must override it. However, in order to
+ // make this change compatible with both Galactic and later, we leave off
+ // the 'override' flag.
+ void
+ handle_serialized_message(
+ const std::shared_ptr & serialized_message,
+ const rclcpp::MessageInfo & message_info);
+
+ // Same as return_serialized_message() as the subscription is to serialized_messages only
+ void return_message(std::shared_ptr & message) override;
+
+ void return_serialized_message(std::shared_ptr & message) override;
+
+ // Provide a const reference to the QoS Profile used to create this subscription.
+ const rclcpp::QoS & qos_profile() const;
+
+private:
+ // cppcheck-suppress unknownMacro
+ RCLCPP_DISABLE_COPY(GenericSubscription)
+
+ std::shared_ptr borrow_serialized_message(size_t capacity);
+ rcutils_allocator_t default_allocator_;
+ std::function)> callback_;
+ const rclcpp::QoS qos_;
+};
+
+} // namespace domain_bridge
+
+#endif // DOMAIN_BRIDGE__GENERIC_SUBSCRIPTION_HPP_