diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 385f5b5353c..e1ea256a661 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -935,7 +935,7 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) { // Note that the ordering of the events for different nodes is non-prescriptive, though the // ordering of the two events that both go to nodes[2] have to stay in the same order. - let (nodes_0_event, events_3) = remove_first_msg_event_to_node(&nodes[0].node.get_our_node_id(), &events_3); + let nodes_0_event = remove_first_msg_event_to_node(&nodes[0].node.get_our_node_id(), &mut events_3); let messages_a = match nodes_0_event { MessageSendEvent::UpdateHTLCs { node_id, mut updates } => { assert_eq!(node_id, nodes[0].node.get_our_node_id()); @@ -949,12 +949,12 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) { _ => panic!("Unexpected event type!"), }; - let (nodes_2_event, events_3) = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &events_3); + let nodes_2_event = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &mut events_3); let send_event_b = SendEvent::from_event(nodes_2_event); assert_eq!(send_event_b.node_id, nodes[2].node.get_our_node_id()); let raa = if test_ignore_second_cs { - let (nodes_2_event, _events_3) = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &events_3); + let nodes_2_event = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &mut events_3); match nodes_2_event { MessageSendEvent::SendRevokeAndACK { node_id, msg } => { assert_eq!(node_id, nodes[2].node.get_our_node_id()); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6ab5149f4c5..a042d2f95ee 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -487,6 +487,22 @@ pub(super) struct PeerState { /// Messages to send to the peer - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, + /// The peer is currently connected (i.e. we've seen a + /// [`ChannelMessageHandler::peer_connected`] and no corresponding + /// [`ChannelMessageHandler::peer_disconnected`]. + is_connected: bool, +} + +impl PeerState { + /// Indicates that a peer meets the criteria where we're ok to remove it from our storage. + /// If true is passed for `require_disconnected`, the function will return false if we haven't + /// disconnected from the node already, ie. `PeerState::is_connected` is set to `true`. + fn ok_to_remove(&self, require_disconnected: bool) -> bool { + if require_disconnected && self.is_connected { + return false + } + self.channel_by_id.len() == 0 + } } /// Stores a PaymentSecret and any other data we may need to validate an inbound payment is @@ -763,9 +779,8 @@ where /// very far in the past, and can only ever be up to two hours in the future. highest_seen_timestamp: AtomicUsize, - /// The bulk of our storage will eventually be here (message queues and the like). Currently - /// the `per_peer_state` stores our channels on a per-peer basis, as well as the peer's latest - /// features. + /// The bulk of our storage. Currently the `per_peer_state` stores our channels on a per-peer + /// basis, as well as the peer's latest features. /// /// If we are connected to a peer we always at least have an entry here, even if no channels /// are currently open with that peer. @@ -1243,26 +1258,6 @@ macro_rules! handle_error { let mut peer_state = peer_state_mutex.lock().unwrap(); peer_state.pending_msg_events.append(&mut msg_events); } - #[cfg(any(feature = "_test_utils", test))] - { - if let None = per_peer_state.get(&$counterparty_node_id) { - // This shouldn't occour in tests unless an unkown counterparty_node_id - // has been passed to our message handling functions. - let expected_error_str = format!("Can't find a peer matching the passed counterparty node_id {}", $counterparty_node_id); - match err.action { - msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { ref channel_id, ref data } - } - => { - assert_eq!(*data, expected_error_str); - if let Some((err_channel_id, _user_channel_id)) = chan_id { - debug_assert_eq!(*channel_id, err_channel_id); - } - } - _ => debug_assert!(false, "Unexpected event"), - } - } - } } // Return error in case higher-API need one @@ -1592,12 +1587,10 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&their_network_key); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError { err: format!("Not connected to node: {}", their_network_key) }); - } + let peer_state_mutex = per_peer_state.get(&their_network_key) + .ok_or_else(|| APIError::APIMisuseError{ err: format!("Not connected to node: {}", their_network_key) })?; - let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state = peer_state_mutex.lock().unwrap(); let channel = { let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); let their_features = &peer_state.latest_features; @@ -1635,14 +1628,13 @@ where } fn list_channels_with_filter::Signer>)) -> bool + Copy>(&self, f: Fn) -> Vec { - let mut res = Vec::new(); // Allocate our best estimate of the number of channels we have in the `res` // Vec. Sadly the `short_to_chan_info` map doesn't cover channels without // a scid or a scid alias, and the `id_to_peer` shouldn't be used outside // of the ChannelMonitor handling. Therefore reallocations may still occur, but is // unlikely as the `short_to_chan_info` map often contains 2 entries for // the same channel. - res.reserve(self.short_to_chan_info.read().unwrap().len()); + let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len()); { let best_block_height = self.best_block.read().unwrap().height(); let per_peer_state = self.per_peer_state.read().unwrap(); @@ -1772,12 +1764,10 @@ where let result: Result<(), _> = loop { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -1893,12 +1883,10 @@ where fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: &PublicKey, peer_msg: Option<&String>, broadcast: bool) -> Result { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(peer_node_id); + let peer_state_mutex = per_peer_state.get(peer_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?; let mut chan = { - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(chan) = peer_state.channel_by_id.entry(channel_id.clone()) { if let Some(peer_msg) = peer_msg { @@ -1914,7 +1902,7 @@ where log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); self.finish_force_close_channel(chan.force_shutdown(broadcast)); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state = peer_state_mutex.lock().unwrap(); peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); @@ -2201,7 +2189,7 @@ where let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { + if peer_state_mutex_opt.is_none() { break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); @@ -2315,7 +2303,9 @@ where /// public, and thus should be called whenever the result is going to be passed out in a /// [`MessageSendEvent::BroadcastChannelUpdate`] event. /// - /// May be called with peer_state already locked! + /// Note that in `internal_closing_signed`, this function is called without the `peer_state` + /// corresponding to the channel's counterparty locked, as the channel been removed from the + /// storage and the `peer_state` lock has been dropped. fn get_channel_update_for_broadcast(&self, chan: &Channel<::Signer>) -> Result { if !chan.should_announce() { return Err(LightningError { @@ -2334,7 +2324,10 @@ where /// is public (only returning an Err if the channel does not yet have an assigned short_id), /// and thus MUST NOT be called unless the recipient of the resulting message has already /// provided evidence that they know about the existence of the channel. - /// May be called with peer_state already locked! + /// + /// Note that through `internal_closing_signed`, this function is called without the + /// `peer_state` corresponding to the channel's counterparty locked, as the channel been + /// removed from the storage and the `peer_state` lock has been dropped. fn get_channel_update_for_unicast(&self, chan: &Channel<::Signer>) -> Result { log_trace!(self.logger, "Attempting to generate channel update for channel {}", log_bytes!(chan.channel_id())); let short_channel_id = match chan.get_short_channel_id().or(chan.latest_inbound_scid_alias()) { @@ -2395,11 +2388,9 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::InvalidRoute{err: "No peer matching the path's first hop found!" }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(&counterparty_node_id) + .ok_or_else(|| APIError::InvalidRoute{err: "No peer matching the path's first hop found!" })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) { match { @@ -2675,12 +2666,10 @@ where &self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput ) -> Result<(), APIError> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let (chan, msg) = { let (res, chan) = { @@ -2846,11 +2835,9 @@ where &self.total_consistency_lock, &self.persistence_notifier, ); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; for channel_id in channel_ids { if !peer_state.channel_by_id.contains_key(channel_id) { @@ -2903,24 +2890,22 @@ where let next_hop_scid = { let peer_state_lock = self.per_peer_state.read().unwrap(); - if let Some(peer_state_mutex) = peer_state_lock.get(&next_node_id) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.get(next_hop_channel_id) { - Some(chan) => { - if !chan.is_usable() { - return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id)) - }) - } - chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias()) - }, - None => return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id) - }) - } - } else { - return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) }); + let peer_state_mutex = peer_state_lock.get(&next_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.get(next_hop_channel_id) { + Some(chan) => { + if !chan.is_usable() { + return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id)) + }) + } + chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias()) + }, + None => return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id) + }) } }; @@ -3100,7 +3085,7 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { + if peer_state_mutex_opt.is_none() { forwarding_channel_not_found!(); continue; } @@ -3492,6 +3477,7 @@ where /// the channel. /// * Expiring a channel's previous `ChannelConfig` if necessary to only allow forwarding HTLCs /// with the current `ChannelConfig`. + /// * Removing peers which have disconnected but and no longer have any channels. /// /// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate /// estimate fetches. @@ -3504,19 +3490,21 @@ where let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new(); let mut timed_out_mpp_htlcs = Vec::new(); + let mut pending_peers_awaiting_removal = Vec::new(); { let per_peer_state = self.per_peer_state.read().unwrap(); for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; + let counterparty_node_id = *counterparty_node_id; peer_state.channel_by_id.retain(|chan_id, chan| { let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } if let Err(e) = chan.timer_check_closing_negotiation_progress() { let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id); - handle_errors.push((Err(err), *counterparty_node_id)); + handle_errors.push((Err(err), counterparty_node_id)); if needs_close { return false; } } @@ -3550,6 +3538,36 @@ where true }); + if peer_state.ok_to_remove(true) { + pending_peers_awaiting_removal.push(counterparty_node_id); + } + } + } + + // When a peer disconnects but still has channels, the peer's `peer_state` entry in the + // `per_peer_state` is not removed by the `peer_disconnected` function. If the channels + // of to that peer is later closed while still being disconnected (i.e. force closed), + // we therefore need to remove the peer from `peer_state` separately. + // To avoid having to take the `per_peer_state` `write` lock once the channels are + // closed, we instead remove such peers awaiting removal here on a timer, to limit the + // negative effects on parallelism as much as possible. + if pending_peers_awaiting_removal.len() > 0 { + let mut per_peer_state = self.per_peer_state.write().unwrap(); + for counterparty_node_id in pending_peers_awaiting_removal { + match per_peer_state.entry(counterparty_node_id) { + hash_map::Entry::Occupied(entry) => { + // Remove the entry if the peer is still disconnected and we still + // have no channels to the peer. + let remove_entry = { + let peer_state = entry.get().lock().unwrap(); + peer_state.ok_to_remove(true) + }; + if remove_entry { + entry.remove_entry(); + } + }, + hash_map::Entry::Vacant(_) => { /* The PeerState has already been removed */ } + } } } @@ -3727,7 +3745,7 @@ where fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) { #[cfg(any(feature = "_test_utils", test))] { - // Ensure that no peer state channel storage lock is not held when calling this + // Ensure that the peer state channel storage lock is not held when calling this // function. // This ensures that future code doesn't introduce a lock_order requirement for // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling @@ -3850,7 +3868,7 @@ where let mut expected_amt_msat = None; let mut valid_mpp = true; let mut errs = Vec::new(); - let mut per_peer_state = Some(self.per_peer_state.read().unwrap()); + let per_peer_state = self.per_peer_state.read().unwrap(); for htlc in sources.iter() { let (counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) { Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), @@ -3860,16 +3878,16 @@ where } }; - if let None = per_peer_state.as_ref().unwrap().get(&counterparty_node_id) { + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { valid_mpp = false; break; } - let peer_state_mutex = per_peer_state.as_ref().unwrap().get(&counterparty_node_id).unwrap(); - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let None = peer_state.channel_by_id.get(&chan_id) { + if peer_state.channel_by_id.get(&chan_id).is_none() { valid_mpp = false; break; } @@ -3895,14 +3913,13 @@ where claimable_amt_msat += htlc.value; } + mem::drop(per_peer_state); if sources.is_empty() || expected_amt_msat.is_none() { - mem::drop(per_peer_state); self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); log_info!(self.logger, "Attempted to claim an incomplete payment which no longer had any available HTLCs!"); return; } if claimable_amt_msat != expected_amt_msat.unwrap() { - mem::drop(per_peer_state); self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); log_info!(self.logger, "Attempted to claim an incomplete payment, expected {} msat, had {} available to claim.", expected_amt_msat.unwrap(), claimable_amt_msat); @@ -3910,8 +3927,7 @@ where } if valid_mpp { for htlc in sources.drain(..) { - if per_peer_state.is_none() { per_peer_state = Some(self.per_peer_state.read().unwrap()); } - if let Err((pk, err)) = self.claim_funds_from_hop(per_peer_state.take().unwrap(), + if let Err((pk, err)) = self.claim_funds_from_hop( htlc.prev_hop, payment_preimage, |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })) { @@ -3923,7 +3939,6 @@ where } } } - mem::drop(per_peer_state); if !valid_mpp { for htlc in sources.drain(..) { let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec(); @@ -3944,11 +3959,11 @@ where } fn claim_funds_from_hop) -> Option>(&self, - per_peer_state_lock: RwLockReadGuard::Signer>>>>, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc) -> Result<(), (PublicKey, MsgHandleErrInternal)> { //TODO: Delay the claimed_funds relaying just like we do outbound relay! + let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.outpoint.to_channel_id(); let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) { @@ -3956,83 +3971,76 @@ where None => None }; - let (found_channel, mut peer_state_opt) = if counterparty_node_id_opt.is_some() && per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).is_some() { - let peer_mutex = per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).unwrap(); - let peer_state = peer_mutex.lock().unwrap(); - let found_channel = peer_state.channel_by_id.contains_key(&chan_id); - (found_channel, Some(peer_state)) - } else { (false, None) }; - - if found_channel { - let peer_state = &mut *peer_state_opt.as_mut().unwrap(); - if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) { - let counterparty_node_id = chan.get().get_counterparty_node_id(); - match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { - Ok(msgs_monitor_option) => { - if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { - match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { - ChannelMonitorUpdateStatus::Completed => {}, - e => { - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, - "Failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, e); - let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); - mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - return Err((counterparty_node_id, err)); - } - } - if let Some((msg, commitment_signed)) = msgs { - log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", - log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id, - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: vec![msg], - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, - } - }); - } - mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); - self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); - Ok(()) - } else { - Ok(()) - } - }, - Err((e, monitor_update)) => { + let mut peer_state_opt = counterparty_node_id_opt.as_ref().map( + |counterparty_node_id| per_peer_state.get(counterparty_node_id).map( + |peer_mutex| peer_mutex.lock().unwrap() + ) + ).unwrap_or(None); + + if let Some(hash_map::Entry::Occupied(mut chan)) = peer_state_opt.as_mut().map(|peer_state| peer_state.channel_by_id.entry(chan_id)) + { + let counterparty_node_id = chan.get().get_counterparty_node_id(); + match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { + Ok(msgs_monitor_option) => { + if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option { match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { ChannelMonitorUpdateStatus::Completed => {}, e => { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same update and try - // again on restart. - log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, - "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", + log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug }, + "Failed to update channel monitor with preimage {:?}: {:?}", payment_preimage, e); - }, + let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(); + mem::drop(peer_state_opt); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); + return Err((counterparty_node_id, err)); + } } - let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); - if drop { - chan.remove_entry(); + if let Some((msg, commitment_signed)) = msgs { + log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", + log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); + peer_state_opt.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: counterparty_node_id, + updates: msgs::CommitmentUpdate { + update_add_htlcs: Vec::new(), + update_fulfill_htlcs: vec![msg], + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + } + }); } mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); - self.handle_monitor_update_completion_actions(completion_action(None)); - Err((counterparty_node_id, res)) - }, - } - } else { - // We've held the peer_state mutex since finding the channel and setting - // found_channel to true, so the channel can't have been dropped. - unreachable!() + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat))); + Ok(()) + } else { + Ok(()) + } + }, + Err((e, monitor_update)) => { + match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) { + ChannelMonitorUpdateStatus::Completed => {}, + e => { + // TODO: This needs to be handled somehow - if we receive a monitor update + // with a preimage we *must* somehow manage to propagate it to the upstream + // channel, or we must have an ability to receive the same update and try + // again on restart. + log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info }, + "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}", + payment_preimage, e); + }, + } + let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id); + if drop { + chan.remove_entry(); + } + mem::drop(peer_state_opt); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions(completion_action(None)); + Err((counterparty_node_id, res)) + }, } } else { let preimage_update = ChannelMonitorUpdate { @@ -4053,7 +4061,7 @@ where payment_preimage, update_res); } mem::drop(peer_state_opt); - mem::drop(per_peer_state_lock); + mem::drop(per_peer_state); // Note that we do process the completion action here. This totally could be a // duplicate claim, but we have no way of knowing without interrogating the // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are @@ -4075,7 +4083,7 @@ where }, HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; - let res = self.claim_funds_from_hop(self.per_peer_state.read().unwrap(), hop_data, payment_preimage, + let res = self.claim_funds_from_hop(hop_data, payment_preimage, |htlc_claim_value_msat| { if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat { @@ -4207,7 +4215,7 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); let mut peer_state_lock; let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if let None = peer_state_mutex_opt { return } + if peer_state_mutex_opt.is_none() { return } peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; let mut channel = { @@ -4297,11 +4305,9 @@ where let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) }); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(temporary_channel_id.clone()) { hash_map::Entry::Occupied(mut channel) => { @@ -4349,11 +4355,12 @@ where let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id.clone())) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id.clone()) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let mut channel = match Channel::new_from_req(&self.fee_estimator, &self.entropy_source, &self.signer_provider, counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration, @@ -4401,11 +4408,12 @@ where fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { let (value, output_script, user_id) = { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4428,13 +4436,14 @@ where fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id) + })?; let ((funding_msg, monitor, mut channel_ready), mut chan) = { let best_block = *self.best_block.read().unwrap(); - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4471,7 +4480,7 @@ where // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the // peer exists, despite the inner PeerState potentially having no channels after removing // the channel above. - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_msg.channel_id) { hash_map::Entry::Occupied(_) => { @@ -4506,12 +4515,13 @@ where let funding_tx = { let best_block = *self.best_block.read().unwrap(); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4549,11 +4559,12 @@ where fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4592,11 +4603,12 @@ where let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>; let result: Result<(), _> = loop { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -4645,12 +4657,13 @@ where fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; let (tx, chan_option) = { - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -4679,7 +4692,7 @@ where } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update @@ -4702,11 +4715,12 @@ where let pending_forward_info = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4743,11 +4757,12 @@ where fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> { let (htlc_source, forwarded_htlc_value) = { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4762,11 +4777,12 @@ where fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4779,11 +4795,12 @@ where fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4800,11 +4817,12 @@ where fn internal_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -4943,11 +4961,12 @@ where let mut htlcs_to_fail = Vec::new(); let res = loop { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5008,11 +5027,12 @@ where fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5025,11 +5045,12 @@ where fn internal_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5063,7 +5084,7 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id); - if let None = peer_state_mutex_opt { + if peer_state_mutex_opt.is_none() { return Ok(NotifyOption::SkipPersist) } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); @@ -5098,11 +5119,12 @@ where let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -5720,9 +5742,7 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if peer_state.pending_msg_events.len() > 0 { - let mut peer_pending_events = Vec::new(); - mem::swap(&mut peer_pending_events, &mut peer_state.pending_msg_events); - pending_events.append(&mut peer_pending_events); + pending_events.append(&mut peer_state.pending_msg_events); } } @@ -6253,9 +6273,8 @@ where fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_channels = Vec::new(); - let mut no_channels_remain = true; let mut per_peer_state = self.per_peer_state.write().unwrap(); - { + let remove_peer = { log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.", log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" }); if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { @@ -6268,8 +6287,6 @@ where update_maps_on_chan_removal!(self, chan); self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer); return false; - } else { - no_channels_remain = false; } true }); @@ -6298,9 +6315,12 @@ where &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, } }); - } - } - if no_channels_remain { + debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect"); + peer_state.is_connected = false; + peer_state.ok_to_remove(true) + } else { true } + }; + if remove_peer { per_peer_state.remove(counterparty_node_id); } mem::drop(per_peer_state); @@ -6328,10 +6348,14 @@ where channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), + is_connected: true, })); }, hash_map::Entry::Occupied(e) => { - e.get().lock().unwrap().latest_features = init_msg.features.clone(); + let mut peer_state = e.get().lock().unwrap(); + peer_state.latest_features = init_msg.features.clone(); + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; }, } } @@ -6382,7 +6406,7 @@ where let channel_ids: Vec<[u8; 32]> = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { return; } + if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; peer_state.channel_by_id.keys().cloned().collect() @@ -6396,7 +6420,7 @@ where // First check if we can advance the channel type and try again. let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if let None = peer_state_mutex_opt { return; } + if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; if let Some(chan) = peer_state.channel_by_id.get_mut(&msg.channel_id) { @@ -6890,6 +6914,7 @@ where best_block.block_hash().write(writer)?; } + let mut serializable_peer_count: u64 = 0; { let per_peer_state = self.per_peer_state.read().unwrap(); let mut unfunded_channels = 0; @@ -6897,6 +6922,9 @@ where for (_, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + if !peer_state.ok_to_remove(false) { + serializable_peer_count += 1; + } number_of_channels += peer_state.channel_by_id.len(); for (_, channel) in peer_state.channel_by_id.iter() { if !channel.is_funding_initiated() { @@ -6947,11 +6975,18 @@ where htlc_purposes.push(purpose); } - (per_peer_state.len() as u64).write(writer)?; + (serializable_peer_count).write(writer)?; for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() { - peer_pubkey.write(writer)?; - let peer_state = peer_state_mutex.lock().unwrap(); - peer_state.latest_features.write(writer)?; + let peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &*peer_state_lock; + // Peers which we have no channels to should be dropped once disconnected. As we + // disconnect all peers when shutting down and serializing the ChannelManager, we + // consider all peers as disconnected here. There's therefore no need write peers with + // no channels. + if !peer_state.ok_to_remove(false) { + peer_pubkey.write(writer)?; + peer_state.latest_features.write(writer)?; + } } let events = self.pending_events.lock().unwrap(); @@ -7350,6 +7385,7 @@ where channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), latest_features: Readable::read(reader)?, pending_msg_events: Vec::new(), + is_connected: false, }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -7737,19 +7773,14 @@ where mod tests { use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; - use bitcoin::hashes::hex::FromHex; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; - use bitcoin::secp256k1::ecdsa::Signature; - use bitcoin::secp256k1::ffi::Signature as FFISignature; - use bitcoin::blockdata::script::Script; - use bitcoin::Txid; use core::time::Duration; use core::sync::atomic::Ordering; use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, InterceptId}; use crate::ln::functional_test_utils::*; use crate::ln::msgs; - use crate::ln::msgs::{ChannelMessageHandler, OptionalField}; + use crate::ln::msgs::ChannelMessageHandler; use crate::routing::router::{PaymentParameters, RouteParameters, find_route}; use crate::util::errors::APIError; use crate::util::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; @@ -8063,8 +8094,6 @@ mod tests { let payer_pubkey = nodes[0].node.get_our_node_id(); let payee_pubkey = nodes[1].node.get_our_node_id(); - nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap(); - nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap(); let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]); let route_params = RouteParameters { @@ -8108,8 +8137,6 @@ mod tests { let payer_pubkey = nodes[0].node.get_our_node_id(); let payee_pubkey = nodes[1].node.get_our_node_id(); - nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap(); - nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap(); let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]); let route_params = RouteParameters { @@ -8174,6 +8201,40 @@ mod tests { } } + #[test] + fn test_drop_disconnected_peers_when_removing_channels() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let chan = create_announced_chan_between_nodes(&nodes, 0, 1); + + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + + nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap(); + check_closed_broadcast!(nodes[0], true); + check_added_monitors!(nodes[0], 1); + check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed); + + { + // Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been + // disconnected and the channel between has been force closed. + let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap(); + // Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed. + assert_eq!(nodes_0_per_peer_state.len(), 1); + assert!(nodes_0_per_peer_state.get(&nodes[1].node.get_our_node_id()).is_some()); + } + + nodes[0].node.timer_tick_occurred(); + + { + // Assert that nodes[1] has now been removed. + assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 0); + } + } + #[test] fn bad_inbound_payment_hash() { // Add coverage for checking that a user-provided payment hash matches the payment secret. @@ -8318,19 +8379,22 @@ mod tests { fn check_not_connected_to_peer_error(res_err: Result, expected_public_key: PublicKey) { let expected_message = format!("Not connected to node: {}", expected_public_key); - check_api_misuse_error_message(expected_message, res_err) + check_api_error_message(expected_message, res_err) } fn check_unkown_peer_error(res_err: Result, expected_public_key: PublicKey) { let expected_message = format!("Can't find a peer matching the passed counterparty node_id {}", expected_public_key); - check_api_misuse_error_message(expected_message, res_err) + check_api_error_message(expected_message, res_err) } - fn check_api_misuse_error_message(expected_err_message: String, res_err: Result) { + fn check_api_error_message(expected_err_message: String, res_err: Result) { match res_err { Err(APIError::APIMisuseError { err }) => { assert_eq!(err, expected_err_message); }, + Err(APIError::ChannelUnavailable { err }) => { + assert_eq!(err, expected_err_message); + }, Ok(_) => panic!("Unexpected Ok"), Err(_) => panic!("Unexpected Error"), } @@ -8338,140 +8402,23 @@ mod tests { #[test] fn test_api_calls_with_unkown_counterparty_node() { - // Tests that our API functions and message handlers that expects a `counterparty_node_id` - // as input, behaves as expected if the `counterparty_node_id` is an unkown peer in the + // Tests that our API functions that expects a `counterparty_node_id` as input, behaves as + // expected if the `counterparty_node_id` is an unkown peer in the // `ChannelManager::per_peer_state` map. let chanmon_cfg = create_chanmon_cfgs(2); let node_cfg = create_node_cfgs(2, &chanmon_cfg); let node_chanmgr = create_node_chanmgrs(2, &node_cfg, &[None, None]); let nodes = create_network(2, &node_cfg, &node_chanmgr); - // Boilerplate code to produce `open_channel` and `accept_channel` msgs more densly than - // creating dummy ones. - nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 1_000_000, 500_000_000, 42, None).unwrap(); - let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id()); - nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg); - let accept_channel_msg = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id()); - // Dummy values let channel_id = [4; 32]; - let signature = Signature::from(unsafe { FFISignature::new() }); let unkown_public_key = PublicKey::from_secret_key(&Secp256k1::signing_only(), &SecretKey::from_slice(&[42; 32]).unwrap()); let intercept_id = InterceptId([0; 32]); - // Dummy msgs - let funding_created_msg = msgs::FundingCreated { - temporary_channel_id: open_channel_msg.temporary_channel_id, - funding_txid: Txid::from_hex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap(), - funding_output_index: 0, - signature: signature, - }; - - let funding_signed_msg = msgs::FundingSigned { - channel_id: channel_id, - signature: signature, - }; - - let channel_ready_msg = msgs::ChannelReady { - channel_id: channel_id, - next_per_commitment_point: unkown_public_key, - short_channel_id_alias: None, - }; - - let announcement_signatures_msg = msgs::AnnouncementSignatures { - channel_id: channel_id, - short_channel_id: 0, - node_signature: signature, - bitcoin_signature: signature, - }; - - let channel_reestablish_msg = msgs::ChannelReestablish { - channel_id: channel_id, - next_local_commitment_number: 0, - next_remote_commitment_number: 0, - data_loss_protect: OptionalField::Absent, - }; - - let closing_signed_msg = msgs::ClosingSigned { - channel_id: channel_id, - fee_satoshis: 1000, - signature: signature, - fee_range: None, - }; - - let shutdown_msg = msgs::Shutdown { - channel_id: channel_id, - scriptpubkey: Script::new(), - }; - - let onion_routing_packet = msgs::OnionPacket { - version: 255, - public_key: Ok(unkown_public_key), - hop_data: [1; 20*65], - hmac: [2; 32] - }; - - let update_add_htlc_msg = msgs::UpdateAddHTLC { - channel_id: channel_id, - htlc_id: 0, - amount_msat: 1000000, - payment_hash: PaymentHash([1; 32]), - cltv_expiry: 821716, - onion_routing_packet - }; - - let commitment_signed_msg = msgs::CommitmentSigned { - channel_id: channel_id, - signature: signature, - htlc_signatures: Vec::new(), - }; - - let update_fee_msg = msgs::UpdateFee { - channel_id: channel_id, - feerate_per_kw: 1000, - }; - - let malformed_update_msg = msgs::UpdateFailMalformedHTLC{ - channel_id: channel_id, - htlc_id: 0, - sha256_of_onion: [1; 32], - failure_code: 0x8000, - }; - - let fulfill_update_msg = msgs::UpdateFulfillHTLC{ - channel_id: channel_id, - htlc_id: 0, - payment_preimage: PaymentPreimage([1; 32]), - }; - - let fail_update_msg = msgs::UpdateFailHTLC{ - channel_id: channel_id, - htlc_id: 0, - reason: msgs::OnionErrorPacket { data: Vec::new()}, - }; - - let revoke_and_ack_msg = msgs::RevokeAndACK { - channel_id: channel_id, - per_commitment_secret: [1; 32], - next_per_commitment_point: unkown_public_key, - }; - - // Test the API functions and message handlers. + // Test the API functions. check_not_connected_to_peer_error(nodes[0].node.create_channel(unkown_public_key, 1_000_000, 500_000_000, 42, None), unkown_public_key); - nodes[1].node.handle_open_channel(&unkown_public_key, &open_channel_msg); - - nodes[0].node.handle_accept_channel(&unkown_public_key, &accept_channel_msg); - - check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&open_channel_msg.temporary_channel_id, &unkown_public_key, 42), unkown_public_key); - - nodes[1].node.handle_funding_created(&unkown_public_key, &funding_created_msg); - - nodes[0].node.handle_funding_signed(&unkown_public_key, &funding_signed_msg); - - nodes[0].node.handle_channel_ready(&unkown_public_key, &channel_ready_msg); - - nodes[1].node.handle_announcement_signatures(&unkown_public_key, &announcement_signatures_msg); + check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&channel_id, &unkown_public_key, 42), unkown_public_key); check_unkown_peer_error(nodes[0].node.close_channel(&channel_id, &unkown_public_key), unkown_public_key); @@ -8482,26 +8429,6 @@ mod tests { check_unkown_peer_error(nodes[0].node.forward_intercepted_htlc(intercept_id, &channel_id, unkown_public_key, 1_000_000), unkown_public_key); check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key); - - nodes[0].node.handle_shutdown(&unkown_public_key, &shutdown_msg); - - nodes[1].node.handle_closing_signed(&unkown_public_key, &closing_signed_msg); - - nodes[0].node.handle_channel_reestablish(&unkown_public_key, &channel_reestablish_msg); - - nodes[1].node.handle_update_add_htlc(&unkown_public_key, &update_add_htlc_msg); - - nodes[1].node.handle_commitment_signed(&unkown_public_key, &commitment_signed_msg); - - nodes[1].node.handle_update_fail_malformed_htlc(&unkown_public_key, &malformed_update_msg); - - nodes[1].node.handle_update_fail_htlc(&unkown_public_key, &fail_update_msg); - - nodes[1].node.handle_update_fulfill_htlc(&unkown_public_key, &fulfill_update_msg); - - nodes[1].node.handle_revoke_and_ack(&unkown_public_key, &revoke_and_ack_msg); - - nodes[1].node.handle_update_fee(&unkown_public_key, &update_fee_msg); } #[cfg(anchors)] diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 7bf51df5c7a..da8abcc108d 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -572,12 +572,12 @@ macro_rules! get_htlc_update_msgs { } /// Fetches the first `msg_event` to the passed `node_id` in the passed `msg_events` vec. -/// Returns the `msg_event`, along with an updated `msg_events` vec with the message removed. +/// Returns the `msg_event`. /// /// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate` /// `msg_events` are stored under specific peers, this function does not fetch such `msg_events` as /// such messages are intended to all peers. -pub fn remove_first_msg_event_to_node(msg_node_id: &PublicKey, msg_events: &Vec) -> (MessageSendEvent, Vec) { +pub fn remove_first_msg_event_to_node(msg_node_id: &PublicKey, msg_events: &mut Vec) -> MessageSendEvent { let ev_index = msg_events.iter().position(|e| { match e { MessageSendEvent::SendAcceptChannel { node_id, .. } => { node_id == msg_node_id @@ -644,9 +644,7 @@ pub fn remove_first_msg_event_to_node(msg_node_id: &PublicKey, msg_events: &Vec< }, }}); if ev_index.is_some() { - let mut updated_msg_events = msg_events.to_vec(); - let ev = updated_msg_events.remove(ev_index.unwrap()); - (ev, updated_msg_events) + msg_events.remove(ev_index.unwrap()) } else { panic!("Couldn't find any MessageSendEvent to the node!") } @@ -1486,9 +1484,9 @@ pub fn do_main_commitment_signed_dance(node_a: &Node<'_, '_, '_>, node_b: &Node< check_added_monitors!(node_b, 1); node_b.node.handle_commitment_signed(&node_a.node.get_our_node_id(), &as_commitment_signed); let (bs_revoke_and_ack, extra_msg_option) = { - let events = node_b.node.get_and_clear_pending_msg_events(); + let mut events = node_b.node.get_and_clear_pending_msg_events(); assert!(events.len() <= 2); - let (node_a_event, events) = remove_first_msg_event_to_node(&node_a.node.get_our_node_id(), &events); + let node_a_event = remove_first_msg_event_to_node(&node_a.node.get_our_node_id(), &mut events); (match node_a_event { MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { assert_eq!(*node_id, node_a.node.get_our_node_id()); @@ -1943,8 +1941,7 @@ pub fn pass_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expected_rou let mut events = origin_node.node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), expected_route.len()); for (path_idx, expected_path) in expected_route.iter().enumerate() { - let (ev, updated_events) = remove_first_msg_event_to_node(&expected_path[0].node.get_our_node_id(), &events); - events = updated_events; + let ev = remove_first_msg_event_to_node(&expected_path[0].node.get_our_node_id(), &mut events); // Once we've gotten through all the HTLCs, the last one should result in a // PaymentClaimable (but each previous one should not!), . let expect_payment = path_idx == expected_route.len() - 1; @@ -2003,9 +2000,8 @@ pub fn do_claim_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, } else { for expected_path in expected_paths.iter() { // For MPP payments, we always want the message to the first node in the path. - let (ev, updated_events) = remove_first_msg_event_to_node(&expected_path[0].node.get_our_node_id(), &events); + let ev = remove_first_msg_event_to_node(&expected_path[0].node.get_our_node_id(), &mut events); per_path_msgs.push(msgs_from_ev!(&ev)); - events = updated_events; } } diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index bf0c3396ced..eedde1e93f3 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -2747,7 +2747,7 @@ fn test_htlc_on_chain_success() { }, _ => panic!() } - let events = nodes[1].node.get_and_clear_pending_msg_events(); + let mut events = nodes[1].node.get_and_clear_pending_msg_events(); { let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); assert_eq!(added_monitors.len(), 2); @@ -2757,8 +2757,8 @@ fn test_htlc_on_chain_success() { } assert_eq!(events.len(), 3); - let (nodes_2_event, events) = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &events); - let (nodes_0_event, events) = remove_first_msg_event_to_node(&nodes[0].node.get_our_node_id(), &events); + let nodes_2_event = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &mut events); + let nodes_0_event = remove_first_msg_event_to_node(&nodes[0].node.get_our_node_id(), &mut events); match nodes_2_event { MessageSendEvent::HandleError { action: ErrorAction::SendErrorMessage { .. }, node_id: _ } => {}, @@ -3196,14 +3196,15 @@ fn do_test_commitment_revoked_fail_backward_exhaustive(deliver_bs_raa: bool, use _ => panic!("Unexpected event"), } } + nodes[1].node.process_pending_htlc_forwards(); check_added_monitors!(nodes[1], 1); let mut events = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), if deliver_bs_raa { 4 } else { 3 }); - let events = if deliver_bs_raa { - let (nodes_2_event, events) = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &events); + if deliver_bs_raa { + let nodes_2_event = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &mut events); match nodes_2_event { MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, ref update_fulfill_htlcs, ref update_fail_malformed_htlcs, .. } } => { assert_eq!(nodes[2].node.get_our_node_id(), *node_id); @@ -3214,10 +3215,9 @@ fn do_test_commitment_revoked_fail_backward_exhaustive(deliver_bs_raa: bool, use }, _ => panic!("Unexpected event"), } - events - } else { events }; + } - let (nodes_2_event, events) = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &events); + let nodes_2_event = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &mut events); match nodes_2_event { MessageSendEvent::HandleError { action: ErrorAction::SendErrorMessage { msg: msgs::ErrorMessage { channel_id, ref data } }, node_id: _ } => { assert_eq!(channel_id, chan_2.2); @@ -3226,7 +3226,7 @@ fn do_test_commitment_revoked_fail_backward_exhaustive(deliver_bs_raa: bool, use _ => panic!("Unexpected event"), } - let (nodes_0_event, events) = remove_first_msg_event_to_node(&nodes[0].node.get_our_node_id(), &events); + let nodes_0_event = remove_first_msg_event_to_node(&nodes[0].node.get_our_node_id(), &mut events); match nodes_0_event { MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, ref update_fulfill_htlcs, ref update_fail_malformed_htlcs, ref commitment_signed, .. } } => { assert!(update_add_htlcs.is_empty()); @@ -3839,9 +3839,10 @@ fn do_test_drop_messages_peer_disconnect(messages_delivered: u8, simulate_broken if messages_delivered == 1 || messages_delivered == 2 { expect_payment_path_successful!(nodes[0]); } - - nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); - nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + if messages_delivered <= 5 { + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); + } reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); if messages_delivered > 2 { @@ -4641,10 +4642,10 @@ fn test_onchain_to_onchain_claim() { _ => panic!("Unexpected event"), } check_added_monitors!(nodes[1], 1); - let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + let mut msg_events = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(msg_events.len(), 3); - let (nodes_2_event, msg_events) = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &msg_events); - let (nodes_0_event, msg_events) = remove_first_msg_event_to_node(&nodes[0].node.get_our_node_id(), &msg_events); + let nodes_2_event = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &mut msg_events); + let nodes_0_event = remove_first_msg_event_to_node(&nodes[0].node.get_our_node_id(), &mut msg_events); match nodes_2_event { MessageSendEvent::HandleError { action: ErrorAction::SendErrorMessage { .. }, node_id: _ } => {}, @@ -9210,8 +9211,6 @@ fn test_keysend_payments_to_private_node() { let payer_pubkey = nodes[0].node.get_our_node_id(); let payee_pubkey = nodes[1].node.get_our_node_id(); - nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap(); - nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap(); let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]); let route_params = RouteParameters { @@ -9285,7 +9284,7 @@ fn test_double_partial_claim() { let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 2); - let (node_1_msgs, _events) = remove_first_msg_event_to_node(&nodes[1].node.get_our_node_id(), &events); + let node_1_msgs = remove_first_msg_event_to_node(&nodes[1].node.get_our_node_id(), &mut events); pass_along_path(&nodes[0], &[&nodes[1], &nodes[3]], 15_000_000, payment_hash, Some(payment_secret), node_1_msgs, false, None); // At this point nodes[3] has received one half of the payment, and the user goes to handle diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index c6b170364d6..1c06c0b32cf 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -153,11 +153,11 @@ fn mpp_retry() { assert_eq!(events.len(), 2); // Pass half of the payment along the success path. - let (success_path_msgs, mut events) = remove_first_msg_event_to_node(&nodes[1].node.get_our_node_id(), &events); + let success_path_msgs = remove_first_msg_event_to_node(&nodes[1].node.get_our_node_id(), &mut events); pass_along_path(&nodes[0], &[&nodes[1], &nodes[3]], 2_000_000, payment_hash, Some(payment_secret), success_path_msgs, false, None); // Add the HTLC along the first hop. - let (fail_path_msgs_1, _events) = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &events); + let fail_path_msgs_1 = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &mut events); let (update_add, commitment_signed) = match fail_path_msgs_1 { MessageSendEvent::UpdateHTLCs { node_id: _, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert_eq!(update_add_htlcs.len(), 1); @@ -237,7 +237,7 @@ fn do_mpp_receive_timeout(send_partial_mpp: bool) { assert_eq!(events.len(), 2); // Pass half of the payment along the first path. - let (node_1_msgs, mut events) = remove_first_msg_event_to_node(&nodes[1].node.get_our_node_id(), &events); + let node_1_msgs = remove_first_msg_event_to_node(&nodes[1].node.get_our_node_id(), &mut events); pass_along_path(&nodes[0], &[&nodes[1], &nodes[3]], 200_000, payment_hash, Some(payment_secret), node_1_msgs, false, None); if send_partial_mpp { @@ -265,7 +265,7 @@ fn do_mpp_receive_timeout(send_partial_mpp: bool) { expect_payment_failed_conditions(&nodes[0], payment_hash, false, PaymentFailedConditions::new().mpp_parts_remain().expected_htlc_error_data(23, &[][..])); } else { // Pass half of the payment along the second path. - let (node_2_msgs, _events) = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &events); + let node_2_msgs = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &mut events); pass_along_path(&nodes[0], &[&nodes[2], &nodes[3]], 200_000, payment_hash, Some(payment_secret), node_2_msgs, true, None); // Even after MPP_TIMEOUT_TICKS we should not timeout the MPP if we have all the parts diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 427c4001b6d..8e5056dc68e 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -701,8 +701,8 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) { // Send the payment through to nodes[3] *without* clearing the PaymentClaimable event let mut send_events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(send_events.len(), 2); - let (node_1_msgs, mut send_events) = remove_first_msg_event_to_node(&nodes[1].node.get_our_node_id(), &send_events); - let (node_2_msgs, _send_events) = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &send_events); + let node_1_msgs = remove_first_msg_event_to_node(&nodes[1].node.get_our_node_id(), &mut send_events); + let node_2_msgs = remove_first_msg_event_to_node(&nodes[2].node.get_our_node_id(), &mut send_events); do_pass_along_path(&nodes[0], &[&nodes[1], &nodes[3]], 15_000_000, payment_hash, Some(payment_secret), node_1_msgs, true, false, None); do_pass_along_path(&nodes[0], &[&nodes[2], &nodes[3]], 15_000_000, payment_hash, Some(payment_secret), node_2_msgs, true, false, None); diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index 55c1341c3d0..ce1f3b64750 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -13,8 +13,8 @@ use crate::chain::channelmonitor::ANTI_REORG_DELAY; use crate::chain::transaction::OutPoint; use crate::chain::Confirm; use crate::ln::channelmanager::ChannelManager; -use crate::ln::msgs::ChannelMessageHandler; -use crate::util::events::{Event, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; +use crate::ln::msgs::{ChannelMessageHandler, Init}; +use crate::util::events::{Event, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use crate::util::test_utils; use crate::util::ser::Writeable; @@ -374,6 +374,12 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clear(); // Now check that we can create a new channel + if reload_node && nodes[0].node.per_peer_state.read().unwrap().len() == 0 { + // If we dropped the channel before reloading the node, nodes[1] was also dropped from + // nodes[0] storage, and hence not connected again on startup. We therefore need to + // reconnect to the node before attempting to create a new channel. + nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap(); + } create_announced_chan_between_nodes(&nodes, 0, 1); send_payment(&nodes[0], &[&nodes[1]], 8000000); }