Skip to content

Commit

Permalink
Add Foxy support
Browse files Browse the repository at this point in the history
Signed-off-by: Abrar Rahman Protyasha <[email protected]>

Going from `SerializedPublisher` to generic

Signed-off-by: Abrar Rahman Protyasha <[email protected]>

Migrate from rclcpp generic pub/sub

With this patch, the package builds successfully in rolling, but there
are failing tests to address, and all of this is before having built in
foxy at all (and before adding back some of the `create_subscription`
code from PR 24).

Signed-off-by: Abrar Rahman Protyasha <[email protected]>

Add missing compress/decompress logic

Signed-off-by: Abrar Rahman Protyasha <[email protected]>

Update generic pub sub source

Signed-off-by: Abrar Rahman Protyasha <[email protected]>

Publish the message rather than its shared ptr

Signed-off-by: Abrar Rahman Protyasha <[email protected]>

Rename connext foxy package

Signed-off-by: Abrar Rahman Protyasha <[email protected]>

Address uncrustify error

Signed-off-by: Abrar Rahman Protyasha <[email protected]>

Migrate back to rosidl_target_interfaces

Signed-off-by: Abrar Rahman Protyasha <[email protected]>
  • Loading branch information
aprotyas committed Oct 28, 2021
1 parent 3030c02 commit 0c05dab
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 68 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ add_library(${PROJECT_NAME}_lib SHARED
src/${PROJECT_NAME}/compress_messages.cpp
src/${PROJECT_NAME}/domain_bridge.cpp
src/${PROJECT_NAME}/domain_bridge_options.cpp
src/${PROJECT_NAME}/generic_publisher.cpp
src/${PROJECT_NAME}/generic_subscription.cpp
src/${PROJECT_NAME}/parse_domain_bridge_yaml_config.cpp
src/${PROJECT_NAME}/qos_options.cpp
src/${PROJECT_NAME}/service_bridge_options.cpp
Expand Down
2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<test_depend>test_msgs</test_depend>

<!-- Ensure the following Tier 1 RMW implementations are available for testing -->
<test_depend>rmw_connextdds</test_depend>
<test_depend>rmw_connext_cpp</test_depend>
<test_depend>rmw_cyclonedds_cpp</test_depend>
<test_depend>rmw_fastrtps_cpp</test_depend>

Expand Down
125 changes: 58 additions & 67 deletions src/domain_bridge/domain_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

#include "rclcpp/executor.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/generic_publisher.hpp"
#include "rclcpp/generic_subscription.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/serialization.hpp"
#include "rosbag2_cpp/typesupport_helpers.hpp"
Expand All @@ -44,43 +42,24 @@
#include "domain_bridge/topic_bridge_options.hpp"
#include "domain_bridge/msg/compressed_msg.hpp"

#include "generic_publisher.hpp"
#include "generic_subscription.hpp"
#include "wait_for_graph_events.hpp"

namespace domain_bridge
{

/// \internal
/**
* A hack, because PublisherBase doesn't support publishing serialized messages and because
* GenericPublisher cannot be created with a typesupport handle :/
*/
class SerializedPublisher
[[noreturn]] static void unreachable()
{
public:
template<typename MessageT, typename AllocatorT>
explicit SerializedPublisher(std::shared_ptr<rclcpp::Publisher<MessageT, AllocatorT>> impl)
: impl_(std::move(impl)),
publish_method_pointer_(static_cast<decltype(publish_method_pointer_)>(
&rclcpp::Publisher<MessageT, AllocatorT>::publish))
{}

explicit SerializedPublisher(std::shared_ptr<rclcpp::GenericPublisher> impl)
: impl_(std::move(impl)),
publish_method_pointer_(static_cast<decltype(publish_method_pointer_)>(
&rclcpp::GenericPublisher::publish))
{}

void publish(const rclcpp::SerializedMessage & message)
{
((*impl_).*publish_method_pointer_)(message); // this is a bit horrible, but it works ...
}

private:
std::shared_ptr<rclcpp::PublisherBase> impl_;
using PointerToMemberMethod =
void (rclcpp::PublisherBase::*)(const rclcpp::SerializedMessage & message);
PointerToMemberMethod publish_method_pointer_;
};
#if defined(__has_builtin)
#if __has_builtin(__builtin_unreachable)
__builtin_unreachable();
#endif
#elif (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 5))
__builtin_unreachable();
#endif
throw std::logic_error("This code should be unreachable.");
}

/// Implementation of \ref DomainBridge.
class DomainBridgeImpl
Expand All @@ -90,8 +69,8 @@ class DomainBridgeImpl
using TopicBridgeMap = std::map<
TopicBridge,
std::pair<
std::shared_ptr<SerializedPublisher>,
std::shared_ptr<rclcpp::SubscriptionBase>>>;
std::shared_ptr<GenericPublisher>,
std::shared_ptr<GenericSubscription>>>;
using ServiceBridgeMap = std::map<
detail::ServiceBridge,
std::pair<std::shared_ptr<rclcpp::ServiceBase>, std::shared_ptr<rclcpp::ClientBase>>>;
Expand Down Expand Up @@ -195,32 +174,39 @@ class DomainBridgeImpl
type, "rosidl_typesupport_cpp");
}

