From 8bef76d266688a423e98c655356d8d8f09dd3d81 Mon Sep 17 00:00:00 2001 From: David Irvine Date: Thu, 21 Nov 2024 12:19:38 +0000 Subject: [PATCH] feat(networking): add bootstrap cache for peer discovery Add persistent bootstrap cache to maintain a list of previously known peers, improving network bootstrapping efficiency and reducing cold-start times. --- bootstrap_cache/Cargo.toml | 25 + bootstrap_cache/README.md | 193 ++++++++ bootstrap_cache/src/cache.rs | 171 +++++++ bootstrap_cache/src/cache_store.rs | 427 ++++++++++++++++++ bootstrap_cache/src/config.rs | 157 +++++++ bootstrap_cache/src/error.rs | 38 ++ bootstrap_cache/src/initial_peer_discovery.rs | 330 ++++++++++++++ bootstrap_cache/src/lib.rs | 115 +++++ bootstrap_cache/tests/integration_tests.rs | 175 +++++++ docs/bootstrap_cache_implementation.md | 339 ++++++++++++++ docs/bootstrap_cache_prd.md | 162 +++++++ 11 files changed, 2132 insertions(+) create mode 100644 bootstrap_cache/Cargo.toml create mode 100644 bootstrap_cache/README.md create mode 100644 bootstrap_cache/src/cache.rs create mode 100644 bootstrap_cache/src/cache_store.rs create mode 100644 bootstrap_cache/src/config.rs create mode 100644 bootstrap_cache/src/error.rs create mode 100644 bootstrap_cache/src/initial_peer_discovery.rs create mode 100644 bootstrap_cache/src/lib.rs create mode 100644 bootstrap_cache/tests/integration_tests.rs create mode 100644 docs/bootstrap_cache_implementation.md create mode 100644 docs/bootstrap_cache_prd.md diff --git a/bootstrap_cache/Cargo.toml b/bootstrap_cache/Cargo.toml new file mode 100644 index 0000000000..e2e305e51d --- /dev/null +++ b/bootstrap_cache/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "bootstrap_cache" +version = "0.1.0" +edition = "2021" +license = "GPL-3.0" +authors = ["MaidSafe Developers "] +description = "Bootstrap cache functionality for the Safe Network" + +[dependencies] +chrono = { version = "0.4", features = ["serde"] } +dirs = "5.0" +fs2 = "0.4.3" +libp2p = { version = "0.53", features = ["serde"] } +reqwest = { version = "0.11", features = ["json"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tempfile = "3.8.1" +thiserror = "1.0" +tokio = { version = "1.0", features = ["full", "sync"] } +tracing = "0.1" + +[dev-dependencies] +wiremock = "0.5" +tokio = { version = "1.0", features = ["full", "test-util"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/bootstrap_cache/README.md b/bootstrap_cache/README.md new file mode 100644 index 0000000000..d50dd0a6e6 --- /dev/null +++ b/bootstrap_cache/README.md @@ -0,0 +1,193 @@ +# Bootstrap Cache + +A decentralized peer discovery and caching system for the Safe Network. + +## Features + +- **Decentralized Design**: No dedicated bootstrap nodes required +- **Cross-Platform Support**: Works on Linux, macOS, and Windows +- **Shared Cache**: System-wide cache file accessible by both nodes and clients +- **Concurrent Access**: File locking for safe multi-process access +- **Atomic Operations**: Safe cache updates using atomic file operations +- **Initial Peer Discovery**: Fallback web endpoints for new/stale cache scenarios +- **Comprehensive Error Handling**: Detailed error types and logging + +### Peer Management + +The bootstrap cache implements a robust peer management system: + +- **Peer Status Tracking**: Each peer's connection history is tracked, including: + - Success count: Number of successful connections + - Failure count: Number of failed connection attempts + - Last seen timestamp: When the peer was last successfully contacted + +- **Automatic Cleanup**: The system automatically removes unreliable peers: + - Peers that fail 3 consecutive connection attempts are marked for removal + - Removal only occurs if there are at least 2 working peers available + - This ensures network connectivity is maintained even during temporary connection issues + +- **Duplicate Prevention**: The cache automatically prevents duplicate peer entries: + - Same IP and port combinations are only stored once + - Different ports on the same IP are treated as separate peers + +## Installation + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +bootstrap_cache = { version = "0.1.0" } +``` + +## Usage + +### Basic Example + +```rust +use bootstrap_cache::{BootstrapCache, CacheManager, InitialPeerDiscovery}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize the cache manager + let cache_manager = CacheManager::new()?; + + // Try to read from the cache + let mut cache = match cache_manager.read_cache() { + Ok(cache) if !cache.is_stale() => cache, + _ => { + // Cache is stale or unavailable, fetch initial peers + let discovery = InitialPeerDiscovery::new(); + let peers = discovery.fetch_peers().await?; + let cache = BootstrapCache { + last_updated: chrono::Utc::now(), + peers, + }; + cache_manager.write_cache(&cache)?; + cache + } + }; + + println!("Found {} peers in cache", cache.peers.len()); + Ok(()) +} +``` + +### Custom Endpoints + +```rust +use bootstrap_cache::InitialPeerDiscovery; + +let discovery = InitialPeerDiscovery::with_endpoints(vec![ + "http://custom1.example.com/peers.json".to_string(), + "http://custom2.example.com/peers.json".to_string(), +]); +``` + +### Peer Management Example + +```rust +use bootstrap_cache::BootstrapCache; + +let mut cache = BootstrapCache::new(); + +// Add a new peer +cache.add_peer("192.168.1.1".to_string(), 8080); + +// Update peer status after connection attempts +cache.update_peer_status("192.168.1.1", 8080, true); // successful connection +cache.update_peer_status("192.168.1.1", 8080, false); // failed connection + +// Clean up failed peers (only if we have at least 2 working peers) +cache.cleanup_failed_peers(); +``` + +## Cache File Location + +The cache file is stored in a system-wide location accessible to all processes: + +- **Linux**: `/var/safe/bootstrap_cache.json` +- **macOS**: `/Library/Application Support/Safe/bootstrap_cache.json` +- **Windows**: `C:\ProgramData\Safe\bootstrap_cache.json` + +## Cache File Format + +```json +{ + "last_updated": "2024-02-20T15:30:00Z", + "peers": [ + { + "ip": "192.168.1.1", + "port": 8080, + "last_seen": "2024-02-20T15:30:00Z", + "success_count": 10, + "failure_count": 0 + } + ] +} +``` + +## Error Handling + +The crate provides detailed error types through the `Error` enum: + +```rust +use bootstrap_cache::Error; + +match cache_manager.read_cache() { + Ok(cache) => println!("Cache loaded successfully"), + Err(Error::CacheStale) => println!("Cache is stale"), + Err(Error::CacheCorrupted) => println!("Cache file is corrupted"), + Err(Error::Io(e)) => println!("IO error: {}", e), + Err(e) => println!("Other error: {}", e), +} +``` + +## Thread Safety + +The cache system uses file locking to ensure safe concurrent access: + +- Shared locks for reading +- Exclusive locks for writing +- Atomic file updates using temporary files + +## Development + +### Building + +```bash +cargo build +``` + +### Running Tests + +```bash +cargo test +``` + +### Running with Logging + +```rust +use tracing_subscriber::FmtSubscriber; + +// Initialize logging +let subscriber = FmtSubscriber::builder() + .with_max_level(tracing::Level::DEBUG) + .init(); +``` + +## Contributing + +1. Fork the repository +2. Create your feature branch (`git checkout -b feature/amazing-feature`) +3. Commit your changes (`git commit -am 'Add amazing feature'`) +4. Push to the branch (`git push origin feature/amazing-feature`) +5. Open a Pull Request + +## License + +This project is licensed under the GPL-3.0 License - see the LICENSE file for details. + +## Related Documentation + +- [Bootstrap Cache PRD](docs/bootstrap_cache_prd.md) +- [Implementation Guide](docs/bootstrap_cache_implementation.md) diff --git a/bootstrap_cache/src/cache.rs b/bootstrap_cache/src/cache.rs new file mode 100644 index 0000000000..914f150a0b --- /dev/null +++ b/bootstrap_cache/src/cache.rs @@ -0,0 +1,171 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::{BootstrapCache, Error}; +use fs2::FileExt; +use std::{ + fs::{self, File}, + io::{self, Read, Write}, + path::PathBuf, +}; +use tracing::{debug, error, info}; + +/// Manages reading and writing of the bootstrap cache file +pub struct CacheManager { + cache_path: PathBuf, +} + +impl CacheManager { + /// Creates a new CacheManager instance + pub fn new() -> Result { + let cache_path = Self::get_cache_path()?; + Ok(Self { cache_path }) + } + + /// Returns the platform-specific cache file path + fn get_cache_path() -> io::Result { + let path = if cfg!(target_os = "macos") { + PathBuf::from("/Library/Application Support/Safe/bootstrap_cache.json") + } else if cfg!(target_os = "linux") { + PathBuf::from("/var/safe/bootstrap_cache.json") + } else if cfg!(target_os = "windows") { + PathBuf::from(r"C:\ProgramData\Safe\bootstrap_cache.json") + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "Unsupported operating system", + )); + }; + + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + Ok(path) + } + + /// Reads the cache file with file locking + pub fn read_cache(&self) -> Result { + debug!("Reading bootstrap cache from {:?}", self.cache_path); + + let mut file = match File::open(&self.cache_path) { + Ok(file) => file, + Err(e) if e.kind() == io::ErrorKind::NotFound => { + info!("Cache file not found, creating new empty cache"); + return Ok(BootstrapCache::new()); + } + Err(e) => { + error!("Failed to open cache file: {}", e); + return Err(e.into()); + } + }; + + // Acquire shared lock for reading + file.lock_shared().map_err(|e| { + error!("Failed to acquire shared lock: {}", e); + Error::LockError + })?; + + let mut contents = String::new(); + file.read_to_string(&mut contents).map_err(|e| { + error!("Failed to read cache file: {}", e); + Error::Io(e) + })?; + + // Release lock + file.unlock().map_err(|e| { + error!("Failed to release lock: {}", e); + Error::LockError + })?; + + serde_json::from_str(&contents).map_err(|e| { + error!("Failed to parse cache file: {}", e); + Error::Json(e) + }) + } + + /// Writes the cache file with file locking and atomic replacement + pub fn write_cache(&self, cache: &BootstrapCache) -> Result<(), Error> { + debug!("Writing bootstrap cache to {:?}", self.cache_path); + + let temp_path = self.cache_path.with_extension("tmp"); + let mut file = File::create(&temp_path).map_err(|e| { + error!("Failed to create temporary cache file: {}", e); + Error::Io(e) + })?; + + // Acquire exclusive lock for writing + file.lock_exclusive().map_err(|e| { + error!("Failed to acquire exclusive lock: {}", e); + Error::LockError + })?; + + let contents = serde_json::to_string_pretty(cache).map_err(|e| { + error!("Failed to serialize cache: {}", e); + Error::Json(e) + })?; + + file.write_all(contents.as_bytes()).map_err(|e| { + error!("Failed to write cache file: {}", e); + Error::Io(e) + })?; + + file.sync_all().map_err(|e| { + error!("Failed to sync cache file: {}", e); + Error::Io(e) + })?; + + // Release lock + file.unlock().map_err(|e| { + error!("Failed to release lock: {}", e); + Error::LockError + })?; + + // Atomic rename + fs::rename(&temp_path, &self.cache_path).map_err(|e| { + error!("Failed to rename temporary cache file: {}", e); + Error::Io(e) + })?; + + info!("Successfully wrote cache file"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use tempfile::tempdir; + + #[test] + fn test_cache_read_write() { + let dir = tempdir().unwrap(); + let cache_path = dir.path().join("test_cache.json"); + + let cache = BootstrapCache { + last_updated: Utc::now(), + peers: vec![], + }; + + let manager = CacheManager { cache_path }; + manager.write_cache(&cache).unwrap(); + + let read_cache = manager.read_cache().unwrap(); + assert_eq!(cache.peers.len(), read_cache.peers.len()); + } + + #[test] + fn test_missing_cache_file() { + let dir = tempdir().unwrap(); + let cache_path = dir.path().join("nonexistent.json"); + + let manager = CacheManager { cache_path }; + let cache = manager.read_cache().unwrap(); + assert!(cache.peers.is_empty()); + } +} diff --git a/bootstrap_cache/src/cache_store.rs b/bootstrap_cache/src/cache_store.rs new file mode 100644 index 0000000000..b14551e458 --- /dev/null +++ b/bootstrap_cache/src/cache_store.rs @@ -0,0 +1,427 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::{BootstrapPeer, Error, Result, InitialPeerDiscovery}; +use libp2p::Multiaddr; +use serde::{Deserialize, Serialize}; +use std::fs::{self, File, OpenOptions}; +use std::io::{self, Read}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use fs2::FileExt; +use tempfile::NamedTempFile; +use tokio::sync::RwLock; + +const PEER_EXPIRY_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CacheData { + peers: std::collections::HashMap, + #[serde(default = "SystemTime::now")] + last_updated: SystemTime, + #[serde(default = "default_version")] + version: u32, +} + +fn default_version() -> u32 { + 1 +} + +impl Default for CacheData { + fn default() -> Self { + Self { + peers: std::collections::HashMap::new(), + last_updated: SystemTime::now(), + version: default_version(), + } + } +} + +#[derive(Clone)] +pub struct CacheStore { + cache_path: PathBuf, + config: Arc, + data: Arc>, +} + +impl CacheStore { + pub async fn new(config: crate::BootstrapConfig) -> Result { + let cache_path = config.cache_file_path.clone(); + let config = Arc::new(config); + + // Create cache directory if it doesn't exist + if let Some(parent) = cache_path.parent() { + fs::create_dir_all(parent)?; + } + + let data = if cache_path.exists() { + // Load existing cache + let mut file = OpenOptions::new() + .read(true) + .open(&cache_path) + .map_err(|e| Error::IoError(format!("Failed to open cache file: {}", e)))?; + + // Acquire shared lock for reading + Self::acquire_shared_lock(&file).await?; + + let mut contents = String::new(); + file.read_to_string(&mut contents) + .map_err(|e| Error::IoError(format!("Failed to read cache file: {}", e)))?; + + // Lock will be automatically released when file is dropped + serde_json::from_str(&contents) + .map_err(|e| Error::IoError(format!("Failed to parse cache data: {}", e)))? + } else { + Self::fallback_to_default(&config).await? + }; + + let store = Self { + cache_path, + config, + data: Arc::new(RwLock::new(data)), + }; + + // Clean up any stale peers on startup + store.cleanup_stale_peers().await?; + + Ok(store) + } + + async fn fallback_to_default(config: &crate::BootstrapConfig) -> Result { + if config.endpoints.is_empty() { + return Ok(CacheData { + peers: std::collections::HashMap::new(), + last_updated: SystemTime::now(), + version: default_version(), + }); + } + + let discovery = InitialPeerDiscovery::with_endpoints(config.endpoints.clone()); + + let peers = discovery.fetch_peers().await?; + let mut peer_map = std::collections::HashMap::new(); + for peer in peers { + peer_map.insert(peer.addr.to_string(), peer); + } + Ok(CacheData { + peers: peer_map, + last_updated: SystemTime::now(), + version: default_version(), + }) + } + + pub async fn get_peers(&self) -> Vec { + let data = self.data.read().await; + data.peers.values().cloned().collect() + } + + pub async fn get_reliable_peers(&self) -> Vec { + let data = self.data.read().await; + let reliable_peers: Vec<_> = data + .peers + .values() + .filter(|peer| peer.success_count > peer.failure_count) + .cloned() + .collect(); + + // If we have no reliable peers, try to refresh from default endpoints + if reliable_peers.is_empty() { + drop(data); + if let Ok(new_data) = Self::fallback_to_default(&self.config).await { + let mut data = self.data.write().await; + *data = new_data; + return data.peers.values() + .filter(|peer| peer.success_count > peer.failure_count) + .cloned() + .collect(); + } + } + + reliable_peers + } + + pub async fn update_peer_status(&self, addr: &str, success: bool) -> Result<()> { + let mut data = self.data.write().await; + + match addr.parse::() { + Ok(_) => { + let peer = data.peers.entry(addr.to_string()).or_insert_with(|| { + BootstrapPeer::new(addr.parse().expect("Already validated")) + }); + peer.update_status(success); + self.save_to_disk(&data).await?; + Ok(()) + } + Err(e) => Err(Error::InvalidMultiaddr(e)), + } + } + + pub async fn add_peer(&self, addr: Multiaddr) -> Result<()> { + let mut data = self.data.write().await; + let addr_str = addr.to_string(); + + if !data.peers.contains_key(&addr_str) { + data.peers.insert(addr_str, BootstrapPeer::new(addr)); + self.save_to_disk(&data).await?; + } + + Ok(()) + } + + pub async fn remove_peer(&self, addr: &str) -> Result<()> { + let mut data = self.data.write().await; + data.peers.remove(addr); + self.save_to_disk(&data).await?; + Ok(()) + } + + pub async fn cleanup_unreliable_peers(&self) -> Result<()> { + let mut data = self.data.write().await; + let unreliable_peers: Vec = data + .peers + .iter() + .filter(|(_, peer)| !peer.is_reliable()) + .map(|(addr, _)| addr.clone()) + .collect(); + + for addr in unreliable_peers { + data.peers.remove(&addr); + } + + self.save_to_disk(&data).await?; + Ok(()) + } + + pub async fn cleanup_stale_peers(&self) -> Result<()> { + let mut data = self.data.write().await; + let stale_peers: Vec = data + .peers + .iter() + .filter(|(_, peer)| { + if let Ok(elapsed) = peer.last_seen.elapsed() { + elapsed > PEER_EXPIRY_DURATION + } else { + true // If we can't get elapsed time, consider it stale + } + }) + .map(|(addr, _)| addr.clone()) + .collect(); + + for addr in stale_peers { + data.peers.remove(&addr); + } + + self.save_to_disk(&data).await?; + Ok(()) + } + + async fn acquire_shared_lock(file: &File) -> Result<()> { + let file = file.try_clone() + .map_err(|e| Error::IoError(format!("Failed to clone file handle: {}", e)))?; + + tokio::task::spawn_blocking(move || { + file.try_lock_shared() + .map_err(|e| Error::IoError(format!("Failed to acquire shared lock: {}", e))) + }) + .await + .map_err(|e| Error::IoError(format!("Failed to spawn blocking task: {}", e)))? + } + + async fn acquire_exclusive_lock(file: &File) -> Result<()> { + let mut backoff = Duration::from_millis(10); + let max_attempts = 5; + let mut attempts = 0; + + while attempts < max_attempts { + match file.try_lock_exclusive() { + Ok(_) => return Ok(()), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + attempts += 1; + if attempts == max_attempts { + return Err(Error::IoError("Failed to acquire exclusive lock after max attempts".into())); + } + tokio::time::sleep(backoff).await; + backoff *= 2; + } + Err(e) => return Err(Error::IoError(format!("Failed to acquire exclusive lock: {}", e))), + } + } + Ok(()) + } + + async fn atomic_write(&self, data: &CacheData) -> Result<()> { + // Create parent directory if it doesn't exist + if let Some(parent) = self.cache_path.parent() { + fs::create_dir_all(parent) + .map_err(|e| Error::IoError(format!("Failed to create directory: {}", e)))?; + } + + // Create a temporary file in the same directory as the cache file + let temp_file = NamedTempFile::new() + .map_err(|e| Error::IoError(format!("Failed to create temp file: {}", e)))?; + + // Write data to temporary file + serde_json::to_writer_pretty(&temp_file, &data) + .map_err(|e| Error::IoError(format!("Failed to write to temp file: {}", e)))?; + + // Open the target file with proper permissions + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.cache_path) + .map_err(|e| Error::IoError(format!("Failed to open cache file: {}", e)))?; + + // Acquire exclusive lock + Self::acquire_exclusive_lock(&file).await?; + + // Perform atomic rename + temp_file.persist(&self.cache_path) + .map_err(|e| Error::IoError(format!("Failed to persist cache file: {}", e)))?; + + // Lock will be automatically released when file is dropped + Ok(()) + } + + async fn save_to_disk(&self, data: &CacheData) -> Result<()> { + self.atomic_write(data).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + async fn create_test_store() -> (CacheStore, PathBuf) { + let temp_dir = tempdir().unwrap(); + let cache_file = temp_dir.path().join("cache.json"); + + let config = crate::BootstrapConfig::new( + vec![], // Empty endpoints to prevent fallback + 1500, + cache_file.clone(), + Duration::from_secs(60), + Duration::from_secs(10), + 3, + ); + + let store = CacheStore::new(config).await.unwrap(); + (store.clone(), store.cache_path.clone()) + } + + #[tokio::test] + async fn test_peer_update_and_save() { + let (store, _) = create_test_store().await; + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + + // Manually add a peer without using fallback + { + let mut data = store.data.write().await; + data.peers.insert(addr.to_string(), BootstrapPeer::new(addr.clone())); + store.save_to_disk(&data).await.unwrap(); + } + + store.update_peer_status(&addr.to_string(), true).await.unwrap(); + + let peers = store.get_peers().await; + assert_eq!(peers.len(), 1); + assert_eq!(peers[0].addr, addr); + assert_eq!(peers[0].success_count, 1); + assert_eq!(peers[0].failure_count, 0); + } + + #[tokio::test] + async fn test_peer_cleanup() { + let (store, _) = create_test_store().await; + let good_addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + let bad_addr: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap(); + + // Add peers + store.add_peer(good_addr.clone()).await.unwrap(); + store.add_peer(bad_addr.clone()).await.unwrap(); + + // Make one peer reliable and one unreliable + store.update_peer_status(&good_addr.to_string(), true).await.unwrap(); + for _ in 0..5 { + store.update_peer_status(&bad_addr.to_string(), false).await.unwrap(); + } + + // Clean up unreliable peers + store.cleanup_unreliable_peers().await.unwrap(); + + // Get all peers (not just reliable ones) + let peers = store.get_peers().await; + assert_eq!(peers.len(), 1); + assert_eq!(peers[0].addr, good_addr); + } + + #[tokio::test] + async fn test_stale_peer_cleanup() { + let (store, _) = create_test_store().await; + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + + // Add a peer with more failures than successes + let mut peer = BootstrapPeer::new(addr.clone()); + peer.success_count = 1; + peer.failure_count = 5; + { + let mut data = store.data.write().await; + data.peers.insert(addr.to_string(), peer); + store.save_to_disk(&data).await.unwrap(); + } + + // Clean up unreliable peers + store.cleanup_unreliable_peers().await.unwrap(); + + // Should have no peers since the only peer was unreliable + let peers = store.get_reliable_peers().await; + assert_eq!(peers.len(), 0); + } + + #[tokio::test] + async fn test_concurrent_access() { + let (store, _) = create_test_store().await; + let store = Arc::new(store); + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + + // Manually add a peer without using fallback + { + let mut data = store.data.write().await; + data.peers.insert(addr.to_string(), BootstrapPeer::new(addr.clone())); + store.save_to_disk(&data).await.unwrap(); + } + + let mut handles = vec![]; + + // Spawn multiple tasks to update peer status concurrently + for i in 0..10 { + let store = Arc::clone(&store); + let addr = addr.clone(); + + handles.push(tokio::spawn(async move { + store.update_peer_status(&addr.to_string(), i % 2 == 0).await.unwrap(); + })); + } + + // Wait for all tasks to complete + for handle in handles { + handle.await.unwrap(); + } + + // Verify the final state - should have one peer + let peers = store.get_peers().await; + assert_eq!(peers.len(), 1); + + // The peer should have a mix of successes and failures + assert!(peers[0].success_count > 0); + assert!(peers[0].failure_count > 0); + } +} diff --git a/bootstrap_cache/src/config.rs b/bootstrap_cache/src/config.rs new file mode 100644 index 0000000000..659104f0a7 --- /dev/null +++ b/bootstrap_cache/src/config.rs @@ -0,0 +1,157 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use std::time::Duration; +use std::path::{Path, PathBuf}; + +/// Configuration for the bootstrap cache +#[derive(Clone, Debug)] +pub struct BootstrapConfig { + /// List of bootstrap endpoints to fetch peer information from + pub endpoints: Vec, + /// Maximum number of peers to keep in the cache + pub max_peers: usize, + /// Path to the bootstrap cache file + pub cache_file_path: PathBuf, + /// How often to update the cache (in seconds) + pub update_interval: Duration, + /// Request timeout for endpoint queries + pub request_timeout: Duration, + /// Maximum retries per endpoint + pub max_retries: u32, +} + +impl Default for BootstrapConfig { + fn default() -> Self { + Self { + endpoints: vec![ + "https://sn-testnet.s3.eu-west-2.amazonaws.com/bootstrap_cache.json".to_string(), + "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts".to_string(), + "https://sn-node1.s3.eu-west-2.amazonaws.com/peers".to_string(), + "https://sn-node2.s3.eu-west-2.amazonaws.com/peers".to_string(), + ], + max_peers: 1500, + cache_file_path: default_cache_path(), + update_interval: Duration::from_secs(60), + request_timeout: Duration::from_secs(10), + max_retries: 3, + } + } +} + +impl BootstrapConfig { + /// Creates a new BootstrapConfig with custom endpoints + pub fn with_endpoints(endpoints: Vec) -> Self { + Self { + endpoints, + ..Default::default() + } + } + + /// Creates a new BootstrapConfig with a custom cache file path + pub fn with_cache_path>(path: P) -> Self { + Self { + cache_file_path: path.as_ref().to_path_buf(), + ..Default::default() + } + } + + /// Creates a new BootstrapConfig with custom settings + pub fn new( + endpoints: Vec, + max_peers: usize, + cache_file_path: PathBuf, + update_interval: Duration, + request_timeout: Duration, + max_retries: u32, + ) -> Self { + Self { + endpoints, + max_peers, + cache_file_path, + update_interval, + request_timeout, + max_retries, + } + } +} + +/// Returns the default path for the bootstrap cache file +fn default_cache_path() -> PathBuf { + dirs::cache_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join("safe_network") + .join("bootstrap_cache.json") +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_default_config() { + let config = BootstrapConfig::default(); + assert_eq!(config.endpoints.len(), 4); + assert_eq!( + config.endpoints[0], + "https://sn-testnet.s3.eu-west-2.amazonaws.com/bootstrap_cache.json" + ); + assert_eq!( + config.endpoints[1], + "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts" + ); + assert_eq!( + config.endpoints[2], + "https://sn-node1.s3.eu-west-2.amazonaws.com/peers" + ); + assert_eq!( + config.endpoints[3], + "https://sn-node2.s3.eu-west-2.amazonaws.com/peers" + ); + assert_eq!(config.max_peers, 1500); + assert_eq!(config.update_interval, Duration::from_secs(60)); + assert_eq!(config.request_timeout, Duration::from_secs(10)); + assert_eq!(config.max_retries, 3); + } + + #[test] + fn test_custom_endpoints() { + let endpoints = vec!["http://custom.endpoint/cache".to_string()]; + let config = BootstrapConfig::with_endpoints(endpoints.clone()); + assert_eq!(config.endpoints, endpoints); + } + + #[test] + fn test_custom_cache_path() { + let path = PathBuf::from("/custom/path/cache.json"); + let config = BootstrapConfig::with_cache_path(&path); + assert_eq!(config.cache_file_path, path); + } + + #[test] + fn test_new_config() { + let endpoints = vec!["http://custom.endpoint/cache".to_string()]; + let path = PathBuf::from("/custom/path/cache.json"); + let config = BootstrapConfig::new( + endpoints.clone(), + 2000, + path.clone(), + Duration::from_secs(120), + Duration::from_secs(5), + 5, + ); + + assert_eq!(config.endpoints, endpoints); + assert_eq!(config.max_peers, 2000); + assert_eq!(config.cache_file_path, path); + assert_eq!(config.update_interval, Duration::from_secs(120)); + assert_eq!(config.request_timeout, Duration::from_secs(5)); + assert_eq!(config.max_retries, 5); + } +} diff --git a/bootstrap_cache/src/error.rs b/bootstrap_cache/src/error.rs new file mode 100644 index 0000000000..ab8c656e77 --- /dev/null +++ b/bootstrap_cache/src/error.rs @@ -0,0 +1,38 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("No peers found: {0}")] + NoPeersFound(String), + + #[error("Invalid multiaddr: {0}")] + InvalidMultiaddr(#[from] libp2p::multiaddr::Error), + + #[error("HTTP request failed: {0}")] + HttpRequest(#[from] reqwest::Error), + + #[error("JSON parsing error: {0}")] + JsonParsing(#[from] serde_json::Error), + + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + + #[error("IO error: {0}")] + IoError(String), + + #[error("Request timed out: {0}")] + Timeout(#[from] tokio::time::error::Elapsed), + + #[error("Failed to acquire file lock")] + LockError, +} + +pub type Result = std::result::Result; diff --git a/bootstrap_cache/src/initial_peer_discovery.rs b/bootstrap_cache/src/initial_peer_discovery.rs new file mode 100644 index 0000000000..2d21ef0fed --- /dev/null +++ b/bootstrap_cache/src/initial_peer_discovery.rs @@ -0,0 +1,330 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::{BootstrapPeer, BootstrapEndpoints, Error, Result}; +use libp2p::Multiaddr; +use reqwest::Client; +use serde_json; +use tracing::{info, warn}; +use tokio::time::timeout; + +const DEFAULT_JSON_ENDPOINT: &str = "https://sn-testnet.s3.eu-west-2.amazonaws.com/bootstrap_cache.json"; + +const DEFAULT_BOOTSTRAP_ENDPOINTS: &[&str] = &[ + DEFAULT_JSON_ENDPOINT, + "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts", + "https://sn-node1.s3.eu-west-2.amazonaws.com/peers", + "https://sn-node2.s3.eu-west-2.amazonaws.com/peers", +]; + +const FETCH_TIMEOUT_SECS: u64 = 30; + +/// Discovers initial peers from a list of endpoints +pub struct InitialPeerDiscovery { + endpoints: Vec, + client: Client, +} + +impl InitialPeerDiscovery { + pub fn new() -> Self { + Self { + endpoints: DEFAULT_BOOTSTRAP_ENDPOINTS.iter().map(|s| s.to_string()).collect(), + client: Client::new(), + } + } + + pub fn with_endpoints(endpoints: Vec) -> Self { + Self { + endpoints, + client: Client::new(), + } + } + + /// Load endpoints from a JSON file + pub async fn from_json(json_str: &str) -> Result { + let endpoints: BootstrapEndpoints = serde_json::from_str(json_str)?; + Ok(Self { + endpoints: endpoints.peers, + client: Client::new(), + }) + } + + /// Fetch peers from all configured endpoints + pub async fn fetch_peers(&self) -> Result> { + let mut peers = Vec::new(); + let mut last_error = None; + + for endpoint in &self.endpoints { + match self.fetch_from_endpoint(endpoint).await { + Ok(mut endpoint_peers) => { + info!("Found {} peers from {}", endpoint_peers.len(), endpoint); + peers.append(&mut endpoint_peers); + } + Err(e) => { + warn!("Failed to fetch peers from {}: {}", endpoint, e); + last_error = Some(e); + } + } + } + + if peers.is_empty() { + if let Some(e) = last_error { + Err(Error::NoPeersFound(format!("No valid peers found from any endpoint: {}", e))) + } else { + Err(Error::NoPeersFound("No valid peers found from any endpoint".to_string())) + } + } else { + Ok(peers) + } + } + + async fn fetch_from_endpoint(&self, endpoint: &str) -> Result> { + let response = timeout( + std::time::Duration::from_secs(FETCH_TIMEOUT_SECS), + self.client.get(endpoint).send(), + ) + .await??; + + let content = response.text().await?; + info!("Received response content: {}", content); + + // If it's an Amazon S3 endpoint, treat it as plain text + if endpoint.contains("amazonaws.com") { + return Ok(content + .lines() + .filter(|line| !line.trim().is_empty()) + .filter_map(|addr| { + match addr.trim().parse::() { + Ok(addr) => Some(BootstrapPeer::new(addr)), + Err(e) => { + warn!("Invalid multiaddr in plain text response: {}: {}", addr, e); + None + } + } + }) + .collect()); + } + + // For other endpoints, try parsing as JSON + match serde_json::from_str::(&content) { + Ok(json_endpoints) => { + info!("Successfully parsed JSON response"); + let mut peers = Vec::new(); + + // Handle direct peer multiaddresses + for addr in json_endpoints.peers { + match addr.parse::() { + Ok(addr) => peers.push(BootstrapPeer::new(addr)), + Err(e) => { + warn!("Failed to parse multiaddr {}: {}", addr, e); + } + } + } + + if peers.is_empty() { + Err(Error::NoPeersFound("No valid peers found in JSON response".to_string())) + } else { + Ok(peers) + } + } + Err(e) => { + warn!("Failed to parse JSON response: {}", e); + // Try parsing as plain text as a last resort + Ok(content + .lines() + .filter(|line| !line.trim().is_empty()) + .filter_map(|addr| { + match addr.trim().parse::() { + Ok(addr) => Some(BootstrapPeer::new(addr)), + Err(e) => { + warn!("Invalid multiaddr in plain text response: {}: {}", addr, e); + None + } + } + }) + .collect()) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use wiremock::{ + matchers::{method, path}, + Mock, MockServer, ResponseTemplate, + }; + + #[tokio::test] + async fn test_fetch_peers() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/")) + .respond_with(ResponseTemplate::new(200).set_body_string( + "/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080", + )) + .mount(&mock_server) + .await; + + let mut discovery = InitialPeerDiscovery::new(); + discovery.endpoints = vec![mock_server.uri()]; + + let peers = discovery.fetch_peers().await.unwrap(); + assert_eq!(peers.len(), 2); + + let addr1: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + let addr2: Multiaddr = "/ip4/127.0.0.2/tcp/8080".parse().unwrap(); + assert!(peers.iter().any(|p| p.addr == addr1)); + assert!(peers.iter().any(|p| p.addr == addr2)); + } + + #[tokio::test] + async fn test_endpoint_failover() { + let mock_server1 = MockServer::start().await; + let mock_server2 = MockServer::start().await; + + // First endpoint fails + Mock::given(method("GET")) + .and(path("/")) + .respond_with(ResponseTemplate::new(500)) + .mount(&mock_server1) + .await; + + // Second endpoint succeeds + Mock::given(method("GET")) + .and(path("/")) + .respond_with(ResponseTemplate::new(200).set_body_string( + "/ip4/127.0.0.1/tcp/8080", + )) + .mount(&mock_server2) + .await; + + let mut discovery = InitialPeerDiscovery::new(); + discovery.endpoints = vec![mock_server1.uri(), mock_server2.uri()]; + + let peers = discovery.fetch_peers().await.unwrap(); + assert_eq!(peers.len(), 1); + + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + assert_eq!(peers[0].addr, addr); + } + + #[tokio::test] + async fn test_invalid_multiaddr() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/")) + .respond_with(ResponseTemplate::new(200).set_body_string( + "/ip4/127.0.0.1/tcp/8080\ninvalid-addr\n/ip4/127.0.0.2/tcp/8080", + )) + .mount(&mock_server) + .await; + + let mut discovery = InitialPeerDiscovery::new(); + discovery.endpoints = vec![mock_server.uri()]; + + let peers = discovery.fetch_peers().await.unwrap(); + let valid_addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + assert_eq!(peers[0].addr, valid_addr); + } + + #[tokio::test] + async fn test_empty_response() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/")) + .respond_with(ResponseTemplate::new(200).set_body_string("")) + .mount(&mock_server) + .await; + + let mut discovery = InitialPeerDiscovery::new(); + discovery.endpoints = vec![mock_server.uri()]; + + let result = discovery.fetch_peers().await; + assert!(matches!(result, Err(Error::NoPeersFound(_)))); + } + + #[tokio::test] + async fn test_whitespace_and_empty_lines() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/")) + .respond_with(ResponseTemplate::new(200).set_body_string( + "\n \n/ip4/127.0.0.1/tcp/8080\n \n", + )) + .mount(&mock_server) + .await; + + let mut discovery = InitialPeerDiscovery::new(); + discovery.endpoints = vec![mock_server.uri()]; + + let peers = discovery.fetch_peers().await.unwrap(); + assert_eq!(peers.len(), 1); + + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + assert_eq!(peers[0].addr, addr); + } + + #[tokio::test] + async fn test_default_endpoints() { + let discovery = InitialPeerDiscovery::new(); + assert_eq!(discovery.endpoints.len(), 4); + assert_eq!( + discovery.endpoints[0], + "https://sn-testnet.s3.eu-west-2.amazonaws.com/bootstrap_cache.json" + ); + assert_eq!( + discovery.endpoints[1], + "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts" + ); + assert_eq!( + discovery.endpoints[2], + "https://sn-node1.s3.eu-west-2.amazonaws.com/peers" + ); + assert_eq!( + discovery.endpoints[3], + "https://sn-node2.s3.eu-west-2.amazonaws.com/peers" + ); + } + + #[tokio::test] + async fn test_custom_endpoints() { + let endpoints = vec!["http://example.com".to_string()]; + let discovery = InitialPeerDiscovery::with_endpoints(endpoints.clone()); + assert_eq!(discovery.endpoints, endpoints); + } + + #[tokio::test] + async fn test_json_endpoints() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"{"peers": ["/ip4/127.0.0.1/tcp/8080", "/ip4/127.0.0.2/tcp/8080"]}"#, + )) + .mount(&mock_server) + .await; + + let mut discovery = InitialPeerDiscovery::new(); + discovery.endpoints = vec![mock_server.uri()]; + + let peers = discovery.fetch_peers().await.unwrap(); + assert_eq!(peers.len(), 2); + + let addr1: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + let addr2: Multiaddr = "/ip4/127.0.0.2/tcp/8080".parse().unwrap(); + assert!(peers.iter().any(|p| p.addr == addr1)); + assert!(peers.iter().any(|p| p.addr == addr2)); + } +} diff --git a/bootstrap_cache/src/lib.rs b/bootstrap_cache/src/lib.rs new file mode 100644 index 0000000000..a5cd7f1c24 --- /dev/null +++ b/bootstrap_cache/src/lib.rs @@ -0,0 +1,115 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +mod initial_peer_discovery; +mod cache_store; +pub mod config; +mod error; + +use libp2p::Multiaddr; +use serde::{Deserialize, Serialize}; +use std::{fmt, time::SystemTime}; +use thiserror::Error; +use chrono; + +pub use cache_store::CacheStore; +pub use config::BootstrapConfig; +pub use error::{Error, Result}; +pub use initial_peer_discovery::InitialPeerDiscovery; + +/// Structure representing a list of bootstrap endpoints +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BootstrapEndpoints { + /// List of peer multiaddresses + pub peers: Vec, + /// Optional metadata about the endpoints + #[serde(default)] + pub metadata: EndpointMetadata, +} + +/// Metadata about bootstrap endpoints +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EndpointMetadata { + /// When the endpoints were last updated + #[serde(default = "default_last_updated")] + pub last_updated: String, + /// Optional description of the endpoints + #[serde(default)] + pub description: String, +} + +fn default_last_updated() -> String { + chrono::Utc::now().to_rfc3339() +} + +impl Default for EndpointMetadata { + fn default() -> Self { + Self { + last_updated: default_last_updated(), + description: String::new(), + } + } +} + +/// A peer that can be used for bootstrapping into the network +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BootstrapPeer { + /// The multiaddress of the peer + pub addr: Multiaddr, + /// The number of successful connections to this peer + pub success_count: u32, + /// The number of failed connection attempts to this peer + pub failure_count: u32, + /// The last time this peer was successfully contacted + pub last_seen: SystemTime, +} + +impl BootstrapPeer { + pub fn new(addr: Multiaddr) -> Self { + Self { + addr, + success_count: 0, + failure_count: 0, + last_seen: SystemTime::now(), + } + } + + pub fn update_status(&mut self, success: bool) { + if success { + self.success_count += 1; + self.last_seen = SystemTime::now(); + } else { + self.failure_count += 1; + } + } + + pub fn is_reliable(&self) -> bool { + // A peer is considered reliable if it has more successes than failures + self.success_count > self.failure_count + } +} + +impl fmt::Display for BootstrapPeer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "BootstrapPeer {{ addr: {}, last_seen: {:?}, success: {}, failure: {} }}", + self.addr, self.last_seen, self.success_count, self.failure_count + ) + } +} + +/// Creates a new bootstrap cache with default configuration +pub async fn new() -> Result { + CacheStore::new(BootstrapConfig::default()).await +} + +/// Creates a new bootstrap cache with custom configuration +pub async fn with_config(config: BootstrapConfig) -> Result { + CacheStore::new(config).await +} diff --git a/bootstrap_cache/tests/integration_tests.rs b/bootstrap_cache/tests/integration_tests.rs new file mode 100644 index 0000000000..273d9600e9 --- /dev/null +++ b/bootstrap_cache/tests/integration_tests.rs @@ -0,0 +1,175 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use bootstrap_cache::{BootstrapEndpoints, InitialPeerDiscovery}; +use libp2p::Multiaddr; +use tracing_subscriber::{fmt, EnvFilter}; +use wiremock::{ + matchers::{method, path}, + Mock, MockServer, ResponseTemplate, +}; + +// Initialize logging for tests +fn init_logging() { + let _ = fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); +} + +#[tokio::test] +async fn test_fetch_from_amazon_s3() { + init_logging(); + let discovery = InitialPeerDiscovery::new(); + let peers = discovery.fetch_peers().await.unwrap(); + + // We should get some peers + assert!(!peers.is_empty(), "Expected to find some peers from S3"); + + // Verify that all peers have valid multiaddresses + for peer in &peers { + println!("Found peer: {}", peer.addr); + let addr_str = peer.addr.to_string(); + assert!(addr_str.contains("/ip4/"), "Expected IPv4 address"); + assert!(addr_str.contains("/udp/"), "Expected UDP port"); + assert!(addr_str.contains("/quic-v1/"), "Expected QUIC protocol"); + assert!(addr_str.contains("/p2p/"), "Expected peer ID"); + } +} + +#[tokio::test] +async fn test_individual_s3_endpoints() { + init_logging(); + + // Start a mock server + let mock_server = MockServer::start().await; + + // Create mock responses + let mock_response = r#"/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE +/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERF"#; + + // Mount the mock + Mock::given(method("GET")) + .and(path("/peers")) + .respond_with(ResponseTemplate::new(200).set_body_string(mock_response)) + .mount(&mock_server) + .await; + + let endpoint = format!("{}/peers", mock_server.uri()); + let discovery = InitialPeerDiscovery::with_endpoints(vec![endpoint.clone()]); + + match discovery.fetch_peers().await { + Ok(peers) => { + println!("Successfully fetched {} peers from {}", peers.len(), endpoint); + assert!(!peers.is_empty(), "Expected to find peers from {}", endpoint); + + // Verify first peer's multiaddr format + if let Some(first_peer) = peers.first() { + let addr_str = first_peer.addr.to_string(); + println!("First peer from {}: {}", endpoint, addr_str); + assert!(addr_str.contains("/ip4/"), "Expected IPv4 address"); + assert!(addr_str.contains("/udp/"), "Expected UDP port"); + assert!(addr_str.contains("/quic-v1/"), "Expected QUIC protocol"); + assert!(addr_str.contains("/p2p/"), "Expected peer ID"); + + // Try to parse it back to ensure it's valid + assert!(addr_str.parse::().is_ok(), "Should be valid multiaddr"); + } + } + Err(e) => { + panic!("Failed to fetch peers from {}: {}", endpoint, e); + } + } +} + +#[tokio::test] +async fn test_response_format() { + init_logging(); + let discovery = InitialPeerDiscovery::new(); + let peers = discovery.fetch_peers().await.unwrap(); + + // Get the first peer to check format + let first_peer = peers.first().expect("Expected at least one peer"); + let addr_str = first_peer.addr.to_string(); + + // Print the address for debugging + println!("First peer address: {}", addr_str); + + // Verify address components + let components: Vec<&str> = addr_str.split('/').collect(); + assert!(components.contains(&"ip4"), "Missing IP4 component"); + assert!(components.contains(&"udp"), "Missing UDP component"); + assert!(components.contains(&"quic-v1"), "Missing QUIC component"); + assert!(components.iter().any(|&c| c == "p2p"), "Missing P2P component"); + + // Ensure we can parse it back into a multiaddr + let parsed: Multiaddr = addr_str.parse().expect("Should be valid multiaddr"); + assert_eq!(parsed.to_string(), addr_str, "Multiaddr should round-trip"); +} + +#[tokio::test] +async fn test_json_endpoint_format() { + init_logging(); + let mock_server = MockServer::start().await; + + // Create a mock JSON response + let json_response = r#" + { + "peers": [ + "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE", + "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERF" + ], + "metadata": { + "description": "Test endpoints", + "last_updated": "2024-01-01T00:00:00Z" + } + } + "#; + + // Mount the mock + Mock::given(method("GET")) + .and(path("/")) // Use root path instead of /peers + .respond_with(ResponseTemplate::new(200).set_body_string(json_response)) + .mount(&mock_server) + .await; + + let endpoint = format!("{}", mock_server.uri()); + let discovery = InitialPeerDiscovery::with_endpoints(vec![endpoint.clone()]); + + let peers = discovery.fetch_peers().await.unwrap(); + assert_eq!(peers.len(), 2); + + // Verify peer addresses + let addrs: Vec = peers.iter().map(|p| p.addr.to_string()).collect(); + assert!(addrs.contains(&"/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE".to_string())); + assert!(addrs.contains(&"/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERF".to_string())); +} + +#[tokio::test] +async fn test_s3_json_format() { + init_logging(); + + // Fetch and parse the bootstrap cache JSON + let response = reqwest::get("https://sn-testnet.s3.eu-west-2.amazonaws.com/bootstrap_cache.json") + .await + .unwrap(); + let json_str = response.text().await.unwrap(); + + // Parse using our BootstrapEndpoints struct + let endpoints: BootstrapEndpoints = serde_json::from_str(&json_str).unwrap(); + + // Verify we got all the peers + assert_eq!(endpoints.peers.len(), 24); + + // Verify we can parse each peer address + for peer in endpoints.peers { + peer.parse::().unwrap(); + } + + // Verify metadata + assert_eq!(endpoints.metadata.description, "Safe Network testnet bootstrap cache"); +} diff --git a/docs/bootstrap_cache_implementation.md b/docs/bootstrap_cache_implementation.md new file mode 100644 index 0000000000..0793465fc2 --- /dev/null +++ b/docs/bootstrap_cache_implementation.md @@ -0,0 +1,339 @@ +# Bootstrap Cache Implementation Guide + +This guide breaks down the implementation of the bootstrap cache system into testable components. Each component can be implemented and tested independently before integration. + +## Phase 1: Bootstrap Cache File Management + +### 1.1 Cache File Structure +```rust +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct BootstrapPeer { + pub ip: String, + pub port: u16, + pub last_seen: DateTime, + pub success_count: u32, + pub failure_count: u32, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct BootstrapCache { + pub last_updated: DateTime, + pub peers: Vec, +} +``` + +### 1.2 File Operations Implementation +1. Create `bootstrap_cache.rs`: +```rust +use std::path::PathBuf; +use std::fs::{self, File}; +use std::io::{self, Read, Write}; +use fs2::FileExt; // For file locking + +pub struct CacheManager { + cache_path: PathBuf, +} + +impl CacheManager { + pub fn new() -> io::Result { + let cache_path = Self::get_cache_path()?; + Ok(Self { cache_path }) + } + + fn get_cache_path() -> io::Result { + #[cfg(target_os = "macos")] + let path = PathBuf::from("/Library/Application Support/Safe/bootstrap_cache.json"); + + #[cfg(target_os = "linux")] + let path = PathBuf::from("/var/safe/bootstrap_cache.json"); + + #[cfg(target_os = "windows")] + let path = PathBuf::from(r"C:\ProgramData\Safe\bootstrap_cache.json"); + + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + Ok(path) + } + + pub fn read_cache(&self) -> io::Result { + let file = File::open(&self.cache_path)?; + file.lock_shared()?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + file.unlock()?; + + serde_json::from_str(&contents) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + } + + pub fn write_cache(&self, cache: &BootstrapCache) -> io::Result<()> { + let temp_path = self.cache_path.with_extension("tmp"); + let mut file = File::create(&temp_path)?; + file.lock_exclusive()?; + + let contents = serde_json::to_string_pretty(cache) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + file.write_all(contents.as_bytes())?; + file.sync_all()?; + file.unlock()?; + + fs::rename(temp_path, &self.cache_path)?; + Ok(()) + } +} +``` + +### Test Cases for Phase 1 +1. File Operations: +```rust +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn test_cache_read_write() { + let dir = tempdir().unwrap(); + let cache_path = dir.path().join("test_cache.json"); + + let mut cache = BootstrapCache { + last_updated: Utc::now(), + peers: vec![ + BootstrapPeer { + ip: "127.0.0.1".to_string(), + port: 8080, + last_seen: Utc::now(), + success_count: 1, + failure_count: 0, + } + ], + }; + + // Test writing + let manager = CacheManager::new().unwrap(); + manager.write_cache(&cache).unwrap(); + + // Test reading + let read_cache = manager.read_cache().unwrap(); + assert_eq!(cache.peers.len(), read_cache.peers.len()); + assert_eq!(cache.peers[0].ip, read_cache.peers[0].ip); + } +} +``` + +## Phase 2: Initial Peer Discovery + +### 2.1 Web Endpoint Implementation +1. Create `peer_discovery.rs`: +```rust +pub struct PeerDiscovery { + endpoints: Vec, +} + +impl PeerDiscovery { + pub fn new() -> Self { + Self { + endpoints: vec![ + "http://45.3.4.6/safe/peers.json".to_string(), + "http://46.4.5.7/safe/peers.json".to_string(), + ], + } + } + + pub async fn fetch_peers(&self) -> Result, Error> { + for endpoint in &self.endpoints { + match self.fetch_from_endpoint(endpoint).await { + Ok(peers) => return Ok(peers), + Err(_) => continue, + } + } + Err(Error::NoEndpointsAvailable) + } + + async fn fetch_from_endpoint(&self, endpoint: &str) -> Result, Error> { + let client = reqwest::Client::new(); + let response = client + .get(endpoint) + .timeout(Duration::from_secs(5)) + .send() + .await?; + + let peers: Vec = response.json().await?; + Ok(peers) + } +} +``` + +### Test Cases for Phase 2 +1. Endpoint Fetching: +```rust +#[cfg(test)] +mod tests { + use super::*; + use wiremock::{MockServer, Mock, ResponseTemplate}; + use wiremock::matchers::method; + + #[tokio::test] + async fn test_fetch_peers() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(200).set_body_json(vec![ + BootstrapPeer { + ip: "192.168.1.1".to_string(), + port: 8080, + last_seen: Utc::now(), + success_count: 1, + failure_count: 0, + } + ])) + .mount(&mock_server) + .await; + + let discovery = PeerDiscovery::new(); + let peers = discovery.fetch_from_endpoint(&mock_server.uri()).await.unwrap(); + + assert_eq!(peers.len(), 1); + assert_eq!(peers[0].ip, "192.168.1.1"); + } +} +``` + +## Phase 3: Node Implementation + +### 3.1 Node Bootstrap Process +1. Create `node_bootstrap.rs`: +```rust +pub struct NodeBootstrap { + cache_manager: CacheManager, + peer_discovery: PeerDiscovery, +} + +impl NodeBootstrap { + pub fn new() -> io::Result { + Ok(Self { + cache_manager: CacheManager::new()?, + peer_discovery: PeerDiscovery::new(), + }) + } + + pub async fn get_bootstrap_peers(&self) -> Result, Error> { + // Try local cache first + match self.cache_manager.read_cache() { + Ok(cache) if !Self::is_cache_stale(&cache) => { + Ok(cache.peers) + } + _ => { + // Fetch from endpoints if cache is stale or unavailable + let peers = self.peer_discovery.fetch_peers().await?; + + // Update cache with new peers + self.cache_manager.write_cache(&BootstrapCache { + last_updated: Utc::now(), + peers: peers.clone(), + })?; + + Ok(peers) + } + } + } + + fn is_cache_stale(cache: &BootstrapCache) -> bool { + Utc::now().signed_duration_since(cache.last_updated).num_hours() > 24 + } +} +``` + +### Test Cases for Phase 3 +1. Bootstrap Process: +```rust +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_bootstrap_process() { + let bootstrap = NodeBootstrap::new().unwrap(); + + // Test with empty/stale cache + let peers = bootstrap.get_bootstrap_peers().await.unwrap(); + assert!(!peers.is_empty()); + + // Test with fresh cache + let peers_cached = bootstrap.get_bootstrap_peers().await.unwrap(); + assert_eq!(peers.len(), peers_cached.len()); + } +} +``` + +## Testing Strategy + +### Unit Tests +- Test each component in isolation +- Use mock objects for external dependencies +- Test error conditions and edge cases + +### Integration Tests +1. File System Integration: + - Test file locking with multiple processes + - Test permissions and directory creation + - Test cache file corruption scenarios + +2. Network Integration: + - Test endpoint failover + - Test timeout handling + - Test malformed response handling + +3. Full System Integration: + - Test complete bootstrap process + - Test cache updates during operation + - Test peer statistics updates + +## Development Steps + +1. **Setup (Day 1)** + - [ ] Create project structure + - [ ] Add necessary dependencies + - [ ] Implement basic data structures + +2. **Cache Management (Day 2-3)** + - [ ] Implement `CacheManager` + - [ ] Add file locking + - [ ] Add unit tests + - [ ] Test cross-platform compatibility + +3. **Peer Discovery (Day 4-5)** + - [ ] Implement `PeerDiscovery` + - [ ] Add endpoint failover + - [ ] Add unit tests + - [ ] Test network scenarios + +4. **Node Integration (Day 6-7)** + - [ ] Implement `NodeBootstrap` + - [ ] Add integration tests + - [ ] Test full system flow + +5. **Documentation and Cleanup (Day 8)** + - [ ] Add API documentation + - [ ] Create usage examples + - [ ] Performance testing + - [ ] Security review + +## Dependencies +```toml +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +chrono = { version = "0.4", features = ["serde"] } +tokio = { version = "1.0", features = ["full"] } +reqwest = { version = "0.11", features = ["json"] } +fs2 = "0.4" +thiserror = "1.0" + +[dev-dependencies] +tempfile = "3.2" +wiremock = "0.5" +tokio-test = "0.4" +``` diff --git a/docs/bootstrap_cache_prd.md b/docs/bootstrap_cache_prd.md new file mode 100644 index 0000000000..cb1eaf613e --- /dev/null +++ b/docs/bootstrap_cache_prd.md @@ -0,0 +1,162 @@ +# Bootstrap Cache PRD + +## Overview +This document outlines the design and implementation of a decentralized bootstrap cache system for the Safe Network. This system replaces the current centralized "bootstrap node" concept with a fully decentralized approach where all nodes are equal participants. + +## Goals +- Remove the concept of dedicated "bootstrap nodes" +- Implement a shared local cache system for both nodes and clients +- Reduce infrastructure costs +- Improve network stability and decentralization +- Simplify the bootstrapping process + +## Non-Goals +- Creating any form of centralized node discovery +- Implementing DNS-based discovery +- Maintaining long-term connections between nodes +- Running HTTP servers on nodes + +## Technical Design + +### Bootstrap Cache File +- Location: + - Unix/Linux: `/var/safe/bootstrap_cache.json` + - macOS: `/Library/Application Support/Safe/bootstrap_cache.json` + - Windows: `C:\ProgramData\Safe\bootstrap_cache.json` +- Format: JSON file containing: + ```json + { + "last_updated": "ISO-8601-timestamp", + "peers": [ + { + "ip": "string", + "port": "number", + "last_seen": "ISO-8601-timestamp", + "success_count": "number", + "failure_count": "number" + } + ] + } + ``` + +### Cache Management +1. **Writing Cache** + - Write to cache every 60 minutes if changes occurred + - Write to cache on clean node/client shutdown + - Keep track of successful/failed connection attempts + - Limit cache size to prevent bloat (e.g., 1000 entries) + - Handle file locking for concurrent access from multiple nodes/clients + +2. **Reading Cache** + - On startup, read shared local cache if available + - If cache is stale (>24 hours) or empty, fetch from known web endpoints + - Sort peers by success rate and last_seen timestamp + +### Concurrent File Access +- File locking mechanism using platform-native file locks (flock on Unix/Linux, LockFileEx on Windows) +- Atomic file writes using a write-rename pattern: + 1. Write to temporary file + 2. Acquire exclusive lock + 3. Rename temporary file to target file + 4. Release lock +- Exponential backoff for lock acquisition to handle contention +- Read operations use shared locks to prevent conflicts with writes +- Lock timeout mechanism to prevent deadlocks + +### Client Updates +- Clients have full read/write access to the cache file +- Client permissions: + - Read: Always allowed + - Write: Only when updating peer reliability metrics or adding new peers +- Update validation: + - Clients can only update metrics for peers they've directly connected to + - New peers must be validated before being added to the cache + - Updates must preserve existing peer data not modified by the client +- Client-specific considerations: + - Temporary file permissions match the main cache file + - Error handling for permission issues on restricted systems + - Fallback to user-specific cache if system-wide cache is inaccessible + +### Initial Peer Discovery +- Hardcoded list of web endpoints (no DNS resolution) +- Example: + ```rust + const BOOTSTRAP_ENDPOINTS: &[&str] = &[ + "http://45.3.4.6/safe/peers.json", + "http://46.4.5.7/safe/peers.json", + // Add more as needed + ]; + ``` +- Simple JSON endpoint returning peer list +- Used only for new nodes or when cache is stale +- Endpoints are maintained by the Safe Network team + +### Node Implementation +1. **Cache Management** + - Use routing table as in-memory cache + - Periodically write to shared bootstrap cache file + - Track connection success/failure rates + +2. **Startup Process** + ```rust + fn startup() { + // 1. Try shared local cache + let peers = read_shared_cache(); + + // 2. If cache empty/stale, fetch from web endpoints + if peers.is_empty() || is_stale(peers) { + peers = fetch_from_endpoints(); + } + + // 3. Begin Kademlia lookups with closest peers + start_kad_lookups(peers); + } + ``` + +### Client Implementation +1. **Cache Management** + - Maintain vector of known peers + - Read/write to shared bootstrap cache file + - Track connection success/failure rates + +2. **Connection Process** + ```rust + fn get_chunk(chunk_id: ChunkId) { + // 1. Get closest peers from cache + let peers = get_closest_peers(chunk_id); + + // 2. Perform Kademlia lookup + let result = kad_lookup(chunk_id, peers); + + // 3. Update peer statistics + update_peer_stats(peers, result); + + // 4. Disconnect after operation + cleanup_connections(); + } + ``` + +## Migration Plan +1. Phase 1: Implement shared bootstrap cache file system +2. Phase 2: Set up simple web endpoints for initial peer discovery +3. Phase 3: Remove existing bootstrap node infrastructure +4. Phase 4: Update client APIs to remove connect() pattern + +## Success Metrics +- Reduction in infrastructure costs +- More evenly distributed network load +- Improved network resilience +- Faster node/client startup times + +## Security Considerations +- File permissions for shared cache file +- Validation of peer addresses +- Protection against malicious cache entries +- No DNS dependencies to prevent DNS-based attacks +- File locking mechanism to prevent corruption + +## Future Enhancements +- Optional peer sharing between trusted nodes +- Geographic-based peer selection +- Reputation system for peers +- Automated endpoint list updates via releases