diff --git a/Cargo.lock b/Cargo.lock index 15d81a9e..c41070da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4837,6 +4837,7 @@ dependencies = [ "bincode2", "bollard", "clap", + "color-eyre", "dashmap 6.1.0", "ed25519-zebra 4.0.3", "eigensdk", diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index a1504c60..e322d637 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -100,6 +100,7 @@ sysinfo = { workspace = true } dashmap = { workspace = true } lazy_static = "1.5.0" bincode2 = { workspace = true } +color-eyre = { workspace = true } [target.'cfg(not(target_family = "wasm"))'.dependencies.libp2p] diff --git a/sdk/src/config/gadget_config.rs b/sdk/src/config/gadget_config.rs index 5cc4203a..84b597d6 100644 --- a/sdk/src/config/gadget_config.rs +++ b/sdk/src/config/gadget_config.rs @@ -4,6 +4,7 @@ use crate::keystore::backend::GenericKeyStore; use crate::keystore::BackendExt; #[cfg(any(feature = "std", feature = "wasm"))] use crate::keystore::TanglePairSigner; +use crate::network::setup::NetworkConfig; use crate::utils::test_utils::get_client; use alloc::string::{String, ToString}; use core::fmt::Debug; @@ -129,6 +130,32 @@ impl Default for GadgetConfiguration { } impl GadgetConfiguration { + /// Returns a libp2p-friendly identity keypair. + pub fn libp2p_identity(&self) -> Result { + let ed25519 = *self.first_ed25519_signer()?.signer(); + let keypair = libp2p::identity::Keypair::ed25519_from_bytes(ed25519.seed()) + .map_err(|err| Error::ConfigurationError(err.to_string()))?; + Ok(keypair) + } + + /// Returns a new `NetworkConfig` for the current environment. + pub fn libp2p_network_config>( + &self, + network_name: T, + ) -> Result { + let network_identity = self.libp2p_identity()?; + + let my_ecdsa_key = self.first_ecdsa_signer()?; + let network_config = NetworkConfig::new_service_network( + network_identity, + my_ecdsa_key.signer().clone(), + self.bootnodes.clone(), + self.target_port, + network_name, + ); + + Ok(network_config) + } /// Loads the `KeyStore` from the current environment. /// /// # Errors diff --git a/sdk/src/config/mod.rs b/sdk/src/config/mod.rs index 2fd3fd3f..88d40f68 100644 --- a/sdk/src/config/mod.rs +++ b/sdk/src/config/mod.rs @@ -68,6 +68,8 @@ pub enum Error { MissingSymbioticContractAddresses, #[error("Bad RPC Connection: {0}")] BadRpcConnection(String), + #[error("Configuration error: {0}")] + ConfigurationError(String), } /// Loads the [`GadgetConfiguration`] from the current environment. diff --git a/sdk/src/error.rs b/sdk/src/error.rs index d5a58a42..c61c3c9e 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -70,6 +70,9 @@ pub enum Error { #[error("Bad argument decoding for {0}")] BadArgumentDecoding(String), + #[error("Color Eyre error: {0}")] + Generic(#[from] color_eyre::Report), + #[error("Other error: {0}")] Other(String), } diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index cd732a24..7feadd15 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -80,6 +80,7 @@ pub use parking_lot; pub use subxt_core; pub use tangle_subxt; pub use tokio; +pub use tracing; pub use uuid; // External modules usually used in proc-macro codegen. diff --git a/sdk/src/logging.rs b/sdk/src/logging.rs index c8bdc64f..309a0cc8 100644 --- a/sdk/src/logging.rs +++ b/sdk/src/logging.rs @@ -4,7 +4,7 @@ #[macro_export] macro_rules! trace { ($($tt:tt)*) => { - tracing::trace!(target: "gadget", $($tt)*) + $crate::tracing::trace!(target: "gadget", $($tt)*) } } @@ -14,7 +14,7 @@ macro_rules! trace { #[macro_export] macro_rules! debug { ($($tt:tt)*) => { - tracing::debug!(target: "gadget", $($tt)*) + $crate::tracing::debug!(target: "gadget", $($tt)*) } } @@ -24,7 +24,7 @@ macro_rules! debug { #[macro_export] macro_rules! error { ($($tt:tt)*) => { - tracing::error!(target: "gadget", $($tt)*) + $crate::tracing::error!(target: "gadget", $($tt)*) } } @@ -34,7 +34,7 @@ macro_rules! error { #[macro_export] macro_rules! warn { ($($tt:tt)*) => { - tracing::warn!(target: "gadget", $($tt)*) + $crate::tracing::warn!(target: "gadget", $($tt)*) } } @@ -44,7 +44,7 @@ macro_rules! warn { #[macro_export] macro_rules! info { ($($tt:tt)*) => { - tracing::info!(target: "gadget", $($tt)*) + $crate::tracing::info!(target: "gadget", $($tt)*) } } diff --git a/sdk/src/network/mod.rs b/sdk/src/network/mod.rs index b54fed0d..55d9a671 100644 --- a/sdk/src/network/mod.rs +++ b/sdk/src/network/mod.rs @@ -19,6 +19,7 @@ pub mod handlers; #[cfg(target_family = "wasm")] pub mod matchbox; pub mod messaging; +pub mod round_based_compat; pub mod setup; #[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)] diff --git a/sdk/src/network/round_based_compat.rs b/sdk/src/network/round_based_compat.rs new file mode 100644 index 00000000..d2872f19 --- /dev/null +++ b/sdk/src/network/round_based_compat.rs @@ -0,0 +1,221 @@ +use core::pin::Pin; +use core::sync::atomic::AtomicU64; +use core::task::{ready, Context, Poll}; +use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::sync::Arc; + +use crate::futures::prelude::*; +use crate::network::{self, IdentifierInfo, Network, NetworkMultiplexer, StreamKey, SubNetwork}; +use crate::subxt_core::ext::sp_core::ecdsa; +use round_based::{Delivery, Incoming, Outgoing}; +use round_based::{MessageDestination, MessageType, MsgId, PartyIndex}; +use stream::{SplitSink, SplitStream}; + +pub struct NetworkDeliveryWrapper { + /// The wrapped network implementation. + network: NetworkWrapper, +} + +impl NetworkDeliveryWrapper +where + N: Network + Unpin, + M: Clone + Send + Unpin + 'static, + M: serde::Serialize, + M: serde::de::DeserializeOwned, +{ + /// Create a new NetworkDeliveryWrapper over a network implementation with the given party index. + pub fn new( + network: N, + i: PartyIndex, + task_hash: [u8; 32], + parties: BTreeMap, + ) -> Self { + let mux = NetworkMultiplexer::new(network); + // By default, we create 4 substreams for each party. + let sub_streams = (1..5) + .map(|i| { + let key = StreamKey { + // This is a dummy task hash, it should be replaced with the actual task hash + task_hash: [0u8; 32], + round_id: i, + }; + let substream = mux.multiplex(key); + (key, substream) + }) + .collect(); + let network = NetworkWrapper { + me: i, + mux, + incoming_queue: VecDeque::new(), + outgoing_queue: VecDeque::new(), + sub_streams, + participants: parties, + task_hash, + next_msg_id: Arc::new(NextMessageId::default()), + _network: core::marker::PhantomData, + }; + NetworkDeliveryWrapper { network } + } +} + +/// A NetworkWrapper wraps a network implementation and implements [`Stream`] and [`Sink`] for +/// it. +pub struct NetworkWrapper { + /// The current party index. + me: PartyIndex, + /// Our network Multiplexer. + mux: NetworkMultiplexer, + /// A Map of substreams for each round. + sub_streams: HashMap, + /// A queue of incoming messages. + incoming_queue: VecDeque>, + /// A queue of outgoing messages. + outgoing_queue: VecDeque>, + /// Participants in the network with their corresponding ECDSA public keys. + // Note: This is a BTreeMap to ensure that the participants are sorted by their party index. + participants: BTreeMap, + next_msg_id: Arc, + task_hash: [u8; 32], + _network: core::marker::PhantomData, +} + +impl Delivery for NetworkDeliveryWrapper +where + N: Network + Unpin, + M: Clone + Send + Unpin + 'static, + M: serde::Serialize + serde::de::DeserializeOwned, + M: round_based::ProtocolMessage, +{ + type Send = SplitSink, Outgoing>; + type Receive = SplitStream>; + type SendError = crate::Error; + type ReceiveError = crate::Error; + + fn split(self) -> (Self::Receive, Self::Send) { + let (sink, stream) = self.network.split(); + (stream, sink) + } +} + +impl Stream for NetworkWrapper +where + N: Network + Unpin, + M: serde::de::DeserializeOwned + Unpin, + M: round_based::ProtocolMessage, +{ + type Item = Result, crate::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let sub_streams = self.sub_streams.values(); + // pull all substreams + let mut messages = Vec::new(); + for sub_stream in sub_streams { + let p = sub_stream.next_message().poll_unpin(cx); + let m = match p { + Poll::Ready(Some(msg)) => msg, + _ => continue, + }; + let msg = network::deserialize::(&m.payload)?; + messages.push((m.sender.user_id, m.recipient, msg)); + } + + // Sort the incoming messages by round. + messages.sort_by_key(|(_, _, msg)| msg.round()); + + let this = self.get_mut(); + // Push all messages to the incoming queue + messages + .into_iter() + .map(|(sender, recipient, msg)| Incoming { + id: this.next_msg_id.next(), + sender, + msg_type: match recipient { + Some(_) => MessageType::P2P, + None => MessageType::Broadcast, + }, + msg, + }) + .for_each(|m| this.incoming_queue.push_back(m)); + // Reorder the incoming queue by round message. + let maybe_msg = this.incoming_queue.pop_front(); + if let Some(msg) = maybe_msg { + Poll::Ready(Some(Ok(msg))) + } else { + // No message in the queue, and no message in the substreams. + // Tell the network to wake us up when a new message arrives. + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +impl Sink> for NetworkWrapper +where + N: Network + Unpin, + M: Unpin + serde::Serialize, + M: round_based::ProtocolMessage, +{ + type Error = crate::Error; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, msg: Outgoing) -> Result<(), Self::Error> { + self.get_mut().outgoing_queue.push_back(msg); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // Dequeue all messages and send them one by one to the network + let this = self.get_mut(); + while let Some(out) = this.outgoing_queue.pop_front() { + // Get the substream to send the message to. + let key = StreamKey { + task_hash: this.task_hash, + round_id: i32::from(out.msg.round()), + }; + let substream = this + .sub_streams + .entry(key) + .or_insert_with(|| this.mux.multiplex(key)); + let identifier_info = IdentifierInfo { + block_id: None, + session_id: None, + retry_id: None, + task_id: None, + }; + let (to, to_network_id) = match out.recipient { + MessageDestination::AllParties => (None, None), + MessageDestination::OneParty(p) => (Some(p), this.participants.get(&p).cloned()), + }; + let protocol_message = N::build_protocol_message( + identifier_info, + this.me, + to, + &out.msg, + this.participants.get(&this.me).cloned(), + to_network_id, + ); + let p = substream.send_message(protocol_message).poll_unpin(cx); + match ready!(p) { + Ok(()) => continue, + Err(e) => return Poll::Ready(Err(e)), + } + } + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } +} + +#[derive(Default)] +struct NextMessageId(AtomicU64); + +impl NextMessageId { + pub fn next(&self) -> MsgId { + self.0.fetch_add(1, core::sync::atomic::Ordering::Relaxed) + } +} diff --git a/sdk/src/utils/hashing.rs b/sdk/src/utils/hashing.rs new file mode 100644 index 00000000..6c16ea2d --- /dev/null +++ b/sdk/src/utils/hashing.rs @@ -0,0 +1,14 @@ +#[macro_export] +macro_rules! compute_sha256_hash { + ($($data:expr),*) => { + { + use k256::sha2::{Digest, Sha256}; + let mut hasher = Sha256::default(); + $(hasher.update($data);)* + let result = hasher.finalize(); + let mut hash = [0u8; 32]; + hash.copy_from_slice(result.as_slice()); + hash + } + }; +} diff --git a/sdk/src/utils/mod.rs b/sdk/src/utils/mod.rs index 92ee7bea..d453fc49 100644 --- a/sdk/src/utils/mod.rs +++ b/sdk/src/utils/mod.rs @@ -1,2 +1,3 @@ pub mod evm; +pub mod hashing; pub mod test_utils;