std::shared_ptr<SerializedPublisher>
std::shared_ptr<GenericPublisher>
create_publisher(
rclcpp::Node::SharedPtr node,
const std::string & topic_name,
const std::string & type,
const rclcpp::QoS & qos,
rclcpp::PublisherOptionsWithAllocator<std::allocator<void>> & options)
const rosidl_message_type_support_t & typesupport_handle,
rclcpp::CallbackGroup::SharedPtr group)
{
std::shared_ptr<SerializedPublisher> publisher;
if (options_.mode() != DomainBridgeOptions::Mode::Compress) {
publisher = std::make_shared<SerializedPublisher>(
node->create_generic_publisher(topic_name, type, qos, options));
} else {
publisher = std::make_shared<SerializedPublisher>(
node->create_publisher<domain_bridge::msg::CompressedMsg>(topic_name, qos, options));
auto publisher = std::make_shared<GenericPublisher>(
node->get_node_base_interface().get(),
typesupport_handle,
topic_name,
qos);
node->get_node_topics_interface()->add_publisher(publisher, std::move(group));
return publisher;
}
auto publisher = std::make_shared<GenericPublisher>(
node->get_node_base_interface().get(),
*rosidl_typesupport_cpp::get_message_type_support_handle<domain_bridge::msg::CompressedMsg>(),
topic_name,
qos);
node->get_node_topics_interface()->add_publisher(publisher, std::move(group));
return publisher;
}

std::shared_ptr<rclcpp::SubscriptionBase> create_subscription(
std::shared_ptr<GenericSubscription> create_subscription(
rclcpp::Node::SharedPtr node,
std::shared_ptr<SerializedPublisher> publisher,
std::shared_ptr<GenericPublisher> publisher,
const std::string & topic_name,
const std::string & type,
const rclcpp::QoS & qos,
rclcpp::SubscriptionOptionsWithAllocator<std::allocator<void>> & options)
const rosidl_message_type_support_t & typesupport_handle,
rclcpp::CallbackGroup::SharedPtr group)
{
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>)> callback;
switch (options_.mode()) {
Expand All @@ -236,7 +222,7 @@ class DomainBridgeImpl
compressed_msg.data = domain_bridge::compress_message(cctx, std::move(*msg));
rclcpp::SerializedMessage serialized_compressed_msg;
serializer.serialize_message(&compressed_msg, &serialized_compressed_msg);
publisher->publish(serialized_compressed_msg);
publisher->publish(serialized_compressed_msg.get_rcl_serialized_message());
};
break;
case DomainBridgeOptions::Mode::Decompress:
Expand All @@ -251,31 +237,39 @@ class DomainBridgeImpl
serializer.deserialize_message(serialized_compressed_msg.get(), &compressed_msg);
rclcpp::SerializedMessage msg = domain_bridge::decompress_message(
dctx, std::move(compressed_msg.data));
publisher->publish(msg);
publisher->publish(msg.get_rcl_serialized_message());
};
break;
default: // fallthrough
case DomainBridgeOptions::Mode::Normal:
callback = [publisher](std::shared_ptr<rclcpp::SerializedMessage> msg) {
// Publish message into the other domain
publisher->publish(*msg);
publisher->publish(msg->get_rcl_serialized_message());
};
break;
}
if (options_.mode() != DomainBridgeOptions::Mode::Decompress) {
// Create subscription
return node->create_generic_subscription(
auto subscription = std::make_shared<GenericSubscription>(
node->get_node_base_interface().get(),
typesupport_handle,
topic_name,
type,
qos,
callback,
options);
callback);
node->get_node_topics_interface()->add_subscription(subscription, std::move(group));
return subscription;
}
return node->create_subscription<domain_bridge::msg::CompressedMsg>(
auto subscription = std::make_shared<GenericSubscription>(
node->get_node_base_interface().get(),
*rosidl_typesupport_cpp::get_message_type_support_handle<domain_bridge::msg::CompressedMsg>(),
topic_name,
qos,
callback,
options);
[publisher](std::shared_ptr<rclcpp::SerializedMessage> msg) {
// Publish message into the other domain
publisher->publish(msg->get_rcl_serialized_message());
});
node->get_node_topics_interface()->add_subscription(subscription, std::move(group));
return subscription;
}

