Skip to content

Commit

Permalink
fix concurrent port assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
johncantrell97 committed Jun 4, 2022
1 parent cfd5b4f commit 6e0130f
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 33 deletions.
19 changes: 10 additions & 9 deletions senseicore/src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ impl ChannelOpener {
} = event
{
if *user_channel_id == request.custom_id {
channel_counterparty_node_id = Some(*counterparty_node_id);
return true;
channel_counterparty_node_id = Some(*counterparty_node_id);
return true;
}
}
false
Expand All @@ -149,10 +149,10 @@ impl ChannelOpener {
if event.is_none() {
(request, Err(Error::FundingGenerationNeverHappened), None)
} else {
(request, result, channel_counterparty_node_id)
(request, result, channel_counterparty_node_id)
}
} else {
(request, result, None)
(request, result, None)
}
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -192,7 +192,7 @@ impl ChannelOpener {

let channels_to_open = requests_with_results
.iter()
.filter(|(_request, result, counterparty_node_id)| result.is_ok())
.filter(|(_request, result, _counterparty_node_id)| result.is_ok())
.count();

self.broadcaster
Expand All @@ -203,10 +203,11 @@ impl ChannelOpener {
.map(|(request, result, counterparty_node_id)| {
if let Ok(tcid) = result {
let counterparty_node_id = counterparty_node_id.unwrap();
match self
.channel_manager
.funding_transaction_generated(&tcid, &counterparty_node_id, funding_tx.clone())
{
match self.channel_manager.funding_transaction_generated(
&tcid,
&counterparty_node_id,
funding_tx.clone(),
) {
Ok(()) => (request, result),
Err(e) => (request, Err(Error::LdkApi(e))),
}
Expand Down
9 changes: 9 additions & 0 deletions senseicore/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ impl SenseiDatabase {
.await?)
}

pub async fn list_ports_in_use(&self) -> Result<Vec<u16>, Error> {
Ok(Node::find()
.all(&self.connection)
.await?
.into_iter()
.map(|node| node.listen_port as u16)
.collect())
}

pub async fn list_nodes(
&self,
pagination: PaginationRequest,
Expand Down
2 changes: 1 addition & 1 deletion senseicore/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl EventHandler for LightningNodeEventHandler {
channel_value_satoshis: *channel_value_satoshis,
output_script: output_script.clone(),
user_channel_id: *user_channel_id,
counterparty_node_id: *counterparty_node_id
counterparty_node_id: *counterparty_node_id,
})
.unwrap();
}
Expand Down
4 changes: 2 additions & 2 deletions senseicore/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bitcoin::{Script, Txid, secp256k1::PublicKey};
use bitcoin::{secp256k1::PublicKey, Script, Txid};
use serde::Serialize;

#[derive(Clone, Debug, Serialize)]
Expand All @@ -13,6 +13,6 @@ pub enum SenseiEvent {
channel_value_satoshis: u64,
output_script: Script,
user_channel_id: u64,
counterparty_node_id: PublicKey
counterparty_node_id: PublicKey,
},
}
2 changes: 1 addition & 1 deletion senseicore/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
use lightning::routing::network_graph::{NetGraphMsgHandler, NetworkGraph, NodeId, RoutingFees};
use lightning::routing::router::{RouteHint, RouteHintHop};
use lightning::routing::scoring::ProbabilisticScorer;
use lightning::util::config::{ChannelConfig, ChannelHandshakeLimits, UserConfig};
use lightning::util::config::UserConfig;
use lightning::util::ser::ReadableArgs;
use lightning_background_processor::BackgroundProcessor;
use lightning_invoice::utils::DefaultRouter;
Expand Down
66 changes: 46 additions & 20 deletions senseicore/src/services/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use entity::sea_orm::{ActiveModelTrait, ActiveValue};
use lightning_background_processor::BackgroundProcessor;
use macaroon::Macaroon;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::Ordering;
use std::{collections::hash_map::Entry, fs, sync::Arc};
use tokio::sync::{broadcast, Mutex};
Expand Down Expand Up @@ -141,6 +141,7 @@ pub struct AdminService {
pub database: Arc<SenseiDatabase>,
pub chain_manager: Arc<SenseiChainManager>,
pub event_sender: broadcast::Sender<SenseiEvent>,
pub available_ports: Arc<Mutex<VecDeque<u16>>>,
}

impl AdminService {
Expand All @@ -151,13 +152,31 @@ impl AdminService {
chain_manager: Arc<SenseiChainManager>,
event_sender: broadcast::Sender<SenseiEvent>,
) -> Self {
let mut used_ports = HashSet::new();
let mut available_ports = VecDeque::new();
database
.list_ports_in_use()
.await
.unwrap()
.into_iter()
.for_each(|port| {
used_ports.insert(port);
});

for port in config.port_range_min..config.port_range_max {
if !used_ports.contains(&port) {
available_ports.push_back(port);
}
}

Self {
data_dir: String::from(data_dir),
config: Arc::new(config),
node_directory: Arc::new(Mutex::new(HashMap::new())),
database: Arc::new(database),
chain_manager,
event_sender,
available_ports: Arc::new(Mutex::new(available_ports)),
}
}
}
Expand Down Expand Up @@ -400,34 +419,31 @@ impl AdminService {
role: node::NodeRole,
) -> Result<(node::Model, Macaroon), crate::error::Error> {
let listen_addr = public_ip::addr().await.unwrap().to_string();

let listen_port: i32 = match role {
node::NodeRole::Root => 9735,
node::NodeRole::Default => {
let mut port = self.config.port_range_min;
let mut port_used_by_system = !portpicker::is_free(port);
let mut port_used_by_sensei =
self.database.port_in_use(&listen_addr, port.into()).await?;

while port <= self.config.port_range_max
&& (port_used_by_system || port_used_by_sensei)
{
port += 1;
port_used_by_system = !portpicker::is_free(port);
port_used_by_sensei =
self.database.port_in_use(&listen_addr, port.into()).await?;
}

port.into()
let mut available_ports = self.available_ports.lock().await;
available_ports.pop_front().unwrap().into()
}
};

let node_id = Uuid::new_v4().to_string();
let (node_pubkey, node_macaroon) = LightningNode::get_node_pubkey_and_macaroon(

let result = LightningNode::get_node_pubkey_and_macaroon(
node_id.clone(),
passphrase,
self.database.clone(),
)
.await?;
.await;

if let Err(e) = result {
let mut available_ports = self.available_ports.lock().await;
available_ports.push_front(listen_port.try_into().unwrap());
return Err(e);
}

let (node_pubkey, macaroon) = result.unwrap();

let node = entity::node::ActiveModel {
id: ActiveValue::Set(node_id),
Expand All @@ -442,12 +458,22 @@ impl AdminService {
..Default::default()
};

let node = node.insert(self.database.get_connection()).await.unwrap();
let result = node.insert(self.database.get_connection()).await;

if let Err(e) = result {
let mut available_ports = self.available_ports.lock().await;
available_ports.push_front(listen_port.try_into().unwrap());
return Err(e.into());
}

let node = result.unwrap();

Ok((node, node_macaroon))
Ok((node, macaroon))
}

// note: please be sure to stop the node first? maybe?
// TODO: this was never updated with the DB rewrite
// need to release the port and actually delete the node
async fn delete_node(&self, node: node::Model) -> Result<(), crate::error::Error> {
let data_dir = format!("{}/{}/{}", self.data_dir, self.config.network, node.id);
Ok(fs::remove_dir_all(&data_dir)?)
Expand Down

0 comments on commit 6e0130f

Please sign in to comment.