Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send channel tlc info directly to counterparty and update network graph with owned channel information #446

Open
wants to merge 41 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7a7e2b5
Add {local,remote}_tlc_info to ChannelActorState
contrun Jan 6, 2025
afc5307
Add UpdateTlcInfo peer message
contrun Jan 6, 2025
2812f57
Send UpdateTlcInfo on channel ready
contrun Jan 6, 2025
c54fda0
Fix disabled channel flags
contrun Jan 6, 2025
c6116f6
Fix using disabled channnel
contrun Jan 6, 2025
a36adab
Remove unused UpdateTlcInfo
contrun Jan 6, 2025
0e69c37
Add test for tlc info in private channel
contrun Jan 6, 2025
7fadd70
Read direct channel info while building payment route
contrun Jan 6, 2025
402149d
Use liquid capacity while building route
contrun Jan 6, 2025
ce3247d
Set exact balance to graph edge for owned channels
contrun Jan 6, 2025
f2346fb
Prefetch all graph edges for owned channels
contrun Jan 6, 2025
56f1bef
Add unit test for payment over private channel
contrun Jan 7, 2025
a7c6017
Revert "Read direct channel info while building payment route"
contrun Jan 7, 2025
1724fa9
Convert owned channels to graph ChannelInfo
contrun Jan 7, 2025
4686404
Move fields to ChannelTlcInfo
contrun Jan 7, 2025
f50a273
Keep UpdateTlcInfo up with ChannelUpdate
contrun Jan 7, 2025
1e96242
Use bitflags for ChannelUpdate {message,channel}_flags
contrun Jan 7, 2025
5479342
Add balance field to ChannelUpdateInfo in graph
contrun Jan 7, 2025
12a8555
Fix setting incorrect amount for owned channels in graph
contrun Jan 7, 2025
34f175d
chore: remove some noise
contrun Jan 7, 2025
48ac972
Merge remote-tracking branch 'nervosnetwork/develop' into update-chan…
contrun Jan 7, 2025
2facf08
Add test for sending payment over private channel with insufficient b…
contrun Jan 7, 2025
fb4691e
Sort channels by explicit balance in finding path
contrun Jan 8, 2025
221e965
Add dedicated test to check path finding prefers newer channel
contrun Jan 8, 2025
383b341
Test path finding prefers channel with larger balance
contrun Jan 8, 2025
cf3d47c
Update comments while sorting channels in path-finding
contrun Jan 8, 2025
510d491
Add OwnedChannelUpdateEvent for owned channel updates
contrun Jan 10, 2025
305bdee
Update graph on channel ready
contrun Jan 10, 2025
4b155c4
Update networkgraph ChannelUpdateInfo for owned channels
contrun Jan 10, 2025
0d996cd
Update owned channel balance in graph
contrun Jan 10, 2025
b282f9b
Fix balance not set while converting ChannelActorState
contrun Jan 10, 2025
b2a7509
Separate public/private channel network graph balance test
contrun Jan 10, 2025
447cfc2
Ignore our own channel gossip messages
contrun Jan 10, 2025
c604628
Fix test_channel_update_version, test_node1_node2_channel_update
contrun Jan 10, 2025
35361f6
Fix unit tests in graph.rs because gossip message not processed
contrun Jan 10, 2025
dce8858
Remove channel from graph on channel actor exited
contrun Jan 10, 2025
dfb2683
Notify world about channel updates
contrun Jan 10, 2025
75ed475
Merge remote-tracking branch 'nervosnetwork/develop' into update-chan…
contrun Jan 10, 2025
9c9aece
Remove old loading channel from store code
contrun Jan 10, 2025
ab91390
Remove duplicated Up event on channel ready
contrun Jan 10, 2025
fdf4a12
Fix tests
contrun Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
463 changes: 308 additions & 155 deletions src/fiber/channel.rs

Large diffs are not rendered by default.

513 changes: 475 additions & 38 deletions src/fiber/gen/fiber.rs

Large diffs are not rendered by default.

214 changes: 185 additions & 29 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::channel::ChannelActorStateStore;
use super::channel::{ChannelActorState, ChannelActorStateStore, ChannelTlcInfo};
use super::config::AnnouncedNodeName;
use super::gossip::GossipMessageStore;
use super::history::{Direction, InternalResult, PaymentHistory, TimedResult};
Expand Down Expand Up @@ -154,6 +154,59 @@ impl ChannelInfo {
.map(|n| n.timestamp)
.max(self.update_of_node1.as_ref().map(|n| n.timestamp))
}

#[cfg(test)]
pub fn get_channel_update_of(&self, node: Pubkey) -> Option<&ChannelUpdateInfo> {
if self.node1() == node {
self.update_of_node1.as_ref()
} else if self.node2() == node {
self.update_of_node2.as_ref()
} else {
None
}
}
}