void bridge_topic(
Expand Down Expand Up @@ -378,29 +372,26 @@ class DomainBridgeImpl
std::cerr << warning << std::endl;
}

rclcpp::PublisherOptionsWithAllocator<std::allocator<void>> publisher_options;
rclcpp::SubscriptionOptionsWithAllocator<std::allocator<void>> subscription_options;

publisher_options.callback_group = topic_options.callback_group();
subscription_options.callback_group = topic_options.callback_group();
auto typesupport_handle = rosbag2_cpp::get_typesupport_handle(
type, "rosidl_typesupport_cpp", loaded_typesupports_.at(type));

// Create publisher for the 'to_domain'
// The publisher should be created first so it is available to the subscription callback
auto publisher = this->create_publisher(
to_domain_node,
topic_remapped,
type,
qos,
publisher_options);
*typesupport_handle,
topic_options.callback_group());

// Create subscription for the 'from_domain'
auto subscription = this->create_subscription(
from_domain_node,
publisher,
topic,
type,
qos,
subscription_options);
*typesupport_handle,
topic_options.callback_group());

this->bridged_topics_[topic_bridge] = {publisher, subscription};
};
Expand Down
55 changes: 55 additions & 0 deletions src/domain_bridge/generic_publisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: This file was directly copied from rosbag2_transport
// TODO(jacobperron): Replace with upstream implementation when available
// https://github.com/ros2/rclcpp/pull/1452

#include "generic_publisher.hpp"

#include <memory>
#include <string>

namespace
{
rcl_publisher_options_t rosbag2_get_publisher_options(const rclcpp::QoS & qos)
{
auto options = rcl_publisher_get_default_options();
options.qos = qos.get_rmw_qos_profile();
return options;
}
} // unnamed namespace

namespace domain_bridge
{

GenericPublisher::GenericPublisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & type_support,
const std::string & topic_name,
const rclcpp::QoS & qos)
: rclcpp::PublisherBase(node_base, topic_name, type_support, rosbag2_get_publisher_options(qos))
{}

void GenericPublisher::publish(const rmw_serialized_message_t & message)
{
auto return_code = rcl_publish_serialized_message(
get_publisher_handle().get(), &message, nullptr);

if (return_code != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to publish serialized message");
}
}

} // namespace domain_bridge
46 changes: 46 additions & 0 deletions src/domain_bridge/generic_publisher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: This file was directly copied from rosbag2_transport
// TODO(jacobperron): Replace with upstream implementation when available
// https://github.com/ros2/rclcpp/pull/1452

#ifndef DOMAIN_BRIDGE__GENERIC_PUBLISHER_HPP_
#define DOMAIN_BRIDGE__GENERIC_PUBLISHER_HPP_

#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"

namespace domain_bridge
{

class GenericPublisher : public rclcpp::PublisherBase
{
public:
GenericPublisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & type_support,
const std::string & topic_name,
const rclcpp::QoS & qos);

virtual ~GenericPublisher() = default;

void publish(const rmw_serialized_message_t & message);
};

} // namespace domain_bridge

#endif // DOMAIN_BRIDGE__GENERIC_PUBLISHER_HPP_
Loading

0 comments on commit 0c05dab

Please sign in to comment.