Skip to content

Commit

Permalink
Merge pull request #2510 from b-zee/feat-autonomi-client-init
Browse files Browse the repository at this point in the history
feat: Client::init routine that adds bootstrap peers to routing table
  • Loading branch information
jacderida authored Dec 16, 2024
2 parents 3345299 + 8769f8a commit 7309291
Show file tree
Hide file tree
Showing 19 changed files with 179 additions and 52 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ jobs:

- name: Run autonomi tests
timeout-minutes: 25
run: cargo test --release --package autonomi --lib --features="full,fs"
run: cargo test --release --package autonomi --features full --lib

- name: Run autonomi doc tests
timeout-minutes: 25
run: cargo test --release --package autonomi --features full --doc

- name: Run bootstrap tests
timeout-minutes: 25
Expand Down
13 changes: 9 additions & 4 deletions ant-bootstrap/src/initial_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,21 @@ impl PeersArgs {
return Ok(vec![]);
}

let mut bootstrap_addresses = vec![];

// Read from ANT_PEERS environment variable if present
bootstrap_addresses.extend(Self::read_bootstrap_addr_from_env());

if !bootstrap_addresses.is_empty() {
return Ok(bootstrap_addresses);
}

// If local mode is enabled, return empty store (will use mDNS)
if self.local || cfg!(feature = "local") {
info!("Local mode enabled, using only local discovery.");
return Ok(vec![]);
}

let mut bootstrap_addresses = vec![];

