Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: simplify fn prune_messages_to_be_saved #414

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 20 additions & 72 deletions src/fiber/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet},
marker::PhantomData,
mem::take,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -1017,83 +1018,30 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
// We will also change the relevant state (e.g. update the latest cursor).
// The returned list may be sent to the subscribers.
async fn prune_messages_to_be_saved(&mut self) -> Vec<BroadcastMessageWithTimestamp> {
let complete_messages = self
.messages_to_be_saved
.iter()
.filter(|m| self.has_dependencies_available(m))
.cloned()
.collect::<HashSet<_>>();
self.messages_to_be_saved
.retain(|v| !complete_messages.contains(v));

let mut sorted_messages = Vec::with_capacity(complete_messages.len());

// Save all the messages to a map so that we can easily order messages by their dependencies.
let mut messages_map: HashMap<
(BroadcastMessageID, bool),
VecDeque<BroadcastMessageWithTimestamp>,
> = HashMap::new();

for new_message in complete_messages {
let key = (
new_message.message_id(),
match &new_message {
// Message id alone is not enough to differentiate channel updates.
// We need a flag to indicate if the message is an update of node 1.
BroadcastMessageWithTimestamp::ChannelUpdate(channel_update) => {
channel_update.is_update_of_node_1()
}
_ => true,
},
);
let messages = messages_map.entry(key).or_default();
let index = messages.partition_point(|m| m.cursor() < new_message.cursor());
match messages.get(index + 1) {
Some(message) if message == &new_message => {
// The same message is already saved.
continue;
}
_ => {
messages.insert(index, new_message);
}
}
}
let messages_to_be_saved = take(&mut self.messages_to_be_saved);
let (complete_messages, uncomplete_messages) = messages_to_be_saved
.into_iter()
.partition(|m| self.has_dependencies_available(m));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a subtle bug here. Because we replaces messages_to_be_saved in line 1021, the line 1024 will always return false for ChannelUpdate messages that do not have corresponding ChannelAnnouncement in the store. Previously, the behavior is to also check corresponding ChannelAnnouncement in messages_to_be_saved. This bug is fixed in contrun@8ca34e0 . I will create a PR later.

self.messages_to_be_saved = uncomplete_messages;

loop {
let key = match messages_map.keys().next() {
None => break,
Some(key) => key.clone(),
};
let messages = messages_map.remove(&key).expect("key exists");
if let BroadcastMessageWithTimestamp::ChannelUpdate(channel_update) = &messages[0] {
let outpoint = channel_update.channel_outpoint.clone();
if let Some(message) =
messages_map.remove(&(BroadcastMessageID::ChannelAnnouncement(outpoint), true))
{
for message in message {
sorted_messages.push(message);
}
}
}
for message in messages {
sorted_messages.push(message);
}
}
let mut sorted_messages = complete_messages.into_iter().collect::<Vec<_>>();
sorted_messages.sort_unstable();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to use the Ord implementation of BroadcastMessageWithTimestamp, which will sort messages according to their timestamp. It is possible that the timestamp of ChannelUpdate is smaller than that of Channel announcement (we didn't have code to specifically make the timestamp of ChannelUpdate later than the timestamp of the funding transaction block, we just use the system time).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will order by message_id first:

fiber/src/fiber/types.rs

Lines 2435 to 2441 in 1a2ff8f

impl Ord for BroadcastMessageWithTimestamp {
fn cmp(&self, other: &Self) -> Ordering {
self.message_id()
.cmp(&other.message_id())
.then(self.timestamp().cmp(&other.timestamp()))
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I didn't look carefully. In that case, this PR LGTM. I created a few tests for the dependency of channel gossip messages in #418 . Can you take a look and check if the code change here does not break the tests (I believe it doesn't. Still, it is better to
have some tests)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebased and pushed


let mut verified_sorted_messages = Vec::with_capacity(sorted_messages.len());

for message in sorted_messages {
if let Err(error) =
verify_and_save_broadcast_message(&message, &self.store, &self.chain_actor).await
match verify_and_save_broadcast_message(&message, &self.store, &self.chain_actor).await
{
warn!(
"Failed to verify and save message {:?}: {:?}",
message, error
);
continue;
Ok(_) => {
self.update_last_cursor(message.cursor());
verified_sorted_messages.push(message);
}
Err(error) => {
warn!(
"Failed to verify and save message {:?}: {:?}",
message, error
);
}
}
self.update_last_cursor(message.cursor());
verified_sorted_messages.push(message);
}

verified_sorted_messages
Expand Down
55 changes: 53 additions & 2 deletions src/fiber/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use secp256k1::{
use secp256k1::{Verification, XOnlyPublicKey};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::cmp::Ordering;
use std::fmt::Display;
use std::marker::PhantomData;
use std::str::FromStr;
Expand Down Expand Up @@ -2407,6 +2408,20 @@ impl BroadcastMessageWithTimestamp {
}
}

impl Ord for BroadcastMessageWithTimestamp {
fn cmp(&self, other: &Self) -> Ordering {
self.message_id()
.cmp(&other.message_id())
.then(self.timestamp().cmp(&other.timestamp()))
}
}

impl PartialOrd for BroadcastMessageWithTimestamp {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl From<BroadcastMessageWithTimestamp> for BroadcastMessage {
fn from(broadcast_message_with_timestamp: BroadcastMessageWithTimestamp) -> Self {
match broadcast_message_with_timestamp {
Expand Down Expand Up @@ -2592,6 +2607,42 @@ pub enum BroadcastMessageID {
NodeAnnouncement(Pubkey),
}

// We need to implement Ord for BroadcastMessageID to make sure that a ChannelUpdate message is always ordered after ChannelAnnouncement,
// so that we can use it as the sorting key in fn prune_messages_to_be_saved to simplify the logic.
impl Ord for BroadcastMessageID {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(
BroadcastMessageID::ChannelAnnouncement(outpoint1),
BroadcastMessageID::ChannelAnnouncement(outpoint2),
) => outpoint1.cmp(outpoint2),
(
BroadcastMessageID::ChannelUpdate(outpoint1),
BroadcastMessageID::ChannelUpdate(outpoint2),
) => outpoint1.cmp(outpoint2),
(
BroadcastMessageID::NodeAnnouncement(pubkey1),
BroadcastMessageID::NodeAnnouncement(pubkey2),
) => pubkey1.cmp(pubkey2),
(BroadcastMessageID::ChannelUpdate(_), _) => Ordering::Less,
(BroadcastMessageID::NodeAnnouncement(_), _) => Ordering::Greater,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to order messages by making NodeAnnouncement in the first place, ChannelAnnouncement the second place and ChannelUpdate the last place (this way the sorted messages in the list messages_to_be_saved can be processed sequentially)? If that is the case, I think the correct Ordering should be contrun@ea5c1fe. This problem is not discovered in the test because another bug above always excluding ChannelUpdate from being included in the complete messages when there is a corresponding ChannelAnnouncement in the messages_to_be_saved.

(
BroadcastMessageID::ChannelAnnouncement(_),
BroadcastMessageID::NodeAnnouncement(_),
) => Ordering::Less,
(BroadcastMessageID::ChannelAnnouncement(_), BroadcastMessageID::ChannelUpdate(_)) => {
Ordering::Greater
}
}
}
}

impl PartialOrd for BroadcastMessageID {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

// 1 byte for message type, 36 bytes for message id
const MESSAGE_ID_SIZE: usize = 1 + 36;
// 8 bytes for timestamp, MESSAGE_ID_SIZE bytes for message id
Expand Down Expand Up @@ -2717,13 +2768,13 @@ impl Cursor {
}

impl Ord for Cursor {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
fn cmp(&self, other: &Self) -> Ordering {
self.to_bytes().cmp(&other.to_bytes())
}
}

impl PartialOrd for Cursor {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
Expand Down
Loading