Skip to content

Commit

Permalink
Read direct channel info while building payment route
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jan 6, 2025
1 parent 0e69c37 commit a338f8f
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 53 deletions.
8 changes: 8 additions & 0 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4043,6 +4043,14 @@ impl ChannelActorState {
self.to_local_amount + self.to_remote_amount
}

pub(crate) fn get_total_capacity(&self) -> u128 {
if self.funding_udt_type_script.is_some() {
self.get_total_udt_amount()
} else {
self.get_total_ckb_amount() as u128
}
}

// Get the total liquid capacity of the channel, which will exclude the reserved ckb amount.
// This is the capacity used for gossiping channel information.
fn get_liquid_capacity(&self) -> u128 {
Expand Down
168 changes: 115 additions & 53 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,24 @@ impl From<(u64, ChannelAnnouncement)> for ChannelInfo {
}
}

pub struct DirectedGraphEdge {
pub channel_outpoint: OutPoint,

pub from: Pubkey,
pub to: Pubkey,

// The total capacity of the channel.
pub capacity: u128,
// UDT script
pub udt_type_script: Option<Script>,

/// The difference in htlc expiry values that you must have when routing through this channel (in milliseconds).
pub tlc_expiry_delta: u64,
/// The minimum value, which must be relayed to the next hop via the channel
pub tlc_minimum_value: u128,
pub fee_rate: u128,
}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ChannelUpdateInfo {
// The timestamp is the time when the channel update was received by the node.
Expand Down Expand Up @@ -546,23 +564,75 @@ where
.filter(move |channel| channel.node1() == node_id || channel.node2() == node_id)
}