// Add addrs from arguments if present
for addr in &self.addrs {
if let Some(addr) = craft_valid_multiaddr(addr, false) {
Expand All @@ -124,8 +131,6 @@ impl PeersArgs {
warn!("Invalid multiaddress format from arguments: {addr}");
}
}
// Read from ANT_PEERS environment variable if present
bootstrap_addresses.extend(Self::read_bootstrap_addr_from_env());

if let Some(count) = count {
if bootstrap_addresses.len() >= count {
Expand Down
2 changes: 1 addition & 1 deletion ant-cli/src/actions/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub async fn connect_to_network(peers: Vec<Multiaddr>) -> Result<Client> {

progress_bar.set_message("Connecting to The Autonomi Network...");

match Client::connect(&peers).await {
match Client::init_with_peers(peers).await {
Ok(client) => {
info!("Connected to the Network");
progress_bar.finish_with_message("Connected to the Network");
Expand Down
2 changes: 1 addition & 1 deletion ant-node/tests/common/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl LocalNetwork {

println!("Client bootstrap with peer {bootstrap_peers:?}");
info!("Client bootstrap with peer {bootstrap_peers:?}");
Client::connect(&bootstrap_peers)
Client::init_with_peers(bootstrap_peers)
.await
.expect("Client shall be successfully created.")
}
Expand Down
4 changes: 4 additions & 0 deletions autonomi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ repository = "https://github.com/maidsafe/autonomi"
name = "autonomi"
crate-type = ["cdylib", "rlib"]

[[example]]
name = "put_and_dir_upload"
features = ["full"]

[features]
default = ["vault"]
external-signer = ["ant-evm/external-signer"]
Expand Down
4 changes: 2 additions & 2 deletions autonomi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use autonomi::{Bytes, Client, Wallet};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::init().await?;

// Default wallet of testnet.
let key = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80";

let client = Client::connect(&["/ip4/127.0.0.1/udp/1234/quic-v1".parse()?]).await?;
let wallet = Wallet::new_from_private_key(Default::default(), key)?;

// Put and fetch data.
Expand Down
2 changes: 1 addition & 1 deletion autonomi/examples/put_and_dir_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Default wallet of testnet.
let key = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80";

let client = Client::connect(&["/ip4/127.0.0.1/udp/1234/quic-v1".parse()?]).await?;
let client = Client::init_local().await?;
let wallet = Wallet::new_from_private_key(Default::default(), key)?;

// Put and fetch data.
Expand Down
10 changes: 4 additions & 6 deletions autonomi/src/client/files/archive_public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,10 @@ impl Client {
/// # Example
///
/// ```no_run
/// # use autonomi::client::{Client, archive::ArchiveAddr};
/// # use autonomi::{Client, client::files::archive_public::ArchiveAddr};
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let peers = ["/ip4/127.0.0.1/udp/1234/quic-v1".parse()?];
/// let client = Client::connect(&peers).await?;
/// let client = Client::init().await?;
/// let archive = client.archive_get_public(ArchiveAddr::random(&mut rand::thread_rng())).await?;
/// # Ok(())
/// # }
Expand All @@ -139,12 +138,11 @@ impl Client {
/// Create simple archive containing `file.txt` pointing to random XOR name.
///
/// ```no_run
/// # use autonomi::client::{Client, data::DataAddr, archive::{PublicArchive, ArchiveAddr, Metadata}};
/// # use autonomi::{Client, client::{data::DataAddr, files::{archive::Metadata, archive_public::{PublicArchive, ArchiveAddr}}}};
/// # use std::path::PathBuf;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let peers = ["/ip4/127.0.0.1/udp/1234/quic-v1".parse()?];
/// # let client = Client::connect(&peers).await?;
/// # let client = Client::init().await?;
/// # let wallet = todo!();
/// let mut archive = PublicArchive::new();
/// archive.add_file(PathBuf::from("file.txt"), DataAddr::random(&mut rand::thread_rng()), Metadata::new_with_size(0));
Expand Down
136 changes: 127 additions & 9 deletions autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ pub mod wasm;
mod rate_limiter;
mod utils;

use ant_bootstrap::{BootstrapCacheConfig, BootstrapCacheStore};
use ant_bootstrap::{BootstrapCacheConfig, BootstrapCacheStore, PeersArgs};
pub use ant_evm::Amount;

use ant_evm::EvmNetwork;
use ant_networking::{interval, multiaddr_is_global, Network, NetworkBuilder, NetworkEvent};
use ant_protocol::{version::IDENTIFY_PROTOCOL_STR, CLOSE_GROUP_SIZE};
use ant_protocol::version::IDENTIFY_PROTOCOL_STR;
use libp2p::{identity::Keypair, Multiaddr};
use std::{collections::HashSet, sync::Arc, time::Duration};
use tokio::sync::mpsc;
Expand All @@ -49,18 +49,20 @@ pub const CONNECT_TIMEOUT_SECS: u64 = 10;

const CLIENT_EVENT_CHANNEL_SIZE: usize = 100;

/// Represents a connection to the Autonomi network.
// Amount of peers to confirm into our routing table before we consider the client ready.
pub use ant_protocol::CLOSE_GROUP_SIZE;

/// Represents a client for the Autonomi network.
///
/// # Example
///
/// To connect to the network, use [`Client::connect`].
/// To start interacting with the network, use [`Client::init`].
///
/// ```no_run
/// # use autonomi::client::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let peers = ["/ip4/127.0.0.1/udp/1234/quic-v1".parse()?];
/// let client = Client::connect(&peers).await?;
/// let client = Client::init().await?;
/// # Ok(())
/// # }
/// ```
Expand All @@ -71,18 +73,129 @@ pub struct Client {
pub(crate) evm_network: EvmNetwork,
}

/// Configuration for [`Client::init_with_config`].
#[derive(Debug, Clone, Default)]
pub struct ClientConfig {
/// Whether we're expected to connect to a local network.
pub local: bool,

/// List of peers to connect to.
///
/// If not provided, the client will use the default bootstrap peers.
pub peers: Option<Vec<Multiaddr>>,
}

/// Error returned by [`Client::connect`].
#[derive(Debug, thiserror::Error)]
pub enum ConnectError {
/// Did not manage to connect to enough peers in time.
#[error("Could not connect to enough peers in time.")]
/// Did not manage to populate the routing table with enough peers.
#[error("Failed to populate our routing table with enough peers in time")]
TimedOut,

/// Same as [`ConnectError::TimedOut`] but with a list of incompatible protocols.
#[error("Could not connect to peers due to incompatible protocol: {0:?}")]
#[error("Failed to populate our routing table due to incompatible protocol: {0:?}")]
TimedOutWithIncompatibleProtocol(HashSet<String>, String),

/// An error occurred while bootstrapping the client.
#[error("Failed to bootstrap the client")]
Bootstrap(#[from] ant_bootstrap::Error),
}

impl Client {
/// Initialize the client with default configuration.
///
/// See [`Client::init_with_config`].
pub async fn init() -> Result<Self, ConnectError> {
Self::init_with_config(Default::default()).await
}

/// Initialize a client that is configured to be local.
///
/// See [`Client::init_with_config`].
pub async fn init_local() -> Result<Self, ConnectError> {
Self::init_with_config(ClientConfig {
local: true,
..Default::default()
})
.await
}

/// Initialize a client that bootstraps from a list of peers.
///
/// If any of the provided peers is a global address, the client will not be local.
///
/// ```no_run
/// # use autonomi::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// // Will set `local` to true.
/// let client = Client::init_with_peers(vec!["/ip4/127.0.0.1/udp/1234/quic-v1".parse()?]).await?;
/// # Ok(())
/// # }
/// ```
pub async fn init_with_peers(peers: Vec<Multiaddr>) -> Result<Self, ConnectError> {
// Any global address makes the client non-local
let local = !peers.iter().any(multiaddr_is_global);

Self::init_with_config(ClientConfig {
local,
peers: Some(peers),
})
.await
}

/// Initialize the client with the given configuration.
///
/// This will block until [`CLOSE_GROUP_SIZE`] have been added to the routing table.
///
/// See [`ClientConfig`].
///
/// ```no_run
/// use autonomi::client::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let client = Client::init_with_config(Default::default()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn init_with_config(config: ClientConfig) -> Result<Self, ConnectError> {
let (network, event_receiver) = build_client_and_run_swarm(config.local);

let peers_args = PeersArgs {
disable_mainnet_contacts: config.local,
addrs: config.peers.unwrap_or_default(),
..Default::default()
};

let peers = match peers_args.get_addrs(None, None).await {
Ok(peers) => peers,
Err(e) => return Err(e.into()),
};

let network_clone = network.clone();
let peers = peers.to_vec();
let _handle = ant_networking::target_arch::spawn(async move {
for addr in peers {
if let Err(err) = network_clone.dial(addr.clone()).await {
error!("Failed to dial addr={addr} with err: {err:?}");
eprintln!("addr={addr} Failed to dial: {err:?}");
};
}
});

// Wait until we have added a few peers to our routing table.
let (sender, receiver) = futures::channel::oneshot::channel();
ant_networking::target_arch::spawn(handle_event_receiver(event_receiver, sender));
receiver.await.expect("sender should not close")?;
debug!("Enough peers were added to our routing table, initialization complete");

Ok(Self {
network,
client_event_sender: Arc::new(None),
evm_network: Default::default(),
})
}

/// Connect to the network.
///
/// This will timeout after [`CONNECT_TIMEOUT_SECS`] secs.
Expand All @@ -92,10 +205,15 @@ impl Client {
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let peers = ["/ip4/127.0.0.1/udp/1234/quic-v1".parse()?];
/// #[allow(deprecated)]
/// let client = Client::connect(&peers).await?;
/// # Ok(())
/// # }
/// ```
#[deprecated(
since = "0.2.4",
note = "Use [`Client::init`] or [`Client::init_with_config`] instead"
)]
pub async fn connect(peers: &[Multiaddr]) -> Result<Self, ConnectError> {
// Any global address makes the client non-local
let local = !peers.iter().any(multiaddr_is_global);
Expand Down
2 changes: 1 addition & 1 deletion autonomi/src/client/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl JsClient {
.map(|peer| peer.parse())
.collect::<Result<Vec<Multiaddr>, _>>()?;

let client = super::Client::connect(&peers).await?;
let client = super::Client::init_with_peers(peers).await?;

Ok(JsClient(client))
}
Expand Down
8 changes: 4 additions & 4 deletions autonomi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
//!
//! # Example
//!
//! ```rust
//! ```no_run
//! use autonomi::{Bytes, Client, Wallet};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let client = Client::connect(&["/ip4/127.0.0.1/udp/1234/quic-v1".parse()?]).await?;
//! let client = Client::init().await?;
//!
//! // Default wallet of testnet.
//! let key = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80";
Expand All @@ -26,7 +26,7 @@
//! let _data_fetched = client.data_get_public(data_addr).await?;
//!
//! // Put and fetch directory from local file system.
//! let dir_addr = client.dir_upload_public("files/to/upload".into(), &wallet).await?;
//! let dir_addr = client.dir_and_archive_upload_public("files/to/upload".into(), &wallet).await?;
//! client.dir_download_public(dir_addr, "files/downloaded".into()).await?;
//!
//! Ok(())
Expand Down Expand Up @@ -76,7 +76,7 @@ pub use bytes::Bytes;
pub use libp2p::Multiaddr;

#[doc(inline)]
pub use client::{files::archive::PrivateArchive, Client};
pub use client::{files::archive::PrivateArchive, Client, ClientConfig};

#[cfg(feature = "extension-module")]
mod python;
8 changes: 5 additions & 3 deletions autonomi/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ impl PyClient {
pyo3::exceptions::PyValueError::new_err(format!("Invalid multiaddr: {e}"))
})?;

let client = rt.block_on(RustClient::connect(&peers)).map_err(|e| {
pyo3::exceptions::PyValueError::new_err(format!("Failed to connect: {e}"))
})?;
let client = rt
.block_on(RustClient::init_with_peers(peers))
.map_err(|e| {
pyo3::exceptions::PyValueError::new_err(format!("Failed to connect: {e}"))
})?;

Ok(Self { inner: client })
}
Expand Down
2 changes: 1 addition & 1 deletion autonomi/tests/evm/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod test {
let _log_appender_guard =
ant_logging::LogBuilder::init_single_threaded_tokio_test("file", false);

let mut client = Client::connect(&[]).await.unwrap();
let mut client = Client::init_local().await?;
let mut wallet = get_funded_wallet();

// let data = common::gen_random_data(1024 * 1024 * 1000);
Expand Down
4 changes: 2 additions & 2 deletions autonomi/tests/external_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use bytes::Bytes;
use std::collections::BTreeMap;
use std::time::Duration;
use test_utils::evm::get_funded_wallet;
use test_utils::{gen_random_data, peers_from_env};
use test_utils::gen_random_data;
use tokio::time::sleep;
use xor_name::XorName;

Expand Down Expand Up @@ -103,7 +103,7 @@ async fn external_signer_put() -> eyre::Result<()> {
let _log_appender_guard =
LogBuilder::init_single_threaded_tokio_test("external_signer_put", false);

let client = Client::connect(&peers_from_env()?).await?;
let client = Client::init_local().await?;
let wallet = get_funded_wallet();
let data = gen_random_data(1024 * 1024 * 10);

Expand Down
Loading

0 comments on commit 7309291

Please sign in to comment.