impl TryFrom<&ChannelActorState> for ChannelInfo {
type Error = String;

fn try_from(state: &ChannelActorState) -> Result<Self, Self::Error> {
if !state.is_ready() {
return Err("Channel is not ready".to_string());
}

let timestamp = state.must_get_funding_transaction_timestamp();
let channel_outpoint = state.must_get_funding_transaction_outpoint();
let capacity = state.get_liquid_capacity();
let udt_type_script = state.funding_udt_type_script.clone();

let (node1, node2, update_of_node1, update_of_node2) = if state.local_is_node1() {
(
state.local_pubkey,
state.remote_pubkey,
Some(state.get_local_channel_update_info()),
state.get_remote_channel_update_info(),
)
} else {
(
state.remote_pubkey,
state.local_pubkey,
state.get_remote_channel_update_info(),
Some(state.get_local_channel_update_info()),
)
};
Ok(Self {
channel_outpoint,
timestamp,
features: 0,
node1,
node2,
capacity,
udt_type_script,
update_of_node1,
update_of_node2,
})
}
}

impl From<(u64, ChannelAnnouncement)> for ChannelInfo {
Expand All @@ -178,13 +231,37 @@ pub struct ChannelUpdateInfo {
pub timestamp: u64,
/// Whether the channel can be currently used for payments (in this one direction).
pub enabled: bool,
/// The exact amount of balance that we can receive from the other party via the channel.
/// Note that this is not our balance, but the balance of the other party.
/// This node is forwarding the balance for the other party, so we need to use the receivable balance
/// instead of our balance.
pub receivable_balance: Option<u128>,
/// The difference in htlc expiry values that you must have when routing through this channel (in milliseconds).
pub tlc_expiry_delta: u64,
/// The minimum value, which must be relayed to the next hop via the channel
pub tlc_minimum_value: u128,
pub fee_rate: u64,
}

impl From<&ChannelTlcInfo> for ChannelUpdateInfo {
fn from(info: &ChannelTlcInfo) -> Self {
Self {
timestamp: info.timestamp,
enabled: info.enabled,
receivable_balance: None,
tlc_expiry_delta: info.tlc_expiry_delta,
tlc_minimum_value: info.tlc_minimum_value,
fee_rate: info.tlc_fee_proportional_millionths as u64,
}
}
}

impl From<ChannelTlcInfo> for ChannelUpdateInfo {
fn from(info: ChannelTlcInfo) -> Self {
Self::from(&info)
}
}

impl From<ChannelUpdate> for ChannelUpdateInfo {
fn from(update: ChannelUpdate) -> Self {
Self::from(&update)
Expand All @@ -196,15 +273,37 @@ impl From<&ChannelUpdate> for ChannelUpdateInfo {
Self {
timestamp: update.timestamp,
enabled: !update.is_disabled(),
receivable_balance: None,
tlc_expiry_delta: update.tlc_expiry_delta,
tlc_minimum_value: update.tlc_minimum_value,
fee_rate: update.tlc_fee_proportional_millionths as u64,
}
}
}

/// Update for our own channel has been made. We can use those events to update our graph.
/// The events only contain the information that is relevant for our own channels.
/// Other channel update events should be processed by gossip messages.
#[derive(Debug)]
pub enum OwnedChannelUpdateEvent {
/// The channel is back online and can be used for routing payments.
/// This normally means the peer is now reachable.
Up(ChannelInfo),
/// The channel is down and should not be used for routing payments.
/// This normally means the peer is not reachable.
Down(OutPoint),
/// One direction of the channel is updated (e.g. new balance, new fee rate).
Updated(OutPoint, Pubkey, ChannelUpdateInfo),
}

#[derive(Clone, Debug)]
pub struct NetworkGraph<S> {
// Whether to always process gossip messages for our own channels.
// See comments in should_process_gossip_message_for_channel for why we need this.
// TLDR: Most of the tests do not need this. Only tests in src/fiber/tests/graph.rs need this.
// We will only set this to true for tests in src/fiber/tests/graph.rs.
#[cfg(test)]
pub always_process_gossip_message: bool,
// The pubkey of the node that is running this instance of the network graph.
source: Pubkey,
// All the channels in the network.
Expand Down Expand Up @@ -253,6 +352,8 @@ where
{
pub fn new(store: S, source: Pubkey, announce_private_addr: bool) -> Self {
let mut network_graph = Self {
#[cfg(test)]
always_process_gossip_message: false,
source,
channels: HashMap::new(),
nodes: HashMap::new(),
Expand Down Expand Up @@ -314,6 +415,31 @@ where
return true;
}

// Process the events that are relevant for our own channels, and update the graph accordingly.
pub(crate) fn process_owned_channel_update_event(&mut self, event: OwnedChannelUpdateEvent) {
match event {
OwnedChannelUpdateEvent::Up(channel_info) => {
// Normally the channel_info passed here is the latest channel info,
// so we can just overwrite the old channel info.
self.channels
.insert(channel_info.channel_outpoint.clone(), channel_info);
}
OwnedChannelUpdateEvent::Down(channel_outpoint) => {
self.channels.remove(&channel_outpoint);
}
OwnedChannelUpdateEvent::Updated(channel_outpoint, node, channel_update) => {
if let Some(channel) = self.channels.get_mut(&channel_outpoint) {
if node == channel.node2() {
channel.update_of_node2 = Some(channel_update);
}
if node == channel.node1() {
channel.update_of_node1 = Some(channel_update);
}
}
}
}
}

// Load all the broadcast messages starting from latest_cursor from the store.
// Process them and set nodes and channels accordingly.
pub(crate) fn load_from_store(&mut self) {
Expand Down Expand Up @@ -364,11 +490,33 @@ where
self.channels.get_mut(channel_outpoint)
}

// We don't need to process our own channel announcement with gossip messages.
// They are processed by passing OwnedChannelUpdateEvents to the graph.
// These are real-time events with more detailed information (e.g. balance).
// We don't want to overwrite their detailed information here.
// But tests in src/fiber/tests/graph.rs need to process gossip messages
// to update the network graph. Many of the tests are messages from the graph.source.
// If we ignore these messages, the graph won't be updated. And many tests will fail.
fn should_process_gossip_message_for_nodes(&self, node1: &Pubkey, node2: &Pubkey) -> bool {
#[cfg(test)]
if self.always_process_gossip_message {
return true;
}
!(&self.source == node1 || &self.source == node2)
}

fn process_channel_announcement(
&mut self,
timestamp: u64,
channel_announcement: ChannelAnnouncement,
) -> Option<Cursor> {
if !self.should_process_gossip_message_for_nodes(
&channel_announcement.node1_id,
&channel_announcement.node2_id,
) {
return None;
}

match self.channels.get(&channel_announcement.channel_outpoint) {
Some(_channel) => {
trace!(
Expand Down Expand Up @@ -409,11 +557,16 @@ where
}

fn process_channel_update(&mut self, channel_update: ChannelUpdate) -> Option<Cursor> {
let channel_outpoint = &channel_update.channel_outpoint;
// The channel update message may have smaller timestamp than channel announcement.
// So it is possible that the channel announcement is not loaded into the graph yet,
// when we receive the channel update message.
let channel = self.load_channel_info_mut(channel_outpoint)?;
match self.get_channel(&channel_update.channel_outpoint) {
Some(channel)
if !self
.should_process_gossip_message_for_nodes(&channel.node1, &channel.node2) =>
{
return None;
}
_ => {}
}
let channel = self.load_channel_info_mut(&channel_update.channel_outpoint)?;
let update_info = if channel_update.is_update_of_node_1() {
&mut channel.update_of_node1
} else {
Expand Down Expand Up @@ -595,15 +748,28 @@ where

// Iterating over HashMap's values is not guaranteed to be in order,
// which may introduce randomness in the path finding.
// the weight algorithm in find_path does not considering capacity,
// so the channel with larger capacity maybe have the same weight with the channel with smaller capacity
// so we sort by capacity reverse order to make sure we try channel with larger capacity firstly
channels.sort_by(|(_, _, a, _), (_, _, b, _)| {
b.capacity().cmp(&a.capacity()).then(
b.channel_last_update_time()
.cmp(&a.channel_last_update_time()),
)
});
// We will first sort the channels by receivable_balance, then capacity, and at last update time.
// This is because the weight algorithm in find_path does not considering receivable_balance and capacity,
// so the channel with larger receivable_balance/capacity maybe have the same weight with the channel
// with smaller receivable_balance/capacity, even though the former have better chance to success.
channels.sort_by(
|(_, _, a_channel_info, a_channel_update_info),
(_, _, b_channel_info, b_channel_update_info)| {
b_channel_update_info
.receivable_balance
.cmp(&a_channel_update_info.receivable_balance)
.then(
b_channel_info
.capacity()
.cmp(&a_channel_info.capacity())
.then(
b_channel_info
.channel_last_update_time()
.cmp(&a_channel_info.channel_last_update_time()),
),
)
},
);
channels.into_iter()
}

Expand Down Expand Up @@ -935,20 +1101,10 @@ where
continue;
}

// if this is a direct channel, try to load the channel actor state for balance
if from == self.source || to == self.source {
if let Some(state) = self
.store
.get_channel_state_by_outpoint(&channel_info.out_point())
{
let balance = if from == self.source {
state.to_local_amount
} else {
state.to_remote_amount
};
if amount_to_send > balance {
continue;
}
// If we already know the balance of the channel, check if we can send the amount.
if let Some(balance) = channel_update.receivable_balance {
if amount_to_send > balance {
continue;
}
}

Expand Down
38 changes: 26 additions & 12 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tracing::{debug, error, info, trace, warn};
use super::channel::{
get_funding_and_reserved_amount, occupied_capacity, AcceptChannelParameter, ChannelActor,
ChannelActorMessage, ChannelActorStateStore, ChannelCommand, ChannelCommandWithId,
ChannelEvent, ChannelInitializationParameter, ChannelState, ChannelSubscribers,
ChannelEvent, ChannelInitializationParameter, ChannelState, ChannelSubscribers, ChannelTlcInfo,
OpenChannelParameter, ProcessingChannelError, ProcessingChannelResult, PublicChannelInfo,
RevocationData, SettlementData, ShuttingDownFlags, DEFAULT_COMMITMENT_FEE_RATE,
DEFAULT_FEE_RATE, DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, MAX_COMMITMENT_DELAY_EPOCHS,
Expand All @@ -57,7 +57,7 @@ use super::channel::{
use super::config::{AnnouncedNodeName, MIN_TLC_EXPIRY_DELTA};
use super::fee::calculate_commitment_tx_fee;
use super::gossip::{GossipActorMessage, GossipMessageStore, GossipMessageUpdates};
use super::graph::{NetworkGraph, NetworkGraphStateStore, SessionRoute};
use super::graph::{NetworkGraph, NetworkGraphStateStore, OwnedChannelUpdateEvent, SessionRoute};
use super::key::blake2b_hash_with_salt;
use super::types::{
BroadcastMessage, BroadcastMessageQuery, BroadcastMessageWithTimestamp, EcdsaSignature,
Expand Down Expand Up @@ -652,6 +652,9 @@ pub enum NetworkActorEvent {
Option<(ProcessingChannelError, TlcErr)>,
Option<(Hash256, u64)>,
),

// An owned channel is updated.
OwnedChannelUpdateEvent(OwnedChannelUpdateEvent),
}

#[derive(Debug)]
Expand Down Expand Up @@ -954,6 +957,19 @@ where
let mut graph = self.network_graph.write().await;
graph.update_for_messages(gossip_message_updates.messages);
}
NetworkActorEvent::OwnedChannelUpdateEvent(owned_channel_update_event) => {
let mut graph = self.network_graph.write().await;
debug!(
"Received owned channel update event: {:?}",
owned_channel_update_event
);
let is_down =
matches!(owned_channel_update_event, OwnedChannelUpdateEvent::Down(_));
graph.process_owned_channel_update_event(owned_channel_update_event);
if is_down {
debug!("Owned channel is down");
}
}
}
Ok(())
}
Expand Down Expand Up @@ -1584,12 +1600,8 @@ where
payment_session: &mut PaymentSession,
payment_data: &SendPaymentData,
) -> Result<Vec<PaymentHopData>, Error> {
match self
.network_graph
.read()
.await
.build_route(payment_data.clone())
{
let graph = self.network_graph.read().await;
match graph.build_route(payment_data.clone()) {
Err(e) => {
let error = format!("Failed to build route, {}", e);
self.set_payment_fail_with_error(payment_session, &error);
Expand Down Expand Up @@ -2099,11 +2111,12 @@ where
ChannelInitializationParameter::OpenChannel(OpenChannelParameter {
funding_amount,
seed,
public_channel_info: public.then_some(PublicChannelInfo::new(
tlc_info: ChannelTlcInfo::new(
tlc_min_value.unwrap_or(self.tlc_min_value),
tlc_expiry_delta.unwrap_or(self.tlc_expiry_delta),
tlc_fee_proportional_millionths.unwrap_or(self.tlc_fee_proportional_millionths),
)),
),
public_channel_info: public.then_some(PublicChannelInfo::new()),
funding_udt_type_script,
shutdown_script,
channel_id_sender: tx,
Expand Down Expand Up @@ -2185,11 +2198,12 @@ where
ChannelInitializationParameter::AcceptChannel(AcceptChannelParameter {
funding_amount,
reserved_ckb_amount,
public_channel_info: open_channel.is_public().then_some(PublicChannelInfo::new(
tlc_info: ChannelTlcInfo::new(
min_tlc_value.unwrap_or(self.tlc_min_value),
tlc_expiry_delta.unwrap_or(self.tlc_expiry_delta),
tlc_fee_proportional_millionths.unwrap_or(self.tlc_fee_proportional_millionths),
)),
),
public_channel_info: open_channel.is_public().then_some(PublicChannelInfo::new()),
seed,
open_channel,
shutdown_script,
Expand Down
Loading
Loading