diff --git a/migration/src/m20220421_000001_create_nodes_table.rs b/migration/src/m20220421_000001_create_nodes_table.rs index 83a40ae..89fb4ac 100644 --- a/migration/src/m20220421_000001_create_nodes_table.rs +++ b/migration/src/m20220421_000001_create_nodes_table.rs @@ -27,12 +27,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Node::Network).string().not_null()) .col(ColumnDef::new(Node::ListenAddr).string().not_null()) .col(ColumnDef::new(Node::ListenPort).integer().not_null()) - .col( - ColumnDef::new(Node::Pubkey) - .string() - .unique_key() - .not_null(), - ) + .col(ColumnDef::new(Node::Pubkey).string().not_null()) .col(ColumnDef::new(Node::CreatedAt).big_integer().not_null()) .col(ColumnDef::new(Node::UpdatedAt).big_integer().not_null()) .col(ColumnDef::new(Node::Status).small_integer().not_null()) diff --git a/senseicore/src/channels.rs b/senseicore/src/channels.rs index b032f08..9268ce3 100644 --- a/senseicore/src/channels.rs +++ b/senseicore/src/channels.rs @@ -123,7 +123,7 @@ impl ChannelOpener { .collect::>>(); // TODO: is this appropriate timeout? maybe should accept as param - let events = self.wait_for_events(filters, 15000, 500).await; + let events = self.wait_for_events(filters, 30000, 500).await; // set error state for requests we didn't get an event for let requests_with_results = requests_with_results diff --git a/senseicore/src/config.rs b/senseicore/src/config.rs index 70e6eae..55b7381 100644 --- a/senseicore/src/config.rs +++ b/senseicore/src/config.rs @@ -22,6 +22,7 @@ pub struct SenseiConfig { pub bitcoind_rpc_username: String, pub bitcoind_rpc_password: String, pub network: Network, + pub api_host: String, pub api_port: u16, pub port_range_min: u16, pub port_range_max: u16, @@ -39,6 +40,7 @@ impl Default for SenseiConfig { bitcoind_rpc_username: String::from("bitcoin"), bitcoind_rpc_password: String::from("bitcoin"), network: Network::Bitcoin, + api_host: String::from("127.0.0.1"), api_port: 5401, port_range_min: 10000, port_range_max: 65535, diff --git a/senseicore/src/database.rs b/senseicore/src/database.rs index 13a68ab..4c579cf 100644 --- a/senseicore/src/database.rs +++ b/senseicore/src/database.rs @@ -19,6 +19,7 @@ use entity::payment::Entity as Payment; use entity::sea_orm; use entity::sea_orm::ActiveValue; use entity::sea_orm::QueryOrder; +use entity::seconds_since_epoch; use migration::Condition; use migration::Expr; use rand::thread_rng; @@ -427,6 +428,24 @@ impl SenseiDatabase { }) } + pub async fn create_value( + &self, + node_id: String, + key: String, + value: Vec, + ) -> Result { + let entry = kv_store::ActiveModel { + node_id: ActiveValue::Set(node_id), + k: ActiveValue::Set(key), + v: ActiveValue::Set(value), + ..Default::default() + } + .insert(&self.connection) + .await?; + + Ok(entry) + } + pub async fn set_value( &self, node_id: String, @@ -474,10 +493,37 @@ impl SenseiDatabase { self.set_value(node_id, String::from("seed"), seed).await } + pub async fn create_seed( + &self, + node_id: String, + seed: Vec, + ) -> Result { + self.create_value(node_id, String::from("seed"), seed).await + } + + pub fn get_seed_active_model(&self, node_id: String, seed: Vec) -> kv_store::ActiveModel { + let now = seconds_since_epoch(); + kv_store::ActiveModel { + node_id: ActiveValue::Set(node_id), + k: ActiveValue::Set(String::from("seed")), + v: ActiveValue::Set(seed), + created_at: ActiveValue::Set(now), + updated_at: ActiveValue::Set(now), + ..Default::default() + } + } + + pub async fn insert_kv_store( + &self, + entity: kv_store::ActiveModel, + ) -> Result { + Ok(entity.insert(&self.connection).await?) + } + // Note: today we assume there's only ever one macaroon for a user // once there's some `bakery` functionality exposed we need to define // which macaroon we return when a user unlocks their node - pub async fn get_macaroon(&self, node_id: String) -> Result, Error> { + pub async fn get_macaroon(&self, node_id: &str) -> Result, Error> { Ok(Macaroon::find() .filter(macaroon::Column::NodeId.eq(node_id)) .one(&self.connection) diff --git a/senseicore/src/error.rs b/senseicore/src/error.rs index 5a7e43e..632deaf 100644 --- a/senseicore/src/error.rs +++ b/senseicore/src/error.rs @@ -28,11 +28,14 @@ pub enum Error { LdkInvoiceParse(lightning_invoice::ParseOrSemanticError), InvalidSeedLength, FailedToWriteSeed, + SeedNotFound, + MacaroonNotFound, Unauthenticated, InvalidMacaroon, AdminNodeNotStarted, AdminNodeNotCreated, FundingGenerationNeverHappened, + NodeBeingStartedAlready, } impl Display for Error { @@ -51,11 +54,14 @@ impl Display for Error { Error::LdkInvoiceSign(e) => e.to_string(), Error::LdkInvoiceParse(e) => e.to_string(), Error::InvalidSeedLength => String::from("invalid seed length"), + Error::SeedNotFound => String::from("seed not found for node"), + Error::MacaroonNotFound => String::from("macaroon not found for node"), Error::FailedToWriteSeed => String::from("failed to write seed"), Error::Unauthenticated => String::from("unauthenticated"), Error::InvalidMacaroon => String::from("invalid macaroon"), Error::AdminNodeNotCreated => String::from("admin node not created"), Error::AdminNodeNotStarted => String::from("admin node not started"), + Error::NodeBeingStartedAlready => String::from("node already being started"), Error::FundingGenerationNeverHappened => { String::from("funding generation for request never happened") } diff --git a/senseicore/src/network_graph.rs b/senseicore/src/network_graph.rs index 2a48507..45a1a52 100644 --- a/senseicore/src/network_graph.rs +++ b/senseicore/src/network_graph.rs @@ -10,12 +10,37 @@ use bitcoin::secp256k1::PublicKey; use lightning::{ ln::msgs::{self, Init, LightningError, RoutingMessageHandler}, + routing::network_graph::NetworkGraph, util::events::{MessageSendEvent, MessageSendEventsProvider}, }; use std::{ops::Deref, sync::Arc}; use crate::node::NetworkGraphMessageHandler; +#[derive(Clone)] +pub struct SenseiNetworkGraph { + pub graph: Option>, + pub msg_handler: Option>, +} + +impl SenseiNetworkGraph { + pub fn set_graph(&mut self, graph: Arc) { + self.graph = Some(graph); + } + + pub fn get_graph(&self) -> Option> { + self.graph.clone() + } + + pub fn set_msg_handler(&mut self, msg_handler: Arc) { + self.msg_handler = Some(msg_handler); + } + + pub fn get_msg_handler(&self) -> Option> { + self.msg_handler.clone() + } +} + pub struct OptionalNetworkGraphMsgHandler { pub network_graph_msg_handler: Option>, } diff --git a/senseicore/src/node.rs b/senseicore/src/node.rs index 665055e..55e3f35 100644 --- a/senseicore/src/node.rs +++ b/senseicore/src/node.rs @@ -76,7 +76,7 @@ use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; -use std::{convert::From, fmt, fs}; +use std::{convert::From, fmt}; use tokio::runtime::Handle; use tokio::sync::broadcast; use tokio::task::JoinHandle; @@ -389,11 +389,9 @@ impl MacaroonSession { #[derive(Clone)] pub struct LightningNode { pub config: Arc, - pub macaroon: Macaroon, pub id: String, - pub listen_addresses: Vec, pub listen_port: u16, - pub alias: String, + pub listen_addresses: Vec, pub seed: [u8; 32], pub database: Arc, pub wallet: Arc>>, @@ -414,13 +412,23 @@ pub struct LightningNode { } impl LightningNode { - async fn find_or_create_seed( + pub fn generate_seed() -> [u8; 32] { + let mut seed: [u8; 32] = [0; 32]; + thread_rng().fill_bytes(&mut seed); + seed + } + + pub fn encrypt_seed(seed: &[u8; 32], passphrase: &[u8]) -> Result, Error> { + let cryptor = RingCryptor::new(); + Ok(cryptor.seal_with_passphrase(passphrase, seed)?) + } + + async fn get_seed_for_node( node_id: String, passphrase: String, database: Arc, ) -> Result<[u8; 32], Error> { let cryptor = RingCryptor::new(); - let mut seed: [u8; 32] = [0; 32]; match database.get_seed(node_id.clone()).await? { Some(encrypted_seed) => { @@ -431,27 +439,63 @@ impl LightningNode { return Err(Error::InvalidSeedLength); } seed.copy_from_slice(decrypted_seed.as_slice()); + Ok(seed) } - None => { - thread_rng().fill_bytes(&mut seed); - let encrypted_seed = cryptor.seal_with_passphrase(passphrase.as_bytes(), &seed)?; - database.set_seed(node_id.clone(), encrypted_seed).await?; + None => Err(Error::SeedNotFound), + } + } + + pub fn generate_macaroon(seed: &[u8], pubkey: String) -> Result<(Macaroon, String), Error> { + let id = uuid::Uuid::new_v4().to_string(); + let macaroon_data = MacaroonSession { + id: id.clone(), + pubkey, + }; + let serialized_macaroon_data = serde_json::to_string(&macaroon_data).unwrap(); + let macaroon_key = macaroon::MacaroonKey::from(seed); + let macaroon_identifier = macaroon::ByteString::from(serialized_macaroon_data); + let macaroon = macaroon::Macaroon::create( + Some("senseid".to_string()), + &macaroon_key, + macaroon_identifier, + )?; + + Ok((macaroon, id)) + } + + pub fn write_macaroon_to_file(macaroon_path: String, macaroon: &Macaroon) -> Result<(), Error> { + let macaroon_as_bytes = macaroon.serialize(macaroon::Format::V2)?; + match File::create(macaroon_path.clone()) { + Ok(mut f) => { + f.write_all(macaroon_as_bytes.as_slice())?; + f.sync_all()?; + } + Err(e) => { + println!( + "ERROR: Unable to create admin.macaroon file {}: {}", + macaroon_path, e + ); } } - Ok(seed) + + Ok(()) } - async fn find_or_create_macaroon( - node_id: String, - passphrase: String, - seed: &[u8], - pubkey: String, + pub fn encrypt_macaroon(macaroon: &Macaroon, passphrase: &[u8]) -> Result, Error> { + let cryptor = RingCryptor::new(); + let macaroon_as_bytes = macaroon.serialize(macaroon::Format::V2)?; + let encrypted_macaroon = cryptor.seal_with_passphrase(passphrase, &macaroon_as_bytes)?; + Ok(encrypted_macaroon) + } + + pub async fn get_macaroon_for_node( + node_id: &str, + passphrase: &str, database: Arc, - macaroon_path: Option, ) -> Result { let cryptor = RingCryptor::new(); - match database.get_macaroon(node_id.clone()).await? { + match database.get_macaroon(node_id).await? { Some(macaroon) => { let decrypted_macaroon = cryptor.open( passphrase.as_bytes(), @@ -460,49 +504,7 @@ impl LightningNode { let macaroon = macaroon::Macaroon::deserialize(decrypted_macaroon.as_slice())?; Ok(macaroon) } - None => { - let macaroon_data = MacaroonSession { - id: uuid::Uuid::new_v4().to_string(), - pubkey, - }; - - let serialized_macaroon_data = serde_json::to_string(&macaroon_data).unwrap(); - let macaroon_key = macaroon::MacaroonKey::from(seed); - let macaroon_identifier = macaroon::ByteString::from(serialized_macaroon_data); - let admin_macaroon = macaroon::Macaroon::create( - Some("senseid".to_string()), - &macaroon_key, - macaroon_identifier, - )?; - let serialized_macaroon = admin_macaroon.serialize(macaroon::Format::V2)?; - let encrypted_macaroon = - cryptor.seal_with_passphrase(passphrase.as_bytes(), &serialized_macaroon)?; - - let macaroon = entity::macaroon::ActiveModel { - id: ActiveValue::Set(macaroon_data.id.clone()), - node_id: ActiveValue::Set(node_id.clone()), - encrypted_macaroon: ActiveValue::Set(encrypted_macaroon), - ..Default::default() - }; - macaroon.insert(database.get_connection()).await?; - - if let Some(macaroon_path) = macaroon_path { - match File::create(macaroon_path.clone()) { - Ok(mut f) => { - f.write_all(serialized_macaroon.as_slice())?; - f.sync_all()?; - } - Err(e) => { - println!( - "ERROR: Unable to create admin.macaroon file {}: {}", - macaroon_path, e - ); - } - } - } - - Ok(admin_macaroon) - } + None => Err(Error::MacaroonNotFound), } } @@ -524,40 +526,12 @@ impl LightningNode { .map_err(|_e| Error::InvalidMacaroon) } - pub async fn get_node_pubkey_and_macaroon( - id: String, - passphrase: String, - database: Arc, - ) -> Result<(String, Macaroon), Error> { - let seed = - LightningNode::find_or_create_seed(id.clone(), passphrase.clone(), database.clone()) - .await?; - - let cur = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - let keys_manager = Arc::new(KeysManager::new(&seed, cur.as_secs(), cur.subsec_nanos())); - - let mut secp_ctx = Secp256k1::new(); - secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes()); - - let node_pubkey = PublicKey::from_secret_key( - &secp_ctx, - &keys_manager.get_node_secret(Recipient::Node).unwrap(), - ); - - let macaroon = LightningNode::find_or_create_macaroon( - id.clone(), - passphrase.clone(), - &seed, - node_pubkey.to_string(), - database.clone(), - None, - ) - .await?; - - Ok((node_pubkey.to_string(), macaroon)) + pub fn get_node_pubkey_from_seed(seed: &[u8; 32]) -> String { + let secp_ctx = Secp256k1::new(); + let keys_manager = KeysManager::new(seed, 0, 0); + let node_secret = keys_manager.get_node_secret(Recipient::Node).unwrap(); + let node_pubkey = PublicKey::from_secret_key(&secp_ctx, &node_secret); + node_pubkey.to_string() } #[allow(clippy::too_many_arguments)] @@ -575,14 +549,11 @@ impl LightningNode { chain_manager: Arc, database: Arc, event_sender: broadcast::Sender, - ) -> Result { - fs::create_dir_all(data_dir.clone())?; - + ) -> Result<(Self, Vec>, BackgroundProcessor), Error> { let network = config.network; - let admin_macaroon_path = format!("{}/admin.macaroon", data_dir.clone()); let seed = - LightningNode::find_or_create_seed(id.clone(), passphrase.clone(), database.clone()) + LightningNode::get_seed_for_node(id.clone(), passphrase.clone(), database.clone()) .await?; let cur = SystemTime::now() @@ -590,22 +561,6 @@ impl LightningNode { .unwrap(); let keys_manager = Arc::new(KeysManager::new(&seed, cur.as_secs(), cur.subsec_nanos())); - let mut secp_ctx = Secp256k1::new(); - secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes()); - let node_pubkey = PublicKey::from_secret_key( - &secp_ctx, - &keys_manager.get_node_secret(Recipient::Node).unwrap(), - ); - - let macaroon = LightningNode::find_or_create_macaroon( - id.clone(), - passphrase.clone(), - &seed, - node_pubkey.to_string(), - database.clone(), - Some(admin_macaroon_path), - ) - .await?; let xprivkey = ExtendedPrivKey::new_master(network, &seed).unwrap(); let xkey = ExtendedKey::from(xprivkey); @@ -629,6 +584,7 @@ impl LightningNode { bdk_database, )?; + // TODO: probably can do this later, assuming this is REALLY slow bdk_wallet.ensure_addresses_cached(100).unwrap(); let bdk_wallet = Arc::new(Mutex::new(bdk_wallet)); @@ -647,6 +603,7 @@ impl LightningNode { let persistence_store = AnyKVStore::Database(DatabaseStore::new(database.clone(), id.clone())); + let persister = Arc::new(SenseiPersister::new( persistence_store, config.network, @@ -827,6 +784,7 @@ impl LightningNode { Arc::new(IgnoringMessageHandler {}), )); + // need to move this to AdminService or root node only let scorer = Arc::new(Mutex::new( persister.read_scorer(Arc::clone(&network_graph)), )); @@ -861,41 +819,13 @@ impl LightningNode { let stop_listen = Arc::new(AtomicBool::new(false)); - Ok(LightningNode { - config, - id, - listen_addresses, - listen_port, - alias, - database, - seed, - macaroon, - wallet: bdk_wallet, - channel_manager, - chain_monitor, - chain_manager, - peer_manager, - network_graph, - network_graph_msg_handler, - keys_manager, - logger, - scorer, - invoice_payer, - stop_listen, - persister, - event_sender, - broadcaster, - }) - } - - pub async fn start(self) -> (Vec>, BackgroundProcessor) { let mut handles = vec![]; - let peer_manager_connection_handler = self.peer_manager.clone(); + let peer_manager_connection_handler = peer_manager.clone(); - let stop_listen_ref = Arc::clone(&self.stop_listen); + let stop_listen_ref = Arc::clone(&stop_listen); handles.push(tokio::spawn(async move { - let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.listen_port)) + let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", listen_port)) .await .expect( "Failed to bind to listen port - is something else already listening on it?", @@ -916,8 +846,8 @@ impl LightningNode { } })); - let scorer_persister = Arc::clone(&self.persister); - let scorer_persist = Arc::clone(&self.scorer); + let scorer_persister = Arc::clone(&persister); + let scorer_persist = Arc::clone(&scorer); handles.push(tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(600)); @@ -934,26 +864,26 @@ impl LightningNode { } })); - let bg_persister = Arc::clone(&self.persister); + let bg_persister = Arc::clone(&persister); // TODO: should we allow 'child' nodes to update NetworkGraph based on payment failures? // feels like probably but depends on exactly what is updated let background_processor = BackgroundProcessor::start( bg_persister, - self.invoice_payer.clone(), - self.chain_monitor.clone(), - self.channel_manager.clone(), - Some(self.network_graph_msg_handler.clone()), - self.peer_manager.clone(), - self.logger.clone(), - Some(self.scorer.clone()), + invoice_payer.clone(), + chain_monitor.clone(), + channel_manager.clone(), + Some(network_graph_msg_handler.clone()), + peer_manager.clone(), + logger.clone(), + Some(scorer.clone()), ); // Reconnect to channel peers if possible. - let channel_manager_reconnect = self.channel_manager.clone(); - let peer_manager_reconnect = self.peer_manager.clone(); - let persister_peer = self.persister.clone(); + let channel_manager_reconnect = channel_manager.clone(); + let peer_manager_reconnect = peer_manager.clone(); + let persister_peer = persister.clone(); handles.push(tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { @@ -985,18 +915,17 @@ impl LightningNode { // some public channels, and is only useful if we have public listen address(es) to announce. // In a production environment, this should occur only after the announcement of new channels // to avoid churn in the global network graph. - let chan_manager = Arc::clone(&self.channel_manager); - let listen_addresses = self - .listen_addresses + let chan_manager = Arc::clone(&channel_manager); + let broadcast_listen_addresses = listen_addresses .iter() .filter_map(|addr| match IpAddr::from_str(addr) { Ok(IpAddr::V4(a)) => Some(NetAddress::IPv4 { addr: a.octets(), - port: self.listen_port, + port: listen_port, }), Ok(IpAddr::V6(a)) => Some(NetAddress::IPv6 { addr: a.octets(), - port: self.listen_port, + port: listen_port, }), Err(_) => { println!("Failed to parse announced-listen-addr into an IP address"); @@ -1006,24 +935,48 @@ impl LightningNode { .collect::>(); let mut alias_bytes = [0; 32]; - alias_bytes[..self.alias.len()].copy_from_slice(self.alias.as_bytes()); + alias_bytes[..alias.len()].copy_from_slice(alias.as_bytes()); handles.push(tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(60)); loop { interval.tick().await; - if !listen_addresses.is_empty() { + if !broadcast_listen_addresses.is_empty() { chan_manager.broadcast_node_announcement( [0; 3], alias_bytes, - listen_addresses.clone(), + broadcast_listen_addresses.clone(), ); } } })); - (handles, background_processor) + let lightning_node = LightningNode { + config, + id, + listen_port, + listen_addresses, + database, + seed, + wallet: bdk_wallet, + channel_manager, + chain_monitor, + chain_manager, + peer_manager, + network_graph, + network_graph_msg_handler, + keys_manager, + logger, + scorer, + invoice_payer, + stop_listen, + persister, + event_sender, + broadcaster, + }; + + Ok((lightning_node, handles, background_processor)) } pub async fn open_channels( diff --git a/senseicore/src/services/admin.rs b/senseicore/src/services/admin.rs index 74ec2c9..4588f10 100644 --- a/senseicore/src/services/admin.rs +++ b/senseicore/src/services/admin.rs @@ -12,14 +12,16 @@ use crate::chain::manager::SenseiChainManager; use crate::database::SenseiDatabase; use crate::error::Error as SenseiError; use crate::events::SenseiEvent; +use crate::network_graph::SenseiNetworkGraph; use crate::{config::SenseiConfig, hex_utils, node::LightningNode, version}; -use entity::access_token; -use entity::node; -use entity::sea_orm::{ActiveModelTrait, ActiveValue}; +use entity::node::{self, NodeRole}; +use entity::sea_orm::{ActiveModelTrait, ActiveValue, EntityTrait}; +use entity::{access_token, seconds_since_epoch}; +use futures::stream::{self, StreamExt}; use lightning_background_processor::BackgroundProcessor; use macaroon::Macaroon; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::atomic::Ordering; use std::{collections::hash_map::Entry, fs, sync::Arc}; @@ -33,6 +35,23 @@ pub struct NodeHandle { pub handles: Vec>, } +#[derive(Clone)] +pub struct NodeCreateInfo { + pub username: String, + pub alias: String, + pub passphrase: String, + pub start: bool, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct NodeCreateResult { + pubkey: String, + macaroon: String, + listen_addr: String, + listen_port: i32, + id: String, +} + pub enum AdminRequest { GetStatus { pubkey: String, @@ -52,6 +71,9 @@ pub enum AdminRequest { passphrase: String, start: bool, }, + BatchCreateNode { + nodes: Vec, + }, ListNodes { pagination: PaginationRequest, }, @@ -111,6 +133,9 @@ pub enum AdminResponse { listen_port: i32, id: String, }, + BatchCreateNode { + nodes: Vec, + }, ListNodes { nodes: Vec, pagination: PaginationResponse, @@ -131,7 +156,7 @@ pub enum AdminResponse { Error(Error), } -pub type NodeDirectory = Arc>>; +pub type NodeDirectory = Arc>>>; #[derive(Clone)] pub struct AdminService { @@ -142,6 +167,7 @@ pub struct AdminService { pub chain_manager: Arc, pub event_sender: broadcast::Sender, pub available_ports: Arc>>, + pub network_graph: Arc>, } impl AdminService { @@ -177,6 +203,10 @@ impl AdminService { chain_manager, event_sender, available_ports: Arc::new(Mutex::new(available_ports)), + network_graph: Arc::new(Mutex::new(SenseiNetworkGraph { + graph: None, + msg_handler: None, + })), } } } @@ -290,7 +320,13 @@ impl AdminService { match root_node { Some(node) => { - let macaroon = self.start_node(node.clone(), passphrase).await?; + let macaroon = LightningNode::get_macaroon_for_node( + &node.id, + &passphrase, + self.database.clone(), + ) + .await?; + self.start_node(node.clone(), passphrase).await?; let macaroon = macaroon.serialize(macaroon::Format::V2)?; Ok(AdminResponse::StartAdmin { pubkey: node.pubkey, @@ -307,8 +343,14 @@ impl AdminService { let node = self.database.get_node_by_pubkey(&pubkey).await?; match node { Some(node) => { - let macaroon = self.start_node(node, passphrase).await?; + let macaroon = LightningNode::get_macaroon_for_node( + &node.id, + &passphrase, + self.database.clone(), + ) + .await?; let macaroon = macaroon.serialize(macaroon::Format::V2)?; + self.start_node(node, passphrase).await?; Ok(AdminResponse::StartNode { macaroon: hex_utils::hex_str(macaroon.as_slice()), }) @@ -357,6 +399,34 @@ impl AdminService { id: node.id, }) } + AdminRequest::BatchCreateNode { nodes } => { + let nodes_and_macaroons = self.batch_create_nodes(nodes.clone()).await?; + + for ((node, _macaroon), node_create_info) in + nodes_and_macaroons.iter().zip(nodes.iter()) + { + if node_create_info.start { + self.start_node(node.clone(), node_create_info.passphrase.clone()) + .await?; + } + } + + Ok(AdminResponse::BatchCreateNode { + nodes: nodes_and_macaroons + .into_iter() + .map(|(node, macaroon)| { + let macaroon = macaroon.serialize(macaroon::Format::V2).unwrap(); + NodeCreateResult { + pubkey: node.pubkey, + macaroon: hex_utils::hex_str(macaroon.as_slice()), + listen_addr: node.listen_addr, + listen_port: node.listen_port, + id: node.id, + } + }) + .collect::>(), + }) + } AdminRequest::ListNodes { pagination } => { let (nodes, pagination) = self.list_nodes(pagination).await?; Ok(AdminResponse::ListNodes { nodes, pagination }) @@ -411,14 +481,74 @@ impl AdminService { self.database.list_nodes(pagination).await } - async fn create_node( + async fn batch_create_nodes( + &self, + nodes: Vec, + ) -> Result, crate::error::Error> { + let built_node_futures = nodes + .into_iter() + .map(|info| { + self.build_node( + info.username, + info.alias, + info.passphrase, + NodeRole::Default, + ) + }) + .collect::>(); + + let stream_of_futures = stream::iter(built_node_futures); + let buffered = stream_of_futures.buffer_unordered(10); + let mut built_nodes = buffered + .collect::>() + .await + .into_iter() + .map(|built_result| built_result.unwrap()) + .collect::>(); + + let mut nodes_with_macaroons = Vec::with_capacity(built_nodes.len()); + let mut db_nodes = Vec::with_capacity(built_nodes.len()); + let mut db_seeds = Vec::with_capacity(built_nodes.len()); + let mut db_macaroons = Vec::with_capacity(built_nodes.len()); + + for (node, macaroon, db_node, db_seed, db_macaroon) in built_nodes.drain(..) { + nodes_with_macaroons.push((node, macaroon)); + db_nodes.push(db_node); + db_seeds.push(db_seed); + db_macaroons.push(db_macaroon); + } + + entity::node::Entity::insert_many(db_nodes) + .exec(self.database.get_connection()) + .await?; + entity::kv_store::Entity::insert_many(db_seeds) + .exec(self.database.get_connection()) + .await?; + entity::macaroon::Entity::insert_many(db_macaroons) + .exec(self.database.get_connection()) + .await?; + + Ok(nodes_with_macaroons) + } + + async fn build_node( &self, username: String, alias: String, passphrase: String, role: node::NodeRole, - ) -> Result<(node::Model, Macaroon), crate::error::Error> { - let listen_addr = public_ip::addr().await.unwrap().to_string(); + ) -> Result< + ( + entity::node::Model, + Macaroon, + entity::node::ActiveModel, + entity::kv_store::ActiveModel, + entity::macaroon::ActiveModel, + ), + crate::error::Error, + > { + // IP/PORT + let listen_addr = self.config.api_host.clone(); let listen_port: i32 = match role { node::NodeRole::Root => 9735, @@ -428,45 +558,84 @@ impl AdminService { } }; + // NODE ID let node_id = Uuid::new_v4().to_string(); - let result = LightningNode::get_node_pubkey_and_macaroon( - node_id.clone(), - passphrase, - self.database.clone(), - ) - .await; - - if let Err(e) = result { - let mut available_ports = self.available_ports.lock().await; - available_ports.push_front(listen_port.try_into().unwrap()); - return Err(e); - } + // NODE DIRECTORY + let node_directory = format!("{}/{}/{}", self.data_dir, self.config.network, node_id); + fs::create_dir_all(node_directory)?; + + // NODE SEED + let seed = LightningNode::generate_seed(); + let encrypted_seed = LightningNode::encrypt_seed(&seed, passphrase.as_bytes())?; - let (node_pubkey, macaroon) = result.unwrap(); + let seed_active_model = self + .database + .get_seed_active_model(node_id.clone(), encrypted_seed); - let node = entity::node::ActiveModel { - id: ActiveValue::Set(node_id), - pubkey: ActiveValue::Set(node_pubkey), - username: ActiveValue::Set(username), - alias: ActiveValue::Set(alias), + // NODE PUBKEY + let node_pubkey = LightningNode::get_node_pubkey_from_seed(&seed); + + // NODE MACAROON + let (macaroon, macaroon_id) = LightningNode::generate_macaroon(&seed, node_pubkey.clone())?; + + let encrypted_macaroon = LightningNode::encrypt_macaroon(&macaroon, passphrase.as_bytes())?; + + let now = seconds_since_epoch(); + + let db_macaroon = entity::macaroon::ActiveModel { + id: ActiveValue::Set(macaroon_id), + node_id: ActiveValue::Set(node_id.clone()), + encrypted_macaroon: ActiveValue::Set(encrypted_macaroon), + created_at: ActiveValue::Set(now), + updated_at: ActiveValue::Set(now), + }; + + // NODE + let active_node = entity::node::ActiveModel { + id: ActiveValue::Set(node_id.clone()), + pubkey: ActiveValue::Set(node_pubkey.clone()), + username: ActiveValue::Set(username.clone()), + alias: ActiveValue::Set(alias.clone()), network: ActiveValue::Set(self.config.network.to_string()), - listen_addr: ActiveValue::Set(listen_addr), + listen_addr: ActiveValue::Set(listen_addr.clone()), listen_port: ActiveValue::Set(listen_port), - role: ActiveValue::Set(role.into()), + role: ActiveValue::Set(role.clone().into()), status: ActiveValue::Set(node::NodeStatus::Stopped.into()), - ..Default::default() + created_at: ActiveValue::Set(now), + updated_at: ActiveValue::Set(now), }; - let result = node.insert(self.database.get_connection()).await; + let node = node::Model { + id: node_id, + role: role.into(), + username, + alias, + network: self.config.network.to_string(), + listen_addr, + listen_port, + pubkey: node_pubkey, + created_at: now, + updated_at: now, + status: node::NodeStatus::Stopped.into(), + }; - if let Err(e) = result { - let mut available_ports = self.available_ports.lock().await; - available_ports.push_front(listen_port.try_into().unwrap()); - return Err(e.into()); - } + Ok((node, macaroon, active_node, seed_active_model, db_macaroon)) + } + + async fn create_node( + &self, + username: String, + alias: String, + passphrase: String, + role: node::NodeRole, + ) -> Result<(node::Model, Macaroon), crate::error::Error> { + let (node, macaroon, db_node, db_seed, db_macaroon) = + self.build_node(username, alias, passphrase, role).await?; - let node = result.unwrap(); + db_seed.insert(self.database.get_connection()).await?; + db_macaroon.insert(self.database.get_connection()).await?; + db_node.insert(self.database.get_connection()).await?; Ok((node, macaroon)) } @@ -483,36 +652,33 @@ impl AdminService { &self, node: node::Model, passphrase: String, - ) -> Result { - let mut node_directory = self.node_directory.lock().await; - - let (network_graph, network_graph_msg_handler, external_router) = match node.get_role() { - node::NodeRole::Root => (None, None, false), - node::NodeRole::Default => { - if let Some(root_node) = self.database.get_root_node().await? { - let root_pubkey = root_node.pubkey; - if let Entry::Occupied(entry) = node_directory.entry(root_pubkey) { - let root_node_handle = entry.get(); - let network_graph = root_node_handle.node.network_graph.clone(); - let network_graph_message_handler = - root_node_handle.node.network_graph_msg_handler.clone(); - ( - Some(network_graph), - Some(network_graph_message_handler), - true, - ) - } else { - return Err(crate::error::Error::AdminNodeNotStarted); + ) -> Result<(), crate::error::Error> { + let status = { + let mut node_directory = self.node_directory.lock().await; + match node_directory.entry(node.pubkey.clone()) { + Entry::Vacant(entry) => { + entry.insert(None); + None + } + Entry::Occupied(entry) => { + // TODO: verify passphrase + match entry.get() { + Some(_handle) => Some(Some(())), + None => Some(None), } - } else { - return Err(crate::error::Error::AdminNodeNotCreated); } } }; - match node_directory.entry(node.pubkey.clone()) { - Entry::Vacant(entry) => { - let lightning_node = LightningNode::new( + let external_router = node.get_role() == node::NodeRole::Default; + let (network_graph, network_graph_msg_handler) = { + let ng = self.network_graph.lock().await; + (ng.get_graph(), ng.get_msg_handler()) + }; + + match status { + None => { + let (lightning_node, handles, background_processor) = LightningNode::new( self.config.clone(), node.id.clone(), vec![node.listen_addr.clone()], @@ -535,29 +701,38 @@ impl AdminService { .await?; println!( - "starting node {} on port {}", + "starting {}@{}:{}", node.pubkey.clone(), + self.config.api_host.clone(), node.listen_port ); - let (handles, background_processor) = lightning_node.clone().start().await; + if !external_router { + let mut ng = self.network_graph.lock().await; + ng.set_graph(lightning_node.network_graph.clone()); + ng.set_msg_handler(lightning_node.network_graph_msg_handler.clone()); + } - entry.insert(NodeHandle { - node: Arc::new(lightning_node.clone()), - background_processor, - handles, - }); + { + let mut node_directory = self.node_directory.lock().await; + if let Entry::Occupied(mut entry) = node_directory.entry(node.pubkey.clone()) { + entry.insert(Some(NodeHandle { + node: Arc::new(lightning_node.clone()), + background_processor, + handles, + })); + } + } let mut node: node::ActiveModel = node.into(); node.status = ActiveValue::Set(node::NodeStatus::Running.into()); - node.listen_addr = ActiveValue::Set(public_ip::addr().await.unwrap().to_string()); + node.listen_addr = ActiveValue::Set(self.config.api_host.clone()); node.save(self.database.get_connection()).await?; - Ok(lightning_node.macaroon) - } - Entry::Occupied(entry) => { - // TODO: verify passphrase - Ok(entry.get().node.macaroon.clone()) + + Ok(()) } + Some(None) => Ok(()), + Some(Some(_)) => Ok(()), } } @@ -566,15 +741,15 @@ impl AdminService { let entry = node_directory.entry(pubkey.clone()); if let Entry::Occupied(entry) = entry { - let node_handle = entry.remove(); - - // Disconnect our peers and stop accepting new connections. This ensures we don't continue - // updating our channel data after we've stopped the background processor. - node_handle.node.peer_manager.disconnect_all_peers(); - node_handle.node.stop_listen.store(true, Ordering::Release); - let _res = node_handle.background_processor.stop(); - for handle in node_handle.handles { - handle.abort(); + if let Some(node_handle) = entry.remove() { + // Disconnect our peers and stop accepting new connections. This ensures we don't continue + // updating our channel data after we've stopped the background processor. + node_handle.node.peer_manager.disconnect_all_peers(); + node_handle.node.stop_listen.store(true, Ordering::Release); + let _res = node_handle.background_processor.stop(); + for handle in node_handle.handles { + handle.abort(); + } } } diff --git a/senseicore/tests/smoke_test.rs b/senseicore/tests/smoke_test.rs index 4de670a..2fa4b87 100644 --- a/senseicore/tests/smoke_test.rs +++ b/senseicore/tests/smoke_test.rs @@ -91,7 +91,7 @@ mod test { let directory = admin_service.node_directory.lock().await; let handle = directory.get(&node_pubkey).unwrap(); - handle.node.clone() + handle.as_ref().unwrap().node.clone() } async fn create_root_node( @@ -118,7 +118,7 @@ mod test { role, } => { let directory = admin_service.node_directory.lock().await; - let handle = directory.get(&pubkey).unwrap(); + let handle = directory.get(&pubkey).unwrap().as_ref().unwrap(); Some(handle.node.clone()) } _ => None, diff --git a/src/grpc/node.rs b/src/grpc/node.rs index f20eeae..29e177a 100644 --- a/src/grpc/node.rs +++ b/src/grpc/node.rs @@ -55,7 +55,7 @@ impl NodeService { let node_directory = self.admin_service.node_directory.lock().await; match node_directory.get(&session.pubkey) { - Some(handle) => { + Some(Some(handle)) => { handle .node .verify_macaroon(macaroon, session) @@ -80,6 +80,7 @@ impl NodeService { .map_err(|_e| Status::unknown("error")), } } + Some(None) => Err(Status::not_found("node is in process of being started")), None => match request { NodeRequest::StartNode { passphrase } => { drop(node_directory); diff --git a/src/http/admin.rs b/src/http/admin.rs index 378c4cd..483f76c 100644 --- a/src/http/admin.rs +++ b/src/http/admin.rs @@ -22,7 +22,7 @@ use serde_json::{json, Value}; use senseicore::{ services::{ - admin::{AdminRequest, AdminResponse, AdminService}, + admin::{AdminRequest, AdminResponse, AdminService, NodeCreateInfo}, PaginationRequest, }, utils, @@ -77,6 +77,28 @@ pub struct LoginNodeParams { pub passphrase: String, } +#[derive(Deserialize)] +pub struct BatchCreateNodeParams { + nodes: Vec, +} + +impl From for AdminRequest { + fn from(params: BatchCreateNodeParams) -> Self { + Self::BatchCreateNode { + nodes: params + .nodes + .into_iter() + .map(|node| NodeCreateInfo { + username: node.username, + alias: node.alias, + passphrase: node.passphrase, + start: node.start, + }) + .collect::>(), + } + } +} + #[derive(Deserialize)] pub struct CreateNodeParams { pub username: String, @@ -218,6 +240,7 @@ pub fn add_routes(router: Router) -> Router { .route("/v1/init", post(init_sensei)) .route("/v1/nodes", get(list_nodes)) .route("/v1/nodes", post(create_node)) + .route("/v1/nodes/batch", post(batch_create_nodes)) .route("/v1/nodes/start", post(start_node)) .route("/v1/nodes/stop", post(stop_node)) .route("/v1/nodes/delete", post(delete_node)) @@ -542,6 +565,32 @@ pub async fn create_node( } } +pub async fn batch_create_nodes( + Extension(admin_service): Extension>, + Json(payload): Json, + cookies: Cookies, + AuthHeader { macaroon: _, token }: AuthHeader, +) -> Result, StatusCode> { + let authenticated = + authenticate_request(&admin_service, "nodes/create/batch", &cookies, token).await?; + let request = { + let params: Result = serde_json::from_value(payload); + match params { + Ok(params) => Ok(params.into()), + Err(_) => Err(StatusCode::UNPROCESSABLE_ENTITY), + } + }?; + + if authenticated { + match admin_service.call(request).await { + Ok(response) => Ok(Json(response)), + Err(_err) => Err(StatusCode::UNAUTHORIZED), + } + } else { + Err(StatusCode::UNAUTHORIZED) + } +} + pub async fn start_node( Extension(admin_service): Extension>, Json(payload): Json, diff --git a/src/http/node.rs b/src/http/node.rs index 8127184..c0c3d95 100644 --- a/src/http/node.rs +++ b/src/http/node.rs @@ -317,7 +317,7 @@ pub async fn handle_authenticated_request( let node = node_directory.get(&session.pubkey); match node { - Some(handle) => { + Some(Some(handle)) => { handle .node .verify_macaroon(macaroon, session) @@ -339,6 +339,12 @@ pub async fn handle_authenticated_request( }, } } + Some(None) => { + // TODO: rethink this Some(None) business + let err = senseicore::error::Error::Unauthenticated; + let node_request_error: NodeRequestError = err.into(); + Ok(Json(NodeResponse::Error(node_request_error))) + } None => match request { NodeRequest::StartNode { passphrase } => { drop(node_directory); diff --git a/src/main.rs b/src/main.rs index 778c0e3..bfec130 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,6 +81,8 @@ struct SenseiArgs { port_range_min: Option, #[clap(long, env = "PORT_RANGE_MAX")] port_range_max: Option, + #[clap(long, env = "API_HOST")] + api_host: Option, #[clap(long, env = "API_PORT")] api_port: Option, #[clap(long, env = "DATABASE_URL")] @@ -138,6 +140,9 @@ fn main() { if let Some(api_port) = args.api_port { config.api_port = api_port; } + if let Some(api_host) = args.api_host { + config.api_host = api_host; + } if let Some(database_url) = args.database_url { config.database_url = database_url; } @@ -148,7 +153,7 @@ fn main() { } let persistence_runtime = Builder::new_multi_thread() - .worker_threads(4) + .worker_threads(20) .thread_name("persistence") .enable_all() .build() @@ -156,7 +161,7 @@ fn main() { let persistence_runtime_handle = persistence_runtime.handle().clone(); let sensei_runtime = Builder::new_multi_thread() - .worker_threads(10) + .worker_threads(20) .thread_name("sensei") .enable_all() .build() @@ -166,12 +171,12 @@ fn main() { let (event_sender, _event_receiver): ( broadcast::Sender, broadcast::Receiver, - ) = broadcast::channel(256); + ) = broadcast::channel(1024); let mut db_connection_options = ConnectOptions::new(config.database_url.clone()); db_connection_options - .max_connections(50) - .min_connections(5) + .max_connections(100) + .min_connections(10) .connect_timeout(Duration::new(30, 0)); let db_connection = Database::connect(db_connection_options).await.unwrap(); Migrator::up(&db_connection, None) @@ -269,7 +274,8 @@ fn main() { let server = hyper::Server::bind(&addr).serve(hybrid_service); println!( - "manage your sensei node at http://localhost:{}/admin/nodes", + "manage your sensei node at http://{}:{}/admin/nodes", + config.api_host.clone(), port );