From b0786a6865e39602285af313e9c2c563fad92ae1 Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 27 Dec 2024 16:59:00 +0800 Subject: [PATCH 1/9] Make number of active/passive syncers configurable --- src/fiber/config.rs | 27 +++++++++++++++++++++++++++ src/fiber/gossip.rs | 17 +++++++++++++---- src/fiber/network.rs | 2 ++ src/fiber/tests/gossip.rs | 2 ++ 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/fiber/config.rs b/src/fiber/config.rs index 911f60be1..8641e808d 100644 --- a/src/fiber/config.rs +++ b/src/fiber/config.rs @@ -234,6 +234,33 @@ pub struct FiberConfig { )] pub(crate) gossip_store_maintenance_interval_ms: Option, + /// Gossip network num targeted active syncing peers. [default: None] + /// This is the number of peers to target for active syncing. This is the number of peers that we will + /// send GetBroadcastMessages message to obtain the gossip messages that we missed during the time we + /// were offiline. A larger number means more peers to receive updates from, but also more bandwidth usage. + /// If None, it will use the default value. + #[arg( + name = "FIBER_GOSSIP_NETWORK_NUM_TARGETED_ACTIVE_SYNCING_PEERS", + long = "fiber-gossip-network-num-targeted-active-syncing-peers", + env, + help = "Gossip network num targeted active syncing peers. [default: None]" + )] + pub(crate) gossip_network_num_targeted_active_syncing_peers: Option, + + /// Gossip network num targeted outbound passive syncing peers. [default: None] + /// This is the number of peers to target for outbound passive syncing. This is the number of outbound peers + /// that we will send BroadcastMessageFilter to receive updates from them. A larger number means more + /// peers to receive updates from, but also more bandwidth usage. We only count the outbound peers here, + /// because outbound peers are less likely to be malicious, and we want to receive updates from them. + /// If None, it will use the default value. + #[arg( + name = "FIBER_GOSSIP_NETWORK_NUM_TARGETED_OUTBOUND_PASSIVE_SYNCING_PEERS", + long = "fiber-gossip-network-num-targeted-outbound-passive-syncing-peers", + env, + help = "Gossip network num targeted outbound passive syncing peers. [default: None]" + )] + pub(crate) gossip_network_num_targeted_outbound_passive_syncing_peers: Option, + /// Whether to sync the network graph from the network. [default: true] #[arg( name = "FIBER_SYNC_NETWORK_GRAPH", diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index c2ba222d2..9b7a56a31 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -1368,11 +1368,11 @@ pub enum ExtendedGossipMessageStoreMessage { pub(crate) struct GossipActorState { store: ExtendedGossipMessageStore, control: ServiceAsyncControl, - num_targeted_active_syncing_peers: usize, + num_finished_active_syncing_peers: usize, // The number of active syncing peers that we have finished syncing with. // Together with the number of currect active syncing peers, this is // used to determine if we should start a new active syncing peer. - num_finished_active_syncing_peers: usize, + num_targeted_active_syncing_peers: usize, // The number of outbound passive syncing peers that we want to have. // We only count outbound peers because the purpose of this number is to avoid eclipse attacks. // By maintaining a certain number of outbound passive syncing peers, we can ensure that we are @@ -2089,6 +2089,8 @@ impl GossipProtocolHandle { gossip_network_maintenance_interval: Duration, gossip_store_maintenance_interval: Duration, announce_private_addr: bool, + num_targeted_active_syncing_peers: Option, + num_targeted_outbound_passive_syncing_peers: Option, store: S, chain_actor: ActorRef, supervisor: ActorCell, @@ -2108,6 +2110,9 @@ impl GossipProtocolHandle { gossip_network_maintenance_interval, gossip_store_maintenance_interval, announce_private_addr, + num_targeted_active_syncing_peers.unwrap_or(MAX_NUM_OF_ACTIVE_SYNCING_PEERS), + num_targeted_outbound_passive_syncing_peers + .unwrap_or(MIN_NUM_OF_PASSIVE_SYNCING_PEERS), store, chain_actor, ), @@ -2152,6 +2157,8 @@ where Duration, Duration, bool, + usize, + usize, S, ActorRef, ); @@ -2165,6 +2172,8 @@ where network_maintenance_interval, store_maintenance_interval, announce_private_addr, + num_targeted_active_syncing_peers, + num_targeted_outbound_passive_syncing_peers, store, chain_actor, ): Self::Arguments, @@ -2193,8 +2202,8 @@ where let state = Self::State { store, control, - num_targeted_active_syncing_peers: MAX_NUM_OF_ACTIVE_SYNCING_PEERS, - num_targeted_outbound_passive_syncing_peers: MIN_NUM_OF_PASSIVE_SYNCING_PEERS, + num_targeted_active_syncing_peers, + num_targeted_outbound_passive_syncing_peers, myself, chain_actor, next_request_id: Default::default(), diff --git a/src/fiber/network.rs b/src/fiber/network.rs index d504c05d9..f5ccf567c 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -2893,6 +2893,8 @@ where Duration::from_millis(config.gossip_network_maintenance_interval_ms()).into(), Duration::from_millis(config.gossip_store_maintenance_interval_ms()).into(), config.announce_private_addr(), + config.gossip_network_num_targeted_active_syncing_peers, + config.gossip_network_num_targeted_outbound_passive_syncing_peers, self.store.clone(), self.chain_actor.clone(), myself.get_cell(), diff --git a/src/fiber/tests/gossip.rs b/src/fiber/tests/gossip.rs index c3dd3c48f..5c8349b99 100644 --- a/src/fiber/tests/gossip.rs +++ b/src/fiber/tests/gossip.rs @@ -71,6 +71,8 @@ impl GossipTestingContext { Duration::from_millis(50).into(), Duration::from_millis(50).into(), true, + None, + None, store.clone(), chain_actor.clone(), root_actor.get_cell(), From d47ea232cc519c18026e3aea3564f22de63b2ad0 Mon Sep 17 00:00:00 2001 From: YI Date: Tue, 7 Jan 2025 19:38:29 +0800 Subject: [PATCH 2/9] Fix comments --- src/fiber/gossip.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index 9b7a56a31..b94c90665 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -1368,10 +1368,12 @@ pub enum ExtendedGossipMessageStoreMessage { pub(crate) struct GossipActorState { store: ExtendedGossipMessageStore, control: ServiceAsyncControl, - num_finished_active_syncing_peers: usize, // The number of active syncing peers that we have finished syncing with. - // Together with the number of currect active syncing peers, this is + // Together with the number of current active syncing peers, this is // used to determine if we should start a new active syncing peer. + num_finished_active_syncing_peers: usize, + // The number of targeted active syncing peers that we want to have. + // Currently we will only start this many active syncing peers. num_targeted_active_syncing_peers: usize, // The number of outbound passive syncing peers that we want to have. // We only count outbound peers because the purpose of this number is to avoid eclipse attacks. From 79f29fb94e51d28501ec8fa390491c98b357d4cd Mon Sep 17 00:00:00 2001 From: yukang Date: Tue, 7 Jan 2025 20:26:48 +0800 Subject: [PATCH 3/9] upgrade ractor to new version --- Cargo.lock | 48 +++++++++++++++++++++++++++++++++++++++----- Cargo.toml | 2 +- src/actors.rs | 2 +- src/fiber/gossip.rs | 2 +- src/fiber/network.rs | 4 ++-- 5 files changed, 48 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fa35198a7..8ffa51e4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -472,6 +472,29 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bon" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97493a391b4b18ee918675fb8663e53646fd09321c58b46afa04e8ce2499c869" +dependencies = [ + "bon-macros", + "rustversion", +] + +[[package]] +name = "bon-macros" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2af3eac944c12cdf4423eab70d310da0a8e5851a18ffb192c0a5e3f7ae1663" +dependencies = [ + "darling", + "ident_case", + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "bs58" version = "0.5.1" @@ -948,7 +971,7 @@ dependencies = [ "ckb-script", "ckb-traits", "ckb-types", - "dashmap", + "dashmap 5.5.3", "derive-getters", "dyn-clone", "enum-repr-derive", @@ -1393,6 +1416,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -3272,15 +3309,16 @@ dependencies = [ [[package]] name = "ractor" -version = "0.9.7" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd876f0d609ba2ddc8a36136e9b81299312bd9fc9b71131381d16c9ce8e495a" +checksum = "aeb2472c961aec135028ae83b64491243ef36402c4f93f04b4f29f9f5d8805a8" dependencies = [ "async-trait", - "dashmap", + "bon", + "dashmap 6.1.0", "futures", "once_cell", - "rand 0.8.5", + "strum", "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 983451b07..e95a0c574 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ bitflags = { version = "2.5.0", features = ["serde"] } ckb-hash = "0.115.0" secp256k1 = { version = "0.28.0", features = ["serde", "recovery", "rand-std"] } musig2 = { version = "0.0.11", features = ["secp256k1", "serde"] } -ractor = "=0.9.7" +ractor = "0.14.2" arcode = "0.2.4" nom = "7.1.3" regex = "1.10.5" diff --git a/src/actors.rs b/src/actors.rs index e077e4c0d..8bd38127c 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -68,7 +68,7 @@ impl Actor for RootActor { debug!("Actor terminated for unknown reason (id: {:?})", who); } }, - SupervisionEvent::ActorPanicked(who, err) => { + SupervisionEvent::ActorFailed(who, err) => { panic!("Actor unexpectedly panicked (id: {:?}): {:?}", who, err); } _ => {} diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index e44ce9015..703f90c70 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -2216,7 +2216,7 @@ where SupervisionEvent::ActorTerminated(who, _, _) => { debug!("{:?} terminated", who); } - SupervisionEvent::ActorPanicked(who, err) => { + SupervisionEvent::ActorFailed(who, err) => { panic!("Actor unexpectedly panicked (id: {:?}): {:?}", who, err); } _ => {} diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 132f696e0..6e7eb4e5e 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -2203,7 +2203,7 @@ where { let chain = self.chain_actor.clone(); // Spawn a new task to avoid blocking current actor message processing. - ractor::concurrency::tokio_primatives::spawn(async move { + ractor::concurrency::tokio_primitives::spawn(async move { debug!("Trying to broadcast transaction {:?}", &transaction); let result = match call_t!( &chain, @@ -3268,7 +3268,7 @@ where SupervisionEvent::ActorTerminated(who, _, _) => { debug!("Actor {:?} terminated", who); } - SupervisionEvent::ActorPanicked(who, err) => { + SupervisionEvent::ActorFailed(who, err) => { panic!("Actor unexpectedly panicked (id: {:?}): {:?}", who, err); } _ => {} From 5b1cc49215bebdc68b52481f3c2eca9011c64076 Mon Sep 17 00:00:00 2001 From: YI Date: Tue, 7 Jan 2025 22:57:22 +0800 Subject: [PATCH 4/9] Stop all sub actor on exiting --- src/actors.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/actors.rs b/src/actors.rs index 8bd38127c..8c27e4147 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -46,10 +46,14 @@ impl Actor for RootActor { async fn post_stop( &self, - _myself: ActorRef, + myself: ActorRef, _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { debug!("Root actor stopped"); + myself + .get_cell() + .stop_children_and_wait(Some("Root actor stopped".to_string()), None) + .await; Ok(()) } From b8e3056bc097681aa34871869996adb2774af9ed Mon Sep 17 00:00:00 2001 From: Yukang Date: Wed, 8 Jan 2025 11:57:26 +0800 Subject: [PATCH 5/9] feat: Retry all payment session with actor message (#430) --- src/fiber/network.rs | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 766fdf019..b84a4b740 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -1498,12 +1498,10 @@ where .await .record_payment_fail(&payment_session, error_detail.clone()); if need_to_retry { - let res = self - .try_payment_session(myself, state, payment_session.payment_hash()) - .await; - if res.is_err() { - debug!("Failed to retry payment session: {:?}", res); - } + // If this is the first hop error, like the WaitingTlcAck error, + // we will just retry later, return Ok here for letting endpoint user + // know payment session is created successfully + self.register_payment_retry(myself, payment_hash); } else { self.set_payment_fail_with_error( &mut payment_session, @@ -1530,11 +1528,13 @@ where .. }) = &tcl_error_detail.extra_data { - let _ = network.send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ProcessBroadcastMessage(BroadcastMessage::ChannelUpdate( - channel_update.clone(), - )), - )); + network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::BroadcastMessages(vec![ + BroadcastMessageWithTimestamp::ChannelUpdate(channel_update.clone()), + ]), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); } } match tcl_error_detail.error_code() { @@ -1756,14 +1756,10 @@ where Err(err) => { let need_retry = matches!(err, Error::SendPaymentFirstHopError(_, true)); if need_retry { - // If this is the first hop error, like the WaitingTlcAck error, + // If this is the first hop error, such as the WaitingTlcAck error, // we will just retry later, return Ok here for letting endpoint user // know payment session is created successfully - myself.send_after(Duration::from_millis(500), move || { - NetworkActorMessage::new_event(NetworkActorEvent::RetrySendPayment( - payment_hash, - )) - }); + self.register_payment_retry(myself, payment_hash); return Ok(payment_session); } else { return Err(err); @@ -1781,6 +1777,12 @@ where } } + fn register_payment_retry(&self, myself: ActorRef, payment_hash: Hash256) { + myself.send_after(Duration::from_millis(500), move || { + NetworkActorMessage::new_event(NetworkActorEvent::RetrySendPayment(payment_hash)) + }); + } + async fn on_send_payment( &self, myself: ActorRef, @@ -2811,7 +2813,7 @@ where ); // Notify outside observers. network - .send_message(NetworkActorMessage::new_event( NetworkActorEvent::FundingTransactionFailed(outpoint) + .send_message(NetworkActorMessage::new_event(NetworkActorEvent::FundingTransactionFailed(outpoint) )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } @@ -2819,7 +2821,7 @@ where error!("Failed to trace transaction {:?}: {:?}", &tx_hash, &err); // Notify outside observers. network - .send_message(NetworkActorMessage::new_event( NetworkActorEvent::FundingTransactionFailed(outpoint) + .send_message(NetworkActorMessage::new_event(NetworkActorEvent::FundingTransactionFailed(outpoint) )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } From 4a089acb39fa58f677b3ab3af636147d07a40383 Mon Sep 17 00:00:00 2001 From: contrun Date: Wed, 8 Jan 2025 20:17:49 +0800 Subject: [PATCH 6/9] chore: Unify various cryptography functions (#420) --- src/fiber/channel.rs | 666 ++++++++++++++++++------------------- src/fiber/tests/channel.rs | 3 +- src/store/tests/store.rs | 5 +- 3 files changed, 337 insertions(+), 337 deletions(-) diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index d542d6c55..1a41cb1e9 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -1,9 +1,7 @@ +use crate::debug_event; #[cfg(debug_assertions)] use crate::fiber::network::DebugEvent; -use crate::{ - debug_event, - fiber::{serde_utils::U64Hex, types::BroadcastMessageWithTimestamp}, -}; +use crate::fiber::types::BroadcastMessageWithTimestamp; use bitflags::bitflags; use futures::future::OptionFuture; use secp256k1::XOnlyPublicKey; @@ -54,7 +52,7 @@ use ckb_types::{ use molecule::prelude::{Builder, Entity}; use musig2::{ aggregate_partial_signatures, - errors::{SigningError, VerifyError}, + errors::{RoundFinalizeError, SigningError, VerifyError}, secp::Point, sign_partial, verify_partial, AggNonce, CompactSignature, KeyAggContext, PartialSignature, PubNonce, SecNonce, @@ -645,6 +643,7 @@ where }, ProcessingChannelError::RepeatedProcessing(_) => TlcErrorCode::TemporaryChannelFailure, ProcessingChannelError::SpawnErr(_) + | ProcessingChannelError::Musig2RoundFinalizeError(_) | ProcessingChannelError::Musig2SigningError(_) | ProcessingChannelError::Musig2VerifyError(_) | ProcessingChannelError::CapacityError(_) => TlcErrorCode::TemporaryNodeFailure, @@ -1198,7 +1197,6 @@ where )), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); - state.save_remote_nonce_for_raa(); match flags { CommitmentSignedFlags::SigningCommitment(flags) => { @@ -1213,6 +1211,7 @@ where state.maybe_transition_to_shutdown(&self.network)?; } } + state.update_last_commitment_signed_remote_nonce(); Ok(()) } @@ -3010,16 +3009,33 @@ pub struct ChannelActorState { #[serde_as(as = "EntityHex")] pub local_shutdown_script: Script, - // While building a CommitmentSigned message, we use a nonce sent by the counterparty - // to partially sign the commitment transaction. This nonce is also used while handling the revoke_and_ack - // message from the peer. We need to save this nonce because the counterparty may send other nonces during - // the period when our CommitmentSigned is sent and the counterparty's RevokeAndAck is received. + // Basically the latest remote nonce sent by the peer with the CommitmentSigned message, + // but we will only update this field after we have sent a RevokeAndAck to the peer. + // With above guarantee, we can be sure the results of the sender obtaining its latest local nonce + // and the receiver obtaining its latest remote nonce are the same. + #[serde_as(as = "Option")] + pub last_committed_remote_nonce: Option, + + // While handling peer's CommitmentSigned message, we will build a RevokeAndAck message, + // and reply this message to the peer. The nonce used to build the RevokeAndAck message is + // an older one sent by the peer. We will read this nonce from the field `last_committed_remote_nonce` + // The new nonce contained in the CommitmentSigned message + // will be saved to `last_committed_remote_nonce` field when this process finishes successfully. + // The problem is in some abnormal cases, the may not be able to successfully send the RevokeAndAck. + // But we have overwritten the `last_committed_remote_nonce` field with the new nonce. + // While reestablishing the channel, we need to use the old nonce to build the RevokeAndAck message. + // This is why we need to save the old nonce in this field. #[serde_as(as = "Option")] - pub last_used_nonce_in_commitment_signed: Option, + pub last_commitment_signed_remote_nonce: Option, - // The nonces that are sent by the counterparty, the length is at most 2 - #[serde_as(as = "Vec<(U64Hex, PubNonceAsBytes)>")] - pub remote_nonces: Vec<(u64, PubNonce)>, + // While building a CommitmentSigned message, we use the latest remote nonce (the `last_committed_remote_nonce` above) + // to partially sign the commitment transaction. This nonce is also needed for the RevokeAndAck message + // returned from the peer. We need to save this nonce because the counterparty may send other nonces during + // the period when our CommitmentSigned is sent and the counterparty's RevokeAndAck is received. + // This field is used to keep the nonce used by the unconfirmed CommitmentSigned. When we receive a + // RevokeAndAck from the peer, we will use this nonce to validate the RevokeAndAck message. + #[serde_as(as = "Option")] + pub last_revoke_and_ack_remote_nonce: Option, // The latest commitment transaction we're holding, // it can be broadcasted to blockchain by us to force close the channel. @@ -3129,6 +3145,8 @@ pub enum ProcessingChannelError { CapacityError(#[from] CapacityError), #[error("Failed to spawn actor: {0}")] SpawnErr(String), + #[error("Musig2 RoundFinalizeError: {0}")] + Musig2RoundFinalizeError(#[from] RoundFinalizeError), #[error("Musig2 VerifyError: {0}")] Musig2VerifyError(#[from] VerifyError), #[error("Musig2 SigningError: {0}")] @@ -3394,88 +3412,6 @@ pub(crate) fn occupied_capacity( } } -impl From<&ChannelActorState> for Musig2SignContext { - fn from(value: &ChannelActorState) -> Self { - Musig2SignContext { - key_agg_ctx: value.get_musig2_agg_context(), - agg_nonce: value.get_musig2_agg_pubnonce(), - seckey: value.signer.funding_key.clone(), - secnonce: value.get_local_musig2_secnonce(), - } - } -} - -impl From<&ChannelActorState> for Musig2VerifyContext { - fn from(value: &ChannelActorState) -> Self { - Musig2VerifyContext { - key_agg_ctx: value.get_musig2_agg_context(), - agg_nonce: value.get_musig2_agg_pubnonce(), - pubkey: *value.get_remote_funding_pubkey(), - pubnonce: value.get_remote_nonce().clone(), - } - } -} - -impl From<(&ChannelActorState, bool)> for Musig2SignContext { - fn from(value: (&ChannelActorState, bool)) -> Self { - let (channel, local) = value; - let local_pubkey = channel.get_local_channel_public_keys().funding_pubkey; - let remote_pubkey = channel.get_remote_channel_public_keys().funding_pubkey; - let pubkeys = if local { - [local_pubkey, remote_pubkey] - } else { - [remote_pubkey, local_pubkey] - }; - let key_agg_ctx = KeyAggContext::new(pubkeys).expect("Valid pubkeys"); - - let local_nonce = channel.get_local_nonce(); - let remote_nonce = channel.get_remote_nonce(); - let nonces = if local { - [local_nonce, remote_nonce] - } else { - [remote_nonce, local_nonce] - }; - let agg_nonce = AggNonce::sum(nonces); - - Musig2SignContext { - key_agg_ctx, - agg_nonce, - seckey: channel.signer.funding_key.clone(), - secnonce: channel.get_local_musig2_secnonce(), - } - } -} - -impl From<(&ChannelActorState, bool)> for Musig2VerifyContext { - fn from(value: (&ChannelActorState, bool)) -> Self { - let (channel, local) = value; - let local_pubkey = channel.get_local_channel_public_keys().funding_pubkey; - let remote_pubkey = channel.get_remote_channel_public_keys().funding_pubkey; - let pubkeys = if local { - [local_pubkey, remote_pubkey] - } else { - [remote_pubkey, local_pubkey] - }; - let key_agg_ctx = KeyAggContext::new(pubkeys).expect("Valid pubkeys"); - - let local_nonce = channel.get_local_nonce(); - let remote_nonce = channel.get_remote_nonce(); - let nonces = if local { - [local_nonce, remote_nonce] - } else { - [remote_nonce, local_nonce] - }; - let agg_nonce = AggNonce::sum(nonces); - - Musig2VerifyContext { - key_agg_ctx, - agg_nonce, - pubkey: *channel.get_remote_funding_pubkey(), - pubnonce: channel.get_remote_nonce(), - } - } -} - // Constructors for the channel actor state. #[allow(clippy::too_many_arguments)] impl ChannelActorState { @@ -3541,7 +3477,7 @@ impl ChannelActorState { let agg_nonce = AggNonce::sum(self.order_things_for_musig2(local_nonce, remote_nonce.clone())); - let key_agg_ctx = self.get_musig2_agg_context(); + let key_agg_ctx = self.get_deterministic_musig2_agg_context(); let message = channel_announcement.message_to_sign(); @@ -3734,8 +3670,9 @@ impl ChannelActorState { remote_channel_public_keys: Some(remote_pubkeys), commitment_numbers: Default::default(), remote_shutdown_script: Some(remote_shutdown_script), - last_used_nonce_in_commitment_signed: None, - remote_nonces: vec![(0, remote_nonce)], + last_commitment_signed_remote_nonce: None, + last_revoke_and_ack_remote_nonce: None, + last_committed_remote_nonce: Some(remote_nonce), remote_commitment_points: vec![ (0, first_commitment_point), (1, second_commitment_point), @@ -3806,8 +3743,9 @@ impl ChannelActorState { // these values will update after accept channel peer message handled remote_constraints: ChannelConstraints::default(), remote_channel_public_keys: None, - last_used_nonce_in_commitment_signed: None, - remote_nonces: vec![], + last_commitment_signed_remote_nonce: None, + last_revoke_and_ack_remote_nonce: None, + last_committed_remote_nonce: None, commitment_numbers: Default::default(), remote_commitment_points: vec![], local_shutdown_script: shutdown_script, @@ -4026,7 +3964,7 @@ impl ChannelActorState { let local_secnonce = self.get_channel_announcement_musig2_secnonce(); let local_nonce = local_secnonce.public_nonce(); let agg_nonce = AggNonce::sum(self.order_things_for_musig2(local_nonce, remote_nonce)); - let key_agg_ctx = self.get_musig2_agg_context(); + let key_agg_ctx = self.get_deterministic_musig2_agg_context(); let channel_id = self.get_id(); let peer_id = self.get_remote_peer_id(); let channel_outpoint = self.must_get_funding_transaction_outpoint(); @@ -4198,24 +4136,8 @@ impl ChannelActorState { &mut self, network: &ActorRef, ) -> ProcessingChannelResult { - let key_agg_ctx = { - let local_pubkey = self.get_local_channel_public_keys().funding_pubkey; - let remote_pubkey = self.get_remote_channel_public_keys().funding_pubkey; - KeyAggContext::new([remote_pubkey, local_pubkey]).expect("Valid pubkeys") - }; - let x_only_aggregated_pubkey = key_agg_ctx.aggregated_pubkey::().serialize_xonly(); - let sign_ctx = { - let local_nonce = self.get_local_nonce(); - let remote_nonce = self.get_remote_nonce(); - let nonces = [local_nonce, remote_nonce]; - let agg_nonce = AggNonce::sum(nonces); - Musig2SignContext { - key_agg_ctx, - agg_nonce, - seckey: self.signer.funding_key.clone(), - secnonce: self.get_local_musig2_secnonce(), - } - }; + let sign_ctx = self.get_sign_context(false); + let x_only_aggregated_pubkey = sign_ctx.common_ctx.x_only_aggregated_pubkey(); let revocation_partial_signature = { let commitment_tx_fee = calculate_commitment_tx_fee( @@ -4260,11 +4182,8 @@ impl ChannelActorState { ] .concat(), ); - - sign_ctx - .clone() - .sign(message.as_slice()) - .expect("valid signature") + let our_signature = sign_ctx.sign(message.as_slice()).expect("valid signature"); + our_signature }; let commitment_tx_partial_signature = { @@ -4310,6 +4229,7 @@ impl ChannelActorState { )), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); + self.update_last_revoke_and_ack_remote_nonce(); Ok(()) } @@ -4325,15 +4245,6 @@ impl ChannelActorState { self.remote_pubkey.tentacle_peer_id() } - pub fn get_local_secnonce(&self) -> SecNonce { - self.signer - .derive_musig2_nonce(self.get_local_commitment_number()) - } - - pub fn get_local_nonce(&self) -> PubNonce { - self.get_local_secnonce().public_nonce() - } - pub fn get_next_local_secnonce(&self) -> SecNonce { self.signer .derive_musig2_nonce(self.get_next_commitment_number(true)) @@ -4343,43 +4254,29 @@ impl ChannelActorState { self.get_next_local_secnonce().public_nonce() } - pub fn get_remote_nonce(&self) -> PubNonce { - let comitment_number = self.get_remote_commitment_number(); - assert!(self.remote_nonces.len() <= 2); - self.remote_nonces - .iter() - .rev() - .find_map(|(number, nonce)| { - if *number == comitment_number { - Some(nonce.clone()) - } else { - None - } - }) - .expect("get_remote_nonce") + fn get_last_committed_remote_nonce(&self) -> PubNonce { + self.last_committed_remote_nonce + .as_ref() + .expect("always have peer's last committed nonce in normal channel operations") + .clone() } - fn save_remote_nonce(&mut self, nonce: PubNonce) { - let next_remote_number = if self.remote_nonces.is_empty() { - 0 - } else { - self.get_remote_commitment_number() + 1 - }; - self.remote_nonces.push((next_remote_number, nonce)); - if self.remote_nonces.len() > 2 { - self.remote_nonces.remove(0); - } + fn get_last_commitment_signed_remote_nonce(&self) -> Option { + self.last_commitment_signed_remote_nonce.clone() } - fn save_remote_nonce_for_raa(&mut self) { - let nonce = self.get_remote_nonce(); - self.last_used_nonce_in_commitment_signed = Some(nonce); + fn commit_remote_nonce(&mut self, nonce: PubNonce) { + self.last_committed_remote_nonce = Some(nonce); } - fn take_remote_nonce_for_raa(&mut self) -> PubNonce { - self.last_used_nonce_in_commitment_signed - .take() - .expect("set last_used_nonce_in_commitment_signed in commitment signed") + fn update_last_commitment_signed_remote_nonce(&mut self) { + let nonce = self.get_last_committed_remote_nonce(); + self.last_commitment_signed_remote_nonce = Some(nonce); + } + + fn update_last_revoke_and_ack_remote_nonce(&mut self) { + let nonce = self.get_last_committed_remote_nonce(); + self.last_revoke_and_ack_remote_nonce = Some(nonce); } pub fn get_current_commitment_numbers(&self) -> CommitmentNumbers { @@ -4607,12 +4504,14 @@ impl ChannelActorState { } pub fn get_funding_lock_script_xonly_key(&self) -> XOnlyPublicKey { - let pubkey: secp256k1::PublicKey = self.get_musig2_agg_context().aggregated_pubkey(); + let pubkey: secp256k1::PublicKey = self + .get_deterministic_musig2_agg_context() + .aggregated_pubkey(); pubkey.into() } pub fn get_funding_lock_script_xonly(&self) -> [u8; 32] { - self.get_musig2_agg_context() + self.get_deterministic_musig2_agg_context() .aggregated_pubkey::() .serialize_xonly() } @@ -4635,11 +4534,7 @@ impl ChannelActorState { } } - pub fn get_musig2_agg_pubkey(&self) -> Pubkey { - self.get_musig2_agg_context().aggregated_pubkey() - } - - pub fn get_musig2_agg_context(&self) -> KeyAggContext { + pub fn get_deterministic_musig2_agg_context(&self) -> KeyAggContext { let local_pubkey = self.get_local_channel_public_keys().funding_pubkey; let remote_pubkey = self.get_remote_channel_public_keys().funding_pubkey; let keys = self.order_things_for_musig2(local_pubkey, remote_pubkey); @@ -4668,9 +4563,11 @@ impl ChannelActorState { self.get_local_musig2_secnonce().public_nonce() } - pub fn get_musig2_agg_pubnonce(&self) -> AggNonce { - let local_nonce = self.get_local_nonce(); - let remote_nonce = self.get_remote_nonce(); + pub fn get_deterministic_musig2_agg_pubnonce( + &self, + local_nonce: PubNonce, + remote_nonce: PubNonce, + ) -> AggNonce { let nonces = self.order_things_for_musig2(local_nonce, remote_nonce); AggNonce::sum(nonces) } @@ -5018,12 +4915,16 @@ impl ChannelActorState { fn aggregate_partial_signatures_to_consume_funding_cell( &self, - partial_signatures: [PartialSignature; 2], + common_ctx: &Musig2CommonContext, + our_partial_signature: PartialSignature, + their_partial_signature: PartialSignature, tx: &TransactionView, ) -> Result { - let verify_ctx = Musig2VerifyContext::from(self); - let signature = verify_ctx - .aggregate_partial_signatures_for_msg(partial_signatures, tx.hash().as_slice())?; + let signature = common_ctx.aggregate_partial_signatures_for_msg( + our_partial_signature, + their_partial_signature, + tx.hash().as_slice(), + )?; let witness = create_witness_for_funding_cell(self.get_funding_lock_script_xonly(), signature); @@ -5038,19 +4939,23 @@ impl ChannelActorState { psct: &PartiallySignedCommitmentTransaction, ) -> Result<(TransactionView, SettlementData), ProcessingChannelError> { let completed_commitment_tx = { - let sign_ctx = Musig2SignContext::from(self); + let deterministic_sign_ctx = self.get_deterministic_sign_context(); + let our_funding_tx_partial_signature = - sign_ctx.sign(psct.commitment_tx.hash().as_slice())?; + deterministic_sign_ctx.sign(psct.commitment_tx.hash().as_slice())?; + self.aggregate_partial_signatures_to_consume_funding_cell( - [ - psct.funding_tx_partial_signature, - our_funding_tx_partial_signature, - ], + &deterministic_sign_ctx.common_ctx, + our_funding_tx_partial_signature, + psct.funding_tx_partial_signature, &psct.commitment_tx, )? }; let settlement_data = { + let sign_ctx = self.get_sign_context(false); + let x_only_aggregated_pubkey = sign_ctx.common_ctx.x_only_aggregated_pubkey(); + let settlement_tx = &psct.settlement_tx; let commitment_tx = &psct.commitment_tx; let to_local_output = settlement_tx @@ -5086,18 +4991,8 @@ impl ChannelActorState { ] .concat(), ); - let sign_ctx = Musig2SignContext::from((self, false)); - let our_commitment_tx_partial_signature = sign_ctx.sign(message.as_slice())?; - - let verify_ctx = Musig2VerifyContext::from((self, false)); - let aggregated_signature = verify_ctx.aggregate_partial_signatures_for_msg( - [ - our_commitment_tx_partial_signature, - psct.commitment_tx_partial_signature, - ], - message.as_slice(), - )?; - let x_only_aggregated_pubkey = self.get_commitment_lock_script_xonly(false); + let aggregated_signature = sign_ctx + .sign_and_aggregate(message.as_slice(), psct.commitment_tx_partial_signature)?; SettlementData { x_only_aggregated_pubkey, @@ -5141,7 +5036,7 @@ impl ChannelActorState { if self.local_shutdown_info.is_some() && self.remote_shutdown_info.is_some() { let shutdown_tx = self.build_shutdown_tx()?; - let sign_ctx = Musig2SignContext::from(&*self); + let deterministic_sign_ctx = self.get_deterministic_sign_context(); let local_shutdown_info = self .local_shutdown_info @@ -5158,7 +5053,7 @@ impl ChannelActorState { let local_shutdown_signature = match local_shutdown_info.signature { Some(signature) => signature, None => { - let signature = sign_ctx.sign(shutdown_tx.hash().as_slice())?; + let signature = deterministic_sign_ctx.sign(shutdown_tx.hash().as_slice())?; local_shutdown_info.signature = Some(signature); network @@ -5179,7 +5074,9 @@ impl ChannelActorState { if let Some(remote_shutdown_signature) = remote_shutdown_info.signature { let tx: TransactionView = self .aggregate_partial_signatures_to_consume_funding_cell( - [local_shutdown_signature, remote_shutdown_signature], + &deterministic_sign_ctx.common_ctx, + local_shutdown_signature, + remote_shutdown_signature, &shutdown_tx, )?; assert_eq!( @@ -5226,7 +5123,7 @@ impl ChannelActorState { self.to_remote_amount = accept_channel.funding_amount; self.remote_reserved_ckb_amount = accept_channel.reserved_ckb_amount; - self.save_remote_nonce(accept_channel.next_local_nonce.clone()); + self.commit_remote_nonce(accept_channel.next_local_nonce.clone()); let remote_pubkeys = (&accept_channel).into(); self.remote_channel_public_keys = Some(remote_pubkeys); self.remote_commitment_points = vec![ @@ -5431,8 +5328,6 @@ impl ChannelActorState { )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); - self.save_remote_nonce(commitment_signed.next_local_nonce); - self.latest_commitment_transaction = Some(commitment_tx.data()); match flags { CommitmentSignedFlags::SigningCommitment(flags) => { let flags = flags | SigningCommitmentFlags::THEIR_COMMITMENT_SIGNED_SENT; @@ -5457,6 +5352,8 @@ impl ChannelActorState { } } } + self.commit_remote_nonce(commitment_signed.next_local_nonce); + self.latest_commitment_transaction = Some(commitment_tx.data()); Ok(()) } @@ -5690,33 +5587,8 @@ impl ChannelActorState { next_per_commitment_point, } = revoke_and_ack; - let key_agg_ctx = { - let local_pubkey = self.get_local_channel_public_keys().funding_pubkey; - let remote_pubkey = self.get_remote_channel_public_keys().funding_pubkey; - KeyAggContext::new([local_pubkey, remote_pubkey]).expect("Valid pubkeys") - }; - let x_only_aggregated_pubkey = key_agg_ctx.aggregated_pubkey::().serialize_xonly(); - let (verify_ctx, sign_ctx) = { - let local_nonce = self.get_local_nonce(); - let remote_nonce = self.take_remote_nonce_for_raa(); - let nonces = [remote_nonce.clone(), local_nonce]; - let agg_nonce = AggNonce::sum(nonces); - - ( - Musig2VerifyContext { - key_agg_ctx: key_agg_ctx.clone(), - agg_nonce: agg_nonce.clone(), - pubkey: *self.get_remote_funding_pubkey(), - pubnonce: remote_nonce, - }, - Musig2SignContext { - key_agg_ctx, - agg_nonce, - seckey: self.signer.funding_key.clone(), - secnonce: self.get_local_musig2_secnonce(), - }, - ) - }; + let sign_ctx = self.get_sign_context_for_revoke_and_ack_message()?; + let x_only_aggregated_pubkey = sign_ctx.common_ctx.x_only_aggregated_pubkey(); let revocation_data = { let commitment_tx_fee = calculate_commitment_tx_fee( @@ -5761,14 +5633,8 @@ impl ChannelActorState { ] .concat(), ); - - verify_ctx.verify(revocation_partial_signature, message.as_slice())?; - - let our_signature = sign_ctx.clone().sign(message.as_slice())?; - let aggregated_signature = verify_ctx.aggregate_partial_signatures_for_msg( - [revocation_partial_signature, our_signature], - message.as_slice(), - )?; + let aggregated_signature = + sign_ctx.sign_and_aggregate(message.as_slice(), revocation_partial_signature)?; RevocationData { commitment_number, x_only_aggregated_pubkey, @@ -5799,12 +5665,8 @@ impl ChannelActorState { ] .concat(), ); - verify_ctx.verify(commitment_tx_partial_signature, message.as_slice())?; - let our_signature = sign_ctx.sign(message.as_slice())?; - let aggregated_signature = verify_ctx.aggregate_partial_signatures_for_msg( - [commitment_tx_partial_signature, our_signature], - message.as_slice(), - )?; + let aggregated_signature = + sign_ctx.sign_and_aggregate(message.as_slice(), commitment_tx_partial_signature)?; SettlementData { x_only_aggregated_pubkey, @@ -5943,7 +5805,17 @@ impl ChannelActorState { // Resetting our remote commitment number to the actual remote commitment number // and resend the RevokeAndAck message. self.set_remote_commitment_number(acutal_remote_commitment_number); + // Resetting the remote nonce to build the RevokeAndAck message + let last_commited_nonce = self.get_last_committed_remote_nonce(); + let used_nonce = self + .last_revoke_and_ack_remote_nonce + .as_ref() + .expect("must have set last_revoke_and_ack_remote_nonce") + .clone(); + self.commit_remote_nonce(used_nonce); self.send_revoke_and_ack_message(network)?; + // Now we can reset the remote nonce to the "real" last committed nonce + self.commit_remote_nonce(last_commited_nonce); let need_commitment_signed = self.tlc_state.update_for_commitment_signed(); if need_commitment_signed { network @@ -6072,24 +5944,8 @@ impl ChannelActorState { } fn build_init_commitment_tx_signature(&self) -> Result { - let key_agg_ctx = { - let local_pubkey = self.get_local_channel_public_keys().funding_pubkey; - let remote_pubkey = self.get_remote_channel_public_keys().funding_pubkey; - KeyAggContext::new([remote_pubkey, local_pubkey]).expect("Valid pubkeys") - }; - let x_only_aggregated_pubkey = key_agg_ctx.aggregated_pubkey::().serialize_xonly(); - let sign_ctx = { - let local_nonce = self.get_local_nonce(); - let remote_nonce = self.get_remote_nonce(); - let nonces = [local_nonce, remote_nonce]; - let agg_nonce = AggNonce::sum(nonces); - Musig2SignContext { - key_agg_ctx, - agg_nonce, - seckey: self.signer.funding_key.clone(), - secnonce: self.get_local_musig2_secnonce(), - } - }; + let sign_ctx = self.get_sign_context(true); + let x_only_aggregated_pubkey = sign_ctx.common_ctx.x_only_aggregated_pubkey(); let ([to_local_output, to_remote_output], [to_local_output_data, to_remote_output_data]) = self.build_settlement_transaction_outputs(false); let version = 0u64; @@ -6110,29 +5966,17 @@ impl ChannelActorState { .concat(), ); - sign_ctx.sign(message.as_slice()) + let signature = sign_ctx.sign(message.as_slice()); + signature } fn check_init_commitment_tx_signature( &self, signature: PartialSignature, ) -> Result { - let key_agg_ctx = { - let local_pubkey = self.get_local_channel_public_keys().funding_pubkey; - let remote_pubkey = self.get_remote_channel_public_keys().funding_pubkey; - KeyAggContext::new([local_pubkey, remote_pubkey]).expect("Valid pubkeys") - }; - let x_only_aggregated_pubkey = key_agg_ctx.aggregated_pubkey::().serialize_xonly(); - let local_nonce = self.get_local_nonce(); - let remote_nonce = self.get_remote_nonce(); - let nonces = [remote_nonce.clone(), local_nonce]; - let agg_nonce = AggNonce::sum(nonces); - let verify_ctx = Musig2VerifyContext { - key_agg_ctx: key_agg_ctx.clone(), - agg_nonce: agg_nonce.clone(), - pubkey: *self.get_remote_funding_pubkey(), - pubnonce: remote_nonce, - }; + let sign_ctx = self.get_sign_context(false); + let x_only_aggregated_pubkey = sign_ctx.common_ctx.x_only_aggregated_pubkey(); + let ([to_local_output, to_remote_output], [to_local_output_data, to_remote_output_data]) = self.build_settlement_transaction_outputs(true); let version = 0u64; @@ -6153,21 +5997,9 @@ impl ChannelActorState { .concat(), ); - verify_ctx.verify(signature, message.as_slice())?; - let settlement_data = { - let sign_ctx = Musig2SignContext { - key_agg_ctx, - agg_nonce, - seckey: self.signer.funding_key.clone(), - secnonce: self.get_local_musig2_secnonce(), - }; - - let our_signature = sign_ctx.sign(message.as_slice())?; - let aggregated_signature = verify_ctx.aggregate_partial_signatures_for_msg( - [signature, our_signature], - message.as_slice(), - )?; + let aggregated_signature = + sign_ctx.sign_and_aggregate(message.as_slice(), signature)?; SettlementData { x_only_aggregated_pubkey, @@ -6222,15 +6054,123 @@ impl ChannelActorState { local_pubkey <= remote_pubkey } - // Order some items (like pubkey and nonce) from holders and counterparty in musig2. - fn order_things_for_musig2(&self, holder: T, counterparty: T) -> [T; 2] { + // Order some items (like pubkey and nonce) from local and remote in musig2. + fn order_things_for_musig2(&self, local: T, remote: T) -> [T; 2] { if self.should_local_go_first_in_musig2() { - [holder, counterparty] + [local, remote] } else { - [counterparty, holder] + [remote, local] } } + fn get_deterministic_common_context(&self) -> Musig2CommonContext { + let local_first = self.should_local_go_first_in_musig2(); + let key_agg_ctx = self.get_deterministic_musig2_agg_context(); + let remote_nonce = self.get_last_committed_remote_nonce(); + let local_nonce = self.get_local_musig2_pubnonce(); + let agg_nonce = AggNonce::sum(if local_first { + [local_nonce, remote_nonce] + } else { + [remote_nonce, local_nonce] + }); + Musig2CommonContext { + local_first, + key_agg_ctx, + agg_nonce, + } + } + + // A deterministic `Musig2VerifyContext` is a verifying context that has the same basic configuration + // for both parties. This is mostly used by us to verify transactions to consume the funding cell, + // which uses a deterministic aggregated pubkey for both parties. + fn get_deterministic_verify_context(&self) -> Musig2VerifyContext { + let common_ctx = self.get_deterministic_common_context(); + Musig2VerifyContext { + common_ctx, + pubkey: self.get_remote_funding_pubkey().clone(), + pubnonce: self.get_last_committed_remote_nonce(), + } + } + + fn get_verify_context(&self) -> Musig2VerifyContext { + // We are always verifying a commitment transaction that is broadcast by us, + // so we can always pass false to get_musig2_common_ctx. + let common_ctx = self.get_musig2_common_ctx(false); + + Musig2VerifyContext { + common_ctx, + pubkey: self.get_remote_funding_pubkey().clone(), + pubnonce: self.get_last_committed_remote_nonce(), + } + } + + // A deterministic `Musig2SignContext` is a signing context that has the same basic configuration + // for both parties. This is mostly used by us to sign transactions to consume the funding cell, + // which uses a deterministic aggregated pubkey for both parties. + fn get_deterministic_sign_context(&self) -> Musig2SignContext { + let common_ctx = self.get_deterministic_common_context(); + Musig2SignContext { + common_ctx, + seckey: self.signer.funding_key.clone(), + secnonce: self.get_local_musig2_secnonce(), + } + } + + // This function is used to construct a `Musig2SignContext` with which we can easily sign + // and aggregate partial signatures. The parameter for_remote is used to indicate the direction + // of commitment transation (just like the same parameter used in building commitment transactions). + // This is also due to the fact commitment transactions are asymmetrical (A's broadcastable commitment + // transactions are different from B's broadcastable commitment transactions), sometimes we need to + // construct different `Musig2SignContext` depending on the direction of commitment transaction. + // For example, the `Musig2SignContext`s used by A to construct `CommitmentSigned` and `RevokeAndAck` + // messages to B are different. A needs to build a commitment transaction that is broadcast by B + // to construct a `CommitmentSigned` message, but when constructing `RevokeAndAck` A needs to + // build an old commitment transaction that is broadcast by himself. This is the reason why + // we need a `for_remote` parameter. It serves the same function as `for_remote` in functions + // like `build_commitment_and_settlement_tx`. + fn get_sign_context(&self, for_remote: bool) -> Musig2SignContext { + let common_ctx = self.get_musig2_common_ctx(for_remote); + + Musig2SignContext { + common_ctx, + seckey: self.signer.funding_key.clone(), + secnonce: self.get_local_musig2_secnonce(), + } + } + + // As explained in the documentation of `last_used_remote_nonce` field, we need to + // use a saved remote nonce because the latest remote nonce may be different from the + // one we used while sending CommitmentSigned message. + fn get_sign_context_for_revoke_and_ack_message( + &self, + ) -> Result { + let common_ctx = { + let local_pubkey = self.get_local_channel_public_keys().funding_pubkey; + let remote_pubkey = self.get_remote_channel_public_keys().funding_pubkey; + let pubkeys = [local_pubkey, remote_pubkey]; + let key_agg_ctx = KeyAggContext::new(pubkeys).expect("Valid pubkeys"); + let remote_nonce = + self.get_last_commitment_signed_remote_nonce() + .ok_or(ProcessingChannelError::InvalidState( + "No last used remote nonce found, has the peer sent a RevokeAndAck without us sending CommitmentSigned" + .to_string(), + ))?; + let local_nonce = self.get_local_musig2_pubnonce(); + let agg_nonce = AggNonce::sum([local_nonce, remote_nonce]); + Musig2CommonContext { + local_first: true, + key_agg_ctx, + agg_nonce, + } + }; + + Ok(Musig2SignContext { + common_ctx, + seckey: self.signer.funding_key.clone(), + secnonce: self.get_local_musig2_secnonce(), + }) + } + // Should the local send tx_signatures first? // In order to avoid deadlock, we need to define an order for sending tx_signatures. // Currently the order of sending tx_signatures is defined as follows: @@ -6452,16 +6392,39 @@ impl ChannelActorState { } } - fn get_commitment_lock_script_xonly(&self, for_remote: bool) -> [u8; 32] { + // For different directions of commitment transactions, we put pubkeys and nonces + // in different order. It is a coincidency that in the current code when we are building + // a commitment transaction for the remote, we will put our pubkey/nonce first. + // That is to say, `for_remote` is equivalent to this function's parameter `local_first`. + // But, the name local_first is more descriptive in the context of ordering musig2-related + // stuff. + fn get_musig2_common_ctx(&self, local_first: bool) -> Musig2CommonContext { let local_pubkey = self.get_local_channel_public_keys().funding_pubkey; let remote_pubkey = self.get_remote_channel_public_keys().funding_pubkey; - let pubkeys = if for_remote { + let pubkeys = if local_first { [local_pubkey, remote_pubkey] } else { [remote_pubkey, local_pubkey] }; - KeyAggContext::new(pubkeys) - .expect("Valid pubkeys") + let key_agg_ctx = KeyAggContext::new(pubkeys).expect("Valid pubkeys"); + let remote_nonce = self.get_last_committed_remote_nonce(); + let local_nonce = self.get_local_musig2_pubnonce(); + + let agg_nonce = AggNonce::sum(if local_first { + [local_nonce, remote_nonce] + } else { + [remote_nonce, local_nonce] + }); + Musig2CommonContext { + local_first, + key_agg_ctx, + agg_nonce, + } + } + + fn get_commitment_lock_script_xonly(&self, for_remote: bool) -> [u8; 32] { + self.get_musig2_common_ctx(for_remote) + .key_agg_ctx .aggregated_pubkey::() .serialize_xonly() } @@ -6598,13 +6561,12 @@ impl ChannelActorState { ) -> Result { let (commitment_tx, settlement_tx) = self.build_commitment_and_settlement_tx(false); - let verify_ctx = Musig2VerifyContext::from(self); - verify_ctx.verify( + let deterministic_verify_ctx = self.get_deterministic_verify_context(); + deterministic_verify_ctx.verify( funding_tx_partial_signature, commitment_tx.hash().as_slice(), )?; - let verify_ctx = Musig2VerifyContext::from((self, false)); let to_local_output = settlement_tx .outputs() .get(0) @@ -6638,6 +6600,7 @@ impl ChannelActorState { ] .concat(), ); + let verify_ctx = self.get_verify_context(); verify_ctx.verify(commitment_tx_partial_signature, message.as_slice())?; Ok(PartiallySignedCommitmentTransaction { @@ -6654,10 +6617,10 @@ impl ChannelActorState { ) -> Result<(PartialSignature, PartialSignature), ProcessingChannelError> { let (commitment_tx, settlement_tx) = self.build_commitment_and_settlement_tx(true); - let sign_ctx = Musig2SignContext::from(self); - let funding_tx_partial_signature = sign_ctx.sign(commitment_tx.hash().as_slice())?; + let deterministic_sign_ctx = self.get_deterministic_sign_context(); + let funding_tx_partial_signature = + deterministic_sign_ctx.sign(commitment_tx.hash().as_slice())?; - let sign_ctx = Musig2SignContext::from((self, true)); let to_local_output = settlement_tx .outputs() .get(0) @@ -6692,6 +6655,7 @@ impl ChannelActorState { .concat(), ); + let sign_ctx = self.get_sign_context(true); let commitment_tx_partial_signature = sign_ctx.sign(message.as_slice())?; Ok(( @@ -6800,30 +6764,28 @@ pub fn create_witness_for_commitment_cell( .expect("Witness length should be correct") } -pub struct Musig2VerifyContext { - pub key_agg_ctx: KeyAggContext, - pub agg_nonce: AggNonce, - pub pubkey: Pubkey, - pub pubnonce: PubNonce, +// The common musig2 configuration that is used both by signing and verifying. +#[derive(Debug)] +struct Musig2CommonContext { + // This parameter is also saved to the context because it is useful for + // aggregating partial signatures. + local_first: bool, + key_agg_ctx: KeyAggContext, + agg_nonce: AggNonce, } -impl Musig2VerifyContext { - pub fn verify(&self, signature: PartialSignature, message: &[u8]) -> Result<(), VerifyError> { - verify_partial( - &self.key_agg_ctx, - signature, - &self.agg_nonce, - self.pubkey, - &self.pubnonce, - message, - ) - } - - pub fn aggregate_partial_signatures_for_msg( +impl Musig2CommonContext { + fn aggregate_partial_signatures_for_msg( &self, - partial_signatures: [PartialSignature; 2], + local_signature: PartialSignature, + remote_signature: PartialSignature, message: &[u8], ) -> Result { + let partial_signatures = if self.local_first { + [local_signature, remote_signature] + } else { + [remote_signature, local_signature] + }; aggregate_partial_signatures( &self.key_agg_ctx, &self.agg_nonce, @@ -6831,26 +6793,62 @@ impl Musig2VerifyContext { message, ) } + + pub fn x_only_aggregated_pubkey(&self) -> [u8; 32] { + self.key_agg_ctx + .aggregated_pubkey::() + .serialize_xonly() + } } -#[derive(Clone)] -pub struct Musig2SignContext { - key_agg_ctx: KeyAggContext, - agg_nonce: AggNonce, +struct Musig2VerifyContext { + common_ctx: Musig2CommonContext, + pubkey: Pubkey, + pubnonce: PubNonce, +} + +impl Musig2VerifyContext { + fn verify(&self, signature: PartialSignature, message: &[u8]) -> Result<(), VerifyError> { + verify_partial( + &self.common_ctx.key_agg_ctx, + signature, + &self.common_ctx.agg_nonce, + self.pubkey, + &self.pubnonce, + message, + ) + } +} + +struct Musig2SignContext { + common_ctx: Musig2CommonContext, seckey: Privkey, secnonce: SecNonce, } impl Musig2SignContext { - pub fn sign(self, message: &[u8]) -> Result { + fn sign(&self, message: &[u8]) -> Result { sign_partial( - &self.key_agg_ctx, - self.seckey, + &self.common_ctx.key_agg_ctx, + self.seckey.clone(), self.secnonce.clone(), - &self.agg_nonce, + &self.common_ctx.agg_nonce, message, ) } + + fn sign_and_aggregate( + &self, + message: &[u8], + remote_signature: PartialSignature, + ) -> Result { + let local_signature = self.sign(message)?; + Ok(self.common_ctx.aggregate_partial_signatures_for_msg( + local_signature, + remote_signature, + message, + )?) + } } /// One counterparty's public keys which do not change over the life of a channel. diff --git a/src/fiber/tests/channel.rs b/src/fiber/tests/channel.rs index c5bd63090..391430239 100644 --- a/src/fiber/tests/channel.rs +++ b/src/fiber/tests/channel.rs @@ -3633,7 +3633,7 @@ async fn test_revoke_old_commitment_transaction() { let witness = [ empty_witness_args.to_vec(), vec![0xFF], - 0u64.to_be_bytes().to_vec(), + revocation_data.commitment_number.to_be_bytes().to_vec(), revocation_data.x_only_aggregated_pubkey.to_vec(), revocation_data.aggregated_signature.serialize().to_vec(), ] @@ -3656,6 +3656,7 @@ async fn test_channel_with_simple_update_operation() { #[tokio::test] async fn test_create_channel() { + init_tracing(); let [mut node_a, mut node_b] = NetworkNode::new_n_interconnected_nodes().await; let message = |rpc_reply| { diff --git a/src/store/tests/store.rs b/src/store/tests/store.rs index f7d392797..b6c6d7926 100644 --- a/src/store/tests/store.rs +++ b/src/store/tests/store.rs @@ -354,8 +354,9 @@ fn test_channel_actor_state_store() { }), commitment_numbers: Default::default(), remote_shutdown_script: Some(Script::default()), - last_used_nonce_in_commitment_signed: None, - remote_nonces: vec![(0, pub_nonce.clone())], + last_committed_remote_nonce: None, + last_revoke_and_ack_remote_nonce: None, + last_commitment_signed_remote_nonce: None, remote_commitment_points: vec![ (0, gen_rand_fiber_public_key()), (1, gen_rand_fiber_public_key()), From 181ea20d150b27277d468338e191b122dc67fb69 Mon Sep 17 00:00:00 2001 From: Yukang Date: Thu, 9 Jan 2025 13:29:20 +0800 Subject: [PATCH 7/9] fix: Fix issue that the failure of middle hop tlc forwarding does not returned (#457) --- src/fiber/network.rs | 16 +- src/fiber/tests/payment.rs | 617 +++++++++++++++------------------- src/fiber/tests/test_utils.rs | 29 +- 3 files changed, 313 insertions(+), 349 deletions(-) diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 069427f65..28a00fbcd 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -501,7 +501,7 @@ pub struct AcceptChannelCommand { pub tlc_expiry_delta: Option, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SendOnionPacketCommand { pub peeled_onion_packet: PeeledPaymentOnionPacket, pub previous_tlc: Option<(Hash256, u64)>, @@ -1137,7 +1137,19 @@ where } NetworkActorCommand::SendPaymentOnionPacket(command) => { - let _ = self.handle_send_onion_packet_command(state, command).await; + let res = self + .handle_send_onion_packet_command(state, command.clone()) + .await; + if let Err(err) = res { + self.on_add_tlc_result_event( + myself, + state, + command.payment_hash, + Some((ProcessingChannelError::TlcForwardingError(err.clone()), err)), + command.previous_tlc, + ) + .await; + } } NetworkActorCommand::PeelPaymentOnionPacket(onion_packet, payment_hash, reply) => { let response = onion_packet diff --git a/src/fiber/tests/payment.rs b/src/fiber/tests/payment.rs index 2812f21a1..db9fe7d47 100644 --- a/src/fiber/tests/payment.rs +++ b/src/fiber/tests/payment.rs @@ -27,47 +27,16 @@ async fn test_send_payment_for_direct_channel_and_dry_run() { .await; let [mut node_0, mut node_1] = nodes.try_into().expect("2 nodes"); let source_node = &mut node_0; - let target_pubkey = node_1.pubkey.clone(); let res = source_node - .send_payment(SendPaymentCommand { - target_pubkey: Some(target_pubkey.clone()), - amount: Some(10000000000), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: false, - hop_hints: None, - dry_run: true, - }) + .send_payment_keysend(&node_1, 10000000000, true) .await; eprintln!("res: {:?}", res); assert!(res.is_ok()); let res = source_node - .send_payment(SendPaymentCommand { - target_pubkey: Some(target_pubkey.clone()), - amount: Some(10000000000), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: false, - hop_hints: None, - dry_run: false, - }) + .send_payment_keysend(&node_1, 10000000000, false) .await; eprintln!("res: {:?}", res); @@ -77,22 +46,7 @@ async fn test_send_payment_for_direct_channel_and_dry_run() { source_node.wait_until_success(payment_hash).await; let res = node_1 - .send_payment(SendPaymentCommand { - target_pubkey: Some(source_node.pubkey.clone()), - amount: Some(10000000000), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: false, - hop_hints: None, - dry_run: false, - }) + .send_payment_keysend(&source_node, 10000000000, false) .await; eprintln!("res: {:?}", res); @@ -144,46 +98,12 @@ async fn test_send_payment_for_pay_self() { let node_2_channel2_balance = node_2.get_local_balance_from_channel(channels[2]); // now node_0 -> node_2 will be ok only with node_1, so the fee is larger than 0 - let res = node_0 - .send_payment(SendPaymentCommand { - target_pubkey: Some(node_2.pubkey.clone()), - amount: Some(60000000), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: false, - hop_hints: None, - dry_run: true, - }) - .await; + let res = node_0.send_payment_keysend(&node_2, 60000000, true).await; assert!(res.unwrap().fee > 0); // node_0 -> node_0 will be ok for dry_run if `allow_self_payment` is true - let res = node_0 - .send_payment(SendPaymentCommand { - target_pubkey: Some(node_0.pubkey.clone()), - amount: Some(60000000), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: true, - hop_hints: None, - dry_run: false, - }) - .await; + let res = node_0.send_payment_keysend_to_self(60000000, false).await; eprintln!("res: {:?}", res); assert!(res.is_ok()); @@ -223,24 +143,7 @@ async fn test_send_payment_for_pay_self() { // node_0 -> node_2 will be ok with direct channel2, // since after payself this channel now have enough balance, so the fee is 0 - let res = node_0 - .send_payment(SendPaymentCommand { - target_pubkey: Some(node_2.pubkey.clone()), - amount: Some(60000000), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: false, - hop_hints: None, - dry_run: true, - }) - .await; + let res = node_0.send_payment_keysend(&node_2, 60000000, true).await; eprintln!("res: {:?}", res); assert_eq!(res.unwrap().fee, 0); @@ -267,24 +170,7 @@ async fn test_send_payment_for_pay_self_with_two_nodes() { let node_1_channel1_balance = node_1.get_local_balance_from_channel(channels[1]); // node_0 -> node_0 will be ok for dry_run if `allow_self_payment` is true - let res = node_0 - .send_payment(SendPaymentCommand { - target_pubkey: Some(node_0.pubkey.clone()), - amount: Some(60000000), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: true, - hop_hints: None, - dry_run: false, - }) - .await; + let res = node_0.send_payment_keysend_to_self(60000000, false).await; eprintln!("res: {:?}", res); assert!(res.is_ok()); @@ -353,24 +239,7 @@ async fn test_send_payment_with_more_capacity_for_payself() { let node_2_channel2_balance = node_2.get_local_balance_from_channel(channels[2]); // node_0 -> node_0 will be ok if `allow_self_payment` is true - let res = node_0 - .send_payment(SendPaymentCommand { - target_pubkey: Some(node_0.pubkey.clone()), - amount: Some(60000000), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: true, - hop_hints: None, - dry_run: false, - }) - .await; + let res = node_0.send_payment_keysend_to_self(60000000, false).await; eprintln!("res: {:?}", res); assert!(res.is_ok()); @@ -655,42 +524,12 @@ async fn test_send_payment_select_channel_with_hop_hints() { let (nodes, channels) = create_n_nodes_with_index_and_amounts_with_established_channel( &[ - ( - (0, 1), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), // there are 3 channels from node1 -> node2 - ( - (1, 2), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (1, 2), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (1, 2), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (2, 3), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((2, 3), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), ], 4, true, @@ -832,10 +671,10 @@ async fn test_send_payment_two_nodes_with_hop_hints_and_multiple_channels() { let (nodes, channels) = create_n_nodes_with_index_and_amounts_with_established_channel( &[ - ((0, 1), (MIN_RESERVED_CKB + 10000000000, MIN_RESERVED_CKB)), - ((0, 1), (MIN_RESERVED_CKB + 10000000000, MIN_RESERVED_CKB)), - ((1, 0), (MIN_RESERVED_CKB + 10000000000, MIN_RESERVED_CKB)), - ((1, 0), (MIN_RESERVED_CKB + 10000000000, MIN_RESERVED_CKB)), + ((0, 1), (HUGE_CKB_AMOUNT, MIN_RESERVED_CKB)), + ((0, 1), (HUGE_CKB_AMOUNT, MIN_RESERVED_CKB)), + ((1, 0), (HUGE_CKB_AMOUNT, MIN_RESERVED_CKB)), + ((1, 0), (HUGE_CKB_AMOUNT, MIN_RESERVED_CKB)), ], 2, true, @@ -1045,90 +884,33 @@ async fn test_network_three_nodes_two_channels_send_each_other() { let _span = tracing::info_span!("node", node = "test").entered(); let (nodes, channels) = create_n_nodes_with_index_and_amounts_with_established_channel( &[ - ( - (0, 1), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (1, 2), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), ], 3, true, ) .await; - let [node_a, node_b, node_c] = nodes.try_into().expect("3 nodes"); + let [mut node_a, node_b, mut node_c] = nodes.try_into().expect("3 nodes"); // Wait for the channel announcement to be broadcasted tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; let node_b_old_balance_channel_0 = node_b.get_local_balance_from_channel(channels[0]); let node_b_old_balance_channel_1 = node_b.get_local_balance_from_channel(channels[1]); - let node_a_pubkey = node_a.pubkey.clone(); - let node_c_pubkey = node_c.pubkey.clone(); - let amount_a_to_c = 60000; - let message = |rpc_reply| -> NetworkActorMessage { - NetworkActorMessage::Command(NetworkActorCommand::SendPayment( - SendPaymentCommand { - target_pubkey: Some(node_c_pubkey.clone()), - amount: Some(amount_a_to_c), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: false, - hop_hints: None, - dry_run: false, - }, - rpc_reply, - )) - }; - - let res = call!(node_a.network_actor, message) - .expect("node_a alive") + let res = node_a + .send_payment_keysend(&node_c, amount_a_to_c, false) + .await .unwrap(); let payment_hash1 = res.payment_hash; let fee1 = res.fee; eprintln!("payment_hash1: {:?}", payment_hash1); let amount_c_to_a = 50000; - let message = |rpc_reply| -> NetworkActorMessage { - NetworkActorMessage::Command(NetworkActorCommand::SendPayment( - SendPaymentCommand { - target_pubkey: Some(node_a_pubkey.clone()), - amount: Some(amount_c_to_a), - payment_hash: None, - final_tlc_expiry_delta: None, - tlc_expiry_limit: None, - invoice: None, - timeout: None, - max_fee_amount: None, - max_parts: None, - keysend: Some(true), - udt_type_script: None, - allow_self_payment: false, - hop_hints: None, - dry_run: false, - }, - rpc_reply, - )) - }; - - let res = call!(node_c.network_actor, message) - .expect("node_a alive") + let res = node_c + .send_payment_keysend(&node_a, amount_c_to_a, false) + .await .unwrap(); let payment_hash2 = res.payment_hash; @@ -1157,34 +939,10 @@ async fn test_network_three_nodes_send_each_other() { let _span = tracing::info_span!("node", node = "test").entered(); let (nodes, channels) = create_n_nodes_with_index_and_amounts_with_established_channel( &[ - ( - (0, 1), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (1, 2), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (2, 1), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (1, 0), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((2, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 0), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), ], 3, true, @@ -1298,20 +1056,8 @@ async fn test_send_payment_bench_test() { let _span = tracing::info_span!("node", node = "test").entered(); let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel( &[ - ( - (0, 1), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (1, 2), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), ], 3, true, @@ -1324,7 +1070,10 @@ async fn test_send_payment_bench_test() { let mut all_sent = HashSet::new(); for i in 1..=10 { - let payment = node_0.send_payment_keysend(&node_2, 1000).await.unwrap(); + let payment = node_0 + .send_payment_keysend(&node_2, 1000, false) + .await + .unwrap(); eprintln!("payment: {:?}", payment); all_sent.insert(payment.payment_hash); eprintln!("send: {} payment_hash: {:?} sent", i, payment.payment_hash); @@ -1361,20 +1110,8 @@ async fn test_send_payment_three_nodes_wait_succ_bench_test() { let _span = tracing::info_span!("node", node = "test").entered(); let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel( &[ - ( - (0, 1), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (1, 2), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), ], 3, true, @@ -1387,7 +1124,10 @@ async fn test_send_payment_three_nodes_wait_succ_bench_test() { let mut all_sent = vec![]; for i in 1..=10 { - let payment = node_0.send_payment_keysend(&node_2, 1000).await.unwrap(); + let payment = node_0 + .send_payment_keysend(&node_2, 1000, false) + .await + .unwrap(); all_sent.push(payment.payment_hash); eprintln!( "send: {} payment_hash: {:?} sentxx", @@ -1406,20 +1146,8 @@ async fn test_send_payment_three_nodes_send_each_other_bench_test() { let _span = tracing::info_span!("node", node = "test").entered(); let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel( &[ - ( - (0, 1), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (1, 2), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), ], 3, true, @@ -1432,11 +1160,17 @@ async fn test_send_payment_three_nodes_send_each_other_bench_test() { let mut all_sent = vec![]; for i in 1..=5 { - let payment1 = node_0.send_payment_keysend(&node_2, 1000).await.unwrap(); + let payment1 = node_0 + .send_payment_keysend(&node_2, 1000, false) + .await + .unwrap(); all_sent.push(payment1.payment_hash); eprintln!("send: {} payment_hash: {:?} sent", i, payment1.payment_hash); - let payment2 = node_2.send_payment_keysend(&node_0, 1000).await.unwrap(); + let payment2 = node_2 + .send_payment_keysend(&node_0, 1000, false) + .await + .unwrap(); all_sent.push(payment2.payment_hash); eprintln!("send: {} payment_hash: {:?} sent", i, payment2.payment_hash); tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; @@ -1450,22 +1184,10 @@ async fn test_send_payment_three_nodes_send_each_other_bench_test() { async fn test_send_payment_three_nodes_bench_test() { init_tracing(); let _span = tracing::info_span!("node", node = "test").entered(); - let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel( + let (nodes, channels) = create_n_nodes_with_index_and_amounts_with_established_channel( &[ - ( - (0, 1), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), - ( - (1, 2), - ( - MIN_RESERVED_CKB + 10000000000, - MIN_RESERVED_CKB + 10000000000, - ), - ), + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), ], 3, true, @@ -1476,29 +1198,61 @@ async fn test_send_payment_three_nodes_bench_test() { tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; let mut all_sent = HashSet::new(); + let mut node_2_got_fee = 0; + let mut node1_got_amount = 0; + let mut node_1_sent_fee = 0; + let mut node3_got_amount = 0; + let mut node_3_sent_fee = 0; + let mut node_2_ch1_sent_amount = 0; + let mut node_2_ch2_sent_amount = 0; + + let old_node_1_amount = node_1.get_local_balance_from_channel(channels[0]); + let old_node_2_chnnale1_amount = node_2.get_local_balance_from_channel(channels[0]); + let old_node_2_chnnale2_amount = node_2.get_local_balance_from_channel(channels[1]); + let old_node_3_amount = node_3.get_local_balance_from_channel(channels[1]); for i in 1..=4 { - let payment1 = node_1.send_payment_keysend(&node_3, 1000).await.unwrap(); - all_sent.insert((1, payment1.payment_hash)); + let payment1 = node_1 + .send_payment_keysend(&node_3, 1000, false) + .await + .unwrap(); + all_sent.insert((1, payment1.payment_hash, payment1.fee)); eprintln!("send: {} payment_hash: {:?} sent", i, payment1.payment_hash); - - let payment2 = node_2.send_payment_keysend(&node_3, 1000).await.unwrap(); - all_sent.insert((2, payment2.payment_hash)); + node_1_sent_fee += payment1.fee; + node_2_got_fee += payment1.fee; + + let payment2 = node_2 + .send_payment_keysend(&node_3, 1000, false) + .await + .unwrap(); + all_sent.insert((2, payment2.payment_hash, payment2.fee)); eprintln!("send: {} payment_hash: {:?} sent", i, payment2.payment_hash); - - let payment3 = node_2.send_payment_keysend(&node_1, 1000).await.unwrap(); - all_sent.insert((2, payment3.payment_hash)); + node_2_ch1_sent_amount += 1000; + node1_got_amount += 1000; + + let payment3 = node_2 + .send_payment_keysend(&node_1, 1000, false) + .await + .unwrap(); + all_sent.insert((2, payment3.payment_hash, payment3.fee)); eprintln!("send: {} payment_hash: {:?} sent", i, payment3.payment_hash); - - let payment4 = node_3.send_payment_keysend(&node_1, 1000).await.unwrap(); - all_sent.insert((3, payment4.payment_hash)); + node_2_ch2_sent_amount += 1000; + node3_got_amount += 1000; + + let payment4 = node_3 + .send_payment_keysend(&node_1, 1000, false) + .await + .unwrap(); + all_sent.insert((3, payment4.payment_hash, payment4.fee)); eprintln!("send: {} payment_hash: {:?} sent", i, payment4.payment_hash); + node_3_sent_fee += payment4.fee; + node_2_got_fee += payment4.fee; } loop { tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; - for (node_index, payment_hash) in all_sent.clone().iter() { + for (node_index, payment_hash, fee) in all_sent.clone().iter() { let node = match node_index { 1 => &mut node_1, 2 => &mut node_2, @@ -1510,7 +1264,7 @@ async fn test_send_payment_three_nodes_bench_test() { eprintln!("got payment: {:?} status: {:?}", payment_hash, status); if status == PaymentSessionStatus::Success { eprintln!("payment_hash: {:?} success", payment_hash); - all_sent.remove(&(*node_index, *payment_hash)); + all_sent.remove(&(*node_index, *payment_hash, *fee)); } } let res = node_1.node_info().await; @@ -1523,4 +1277,175 @@ async fn test_send_payment_three_nodes_bench_test() { break; } } + + eprintln!("node_2_got_fee: {}", node_2_got_fee); + eprintln!("node1_got_amount: {}", node1_got_amount); + eprintln!("node3_got_amount: {}", node3_got_amount); + + // node1: sent 4 fee to node2, got 4000 from node2 + // node3: sent 4 fee to node2, got 4000 from node2 + // node2: got 8 from node1 and node3, sent 8000 to node1 and node3 + + let node_1_amount = node_1.get_local_balance_from_channel(channels[0]); + let node_2_chnnale1_amount = node_2.get_local_balance_from_channel(channels[0]); + let node_2_chnnale2_amount = node_2.get_local_balance_from_channel(channels[1]); + let node_3_amount = node_3.get_local_balance_from_channel(channels[1]); + + let node_1_amount_diff = node_1_amount - old_node_1_amount; + let node_2_chnnale1_amount_diff = old_node_2_chnnale1_amount - node_2_chnnale1_amount; + let node_2_chnnale2_amount_diff = old_node_2_chnnale2_amount - node_2_chnnale2_amount; + let node_3_amount_diff = node_3_amount - old_node_3_amount; + + assert_eq!(node_1_amount_diff, node1_got_amount - node_1_sent_fee); + // got 3996 + + assert_eq!( + node_2_chnnale1_amount_diff, + node_2_ch1_sent_amount - node_1_sent_fee + ); + // sent 3996 + + assert_eq!( + node_2_chnnale2_amount_diff, + node_2_ch2_sent_amount - node_3_sent_fee + ); + // sent 3996 + + assert_eq!(node_3_amount_diff, node3_got_amount - node_3_sent_fee); + // got 3996 +} + +#[tokio::test] +async fn test_send_payment_middle_hop_stopped() { + init_tracing(); + let _span = tracing::info_span!("node", node = "test").entered(); + let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel( + &[ + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((2, 3), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((0, 4), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((4, 3), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ], + 5, + true, + ) + .await; + let [mut node_0, _node_1, mut node_2, node_3, mut node_4] = nodes.try_into().expect("5 nodes"); + + // dry run node_0 -> node_3 will select 0 -> 4 -> 3 + let res = node_0 + .send_payment_keysend(&node_3, 1000, true) + .await + .unwrap(); + eprintln!("res: {:?}", res); + assert_eq!(res.fee, 1); + + // node_4 stopped + node_4.stop().await; + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + let res = node_0 + .send_payment_keysend(&node_3, 1000, true) + .await + .unwrap(); + eprintln!("res: {:?}", res); + // when node_4 stopped, the first try path is still 0 -> 4 -> 3 + // so the fee is 1 + assert_eq!(res.fee, 1); + + let res = node_0 + .send_payment_keysend(&node_3, 1000, false) + .await + .unwrap(); + eprintln!("res: {:?}", res); + assert_eq!(res.fee, 1); + + node_0.wait_until_success(res.payment_hash).await; + + // after the first payment try failed, the payment session will find another path + // 0 -> 1 -> 2 -> 3, so it will succeed, but the fee change from 1 to 3 + let payment = node_0.get_payment_result(res.payment_hash).await; + assert_eq!(payment.fee, 3); + eprintln!("payment: {:?}", payment); + + // node_2 stopped, payment will fail + node_2.stop().await; + let res = node_0 + .send_payment_keysend(&node_3, 1000, false) + .await + .unwrap(); + eprintln!("res: {:?}", res); + assert_eq!(res.fee, 3); + + node_0.wait_until_failed(res.payment_hash).await; +} + +#[tokio::test] +async fn test_send_payment_middle_hop_stopped_retry_longer_path() { + init_tracing(); + let _span = tracing::info_span!("node", node = "test").entered(); + let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel( + &[ + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((2, 3), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((0, 4), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((4, 5), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((5, 6), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((6, 3), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ], + 7, + true, + ) + .await; + let [mut node_0, _node_1, mut node_2, mut node_3, _node_4, _node_5, _node_6] = + nodes.try_into().expect("7 nodes"); + + // dry run node_0 -> node_3 will select 0 -> 1 -> 2 -> 3 + let res = node_0 + .send_payment_keysend(&node_3, 1000, true) + .await + .unwrap(); + eprintln!("res: {:?}", res); + assert_eq!(res.fee, 3); + + // node_2 stopped + node_2.stop().await; + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + let res = node_0 + .send_payment_keysend(&node_3, 1000, true) + .await + .unwrap(); + eprintln!("res: {:?}", res); + // when node_2 stopped, the first try path is still 0 -> 1 -> 2 -> 3 + // so the fee is 3 + assert_eq!(res.fee, 3); + + let res = node_0 + .send_payment_keysend(&node_3, 1000, false) + .await + .unwrap(); + eprintln!("res: {:?}", res); + assert_eq!(res.fee, 3); + + node_0.wait_until_success(res.payment_hash).await; + let payment = node_0.get_payment_result(res.payment_hash).await; + eprintln!("payment: {:?}", payment); + + // payment success with a longer path 0 -> 4 -> 5 -> 6 -> 3 + assert_eq!(payment.fee, 5); + + // node_3 stopped, payment will fail + node_3.stop().await; + let res = node_0 + .send_payment_keysend(&node_3, 1000, false) + .await + .unwrap(); + + eprintln!("res: {:?}", res); + assert_eq!(res.fee, 5); + + node_0.wait_until_failed(res.payment_hash).await; } diff --git a/src/fiber/tests/test_utils.rs b/src/fiber/tests/test_utils.rs index 1a841ab3a..dbd299881 100644 --- a/src/fiber/tests/test_utils.rs +++ b/src/fiber/tests/test_utils.rs @@ -59,6 +59,7 @@ use crate::{ static RETAIN_VAR: &str = "TEST_TEMP_RETAIN"; pub(crate) const MIN_RESERVED_CKB: u128 = 4200000000; +pub(crate) const HUGE_CKB_AMOUNT: u128 = MIN_RESERVED_CKB + 1000000000000 as u128; #[derive(Debug)] pub struct TempDir(ManuallyDrop); @@ -560,6 +561,7 @@ impl NetworkNode { &mut self, recipient: &NetworkNode, amount: u128, + dry_run: bool, ) -> std::result::Result { self.send_payment(SendPaymentCommand { target_pubkey: Some(recipient.pubkey.clone()), @@ -574,7 +576,32 @@ impl NetworkNode { keysend: Some(true), udt_type_script: None, allow_self_payment: false, - dry_run: false, + dry_run, + hop_hints: None, + }) + .await + } + + pub async fn send_payment_keysend_to_self( + &mut self, + amount: u128, + dry_run: bool, + ) -> std::result::Result { + let pubkey = self.pubkey.clone(); + self.send_payment(SendPaymentCommand { + target_pubkey: Some(pubkey), + amount: Some(amount), + payment_hash: None, + final_tlc_expiry_delta: None, + tlc_expiry_limit: None, + invoice: None, + timeout: None, + max_fee_amount: None, + max_parts: None, + keysend: Some(true), + udt_type_script: None, + allow_self_payment: true, + dry_run, hop_hints: None, }) .await From 3b7eb5480a0c9701d9cc96d8b708fcbb28144693 Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 9 Jan 2025 21:28:54 +0800 Subject: [PATCH 8/9] Don't return error while handling actor messages --- src/fiber/gossip.rs | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index b70cefa05..7f72786e8 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -1224,10 +1224,12 @@ impl Actor for ExtendedGossipMess // Tick and later we will send the corresponding ChannelAnnouncement. // So the downstream consumer need to either cache some of the messages and wait for the // dependent messages to arrive or read the messages from the store directly. - myself.send_message(ExtendedGossipMessageStoreMessage::LoadMessagesFromStore( - id, - cursor.clone(), - ))?; + myself + .send_message(ExtendedGossipMessageStoreMessage::LoadMessagesFromStore( + id, + cursor.clone(), + )) + .expect("myself alive"); state.output_ports.insert( id, BroadcastMessageOutput::new(cursor, Arc::clone(&output_port)), @@ -2260,7 +2262,7 @@ where } GossipActorMessage::QueryBroadcastMessages(peer, queries) => { let id = state.get_and_increment_request_id(); - state + if let Err(e) = state .send_message_to_peer( &peer, GossipMessage::QueryBroadcastMessages(QueryBroadcastMessages { @@ -2269,7 +2271,13 @@ where queries, }), ) - .await?; + .await + { + error!( + "Failed to send query broadcast messages to peer {:?}: {:?}", + &peer, e + ); + } } GossipActorMessage::TryBroadcastMessages(messages) => { state @@ -2345,8 +2353,15 @@ where chain_hash: get_chain_hash(), queries: queries.clone(), }); - send_message_to_session(&state.control, peer_state.session_id, message) - .await?; + if let Err(e) = + send_message_to_session(&state.control, peer_state.session_id, message) + .await + { + error!( + "Failed to send query broadcast messages to peer {:?}: {:?}", + &peer_state.session_id, e + ); + } } } } @@ -2383,7 +2398,10 @@ where chain_hash, after_cursor, }) => { - check_chain_hash(&chain_hash)?; + if let Err(e) = check_chain_hash(&chain_hash) { + error!("Failed to check chain hash: {:?}", e); + return Ok(()); + } if after_cursor.is_max() { info!( "Received BroadcastMessagesFilter with max cursor from peer, stopping filter processor to {:?}", @@ -2481,7 +2499,10 @@ where chain_hash, queries, }) => { - check_chain_hash(&chain_hash)?; + if let Err(e) = check_chain_hash(&chain_hash) { + error!("Failed to check chain hash: {:?}", e); + return Ok(()); + } if queries.len() > MAX_NUM_OF_BROADCAST_MESSAGES as usize { warn!( "Received QueryBroadcastMessages with too many queries: {:?}", From 2c715fc77065aee6d65b699a8fc934c56d5d8fdd Mon Sep 17 00:00:00 2001 From: quake Date: Fri, 10 Jan 2025 15:13:02 +0900 Subject: [PATCH 9/9] chore: upgrade to v0.2.1 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ffa51e4f..9556d0f87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1626,7 +1626,7 @@ dependencies = [ [[package]] name = "fnn" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", "arcode", diff --git a/Cargo.toml b/Cargo.toml index e95a0c574..5e1cd6cfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fnn" -version = "0.2.0" +version = "0.2.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html