pub fn get_node_inbounds(
&self,
node_id: Pubkey,
) -> impl Iterator<Item = (Pubkey, Pubkey, &ChannelInfo, &ChannelUpdateInfo)> {
let mut channels: Vec<(Pubkey, Pubkey, &ChannelInfo, &ChannelUpdateInfo)> = self
pub fn get_node_inbounds(&self, node_id: Pubkey) -> impl Iterator<Item = DirectedGraphEdge> {
let mut channels = vec![];
//
let mut direct_channels = HashSet::new();
if node_id == self.source {
for (_peer_id, channel_id, _state) in self.store.get_active_channel_states(None) {
match self.store.get_channel_actor_state(&channel_id) {
Some(channel_actor_state) => {
assert_eq!(channel_actor_state.local_pubkey, node_id);
let channel_outpoint =
channel_actor_state.must_get_funding_transaction_outpoint();
direct_channels.insert(channel_outpoint.clone());

if let Some(ref remote_tlc_info) = channel_actor_state.remote_tlc_info {
channels.push(DirectedGraphEdge {
channel_outpoint,
from: node_id,
to: channel_actor_state.remote_pubkey,
capacity: channel_actor_state.get_total_capacity(),
udt_type_script: channel_actor_state
.funding_udt_type_script
.clone(),
tlc_expiry_delta: remote_tlc_info.tlc_expiry_delta,
tlc_minimum_value: remote_tlc_info.tlc_min_value,
fee_rate: remote_tlc_info.tlc_fee_proportional_millionths,
})
}
}
// It is possible that after we obtained the list of channels, the channel is deleted.
None => {}
}
}
}
let mut channels: Vec<DirectedGraphEdge> = self
.channels
.values()
.filter_map(move |channel| {
// We always assume that the information obtained from store is the latest.
// So we would not bother to check channel info from the graph.
if direct_channels.contains(&channel.channel_outpoint) {
return None;
}
if let Some(info) = channel.update_of_node2.as_ref() {
if info.enabled && channel.node2() == node_id {
return Some((channel.node1(), channel.node2(), channel, info));
return Some(DirectedGraphEdge {
channel_outpoint: channel.channel_outpoint.clone(),
from: channel.node1,
to: channel.node2,
capacity: channel.capacity,
udt_type_script: channel.udt_type_script.clone(),
tlc_expiry_delta: info.tlc_expiry_delta,
tlc_minimum_value: info.tlc_minimum_value,
fee_rate: info.fee_rate as u128,
});
}
}

if let Some(info) = channel.update_of_node1.as_ref() {
if info.enabled && channel.node1() == node_id {
return Some((channel.node2(), channel.node1(), channel, info));
return Some(DirectedGraphEdge {
channel_outpoint: channel.channel_outpoint.clone(),
from: channel.node2,
to: channel.node1,
capacity: channel.capacity,
udt_type_script: channel.udt_type_script.clone(),
tlc_expiry_delta: info.tlc_expiry_delta,
tlc_minimum_value: info.tlc_minimum_value,
fee_rate: info.fee_rate as u128,
});
}
}
None
Expand All @@ -574,11 +644,10 @@ where
// the weight algorithm in find_path does not considering capacity,
// so the channel with larger capacity maybe have the same weight with the channel with smaller capacity
// so we sort by capacity reverse order to make sure we try channel with larger capacity firstly
channels.sort_by(|(_, _, a, _), (_, _, b, _)| {
b.capacity().cmp(&a.capacity()).then(
b.channel_last_update_time()
.cmp(&a.channel_last_update_time()),
)
channels.sort_by(|a, b| {
b.capacity
.cmp(&a.capacity)
.then(b.channel_outpoint.cmp(&a.channel_outpoint))
});
channels.into_iter()
}
Expand Down Expand Up @@ -834,48 +903,51 @@ where
while let Some(cur_hop) = nodes_heap.pop() {
nodes_visited += 1;

for (from, to, channel_info, channel_update) in self.get_node_inbounds(cur_hop.node_id)
{
for edge in self.get_node_inbounds(cur_hop.node_id) {
let DirectedGraphEdge {
channel_outpoint,
from,
to,
capacity,
udt_type_script: edge_udt_type_script,
tlc_expiry_delta,
tlc_minimum_value,
fee_rate,
} = edge;
if from == target && !route_to_self {
continue;
}
if &udt_type_script != channel_info.udt_type_script() {
if &udt_type_script != &edge_udt_type_script {
continue;
}

// if the channel is already visited in the last hop, skip it
if last_hop_channels
.values()
.any(|x| x == &channel_info.out_point())
{
if last_hop_channels.values().any(|x| x == &channel_outpoint) {
continue;
}

edges_expanded += 1;

let next_hop_received_amount = cur_hop.amount_received;
if next_hop_received_amount > channel_info.capacity() {
if next_hop_received_amount > capacity {
debug!(
"next_hop_received_amount: {} > channel_info.capacity {}",
next_hop_received_amount,
channel_info.capacity()
"next_hop_received_amount: {} > channel capacity {}",
next_hop_received_amount, capacity
);
continue;
}

let fee = if from == source {
0
} else {
calculate_tlc_forward_fee(
next_hop_received_amount,
channel_update.fee_rate as u128,
)
.map_err(|err| {
PathFindError::PathFind(format!(
"calculate_tlc_forward_fee error: {:?}",
err
))
})?
calculate_tlc_forward_fee(next_hop_received_amount, fee_rate).map_err(
|err| {
PathFindError::PathFind(format!(
"calculate_tlc_forward_fee error: {:?}",
err
))
},
)?
};
let amount_to_send = next_hop_received_amount + fee;

Expand All @@ -892,30 +964,24 @@ where
}
// check to make sure the current hop can send the amount
// if `tlc_maximum_value` equals 0, it means there is no limit
if amount_to_send > channel_info.capacity() {
if amount_to_send > capacity {
continue;
}
if amount_to_send < channel_update.tlc_minimum_value {
if amount_to_send < tlc_minimum_value {
continue;
}

// if this is a direct channel, try to load the channel actor state for balance
if from == self.source {
if let Some(state) = self
.store
.get_channel_state_by_outpoint(&channel_info.out_point())
if let Some(state) = self.store.get_channel_state_by_outpoint(&channel_outpoint)
{
if amount_to_send > state.to_local_amount {
continue;
}
}
}

let expiry_delta = if from == source {
0
} else {
channel_update.tlc_expiry_delta
};
let expiry_delta = if from == source { 0 } else { tlc_expiry_delta };

let incoming_htlc_expiry = cur_hop.incoming_tlc_expiry + expiry_delta;
if incoming_htlc_expiry > tlc_expiry_limit {
Expand All @@ -926,24 +992,20 @@ where
* self.history.eval_probability(
from,
to,
&channel_info.out_point(),
&channel_outpoint,
amount_to_send,
channel_info.capacity(),
capacity,
);

debug!(
"probability: {} for channel_outpoint: {:?} from: {:?} => to: {:?}",
probability,
channel_info.out_point(),
from,
to
probability, &channel_outpoint, from, to
);
if probability < DEFAULT_MIN_PROBABILITY {
debug!("probability is too low: {:?}", probability);
continue;
}
let agg_weight =
self.edge_weight(amount_to_send, fee, channel_update.tlc_expiry_delta);
let agg_weight = self.edge_weight(amount_to_send, fee, tlc_expiry_delta);
let weight = cur_hop.weight + agg_weight;
let distance = self.calculate_distance_based_probability(probability, weight);

Expand All @@ -960,9 +1022,9 @@ where
incoming_tlc_expiry: incoming_htlc_expiry,
fee_charged: fee,
probability,
next_hop: Some((cur_hop.node_id, channel_info.out_point().clone())),
next_hop: Some((cur_hop.node_id, channel_outpoint.clone())),
};
last_hop_channels.insert(node.node_id, channel_info.out_point());
last_hop_channels.insert(node.node_id, channel_outpoint.clone());
distances.insert(node.node_id, node.clone());
nodes_heap.push_or_fix(node);
}
Expand Down

0 comments on commit a338f8f

Please sign in to comment.