diff --git a/Cargo.lock b/Cargo.lock index 55ab3bf4..bbb3aa76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5004,6 +5004,7 @@ dependencies = [ "http-body-util", "hyper 1.5.1", "hyper-util", + "indexmap 2.7.0", "itertools 0.13.0", "k256", "lazy_static", diff --git a/blueprint-manager/src/executor/event_handler.rs b/blueprint-manager/src/executor/event_handler.rs index 77d9c163..82ea8fd6 100644 --- a/blueprint-manager/src/executor/event_handler.rs +++ b/blueprint-manager/src/executor/event_handler.rs @@ -172,9 +172,9 @@ pub(crate) async fn handle_tangle_event( poll_result: EventPollResult, client: &ServicesClient, ) -> color_eyre::Result<()> { - info!("Received notification {}", event.number); + trace!("Received notification {}", event.number); const DEFAULT_PROTOCOL: Protocol = Protocol::Tangle; - warn!("Using Tangle protocol as default over Eigen. This is a temporary development workaround. You can alter this behavior here"); + trace!("[WARNING] Using Tangle protocol as default over Eigen. This is a temporary development workaround. You can alter this behavior here"); // const DEFAULT_PROTOCOL: Protocol = Protocol::Eigenlayer; // warn!("Using Eigen protocol as default over Tangle. This is a temporary development workaround. You can alter this behavior here"); @@ -321,7 +321,7 @@ pub(crate) async fn handle_tangle_event( // Loop through every (blueprint_id, service_id) running. See if the service is still on-chain. If not, kill it and add it to to_remove for (blueprint_id, process_handles) in &mut *active_gadgets { for service_id in process_handles.keys() { - info!( + trace!( "Checking service for on-chain termination: bid={blueprint_id}//sid={service_id}" ); diff --git a/blueprint-test-utils/src/tangle/transactions.rs b/blueprint-test-utils/src/tangle/transactions.rs index 76a710c1..17e5a572 100644 --- a/blueprint-test-utils/src/tangle/transactions.rs +++ b/blueprint-test-utils/src/tangle/transactions.rs @@ -217,7 +217,7 @@ pub async fn wait_for_completion_of_tangle_job( while let Some(Ok(block)) = blocks.next().await { let events = block.events().await?; let results = events.find::().collect::>(); - info!( + gadget_sdk::debug!( %service_id, %call_id, %required_count, diff --git a/macros/context-derive/src/lib.rs b/macros/context-derive/src/lib.rs index 5a2f0e0b..c2019fb0 100644 --- a/macros/context-derive/src/lib.rs +++ b/macros/context-derive/src/lib.rs @@ -19,12 +19,10 @@ mod eigenlayer; mod evm; /// Keystore context extension implementation. mod keystore; -/// MPC context extension implementation. -mod mpc; /// Services context extension implementation. mod services; /// Tangle Subxt Client context extension implementation. -mod subxt; +mod tangle; const CONFIG_TAG_NAME: &str = "config"; const CONFIG_TAG_TYPE: &str = "gadget_sdk::config::GadgetConfiguration"; @@ -75,7 +73,7 @@ pub fn derive_tangle_client_context(input: TokenStream) -> TokenStream { Ok((res, call_id_field)) }) .map(|(config_field, call_id_field)| { - subxt::generate_context_impl(input, config_field, call_id_field) + tangle::generate_context_impl(input, config_field, call_id_field) }); match result { @@ -111,17 +109,3 @@ pub fn derive_eigenlayer_context(input: TokenStream) -> TokenStream { Err(err) => TokenStream::from(err.to_compile_error()), } } - -/// Derive macro for generating Context Extensions trait implementation for `MPCContext`. -#[proc_macro_derive(MPCContext, attributes(config))] -pub fn derive_mpc_context(input: TokenStream) -> TokenStream { - let input = syn::parse_macro_input!(input as syn::DeriveInput); - let result = - cfg::find_config_field(&input.ident, &input.data, CONFIG_TAG_NAME, CONFIG_TAG_TYPE) - .map(|config_field| mpc::generate_context_impl(input, config_field)); - - match result { - Ok(expanded) => TokenStream::from(expanded), - Err(err) => TokenStream::from(err.to_compile_error()), - } -} diff --git a/macros/context-derive/src/mpc.rs b/macros/context-derive/src/mpc.rs deleted file mode 100644 index f0636193..00000000 --- a/macros/context-derive/src/mpc.rs +++ /dev/null @@ -1,158 +0,0 @@ -use quote::quote; -use syn::DeriveInput; - -use crate::cfg::FieldInfo; - -/// Generate the `MPCContext` implementation for the given struct. -#[allow(clippy::too_many_lines)] -pub fn generate_context_impl( - DeriveInput { - ident: name, - generics, - .. - }: DeriveInput, - config_field: FieldInfo, -) -> proc_macro2::TokenStream { - let _field_access = match config_field { - FieldInfo::Named(ident) => quote! { self.#ident }, - FieldInfo::Unnamed(index) => quote! { self.#index }, - }; - - let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); - - quote! { - #[gadget_sdk::async_trait::async_trait] - impl #impl_generics gadget_sdk::contexts::MPCContext for #name #ty_generics #where_clause { - /// Returns a reference to the configuration - #[inline] - fn config(&self) -> &gadget_sdk::config::StdGadgetConfiguration { - &self.config - } - - /// Returns the network protocol identifier for this context - #[inline] - fn network_protocol(&self) -> String { - let name = stringify!(#name).to_string(); - format!("/{}/1.0.0", name.to_lowercase()) - } - - fn create_network_delivery_wrapper( - &self, - mux: std::sync::Arc, - party_index: gadget_sdk::round_based::PartyIndex, - task_hash: [u8; 32], - parties: std::collections::BTreeMap, - ) -> Result, gadget_sdk::Error> - where - M: Clone + Send + Unpin + 'static + gadget_sdk::serde::Serialize + gadget_sdk::serde::de::DeserializeOwned + gadget_sdk::round_based::ProtocolMessage, - { - Ok(gadget_sdk::network::round_based_compat::NetworkDeliveryWrapper::new(mux, party_index, task_hash, parties)) - } - - async fn get_party_index( - &self, - ) -> Result { - Ok(self.get_party_index_and_operators().await?.0 as _) - } - - async fn get_participants( - &self, - client: &gadget_sdk::ext::subxt::OnlineClient, - ) -> Result< - std::collections::BTreeMap, - gadget_sdk::Error, - > { - Ok(self.get_party_index_and_operators().await?.1.into_iter().enumerate().map(|(i, (id, _))| (i as _, id)).collect()) - } - - /// Retrieves the current blueprint ID from the configuration - /// - /// # Errors - /// Returns an error if the blueprint ID is not found in the configuration - fn blueprint_id(&self) -> gadget_sdk::color_eyre::Result { - self.config() - .protocol_specific - .tangle() - .map(|c| c.blueprint_id) - .map_err(|err| gadget_sdk::color_eyre::Report::msg("Blueprint ID not found in configuration: {err}")) - } - - /// Retrieves the current party index and operator mapping - /// - /// # Errors - /// Returns an error if: - /// - Failed to retrieve operator keys - /// - Current party is not found in the operator list - async fn get_party_index_and_operators( - &self, - ) -> gadget_sdk::color_eyre::Result<(usize, std::collections::BTreeMap)> { - let parties = self.current_service_operators_ecdsa_keys().await?; - let my_id = self.config.first_sr25519_signer()?.account_id(); - - gadget_sdk::trace!( - "Looking for {my_id:?} in parties: {:?}", - parties.keys().collect::>() - ); - - let index_of_my_id = parties - .iter() - .position(|(id, _)| id == &my_id) - .ok_or_else(|| gadget_sdk::color_eyre::Report::msg("Party not found in operator list"))?; - - Ok((index_of_my_id, parties)) - } - - /// Retrieves the ECDSA keys for all current service operators - /// - /// # Errors - /// Returns an error if: - /// - Failed to connect to the Tangle client - /// - Failed to retrieve operator information - /// - Missing ECDSA key for any operator - async fn current_service_operators_ecdsa_keys( - &self, - ) -> gadget_sdk::color_eyre::Result> { - let client = self.tangle_client().await?; - let current_blueprint = self.blueprint_id()?; - let current_service_op = self.current_service_operators(&client).await?; - let storage = client.storage().at_latest().await?; - - let mut map = std::collections::BTreeMap::new(); - for (operator, _) in current_service_op { - let addr = gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::storage() - .services() - .operators(current_blueprint, &operator); - - let maybe_pref = storage.fetch(&addr).await.map_err(|err| { - gadget_sdk::color_eyre::Report::msg("Failed to fetch operator storage for {operator}: {err}") - })?; - - if let Some(pref) = maybe_pref { - map.insert(operator, gadget_sdk::subxt_core::ext::sp_core::ecdsa::Public(pref.key)); - } else { - return Err(gadget_sdk::color_eyre::Report::msg("Missing ECDSA key for operator {operator}")); - } - } - - Ok(map) - } - - /// Retrieves the current call ID for this job - /// - /// # Errors - /// Returns an error if failed to retrieve the call ID from storage - async fn current_call_id(&self) -> gadget_sdk::color_eyre::Result { - let client = self.tangle_client().await?; - let addr = gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::storage().services().next_job_call_id(); - let storage = client.storage().at_latest().await?; - - let maybe_call_id = storage - .fetch_or_default(&addr) - .await - .map_err(|err| gadget_sdk::color_eyre::Report::msg("Failed to fetch current call ID: {err}"))?; - - Ok(maybe_call_id.saturating_sub(1)) - } - } - } -} diff --git a/macros/context-derive/src/services.rs b/macros/context-derive/src/services.rs index 18022087..5e338abc 100644 --- a/macros/context-derive/src/services.rs +++ b/macros/context-derive/src/services.rs @@ -21,289 +21,14 @@ pub fn generate_context_impl( let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); quote! { + #[gadget_sdk::async_trait::async_trait] impl #impl_generics gadget_sdk::contexts::ServicesContext for #name #ty_generics #where_clause { type Config = gadget_sdk::ext::subxt::PolkadotConfig; - fn current_blueprint( - &self, - client: &gadget_sdk::ext::subxt::OnlineClient, - ) -> impl core::future::Future< - Output = Result< - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::ServiceBlueprint, - gadget_sdk::ext::subxt::Error - >> { - use gadget_sdk::ext::subxt; - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api; - async move { - let blueprint_id = match #field_access.protocol_specific { - gadget_sdk::config::ProtocolSpecificSettings::Tangle(settings) => { - settings.blueprint_id - } - _ => { - return Err(subxt::Error::Other( - "Blueprint id is only available for Tangle protocol".to_string(), - )) - } - }; - let blueprint = api::storage().services().blueprints(blueprint_id); - let storage = client.storage().at_latest().await?; - let result = storage.fetch(&blueprint).await?; - match result { - Some((_, blueprint)) => Ok(blueprint), - None => Err(subxt::Error::Other(format!( - "Blueprint with id {blueprint_id} not found" - ))), - } - } - } - - fn current_blueprint_owner( - &self, - client: &gadget_sdk::ext::subxt::OnlineClient, - ) -> impl core::future::Future> { - use gadget_sdk::ext::subxt; - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api; - async move { - let blueprint_id = match #field_access.protocol_specific { - gadget_sdk::config::ProtocolSpecificSettings::Tangle(settings) => { - settings.blueprint_id - } - _ => { - return Err(subxt::Error::Other( - "Blueprint id is only available for Tangle protocol".to_string(), - )) - } - }; - let blueprint = api::storage().services().blueprints(blueprint_id); - let storage = client.storage().at_latest().await?; - let result = storage.fetch(&blueprint).await?; - match result { - Some((account_id, _)) => Ok(account_id), - None => Err(subxt::Error::Other(format!( - "Blueprint with id {blueprint_id} not found" - ))), - } - } - } - - fn current_service_operators( - &self, - client: &gadget_sdk::ext::subxt::OnlineClient, - ) -> impl core::future::Future< - Output = Result< - Vec<( - gadget_sdk::ext::subxt::utils::AccountId32, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::sp_arithmetic::per_things::Percent, - )>, - gadget_sdk::ext::subxt::Error - > - > { - use gadget_sdk::ext::subxt; - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api; - - async move { - let service_instance_id = match #field_access.protocol_specific { - gadget_sdk::config::ProtocolSpecificSettings::Tangle(settings) => { - settings.service_id - } - _ => { - return Err(subxt::Error::Other( - "Service instance id is only available for Tangle protocol".to_string(), - )) - } - }; - let service_id = match service_instance_id { - Some(service_instance_id) => service_instance_id, - None => { - return Err(subxt::Error::Other( - "Service instance id is not set. Running in Registration mode?".to_string(), - )) - } - }; - let service_instance = api::storage().services().instances(service_id); - let storage = client.storage().at_latest().await?; - let result = storage.fetch(&service_instance).await?; - match result { - Some(instance) => Ok(instance.operators.0), - None => Err(subxt::Error::Other(format!( - "Service instance {service_id} is not created, yet" - ))), - } - } - } - - fn operators_metadata( - &self, - client: &gadget_sdk::ext::subxt::OnlineClient, - operators: Vec, - ) -> impl core::future::Future< - Output = Result< - Vec<( - gadget_sdk::ext::subxt::utils::AccountId32, - gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::operator::OperatorMetadata< - gadget_sdk::ext::subxt::utils::AccountId32, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegations, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxOperatorBlueprints, - > - )>, - gadget_sdk::ext::subxt::Error - > - > { - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api; - - async move { - let storage = client.storage().at_latest().await?; - let mut operator_metadata = Vec::new(); - - for operator in operators { - let metadata_storage_key = api::storage() - .multi_asset_delegation() - .operators(operator.clone()); - let operator_metadata_result = storage.fetch(&metadata_storage_key).await?; - if let Some(metadata) = operator_metadata_result { - operator_metadata.push((operator, metadata)); - } - } - - Ok(operator_metadata) - } - } - - async fn operator_metadata( - &self, - client: &gadget_sdk::ext::subxt::OnlineClient, - operator: gadget_sdk::ext::subxt::utils::AccountId32, - ) -> Result< - Option< - gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::operator::OperatorMetadata< - gadget_sdk::ext::subxt::utils::AccountId32, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegations, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxOperatorBlueprints, - > - >, - gadget_sdk::ext::subxt::Error, - > { - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api; - - let storage = client.storage().at_latest().await?; - let metadata_storage_key = api::storage().multi_asset_delegation().operators(operator); - storage.fetch(&metadata_storage_key).await - } - - async fn operator_delegations( - &self, - client: &gadget_sdk::ext::subxt::OnlineClient, - operators: Vec, - ) -> Result< - Vec<( - gadget_sdk::ext::subxt::utils::AccountId32, - Option< - gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::delegator::DelegatorMetadata< - gadget_sdk::ext::subxt::utils::AccountId32, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxWithdrawRequests, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegations, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxUnstakeRequests, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegatorBlueprints, - > - > - )>, - gadget_sdk::ext::subxt::Error, - > { - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api; - use gadget_sdk::ext::subxt::utils::AccountId32; - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::force_created::AssetId; - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance; - use gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::delegator::DelegatorMetadata; - - let storage = client.storage().at_latest().await?; - let mut operator_delegations = Vec::new(); - - for operator in operators { - let delegations_storage_key = api::storage() - .multi_asset_delegation() - .delegators(operator.clone()); - let delegations_result = storage.fetch(&delegations_storage_key).await?; - - operator_delegations.push((operator, delegations_result)) - } - - Ok(operator_delegations) - } - - async fn operator_delegation( - &self, - client: &gadget_sdk::ext::subxt::OnlineClient, - operator: gadget_sdk::ext::subxt::utils::AccountId32, - ) -> Result< - Option< - gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::delegator::DelegatorMetadata< - gadget_sdk::ext::subxt::utils::AccountId32, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxWithdrawRequests, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegations, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxUnstakeRequests, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegatorBlueprints, - > - >, - gadget_sdk::ext::subxt::Error, - > { - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api; - - let storage = client.storage().at_latest().await?; - let delegations_storage_key = api::storage().multi_asset_delegation().delegators(operator); - let delegations_result = storage.fetch(&delegations_storage_key).await?; - - Ok(delegations_result) - } - - async fn service_instance( - &self, - client: &gadget_sdk::ext::subxt::OnlineClient, - ) -> Result< - gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::Service< - gadget_sdk::ext::subxt::utils::AccountId32, - gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::system::storage::types::number::Number, - gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, - >, - gadget_sdk::ext::subxt::Error, - >{ - use gadget_sdk::ext::subxt; - use gadget_sdk::ext::tangle_subxt::tangle_testnet_runtime::api; - - let service_instance_id = match &#field_access.protocol_specific { - gadget_sdk::config::ProtocolSpecificSettings::Tangle(settings) => { - settings.service_id - } - _ => { - return Err(subxt::Error::Other( - "Service instance id is only available for Tangle protocol".to_string(), - )) - } - }; - let service_id = match service_instance_id { - Some(service_instance_id) => service_instance_id, - None => { - return Err(subxt::Error::Other( - "Service instance id is not set. Running in Registration mode?".to_string(), - )) - } - }; - let service_instance = api::storage().services().instances(service_id); - let storage = client.storage().at_latest().await?; - let result = storage.fetch(&service_instance).await?; - match result { - Some(instance) => Ok(instance), - None => Err(subxt::Error::Other(format!( - "Service instance {service_id} is not created, yet" - ))), - } + /// Returns a reference to the configuration + #[inline] + fn config(&self) -> &gadget_sdk::config::StdGadgetConfiguration { + &#field_access } } } diff --git a/macros/context-derive/src/subxt.rs b/macros/context-derive/src/tangle.rs similarity index 90% rename from macros/context-derive/src/subxt.rs rename to macros/context-derive/src/tangle.rs index ffbc1e2f..7f139c5b 100644 --- a/macros/context-derive/src/subxt.rs +++ b/macros/context-derive/src/tangle.rs @@ -26,6 +26,7 @@ pub fn generate_context_impl( let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); quote! { + #[gadget_sdk::async_trait::async_trait] impl #impl_generics gadget_sdk::contexts::TangleClientContext for #name #ty_generics #where_clause { type Config = gadget_sdk::ext::subxt::PolkadotConfig; @@ -33,12 +34,11 @@ pub fn generate_context_impl( &mut #field_access_call_id } - fn tangle_client(&self) -> impl core::future::Future, gadget_sdk::ext::subxt::Error>> { + async fn tangle_client(&self) -> Result, gadget_sdk::ext::subxt::Error> { use gadget_sdk::ext::subxt; type Config = subxt::PolkadotConfig; static CLIENT: std::sync::OnceLock> = std::sync::OnceLock::new(); - async { match CLIENT.get() { Some(client) => Ok(client.clone()), None => { @@ -52,7 +52,6 @@ pub fn generate_context_impl( }) } } - } } } } diff --git a/macros/context-derive/tests/ui/basic.rs b/macros/context-derive/tests/ui/basic.rs index 89d1dda3..226aa7f0 100644 --- a/macros/context-derive/tests/ui/basic.rs +++ b/macros/context-derive/tests/ui/basic.rs @@ -1,19 +1,20 @@ use gadget_sdk::async_trait::async_trait; use gadget_sdk::config::{GadgetConfiguration, StdGadgetConfiguration}; +use gadget_sdk::contexts::GossipNetworkContext; use gadget_sdk::contexts::{ - EVMProviderContext, KeystoreContext, MPCContext, ServicesContext, TangleClientContext, + EVMProviderContext, KeystoreContext, ServicesContext, ServicesWithClientExt, + TangleClientContext, }; use gadget_sdk::network::{Network, NetworkMultiplexer, ProtocolMessage}; use gadget_sdk::store::LocalDatabase; use gadget_sdk::subxt_core::ext::sp_core::ecdsa::Public; -use gadget_sdk::subxt_core::tx::signer::Signer; use gadget_sdk::Error; use round_based::ProtocolMessage as RoundBasedProtocolMessage; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; -#[derive(KeystoreContext, EVMProviderContext, TangleClientContext, ServicesContext, MPCContext)] +#[derive(KeystoreContext, EVMProviderContext, TangleClientContext, ServicesContext)] #[allow(dead_code)] struct MyContext { foo: String, @@ -42,7 +43,6 @@ fn main() { // Test MPC context utility functions let _config = ctx.config(); - let _protocol = ctx.network_protocol(); // Test MPC context functions @@ -52,6 +52,8 @@ fn main() { let mut parties = BTreeMap::::new(); parties.insert(0, Public([0u8; 33])); + let mpc = ctx.participants(); + // Test network delivery wrapper creation let _network_wrapper = ctx.create_network_delivery_wrapper::( mux.clone(), @@ -61,22 +63,22 @@ fn main() { ); // Test party index retrieval - let _party_idx = ctx.get_party_index().await; + let _party_idx = mpc.my_index().await; // Test participants retrieval - let _participants = ctx.get_participants(&tangle_client).await; + let _participants = mpc.get_participants().await; // Test blueprint ID retrieval let _blueprint_id = ctx.blueprint_id(); // Test party index and operators retrieval - let _party_idx_ops = ctx.get_party_index_and_operators().await; + let _party_idx_ops = mpc.get_party_index_and_operators().await; // Test service operators ECDSA keys retrieval - let _operator_keys = ctx.current_service_operators_ecdsa_keys().await; + let _operator_keys = mpc.current_service_operators_ecdsa_keys().await; - // Test current call ID retrieval - let _call_id = ctx.current_call_id().await; + // Test current call ID retrieval (will only have a value for a live chain) + let _call_id = ctx.call_id.unwrap_or_default(); }; drop(body); diff --git a/macros/context-derive/tests/ui/generic_struct.rs b/macros/context-derive/tests/ui/generic_struct.rs index 6ee6f46b..dadb54d3 100644 --- a/macros/context-derive/tests/ui/generic_struct.rs +++ b/macros/context-derive/tests/ui/generic_struct.rs @@ -5,7 +5,7 @@ use gadget_sdk::contexts::{ #[derive(KeystoreContext, EVMProviderContext, TangleClientContext, ServicesContext)] #[allow(dead_code)] -struct MyContext { +struct MyContext { foo: T, bar: U, #[config] diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 72eaa8b5..e836748e 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -30,6 +30,8 @@ url = { workspace = true, features = ["serde"] } uuid = { workspace = true } failure = { workspace = true } num-bigint = { workspace = true } +indexmap = { workspace = true } + # Keystore deps ed25519-zebra = { workspace = true } k256 = { workspace = true, features = ["ecdsa", "ecdsa-core", "arithmetic"] } diff --git a/sdk/src/contexts/gossip_network.rs b/sdk/src/contexts/gossip_network.rs index e048a3a3..c1601c2a 100644 --- a/sdk/src/contexts/gossip_network.rs +++ b/sdk/src/contexts/gossip_network.rs @@ -1,5 +1,20 @@ +use crate::contexts::ServicesContext; + /// `GossipNetworkContext` trait provides access to the network client from the context. pub trait GossipNetworkContext { - /// Get the Goossip client from the context. - fn gossip_network(&self) -> &crate::network::gossip::GossipHandle; + /// Creates a network delivery wrapper for MPC communication + fn create_network_delivery_wrapper( + &self, + mux: std::sync::Arc, + party_index: crate::round_based::PartyIndex, + task_hash: [u8; 32], + parties: std::collections::BTreeMap, + ) -> Result, crate::Error> + where + M: Clone + Send + Unpin + 'static + crate::serde::Serialize + crate::serde::de::DeserializeOwned + crate::round_based::ProtocolMessage, + { + Ok(crate::network::round_based_compat::NetworkDeliveryWrapper::new(mux, party_index, task_hash, parties)) + } } + +impl GossipNetworkContext for T {} \ No newline at end of file diff --git a/sdk/src/contexts/mod.rs b/sdk/src/contexts/mod.rs index 73f01c5f..3fe5a5fa 100644 --- a/sdk/src/contexts/mod.rs +++ b/sdk/src/contexts/mod.rs @@ -66,6 +66,5 @@ create_module_derive!(eigenlayer); create_module_derive!(evm_provider); create_module_derive!(gossip_network); create_module_derive!(keystore); -create_module_derive!(mpc); create_module_derive!(services); create_module_derive!(tangle_client); diff --git a/sdk/src/contexts/mpc.rs b/sdk/src/contexts/mpc.rs deleted file mode 100644 index 73eb0dd2..00000000 --- a/sdk/src/contexts/mpc.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::network::NetworkMultiplexer; -use round_based::PartyIndex; -use std::collections::BTreeMap; -use std::sync::Arc; -use subxt_core::utils::AccountId32; - -/// `MPCContext` trait provides access to MPC (Multi-Party Computation) functionality from the context. -#[async_trait::async_trait] -pub trait MPCContext { - /// Returns a reference to the configuration - fn config(&self) -> &crate::config::StdGadgetConfiguration; - - /// Returns the network protocol identifier - fn network_protocol(&self) -> String; - - /// Creates a network delivery wrapper for MPC communication - fn create_network_delivery_wrapper( - &self, - mux: Arc, - party_index: PartyIndex, - task_hash: [u8; 32], - parties: BTreeMap, - ) -> color_eyre::Result< - crate::network::round_based_compat::NetworkDeliveryWrapper, - crate::Error, - > - where - M: Clone - + Send - + Unpin - + 'static - + serde::Serialize - + serde::de::DeserializeOwned - + round_based::ProtocolMessage; - - /// Gets the party index from the participants map - async fn get_party_index(&self) -> color_eyre::Result; - - /// Gets the participants in the MPC protocol - async fn get_participants( - &self, - client: &subxt::OnlineClient, - ) -> color_eyre::Result, crate::Error>; - - /// Gets the current blueprint ID - fn blueprint_id(&self) -> color_eyre::Result; - - /// Gets the party index and operator mapping - async fn get_party_index_and_operators( - &self, - ) -> color_eyre::Result<( - usize, - BTreeMap, - )>; - - /// Gets the ECDSA keys for all current service operators - async fn current_service_operators_ecdsa_keys( - &self, - ) -> color_eyre::Result>; - - /// Gets the current call ID for this job - async fn current_call_id(&self) -> color_eyre::Result; -} diff --git a/sdk/src/contexts/services.rs b/sdk/src/contexts/services.rs index 6b06d4f5..06acd0da 100644 --- a/sdk/src/contexts/services.rs +++ b/sdk/src/contexts/services.rs @@ -1,136 +1,438 @@ -use std::future::Future; -use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::{Service, ServiceBlueprint}; -use tangle_subxt::tangle_testnet_runtime::api::runtime_types::sp_arithmetic::per_things::Percent; -use tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::operator::OperatorMetadata; -use tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance; -use tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId; -use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::{MaxDelegations, MaxDelegatorBlueprints, MaxOperatorBlueprints, MaxUnstakeRequests, MaxWithdrawRequests}; -use tangle_subxt::tangle_testnet_runtime::api::system::storage::types::number::Number; -use tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::delegator::DelegatorMetadata; +use core::fmt::{Debug, Formatter}; +use async_trait::async_trait; +use std::collections::BTreeMap; +use subxt_core::utils::AccountId32; +use std::ops::Deref; +use subxt_core::tx::signer::Signer; +use crate::clients::tangle::runtime::TangleConfig; +use crate::contexts::TangleClientContext; /// `ServicesContext` trait provides access to the current service and current blueprint from the context. #[allow(clippy::type_complexity)] +#[async_trait] pub trait ServicesContext { type Config: subxt::Config; - /// Get the current blueprint information from the context. - fn current_blueprint( + + /// Returns a reference to the configuration + fn config(&self) -> &crate::config::StdGadgetConfiguration; + + async fn current_blueprint( &self, - client: &subxt::OnlineClient, - ) -> impl Future>; + client: &crate::ext::subxt::OnlineClient, + ) -> Result< + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::ServiceBlueprint, + crate::ext::subxt::Error + > { + use crate::ext::subxt; + use crate::ext::tangle_subxt::tangle_testnet_runtime::api; - /// Query the current blueprint owner from the context. - fn current_blueprint_owner( + let blueprint_id = self.blueprint_id()?; + let blueprint = api::storage().services().blueprints(blueprint_id); + let storage = client.storage().at_latest().await?; + let result = storage.fetch(&blueprint).await?; + match result { + Some((_, blueprint)) => Ok(blueprint), + None => Err(subxt::Error::Other(format!( + "Blueprint with id {blueprint_id} not found" + ))), + } + } + + async fn current_blueprint_owner( &self, - client: &subxt::OnlineClient, - ) -> impl Future>; + client: &crate::ext::subxt::OnlineClient, + ) -> Result { + use crate::ext::subxt; + use crate::ext::tangle_subxt::tangle_testnet_runtime::api; + let blueprint_id = self.blueprint_id()?; + let blueprint = api::storage().services().blueprints(blueprint_id); + let storage = client.storage().at_latest().await?; + let result = storage.fetch(&blueprint).await?; + match result { + Some((account_id, _)) => Ok(account_id), + None => Err(subxt::Error::Other(format!( + "Blueprint with id {blueprint_id} not found" + ))), + } + } - /// Get the current service operators with their restake exposure from the context. - /// This function will return a list of service operators that are selected to run this service - /// instance. - fn current_service_operators( + async fn current_service_operators( &self, - client: &subxt::OnlineClient, - ) -> impl Future, subxt::Error>>; + client: &crate::ext::subxt::OnlineClient, + ) -> Result< + Vec<( + crate::ext::subxt::utils::AccountId32, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::sp_arithmetic::per_things::Percent, + )>, + crate::ext::subxt::Error + > { + use crate::ext::subxt; + use crate::ext::tangle_subxt::tangle_testnet_runtime::api; + + let service_id = self.service_id()?; + let service_instance = api::storage().services().instances(service_id); + let storage = client.storage().at_latest().await?; + let result = storage.fetch(&service_instance).await?; + match result { + Some(instance) => { + let mut ret = instance.operators.0; + ret.sort_by(|a, b| a.0.cmp(&b.0)); + Ok(ret) + }, + None => Err(subxt::Error::Other(format!( + "Service instance {service_id} is not created, yet" + ))), + } + } - #[allow(clippy::type_complexity)] - /// Get metadata for a list of operators from the context. - fn operators_metadata( + async fn operators_metadata( &self, - client: &subxt::OnlineClient, - operators: Vec, - ) -> impl Future< - Output = color_eyre::Result< - Vec<( - subxt::utils::AccountId32, - OperatorMetadata< - subxt::utils::AccountId32, - Balance, - AssetId, - MaxDelegations, - MaxOperatorBlueprints, - >, - )>, - subxt::Error, - >, - >; + client: &crate::ext::subxt::OnlineClient, + operators: Vec, + ) -> Result< + Vec<( + crate::ext::subxt::utils::AccountId32, + crate::tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::operator::OperatorMetadata< + crate::ext::subxt::utils::AccountId32, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegations, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxOperatorBlueprints, + > + )>, + crate::ext::subxt::Error + > { + use crate::ext::tangle_subxt::tangle_testnet_runtime::api; + + let storage = client.storage().at_latest().await?; + let mut operator_metadata = Vec::new(); + + for operator in operators { + let metadata_storage_key = api::storage() + .multi_asset_delegation() + .operators(operator.clone()); + let operator_metadata_result = storage.fetch(&metadata_storage_key).await?; + if let Some(metadata) = operator_metadata_result { + operator_metadata.push((operator, metadata)); + } + } - /// Get metadata for a single operator from the context. - /// This function will return the metadata for a single operator. - fn operator_metadata( + Ok(operator_metadata) + } + + async fn operator_metadata( &self, - client: &subxt::OnlineClient, - operator: subxt::utils::AccountId32, - ) -> impl Future< - Output = color_eyre::Result< - Option< - OperatorMetadata< - subxt::utils::AccountId32, - Balance, - AssetId, - MaxDelegations, - MaxOperatorBlueprints, - >, - >, - subxt::Error, + client: &crate::ext::subxt::OnlineClient, + operator: crate::ext::subxt::utils::AccountId32, + ) -> Result< + Option< + crate::tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::operator::OperatorMetadata< + crate::ext::subxt::utils::AccountId32, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegations, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxOperatorBlueprints, + > >, - >; + crate::ext::subxt::Error, + > { + use crate::ext::tangle_subxt::tangle_testnet_runtime::api; - /// Get the current service instance from the context. - fn service_instance( + let storage = client.storage().at_latest().await?; + let metadata_storage_key = api::storage().multi_asset_delegation().operators(operator); + storage.fetch(&metadata_storage_key).await + } + + async fn operator_delegations( &self, - client: &subxt::OnlineClient, - ) -> impl Future< - Output = color_eyre::Result< - Service, - subxt::Error, - >, - >; + client: &crate::ext::subxt::OnlineClient, + operators: Vec, + ) -> Result< + Vec<( + crate::ext::subxt::utils::AccountId32, + Option< + crate::tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::delegator::DelegatorMetadata< + crate::ext::subxt::utils::AccountId32, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxWithdrawRequests, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegations, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxUnstakeRequests, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegatorBlueprints, + > + > + )>, + crate::ext::subxt::Error, + > { + use crate::ext::tangle_subxt::tangle_testnet_runtime::api; + let storage = client.storage().at_latest().await?; + let mut operator_delegations = Vec::new(); + + for operator in operators { + let delegations_storage_key = api::storage() + .multi_asset_delegation() + .delegators(operator.clone()); + let delegations_result = storage.fetch(&delegations_storage_key).await?; + + operator_delegations.push((operator, delegations_result)) + } - #[allow(clippy::type_complexity)] - /// Get delegations for a list of operators from the context. - fn operator_delegations( + Ok(operator_delegations) + } + + async fn operator_delegation( &self, - client: &subxt::OnlineClient, - operators: Vec, - ) -> impl Future< - Output = color_eyre::Result< - Vec<( - subxt::utils::AccountId32, // operator - Option< - DelegatorMetadata< - subxt::utils::AccountId32, - AssetId, - Balance, - MaxWithdrawRequests, - MaxDelegations, - MaxUnstakeRequests, - MaxDelegatorBlueprints, - >, - >, - )>, - subxt::Error, + client: &crate::ext::subxt::OnlineClient, + operator: crate::ext::subxt::utils::AccountId32, + ) -> Result< + Option< + crate::tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::delegator::DelegatorMetadata< + crate::ext::subxt::utils::AccountId32, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::burned::Balance, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxWithdrawRequests, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegations, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxUnstakeRequests, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_testnet_runtime::MaxDelegatorBlueprints, + > >, - >; + crate::ext::subxt::Error, + > { + use crate::ext::tangle_subxt::tangle_testnet_runtime::api; + + let storage = client.storage().at_latest().await?; + let delegations_storage_key = api::storage().multi_asset_delegation().delegators(operator); + let delegations_result = storage.fetch(&delegations_storage_key).await?; - /// Get delegations for a single operator from the context. - fn operator_delegation( + Ok(delegations_result) + } + + async fn service_instance( &self, - client: &subxt::OnlineClient, - operator: subxt::utils::AccountId32, - ) -> impl Future< - Output = color_eyre::Result< - Option< - DelegatorMetadata< - subxt::utils::AccountId32, - AssetId, - Balance, - MaxWithdrawRequests, - MaxDelegations, - MaxUnstakeRequests, - MaxDelegatorBlueprints, - >, - >, - subxt::Error, + client: &crate::ext::subxt::OnlineClient, + ) -> Result< + crate::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::Service< + crate::ext::subxt::utils::AccountId32, + crate::tangle_subxt::tangle_testnet_runtime::api::system::storage::types::number::Number, + crate::ext::tangle_subxt::tangle_testnet_runtime::api::assets::events::accounts_destroyed::AssetId, >, - >; + crate::ext::subxt::Error, + >{ + use crate::ext::subxt; + use crate::ext::tangle_subxt::tangle_testnet_runtime::api; + + let service_id = self.service_id()?; + let service_instance = api::storage().services().instances(service_id); + let storage = client.storage().at_latest().await?; + let result = storage.fetch(&service_instance).await?; + match result { + Some(instance) => Ok(instance), + None => Err(subxt::Error::Other(format!( + "Service instance {service_id} is not created, yet" + ))), + } + } + + /// Retrieves the current blueprint ID from the configuration + /// + /// # Errors + /// Returns an error if the blueprint ID is not found in the configuration + fn blueprint_id(&self) -> Result { + self.config() + .protocol_specific + .tangle() + .map(|c| c.blueprint_id) + .map_err(|err| crate::subxt::Error::from(format!("Blueprint ID not found in configuration: {err}"))) + } + + fn service_id(&self) -> Result { + let service_instance_id = match &self.config().protocol_specific { + crate::config::ProtocolSpecificSettings::Tangle(settings) => { + settings.service_id + } + _ => { + return Err(subxt::Error::Other( + "Service instance id is only available for Tangle protocol".to_string(), + )) + } + }; + + match service_instance_id { + Some(service_instance_id) => Ok(service_instance_id), + None => { + Err(subxt::Error::Other( + "Service instance id is not set. Running in Registration mode?".to_string(), + )) + } + } + } +} + +#[derive(Copy, Clone)] +/// A client that contains functionality useful protocols that require party information. Separated from the ServicesContext to organize and have a struct. +/// +/// ```ignore, no_run +/// #[derive(ServicesContext, TangleClientContext)] +/// struct MyContext { +/// ... +/// } +/// +/// let ctx = MyContext { ... }; +/// let my_party_index = ctx.participants().get_party_index().await?; +/// ``` +pub struct ParticipantsClient<'a, ClientConfig: subxt::Config = TangleConfig> { + client: &'a dyn ServicesWithClientExt +} + +impl Debug for ParticipantsClient<'_, ClientConfig> { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + let blueprint_id = self.blueprint_id().unwrap_or_default(); + let service_id = self.service_id().unwrap_or_default(); + f.debug_struct("ParticipantsClient") + .field("blueprint_id", &blueprint_id) + .field("service_id", &service_id) + .finish() + } +} + +impl Deref for ParticipantsClient<'_, ClientConfig> { + type Target = dyn ServicesWithClientExt; + + fn deref(&self) -> &Self::Target { + self.client + } +} + +/// Extension trait for any context that implements both ServicesContext and TangleClientContext +pub trait ServicesWithClientExt: Send + Sync + 'static + ServicesContext + TangleClientContext { + fn participants(&self) -> ParticipantsClient where Self: Sized { + ParticipantsClient { client: self } + } } + +// Automatically implement the extension trait for any context that implements both ServicesContext and TangleClientContext +impl + TangleClientContext> ServicesWithClientExt for T {} + +impl ParticipantsClient<'_, ClientConfig> { + /// Retrieves the current party index + pub async fn my_index( + &self, + ) -> Result { + Ok(self.get_party_index_and_operators().await?.0 as _) + } + + /// Retrieves the current participants in the MPC protocol + pub async fn get_participants( + &self, + ) -> Result< + std::collections::BTreeMap, + crate::Error, + > { + Ok(self.get_party_index_and_operators().await?.1.into_iter().enumerate().map(|(i, (id, _))| (i as _, id)).collect()) + } + + /// Retrieves the ECDSA keys for all current service operators + /// + /// # Errors + /// Returns an error if: + /// - Failed to connect to the Tangle client + /// - Failed to retrieve operator information + /// - Missing ECDSA key for any operator + pub async fn current_service_operators_ecdsa_keys( + &self, + ) -> color_eyre::Result> { + let client = self.tangle_client().await?; + let current_blueprint = self.blueprint_id()?; + let current_service_op = self.current_service_operators(&client).await?; + let storage = client.storage().at_latest().await?; + + let mut map = std::collections::BTreeMap::new(); + for (operator, _) in current_service_op { + let addr = crate::ext::tangle_subxt::tangle_testnet_runtime::api::storage() + .services() + .operators(current_blueprint, &operator); + + let maybe_pref = storage.fetch(&addr).await.map_err(|err| { + crate::color_eyre::Report::msg(format!("Failed to fetch operator storage for {operator}: {err}")) + })?; + + if let Some(pref) = maybe_pref { + let _ = map.insert(operator, crate::subxt_core::ext::sp_core::ecdsa::Public(pref.key)); + } else { + return Err(crate::color_eyre::Report::msg(format!("Missing ECDSA key for operator {operator}"))); + } + } + + Ok(map) + } + + /// Retrieves the current party index and operator mapping (ECDSA) + /// + /// # Errors + /// Returns an error if: + /// - Failed to retrieve operator keys + /// - Current party is not found in the operator list + pub async fn get_party_index_and_operators( + &self, + ) -> crate::color_eyre::Result<(usize, std::collections::BTreeMap)> { + let parties = self.current_service_operators_ecdsa_keys().await?; + let my_id = self.config().first_sr25519_signer()?.account_id(); + + crate::trace!( + "Looking for {my_id:?} in parties: {:?}", + parties.keys().collect::>() + ); + + let index_of_my_id = parties + .iter() + .position(|(id, _)| id == &my_id) + .ok_or_else(|| crate::color_eyre::Report::msg("Party not found in operator list"))?; + + Ok((index_of_my_id, parties)) + } + + /// Retrieves the ECDSA keys for all current service operators (SR25519) + /// + /// # Errors + /// Returns an error if: + /// - Failed to connect to the Tangle client + /// - Failed to retrieve operator information + /// - Missing ECDSA key for any operator + pub async fn current_service_operators_sr25519( + &self, + ) -> color_eyre::Result> { + let client = self.tangle_client().await?; + let current_service_op = self.current_service_operators(&client).await?; + + let mut map = std::collections::BTreeMap::new(); + for (operator, _) in current_service_op { + let _ = map.insert(operator.clone(), crate::subxt_core::ext::sp_core::sr25519::Public(operator.0)); + } + + Ok(map) + } + + /// + /// # Errors + /// Returns an error if: + /// - Failed to retrieve operator keys + /// - Current party is not found in the operator list + pub async fn get_party_index_and_operators_sr25519( + &self, + ) -> crate::color_eyre::Result<(usize, std::collections::BTreeMap)> { + let parties = self.current_service_operators_sr25519().await?; + let my_id = self.config().first_sr25519_signer()?.account_id(); + + crate::trace!( + "Looking for {my_id:?} in parties: {:?}", + parties.keys().collect::>() + ); + + let index_of_my_id = parties + .iter() + .position(|(id, _)| id == &my_id) + .ok_or_else(|| crate::color_eyre::Report::msg("Party not found in operator list"))?; + + Ok((index_of_my_id, parties)) + } +} \ No newline at end of file diff --git a/sdk/src/contexts/tangle_client.rs b/sdk/src/contexts/tangle_client.rs index 9f1f888b..10aaa18c 100644 --- a/sdk/src/contexts/tangle_client.rs +++ b/sdk/src/contexts/tangle_client.rs @@ -1,12 +1,13 @@ -use std::future::Future; +use async_trait::async_trait; /// `TangleClientContext` trait provides access to the Tangle client from the context. +#[async_trait] pub trait TangleClientContext { type Config: subxt::Config; /// Get the Tangle client from the context. - fn tangle_client( + async fn tangle_client( &self, - ) -> impl Future, subxt::Error>>; + ) -> color_eyre::Result, subxt::Error>; fn get_call_id(&mut self) -> &mut Option; diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 2f78b2e4..4fc576d8 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -8,6 +8,7 @@ #![cfg_attr(all(not(feature = "std"), not(feature = "wasm")), no_std)] extern crate alloc; +extern crate core; /// Benchmark Module #[cfg(any(feature = "std", feature = "wasm"))] diff --git a/sdk/src/network/mod.rs b/sdk/src/network/mod.rs index 7066af2e..dcf18377 100644 --- a/sdk/src/network/mod.rs +++ b/sdk/src/network/mod.rs @@ -526,11 +526,13 @@ pub fn serialize(object: &impl Serialize) -> Result, serde_json::Error> #[cfg(test)] mod tests { use super::*; + use crate::logging::setup_log; use futures::{stream, StreamExt}; use gossip::GossipHandle; use serde::{Deserialize, Serialize}; use sp_core::Pair; use std::collections::BTreeMap; + use std::future::Future; const TOPIC: &str = "/gadget/test/1.0.0"; @@ -564,24 +566,6 @@ mod tests { const NODE_COUNT: u16 = 10; - pub fn setup_log() { - use tracing_subscriber::util::SubscriberInitExt; - let env_filter = tracing_subscriber::EnvFilter::from_default_env() - .add_directive("tokio=off".parse().unwrap()) - .add_directive("hyper=off".parse().unwrap()) - .add_directive("gadget=debug".parse().unwrap()); - - let _ = tracing_subscriber::fmt::SubscriberBuilder::default() - .compact() - .without_time() - .with_span_events(tracing_subscriber::fmt::format::FmtSpan::NONE) - .with_target(false) - .with_env_filter(env_filter) - .with_test_writer() - .finish() - .try_init(); - } - async fn wait_for_nodes_connected(nodes: &[GossipHandle]) { let node_count = nodes.len(); @@ -949,82 +933,108 @@ mod tests { tokio::try_join!(handle0, handle1).unwrap(); } - #[tokio::test(flavor = "multi_thread")] - async fn test_nested_multiplexer() { - setup_log(); - crate::info!("Starting test_nested_multiplexer"); + async fn get_networks() -> (GossipHandle, GossipHandle) { let network0 = node(); let network1 = node(); let mut networks = vec![network0, network1]; wait_for_nodes_connected(&networks).await; + (networks.remove(0), networks.remove(0)) + } - let (network0, network1) = (networks.remove(0), networks.remove(0)); + #[tokio::test(flavor = "multi_thread")] + async fn test_nested_multiplexer_no_delay() { + setup_log(); + crate::info!("Starting test_nested_multiplexer"); + let (network0, network1) = get_networks().await; + nested_multiplex(0, 10, network0, network1, false).await; + } - async fn nested_multiplex( - cur_depth: usize, - max_depth: usize, - network0: N, - network1: N, - ) { - crate::info!("At nested depth = {cur_depth}/{max_depth}"); + #[tokio::test(flavor = "multi_thread")] + /// This test more accurately emulates real use cases since + /// nodes will join at different and nondeterministic times. + /// This test should thus cover the enqueuing capacity of the multiplexer. + async fn test_nested_multiplexer_staggered_start() { + setup_log(); + crate::info!("Starting test_nested_multiplexer"); + let (network0, network1) = get_networks().await; + nested_multiplex(0, 10, network0, network1, true).await; + } - if cur_depth == max_depth { - return; - } + async fn nested_multiplex( + cur_depth: usize, + max_depth: usize, + network0: N, + network1: N, + rand_init_delay: bool, + ) { + crate::info!("At nested depth = {cur_depth}/{max_depth}"); + + if cur_depth == max_depth { + return; + } - let multiplexer0 = NetworkMultiplexer::new(network0); - let multiplexer1 = NetworkMultiplexer::new(network1); + let stream_key = StreamKey { + task_hash: sha2_256(&[(cur_depth % 255) as u8]), + round_id: 0, + }; - let stream_key = StreamKey { - task_hash: sha2_256(&[(cur_depth % 255) as u8]), - round_id: 0, - }; + let initial_delay_0 = if cur_depth % 2 == 0 && rand_init_delay { + rand::random::() % 100 + } else { + 0 + }; - let subnetwork0 = multiplexer0.multiplex(stream_key); - let subnetwork1 = multiplexer1.multiplex(stream_key); + let initial_delay_1 = if cur_depth % 2 != 0 && rand_init_delay { + rand::random::() + } else { + 0 + }; - // Send a message in the subnetwork0 to subnetwork1 and vice versa, assert values of message - let payload = vec![1, 2, 3]; - let msg = GossipHandle::build_protocol_message( - IdentifierInfo::default(), - 0, - Some(1), - &payload, - None, - None, - ); + let task_gen = + |network, i, i_peer, rand_delay| -> Pin>> { + Box::pin(async move { + crate::info!("Peer {i} at layer {cur_depth}/{max_depth}"); + if rand_delay != 0 { + tokio::time::sleep(tokio::time::Duration::from_millis(rand_delay)).await; + } - subnetwork0.send(msg.clone()).unwrap(); + let multiplexer = NetworkMultiplexer::new(network); + let subnetwork = multiplexer.multiplex(stream_key); - let received_msg = subnetwork1.recv().await.unwrap(); - assert_eq!(received_msg.payload, msg.payload); + // Send a message in the subnetwork0 to subnetwork1 and vice versa, assert values of message + let payload = vec![1, 2, 3]; + let msg = GossipHandle::build_protocol_message( + IdentifierInfo::default(), + i, + Some(i_peer), + &payload, + None, + None, + ); - let msg = GossipHandle::build_protocol_message( - IdentifierInfo::default(), - 1, - Some(0), - &payload, - None, - None, - ); + subnetwork.send(msg.clone()).unwrap(); + let received_msg = subnetwork.recv().await.unwrap(); + assert_eq!(received_msg.payload, msg.payload); + subnetwork + }) + }; - subnetwork1.send(msg.clone()).unwrap(); + let task_0 = task_gen(network0, 0, 1, initial_delay_0); + let task_1 = task_gen(network1, 1, 0, initial_delay_1); - let received_msg = subnetwork0.recv().await.unwrap(); - assert_eq!(received_msg.payload, msg.payload); - tracing::info!("Done nested depth = {cur_depth}/{max_depth}"); + let (subnetwork0, subnetwork1) = tokio::join!(task_0, task_1); - Box::pin(nested_multiplex( - cur_depth + 1, - max_depth, - subnetwork0, - subnetwork1, - )) - .await - } + crate::info!("Done nested depth = {cur_depth}/{max_depth}"); - nested_multiplex(0, 10, network0, network1).await; + Box::pin(nested_multiplex( + cur_depth + 1, + max_depth, + subnetwork0, + subnetwork1, + rand_init_delay, + )) + .await } }