Skip to content

Commit

Permalink
Merge branch 'humble-fixes'
Browse files Browse the repository at this point in the history
  • Loading branch information
jonlamb-gh committed Jun 2, 2023
2 parents d9d3044 + f3f576d commit d06b3ad
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: CI

on: [push, pull_request]
on: [push]

jobs:
lint:
Expand Down
2 changes: 1 addition & 1 deletion modality-ros-hook/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "modality-ros-hook"
version = "0.1.1"
version = "0.1.2"
edition = "2021"
authors = [
"Russell Mull <[email protected]>",
Expand Down
29 changes: 16 additions & 13 deletions modality-ros-hook/src/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ struct NodeState {
}

/// The unique identifier (gid) of a publisher.
#[derive(Copy, Clone, Debug)]
pub struct PublisherGraphId([u8; 24]);
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct PublisherGraphId([u8; 16]);

impl std::fmt::Display for PublisherGraphId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("0x")?;
for b in self.0.iter() {
write!(f, "{b:0x}")?;
write!(f, "{b:02x}")?;
}

Ok(())
Expand All @@ -87,8 +87,12 @@ struct PublisherState {

#[derive(Debug)]
pub enum MessageDirection {
Send,
Receive,
Send {
local_publisher_graph_id: Option<PublisherGraphId>,
},
Receive {
remote_publisher_graph_id: Option<PublisherGraphId>,
},
}

type SubscriptionAddress = usize;
Expand All @@ -108,7 +112,6 @@ pub struct CapturedMessage<'p> {
pub node_namespace: &'p str,
pub node_name: &'p str,
pub direction: MessageDirection,
pub publisher_graph_id: Option<PublisherGraphId>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -201,7 +204,7 @@ redhook::hook! {
}
publisher_address
} else {
// If someone tries to createa publisher against an
// If someone tries to create a publisher against an
// uninitialized node, I guess? This is probably an error,
// and really shouldn't be happening.
redhook::real!(rmw_create_publisher)
Expand All @@ -226,9 +229,9 @@ redhook::hook! {
let topic_name: &'static str = &*(pub_state.topic_name.as_str() as *const _);
let node_namespace: &'static str = &*(pub_state.node_namespace.as_str() as *const _);
let node_name: &'static str = &*(pub_state.node_name.as_str() as *const _);
let publisher_graph_id = pub_state.graph_id;
let direction = MessageDirection::Send;
let captured_message = CapturedMessage { kvs, topic_name, node_namespace, node_name, direction, publisher_graph_id };
// TODO ref here
let direction = MessageDirection::Send { local_publisher_graph_id: pub_state.graph_id };
let captured_message = CapturedMessage { kvs, topic_name, node_namespace, node_name, direction };
let _called_after_dest = LAST_CAPTURED_MESSAGE.try_with(|lcm| {
*lcm.borrow_mut() = Some(captured_message);
}).is_err();
Expand Down Expand Up @@ -334,11 +337,11 @@ redhook::hook! {
let node_namespace: &'static str = &*(sub_state.node_namespace.as_str() as *const _);
let node_name: &'static str = &*(sub_state.node_name.as_str() as *const _);

let direction = MessageDirection::Receive;
let publisher_graph_id = Some(PublisherGraphId((*message_info).publisher_gid.data));
let remote_publisher_graph_id = Some(PublisherGraphId((*message_info).publisher_gid.data));
let direction = MessageDirection::Receive { remote_publisher_graph_id };

let msg = CapturedMessageWithTime {
msg: CapturedMessage { kvs, topic_name, node_namespace, node_name, direction, publisher_graph_id },
msg: CapturedMessage { kvs, topic_name, node_namespace, node_name, direction},
publish_time: CapturedTime::SignedEpochNanos((*message_info).source_timestamp),
receive_time: Some(CapturedTime::SignedEpochNanos((*message_info).received_timestamp)),
};
Expand Down
44 changes: 42 additions & 2 deletions modality-ros-hook/src/interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,46 @@ pub struct RmwMessageInfo {
/// This is ns since the unix epoch
pub received_timestamp: i64,

/// Sequence number of the received message set by the publisher.
///
/// This sequence number is set by the publisher and therefore uniquely identifies
/// a message when combined with the publisher GID.
/// For long running applications, the sequence number might wrap around at some point.
///
/// If the rmw implementation doesn't support sequence numbers, its value will be
/// RMW_MESSAGE_INFO_SEQUENCE_NUMBER_UNSUPPORTED.
pub publication_sequence_number: u64,

/// Sequence number of the received message set by the subscription.
///
/// This sequence number is set by the subscription regardless of which
/// publisher sent the message.
/// For long running applications, the sequence number might wrap around at some point.
///
/// If the rmw implementation doesn't support sequence numbers, its value will be
/// RMW_MESSAGE_INFO_SEQUENCE_NUMBER_UNSUPPORTED.
pub reception_sequence_number: u64,

/// Global unique identifier of the publisher that sent the message.
///
/// The identifier uniquely identifies the publisher for the local context, but
/// it will not necessarily be the same identifier given in other contexts or processes
/// for the same publisher.
/// Therefore the identifier will uniquely identify the publisher within your application
/// but may disagree about the identifier for that publisher when compared to another
/// application.
/// Even with this limitation, when combined with the publisher sequence number it can
/// uniquely identify a message within your local context.
/// Publisher GIDs generated by the rmw implementation could collide at some point, in which
/// case it is not possible to distinguish which publisher sent the message.
/// The details of how GIDs are generated are rmw implementation dependent.
///
/// It is possible the the rmw implementation needs to reuse a publisher GID,
/// due to running out of unique identifiers or some other constraint, in which case
/// the rmw implementation may document what happens in that case, but that
/// behavior is not defined here.
/// However, this should be avoided, if at all possible, by the rmw implementation,
/// and should be unlikely to happen in practice.
pub publisher_gid: RmwGid,

/// Whether this message is from intra_process communication or not
Expand All @@ -375,14 +415,14 @@ pub struct RmwGid {
pub implementation_identifier: *const c_char,

/// Byte data Gid value
pub data: [u8; 24],
pub data: [u8; 16],
}

impl Default for RmwGid {
fn default() -> Self {
Self {
implementation_identifier: std::ptr::null(),
data: [0; 24],
data: [0; 16],
}
}
}
Expand Down
128 changes: 75 additions & 53 deletions modality-ros-hook/src/message_processor.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::time::Duration;

use fxhash::FxHashMap;
use fxhash::{FxHashMap, FxHashSet};
use modality_api::{AttrVal, Nanoseconds, TimelineId, Uuid};
use modality_ingest_client::{
dynamic::DynamicIngestClient, protocol::InternedAttrKey, IngestClient,
};

use crate::hooks::{CapturedMessageWithTime, MessageDirection};
use crate::hooks::{CapturedMessageWithTime, MessageDirection, PublisherGraphId};

pub struct MessageProcessor {
client: modality_ingest_client::dynamic::DynamicIngestClient,
attr_cache: FxHashMap<String, InternedAttrKey>,
attr_key_cache: FxHashMap<String, InternedAttrKey>,
timeline_cache: FxHashMap<TimelineCacheKey, TimelineCacheValue>,
sent_timeline_publisher_metadata: FxHashSet<(TimelineCacheKey, PublisherGraphId)>,
currently_open_timeline: Option<TimelineId>,
ordering: u128,
}
Expand All @@ -32,8 +33,9 @@ impl MessageProcessor {
IngestClient::connect_with_standard_config(Duration::from_secs(20), None, None).await?;
Ok(MessageProcessor {
client: DynamicIngestClient::from(client),
attr_cache: Default::default(),
attr_key_cache: Default::default(),
timeline_cache: Default::default(),
sent_timeline_publisher_metadata: Default::default(),
ordering: 0,
currently_open_timeline: None,
})
Expand All @@ -56,7 +58,6 @@ impl MessageProcessor {
self.open_timeline(
captured_message.msg.node_namespace,
captured_message.msg.node_name,
captured_message.msg.topic_name,
)
.await?;

Expand All @@ -65,27 +66,64 @@ impl MessageProcessor {
event_name.remove(0);
}

// http://wiki.ros.org/Names claims that names can't have '.' in them
let normalized_topic_name = event_name.replace('/', ".");

let mut event_attrs = vec![(
self.interned_attr_key("event.name").await?,
AttrVal::String(event_name),
)];

match captured_message.msg.direction {
MessageDirection::Send => {
MessageDirection::Send {
local_publisher_graph_id,
} => {
if let Some(local_gid) = local_publisher_graph_id {
let key = TimelineCacheKey {
node_namespace: captured_message.msg.node_namespace,
node_name: captured_message.msg.node_name,
};

if self
.sent_timeline_publisher_metadata
.insert((key, local_gid))
{
let attrs = vec![(
self.interned_attr_key(&format!(
"timeline.ros.publisher_gid.{normalized_topic_name}"
))
.await?,
AttrVal::String(local_gid.to_string()),
)];

self.client.timeline_metadata(attrs).await?;
}
}

if let Some(t) = captured_message.publish_time.to_epoch_nanos() {
event_attrs.push((
self.interned_attr_key("event.timestamp").await?,
AttrVal::Timestamp(t),
));
}
if let Some(gid) = captured_message.msg.publisher_graph_id {
event_attrs.push((
self.interned_attr_key("event.ros.publisher_gid").await?,
AttrVal::String(gid.to_string()),
));

match captured_message.publish_time {
crate::hooks::CapturedTime::Compound { sec, nsec } => {
event_attrs.push((
self.interned_attr_key("event.dds_time.sec").await?,
AttrVal::Integer(sec),
));
event_attrs.push((
self.interned_attr_key("event.dds_time.nanosec").await?,
AttrVal::Integer(nsec),
));
}
crate::hooks::CapturedTime::SignedEpochNanos(_) => todo!(),
}
}
MessageDirection::Receive => {
MessageDirection::Receive {
remote_publisher_graph_id,
} => {
if let Some(t) = captured_message.publish_time.to_epoch_nanos() {
event_attrs.push((
self.interned_attr_key("event.interaction.remote_event.timestamp")
Expand All @@ -94,10 +132,12 @@ impl MessageProcessor {
));
}

if let Some(gid) = captured_message.msg.publisher_graph_id {
if let Some(gid) = remote_publisher_graph_id {
event_attrs.push((
self.interned_attr_key("event.interaction.remote_event.ros.publisher_gid")
.await?,
self.interned_attr_key(
&format!("event.interaction.remote_timeline.ros.publisher_gid.{normalized_topic_name}")
)
.await?,
AttrVal::String(gid.to_string()),
));
}
Expand Down Expand Up @@ -143,7 +183,6 @@ impl MessageProcessor {
&mut self,
node_namespace: &'static str,
node_name: &'static str,
topic_name: &'static str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let key = TimelineCacheKey {
node_namespace,
Expand All @@ -156,57 +195,40 @@ impl MessageProcessor {
self.client.open_timeline(tl_cache_val.id).await?;
self.currently_open_timeline = Some(tl_cache_val.id);
}
return Ok(());
}

// We haven't seen this timeline before, so send its metadata
let timeline_id = TimelineId::from(Uuid::new_v4());
self.client.open_timeline(timeline_id).await?;
self.currently_open_timeline = Some(timeline_id);

let tl_attrs = self
.timeline_attrs(key.node_namespace, key.node_name, topic_name)
.await?;
self.client.timeline_metadata(tl_attrs).await?;
} else {
// We haven't seen this timeline before, so allocate an id and send its basic metadata
let timeline_id = TimelineId::from(Uuid::new_v4());
self.client.open_timeline(timeline_id).await?;
self.currently_open_timeline = Some(timeline_id);

let mut timeline_name = format!("{node_namespace}/{node_name}");
while timeline_name.starts_with('/') {
timeline_name.remove(0);
}

self.timeline_cache
.insert(key, TimelineCacheValue { id: timeline_id });
Ok(())
}
let tl_attrs = vec![(
self.interned_attr_key("timeline.name").await?,
AttrVal::String(timeline_name),
)];

async fn timeline_attrs(
&mut self,
node_namespace: &str,
node_name: &str,
topic_name: &str,
) -> Result<Vec<(InternedAttrKey, AttrVal)>, Box<dyn std::error::Error + Send + Sync>> {
let mut timeline_name = format!("{node_namespace}{node_name}");
if timeline_name.starts_with('/') {
timeline_name.remove(0);
self.client.timeline_metadata(tl_attrs).await?;
self.timeline_cache
.insert(key, TimelineCacheValue { id: timeline_id });
}

Ok(vec![
(
self.interned_attr_key("timeline.name").await?,
AttrVal::String(timeline_name),
),
(
self.interned_attr_key("timeline.ros.topic").await?,
AttrVal::String(topic_name.to_string()),
),
])
Ok(())
}

async fn interned_attr_key(
&mut self,
name: &str,
) -> Result<InternedAttrKey, Box<dyn std::error::Error + Send + Sync>> {
if let Some(ik) = self.attr_cache.get(name) {
if let Some(ik) = self.attr_key_cache.get(name) {
return Ok(*ik);
}

let ik = self.client.declare_attr_key(name.to_string()).await?;
self.attr_cache.insert(name.to_string(), ik);
self.attr_key_cache.insert(name.to_string(), ik);
Ok(ik)
}
}
Expand Down

0 comments on commit d06b3ad

Please sign in to comment.