From 1e90391ef96186ed5be8f494bbec15dc82c4b07b Mon Sep 17 00:00:00 2001 From: Russell Mull Date: Thu, 1 Jun 2023 11:00:15 -0700 Subject: [PATCH 1/5] Fixes for Ros2 Humble --- modality-ros-hook/src/hooks.rs | 25 +++++---- modality-ros-hook/src/interop.rs | 44 +++++++++++++++- modality-ros-hook/src/message_processor.rs | 59 ++++++++++++++++------ 3 files changed, 100 insertions(+), 28 deletions(-) diff --git a/modality-ros-hook/src/hooks.rs b/modality-ros-hook/src/hooks.rs index d0fa0f8..3ce8f60 100644 --- a/modality-ros-hook/src/hooks.rs +++ b/modality-ros-hook/src/hooks.rs @@ -62,13 +62,13 @@ struct NodeState { /// The unique identifier (gid) of a publisher. #[derive(Copy, Clone, Debug)] -pub struct PublisherGraphId([u8; 24]); +pub struct PublisherGraphId([u8; 16]); impl std::fmt::Display for PublisherGraphId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("0x")?; for b in self.0.iter() { - write!(f, "{b:0x}")?; + write!(f, "{b:02x}")?; } Ok(()) @@ -87,8 +87,12 @@ struct PublisherState { #[derive(Debug)] pub enum MessageDirection { - Send, - Receive, + Send { + local_publisher_graph_id: Option, + }, + Receive { + remote_publisher_graph_id: Option, + }, } type SubscriptionAddress = usize; @@ -108,7 +112,6 @@ pub struct CapturedMessage<'p> { pub node_namespace: &'p str, pub node_name: &'p str, pub direction: MessageDirection, - pub publisher_graph_id: Option, } #[derive(Debug)] @@ -226,9 +229,9 @@ redhook::hook! { let topic_name: &'static str = &*(pub_state.topic_name.as_str() as *const _); let node_namespace: &'static str = &*(pub_state.node_namespace.as_str() as *const _); let node_name: &'static str = &*(pub_state.node_name.as_str() as *const _); - let publisher_graph_id = pub_state.graph_id; - let direction = MessageDirection::Send; - let captured_message = CapturedMessage { kvs, topic_name, node_namespace, node_name, direction, publisher_graph_id }; + // TODO ref here + let direction = MessageDirection::Send { local_publisher_graph_id: pub_state.graph_id.clone() }; + let captured_message = CapturedMessage { kvs, topic_name, node_namespace, node_name, direction }; let _called_after_dest = LAST_CAPTURED_MESSAGE.try_with(|lcm| { *lcm.borrow_mut() = Some(captured_message); }).is_err(); @@ -334,11 +337,11 @@ redhook::hook! { let node_namespace: &'static str = &*(sub_state.node_namespace.as_str() as *const _); let node_name: &'static str = &*(sub_state.node_name.as_str() as *const _); - let direction = MessageDirection::Receive; - let publisher_graph_id = Some(PublisherGraphId((*message_info).publisher_gid.data)); + let remote_publisher_graph_id = Some(PublisherGraphId((*message_info).publisher_gid.data)); + let direction = MessageDirection::Receive { remote_publisher_graph_id }; let msg = CapturedMessageWithTime { - msg: CapturedMessage { kvs, topic_name, node_namespace, node_name, direction, publisher_graph_id }, + msg: CapturedMessage { kvs, topic_name, node_namespace, node_name, direction}, publish_time: CapturedTime::SignedEpochNanos((*message_info).source_timestamp), receive_time: Some(CapturedTime::SignedEpochNanos((*message_info).received_timestamp)), }; diff --git a/modality-ros-hook/src/interop.rs b/modality-ros-hook/src/interop.rs index 72b2d03..c74ee25 100644 --- a/modality-ros-hook/src/interop.rs +++ b/modality-ros-hook/src/interop.rs @@ -361,6 +361,46 @@ pub struct RmwMessageInfo { /// This is ns since the unix epoch pub received_timestamp: i64, + /// Sequence number of the received message set by the publisher. + /// + /// This sequence number is set by the publisher and therefore uniquely identifies + /// a message when combined with the publisher GID. + /// For long running applications, the sequence number might wrap around at some point. + /// + /// If the rmw implementation doesn't support sequence numbers, its value will be + /// RMW_MESSAGE_INFO_SEQUENCE_NUMBER_UNSUPPORTED. + pub publication_sequence_number: u64, + + /// Sequence number of the received message set by the subscription. + /// + /// This sequence number is set by the subscription regardless of which + /// publisher sent the message. + /// For long running applications, the sequence number might wrap around at some point. + /// + /// If the rmw implementation doesn't support sequence numbers, its value will be + /// RMW_MESSAGE_INFO_SEQUENCE_NUMBER_UNSUPPORTED. + pub reception_sequence_number: u64, + + /// Global unique identifier of the publisher that sent the message. + /// + /// The identifier uniquely identifies the publisher for the local context, but + /// it will not necessarily be the same identifier given in other contexts or processes + /// for the same publisher. + /// Therefore the identifier will uniquely identify the publisher within your application + /// but may disagree about the identifier for that publisher when compared to another + /// application. + /// Even with this limitation, when combined with the publisher sequence number it can + /// uniquely identify a message within your local context. + /// Publisher GIDs generated by the rmw implementation could collide at some point, in which + /// case it is not possible to distinguish which publisher sent the message. + /// The details of how GIDs are generated are rmw implementation dependent. + /// + /// It is possible the the rmw implementation needs to reuse a publisher GID, + /// due to running out of unique identifiers or some other constraint, in which case + /// the rmw implementation may document what happens in that case, but that + /// behavior is not defined here. + /// However, this should be avoided, if at all possible, by the rmw implementation, + /// and should be unlikely to happen in practice. pub publisher_gid: RmwGid, /// Whether this message is from intra_process communication or not @@ -375,14 +415,14 @@ pub struct RmwGid { pub implementation_identifier: *const c_char, /// Byte data Gid value - pub data: [u8; 24], + pub data: [u8; 16], } impl Default for RmwGid { fn default() -> Self { Self { implementation_identifier: std::ptr::null(), - data: [0; 24], + data: [0; 16], } } } diff --git a/modality-ros-hook/src/message_processor.rs b/modality-ros-hook/src/message_processor.rs index dfe2a51..0c2d39b 100644 --- a/modality-ros-hook/src/message_processor.rs +++ b/modality-ros-hook/src/message_processor.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use fxhash::FxHashMap; +use fxhash::{FxHashMap, FxHashSet}; use modality_api::{AttrVal, Nanoseconds, TimelineId, Uuid}; use modality_ingest_client::{ dynamic::DynamicIngestClient, protocol::InternedAttrKey, IngestClient, @@ -10,8 +10,9 @@ use crate::hooks::{CapturedMessageWithTime, MessageDirection}; pub struct MessageProcessor { client: modality_ingest_client::dynamic::DynamicIngestClient, - attr_cache: FxHashMap, + attr_key_cache: FxHashMap, timeline_cache: FxHashMap, + sent_timeline_gid: FxHashSet, currently_open_timeline: Option, ordering: u128, } @@ -32,8 +33,9 @@ impl MessageProcessor { IngestClient::connect_with_standard_config(Duration::from_secs(20), None, None).await?; Ok(MessageProcessor { client: DynamicIngestClient::from(client), - attr_cache: Default::default(), + attr_key_cache: Default::default(), timeline_cache: Default::default(), + sent_timeline_gid: Default::default(), ordering: 0, currently_open_timeline: None, }) @@ -71,21 +73,46 @@ impl MessageProcessor { )]; match captured_message.msg.direction { - MessageDirection::Send => { + MessageDirection::Send { + local_publisher_graph_id, + } => { + if let Some(local_gid) = local_publisher_graph_id { + let key = TimelineCacheKey { + node_namespace: captured_message.msg.node_namespace, + node_name: captured_message.msg.node_name, + }; + + if self.sent_timeline_gid.insert(key) { + let k = self.interned_attr_key("timeline.ros.publisher_gid").await?; + let v = AttrVal::String(local_gid.to_string()); + self.client.timeline_metadata([(k, v)]).await?; + } + } + if let Some(t) = captured_message.publish_time.to_epoch_nanos() { event_attrs.push(( self.interned_attr_key("event.timestamp").await?, AttrVal::Timestamp(t), )); } - if let Some(gid) = captured_message.msg.publisher_graph_id { - event_attrs.push(( - self.interned_attr_key("event.ros.publisher_gid").await?, - AttrVal::String(gid.to_string()), - )); + + match captured_message.publish_time { + crate::hooks::CapturedTime::Compound { sec, nsec } => { + event_attrs.push(( + self.interned_attr_key("event.dds_time.sec").await?, + AttrVal::Integer(sec), + )); + event_attrs.push(( + self.interned_attr_key("event.dds_time.nanosec").await?, + AttrVal::Integer(nsec), + )); + } + crate::hooks::CapturedTime::SignedEpochNanos(_) => todo!(), } } - MessageDirection::Receive => { + MessageDirection::Receive { + remote_publisher_graph_id, + } => { if let Some(t) = captured_message.publish_time.to_epoch_nanos() { event_attrs.push(( self.interned_attr_key("event.interaction.remote_event.timestamp") @@ -94,10 +121,12 @@ impl MessageProcessor { )); } - if let Some(gid) = captured_message.msg.publisher_graph_id { + if let Some(gid) = remote_publisher_graph_id { event_attrs.push(( - self.interned_attr_key("event.interaction.remote_event.ros.publisher_gid") - .await?, + self.interned_attr_key( + "event.interaction.remote_timeline.ros.publisher_gid", + ) + .await?, AttrVal::String(gid.to_string()), )); } @@ -201,12 +230,12 @@ impl MessageProcessor { &mut self, name: &str, ) -> Result> { - if let Some(ik) = self.attr_cache.get(name) { + if let Some(ik) = self.attr_key_cache.get(name) { return Ok(*ik); } let ik = self.client.declare_attr_key(name.to_string()).await?; - self.attr_cache.insert(name.to_string(), ik); + self.attr_key_cache.insert(name.to_string(), ik); Ok(ik) } } From 2d941fe951ccfe8f15349cf6529947b495a30dd7 Mon Sep 17 00:00:00 2001 From: Russell Mull Date: Thu, 1 Jun 2023 14:52:50 -0700 Subject: [PATCH 2/5] Work with many publishers per node --- modality-ros-hook/src/hooks.rs | 4 +- modality-ros-hook/src/message_processor.rs | 83 ++++++++++------------ 2 files changed, 40 insertions(+), 47 deletions(-) diff --git a/modality-ros-hook/src/hooks.rs b/modality-ros-hook/src/hooks.rs index 3ce8f60..af2e063 100644 --- a/modality-ros-hook/src/hooks.rs +++ b/modality-ros-hook/src/hooks.rs @@ -61,7 +61,7 @@ struct NodeState { } /// The unique identifier (gid) of a publisher. -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub struct PublisherGraphId([u8; 16]); impl std::fmt::Display for PublisherGraphId { @@ -204,7 +204,7 @@ redhook::hook! { } publisher_address } else { - // If someone tries to createa publisher against an + // If someone tries to create a publisher against an // uninitialized node, I guess? This is probably an error, // and really shouldn't be happening. redhook::real!(rmw_create_publisher) diff --git a/modality-ros-hook/src/message_processor.rs b/modality-ros-hook/src/message_processor.rs index 0c2d39b..da610a4 100644 --- a/modality-ros-hook/src/message_processor.rs +++ b/modality-ros-hook/src/message_processor.rs @@ -6,13 +6,13 @@ use modality_ingest_client::{ dynamic::DynamicIngestClient, protocol::InternedAttrKey, IngestClient, }; -use crate::hooks::{CapturedMessageWithTime, MessageDirection}; +use crate::hooks::{CapturedMessageWithTime, MessageDirection, PublisherGraphId}; pub struct MessageProcessor { client: modality_ingest_client::dynamic::DynamicIngestClient, attr_key_cache: FxHashMap, timeline_cache: FxHashMap, - sent_timeline_gid: FxHashSet, + sent_timeline_publisher_metadata: FxHashSet<(TimelineCacheKey, PublisherGraphId)>, currently_open_timeline: Option, ordering: u128, } @@ -35,7 +35,7 @@ impl MessageProcessor { client: DynamicIngestClient::from(client), attr_key_cache: Default::default(), timeline_cache: Default::default(), - sent_timeline_gid: Default::default(), + sent_timeline_publisher_metadata: Default::default(), ordering: 0, currently_open_timeline: None, }) @@ -58,7 +58,6 @@ impl MessageProcessor { self.open_timeline( captured_message.msg.node_namespace, captured_message.msg.node_name, - captured_message.msg.topic_name, ) .await?; @@ -67,6 +66,9 @@ impl MessageProcessor { event_name.remove(0); } + // http://wiki.ros.org/Names claims that names can't have '.' in them + let normalized_topic_name = event_name.replace("/", "."); + let mut event_attrs = vec![( self.interned_attr_key("event.name").await?, AttrVal::String(event_name), @@ -82,10 +84,19 @@ impl MessageProcessor { node_name: captured_message.msg.node_name, }; - if self.sent_timeline_gid.insert(key) { - let k = self.interned_attr_key("timeline.ros.publisher_gid").await?; - let v = AttrVal::String(local_gid.to_string()); - self.client.timeline_metadata([(k, v)]).await?; + if self + .sent_timeline_publisher_metadata + .insert((key, local_gid.clone())) + { + let attrs = vec![( + self.interned_attr_key(&format!( + "timeline.ros.publisher_gid.{normalized_topic_name}" + )) + .await?, + AttrVal::String(local_gid.to_string()), + )]; + + self.client.timeline_metadata(attrs).await?; } } @@ -124,7 +135,7 @@ impl MessageProcessor { if let Some(gid) = remote_publisher_graph_id { event_attrs.push(( self.interned_attr_key( - "event.interaction.remote_timeline.ros.publisher_gid", + &format!("event.interaction.remote_timeline.ros.publisher_gid.{normalized_topic_name}") ) .await?, AttrVal::String(gid.to_string()), @@ -172,7 +183,6 @@ impl MessageProcessor { &mut self, node_namespace: &'static str, node_name: &'static str, - topic_name: &'static str, ) -> Result<(), Box> { let key = TimelineCacheKey { node_namespace, @@ -185,45 +195,28 @@ impl MessageProcessor { self.client.open_timeline(tl_cache_val.id).await?; self.currently_open_timeline = Some(tl_cache_val.id); } - return Ok(()); - } - - // We haven't seen this timeline before, so send its metadata - let timeline_id = TimelineId::from(Uuid::new_v4()); - self.client.open_timeline(timeline_id).await?; - self.currently_open_timeline = Some(timeline_id); - - let tl_attrs = self - .timeline_attrs(key.node_namespace, key.node_name, topic_name) - .await?; - self.client.timeline_metadata(tl_attrs).await?; + } else { + // We haven't seen this timeline before, so allocate an id and send its basic metadata + let timeline_id = TimelineId::from(Uuid::new_v4()); + self.client.open_timeline(timeline_id).await?; + self.currently_open_timeline = Some(timeline_id); + + let mut timeline_name = format!("{node_namespace}/{node_name}"); + while timeline_name.starts_with('/') { + timeline_name.remove(0); + } - self.timeline_cache - .insert(key, TimelineCacheValue { id: timeline_id }); - Ok(()) - } + let tl_attrs = vec![( + self.interned_attr_key("timeline.name").await?, + AttrVal::String(timeline_name), + )]; - async fn timeline_attrs( - &mut self, - node_namespace: &str, - node_name: &str, - topic_name: &str, - ) -> Result, Box> { - let mut timeline_name = format!("{node_namespace}{node_name}"); - if timeline_name.starts_with('/') { - timeline_name.remove(0); + self.client.timeline_metadata(tl_attrs).await?; + self.timeline_cache + .insert(key, TimelineCacheValue { id: timeline_id }); } - Ok(vec![ - ( - self.interned_attr_key("timeline.name").await?, - AttrVal::String(timeline_name), - ), - ( - self.interned_attr_key("timeline.ros.topic").await?, - AttrVal::String(topic_name.to_string()), - ), - ]) + Ok(()) } async fn interned_attr_key( From d6d5384c0ee3d7b4504c9395ce229022c552be95 Mon Sep 17 00:00:00 2001 From: Jon Lamb Date: Fri, 2 Jun 2023 03:12:10 -0700 Subject: [PATCH 3/5] It's too early, clippy --- modality-ros-hook/src/hooks.rs | 2 +- modality-ros-hook/src/message_processor.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modality-ros-hook/src/hooks.rs b/modality-ros-hook/src/hooks.rs index af2e063..166f16b 100644 --- a/modality-ros-hook/src/hooks.rs +++ b/modality-ros-hook/src/hooks.rs @@ -230,7 +230,7 @@ redhook::hook! { let node_namespace: &'static str = &*(pub_state.node_namespace.as_str() as *const _); let node_name: &'static str = &*(pub_state.node_name.as_str() as *const _); // TODO ref here - let direction = MessageDirection::Send { local_publisher_graph_id: pub_state.graph_id.clone() }; + let direction = MessageDirection::Send { local_publisher_graph_id: pub_state.graph_id }; let captured_message = CapturedMessage { kvs, topic_name, node_namespace, node_name, direction }; let _called_after_dest = LAST_CAPTURED_MESSAGE.try_with(|lcm| { *lcm.borrow_mut() = Some(captured_message); diff --git a/modality-ros-hook/src/message_processor.rs b/modality-ros-hook/src/message_processor.rs index da610a4..1e4c7fc 100644 --- a/modality-ros-hook/src/message_processor.rs +++ b/modality-ros-hook/src/message_processor.rs @@ -67,7 +67,7 @@ impl MessageProcessor { } // http://wiki.ros.org/Names claims that names can't have '.' in them - let normalized_topic_name = event_name.replace("/", "."); + let normalized_topic_name = event_name.replace('/', "."); let mut event_attrs = vec![( self.interned_attr_key("event.name").await?, @@ -86,7 +86,7 @@ impl MessageProcessor { if self .sent_timeline_publisher_metadata - .insert((key, local_gid.clone())) + .insert((key, local_gid)) { let attrs = vec![( self.interned_attr_key(&format!( From 2437ff2007e28e5fd708995fe02f22bd13f1cff1 Mon Sep 17 00:00:00 2001 From: Jon Lamb Date: Fri, 2 Jun 2023 03:12:42 -0700 Subject: [PATCH 4/5] Trigger CI on push --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 275ee57..8da685c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,6 @@ name: CI -on: [push, pull_request] +on: [push] jobs: lint: From f3f576d66561c98c56381641678f7ac5ddcdadc3 Mon Sep 17 00:00:00 2001 From: Jon Lamb Date: Fri, 2 Jun 2023 03:13:06 -0700 Subject: [PATCH 5/5] Bump version --- modality-ros-hook/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modality-ros-hook/Cargo.toml b/modality-ros-hook/Cargo.toml index bba9b1f..589eda9 100644 --- a/modality-ros-hook/Cargo.toml +++ b/modality-ros-hook/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "modality-ros-hook" -version = "0.1.1" +version = "0.1.2" edition = "2021" authors = [ "Russell Mull ",