From 5c887fc8c2c038e629683e41ebdafc15aea528f5 Mon Sep 17 00:00:00 2001 From: drewstone Date: Fri, 25 Oct 2024 04:41:05 -0400 Subject: [PATCH] Leverage blueprint in incredible squaring aggregator (#365) * feat: add eigenlayer context * merge: combine context and proc macro tests * fix: removing unused code and fixing test * fix: move static vars to the lib * fix: move static vars to the lib * chore: debug * fix: lint and fmt to keep things clean * chore: add logging for debugging * fix: identified issue * fix: start on proc macro for abi grabbing * fix: compiles and deploys tasks, event listener not firing * chore: separate event listener files out * fix: spelling and blocks to mine * fix: more logging * chore: remove logs, evm event listener is broke * fix: remove one loop from event listening flow, polling seems to work, event catching not * Fix: Got back to original signing bug * Fix: Fixed address change - back to signature invalid revert * Fix: Correctly pulls task index from events * fix: use EL convert to g1/g2 point * feat: use aggregator and separate out from the blueprint job * fix: updates using aggregator * fix: solve registry error in aggregator * fix: prevent duplicates * fix: got test working * fix: fmt and clippy cleanup * feat: added test in eigen IS blueprint, cleaned up testing code for it * feat: convert aggregator process_new_jobs to blueprint * Use incredible squaring aggregator (#364) * chore: bump rustdoc-types (#348) Also pin to the latest nightly * fix: load substrate signer from env correctly (#349) * fix: add `data_dir` back to `GadgetConfiguration` (#350) It was (mistakenly?) removed in #333. Went ahead and added a default path for blueprint manager as well. * Multi job runner + SDK main macro (#346) * feat: use aggregator and separate out from the blueprint job * fix: uncomment generate_json (#358) * fix: updates using aggregator * fix: solve registry error in aggregator * fix: prevent duplicates * fix: got test working * fix: fmt and clippy cleanup * feat(cli): support custom repo and path sources (#360) * feat: added test in eigen IS blueprint, cleaned up testing code for it * Event Workflows (phase 1: Custom listeners) (#359) * Add wrapper types + trait structure to enforce structure for event flows * Decoupling/refactor and ensure integration test passes * periodic web poller working and further refactor of macro code * Everything compiling, integration test passes * fix(sdk)!: downgrade substrate dependencies for now It's currently impossible to use some APIs downstream, since they expose `sp_core` (v0.34.0) types instead of the `sdk::keystore::sp_core_subxt` (v0.31.0) types. Cargo will refuse to build blueprints using them. I just got rid of the `sdk::keystore::sp_core_subxt` hack and downgraded all of the dependencies necessary (I think). This won't be an issue once #318 is taken care of. * feat(sdk): re-export `libp2p` --------- Co-authored-by: Alex <69764315+Serial-ATA@users.noreply.github.com> Co-authored-by: Thomas Braun <38082993+tbraun96@users.noreply.github.com> Co-authored-by: Tjemmmic * fix: merge fix-el branch * feat: use aggregator blueprint * fix: remappings out of date * fix: remove eigenlayer-middleware from tangle blueprint git modules * chore: close handles once test completes, rpc server still not shutting down * fix: remove broken invalid test cases * fix: proc macro doc fixes to avoid more maintenance * chore: merge main * fix: move helpers to separate file * fix: moving helpers along w/ eigenlayer context * fix: get both blueprints running * fix: expose addresses better * feat: test exits successfully and all pieces work seamlessly * fix: add helper * fix: clippy and fmt * fix: context derive test * fix: context derive test --------- Co-authored-by: Tjemmmic Co-authored-by: Alex <69764315+Serial-ATA@users.noreply.github.com> Co-authored-by: Thomas Braun <38082993+tbraun96@users.noreply.github.com> --- Cargo.lock | 56 +-- Cargo.toml | 2 - blueprint-manager/src/executor/mod.rs | 3 +- blueprint-manager/src/sdk/utils.rs | 48 -- blueprint-manager/src/sources/mod.rs | 12 +- blueprint-test-utils/Cargo.toml | 4 +- blueprint-test-utils/src/helpers.rs | 468 ++++++++++++++++++ blueprint-test-utils/src/lib.rs | 428 ++++------------ blueprint-test-utils/src/test_ext.rs | 11 +- blueprints/ecdsa-threshold-mpc/src/main.rs | 2 +- .../incredible-squaring-eigenlayer/Cargo.toml | 4 +- .../aggregator/Cargo.toml | 61 --- .../aggregator/src/aggregator.rs | 379 -------------- .../aggregator/src/lib.rs | 12 - .../contracts/lib/forge-std | 2 +- .../src/constants.rs | 6 +- .../src/contexts/aggregator.rs | 419 ++++++++++++++++ .../src/{ => contexts}/client.rs | 0 .../src/contexts/mod.rs | 2 + .../src/jobs/compute_x_square.rs | 109 ++++ .../src/jobs/initialize_task.rs | 72 +++ .../src/jobs/mod.rs | 2 + .../incredible-squaring-eigenlayer/src/lib.rs | 130 +---- .../src/main.rs | 25 +- .../src/runner.rs | 94 +++- .../contracts/lib/forge-std | 2 +- cli/src/deploy.rs | 32 +- cli/src/main.rs | 14 +- gadget-io/src/imp/standard/shell.rs | 23 +- macros/context-derive/src/eigenlayer.rs | 132 ++--- macros/context-derive/src/evm.rs | 2 +- macros/context-derive/src/subxt.rs | 2 +- rust-toolchain.toml | 2 +- sdk/Cargo.toml | 1 + sdk/src/config.rs | 75 ++- sdk/src/ctx.rs | 42 ++ sdk/src/events_watcher/evm.rs | 24 +- 37 files changed, 1529 insertions(+), 1173 deletions(-) create mode 100644 blueprint-test-utils/src/helpers.rs delete mode 100644 blueprints/incredible-squaring-eigenlayer/aggregator/Cargo.toml delete mode 100644 blueprints/incredible-squaring-eigenlayer/aggregator/src/aggregator.rs delete mode 100644 blueprints/incredible-squaring-eigenlayer/aggregator/src/lib.rs create mode 100644 blueprints/incredible-squaring-eigenlayer/src/contexts/aggregator.rs rename blueprints/incredible-squaring-eigenlayer/src/{ => contexts}/client.rs (100%) create mode 100644 blueprints/incredible-squaring-eigenlayer/src/contexts/mod.rs create mode 100644 blueprints/incredible-squaring-eigenlayer/src/jobs/compute_x_square.rs create mode 100644 blueprints/incredible-squaring-eigenlayer/src/jobs/initialize_task.rs create mode 100644 blueprints/incredible-squaring-eigenlayer/src/jobs/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 64e288ce..6f521011 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1992,7 +1992,6 @@ dependencies = [ "futures", "gadget-io", "gadget-sdk", - "incredible-squaring-aggregator", "libp2p", "log", "parking_lot 0.12.3", @@ -2002,6 +2001,7 @@ dependencies = [ "sp-io", "subxt", "testcontainers", + "thiserror", "tokio", "tokio-util 0.7.12", "tracing", @@ -4774,6 +4774,7 @@ dependencies = [ "alloy-signer-local", "alloy-sol-types 0.7.7", "alloy-transport 0.1.4", + "alloy-transport-http 0.1.4", "ark-bn254", "ark-ec", "ark-ff 0.4.2", @@ -5923,55 +5924,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "incredible-squaring-aggregator" -version = "0.1.1" -dependencies = [ - "alloy-consensus", - "alloy-contract", - "alloy-json-abi", - "alloy-network", - "alloy-primitives 0.7.7", - "alloy-provider", - "alloy-pubsub", - "alloy-rpc-types", - "alloy-rpc-types-eth", - "alloy-signer", - "alloy-signer-local", - "alloy-sol-types 0.7.7", - "alloy-transport 0.1.4", - "alloy-transport-http 0.1.4", - "ark-bn254", - "ark-ec", - "ark-ff 0.4.2", - "async-trait", - "bip39", - "blueprint-metadata", - "color-eyre", - "ed25519-zebra 4.0.3", - "eigensdk", - "futures-util", - "gadget-sdk", - "hex", - "jsonrpc-core", - "jsonrpc-http-server", - "k256", - "lazy_static", - "libp2p", - "lock_api", - "parking_lot 0.12.3", - "serde", - "serde_json", - "sp-core", - "structopt", - "subxt-signer", - "thiserror", - "tokio", - "tokio-util 0.7.12", - "tracing", - "uuid 1.11.0", -] - [[package]] name = "incredible-squaring-blueprint" version = "0.1.1" @@ -6030,7 +5982,8 @@ dependencies = [ "gadget-io", "gadget-sdk", "hex", - "incredible-squaring-aggregator", + "jsonrpc-core", + "jsonrpc-http-server", "k256", "lazy_static", "libp2p", @@ -6042,6 +5995,7 @@ dependencies = [ "sp-core", "structopt", "subxt-signer", + "thiserror", "tokio", "tokio-util 0.7.12", "tracing", diff --git a/Cargo.toml b/Cargo.toml index f38f5ce1..bb32ac46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,6 @@ members = [ "blueprint-metadata", "blueprints/incredible-squaring", "blueprints/incredible-squaring-eigenlayer", - "blueprints/incredible-squaring-eigenlayer/aggregator", "blueprints/periodic-web-poller", "cli", "gadget-io", @@ -49,7 +48,6 @@ blueprint-test-utils = { path = "./blueprint-test-utils" } gadget-sdk = { path = "./sdk", default-features = false, version = "0.2.2" } incredible-squaring-blueprint-eigenlayer = { path = "./blueprints/incredible-squaring-eigenlayer", default-features = false, version = "0.1.1" } -incredible-squaring-aggregator = { path = "./blueprints/incredible-squaring-eigenlayer/aggregator", default-features = false, version = "0.1.1" } periodic-web-poller-blueprint = { path = "./blueprints/periodic-web-poller", default-features = false, version = "0.1.1" } gadget-blueprint-proc-macro = { path = "./macros/blueprint-proc-macro", default-features = false, version = "0.2.2" } gadget-blueprint-proc-macro-core = { path = "./macros/blueprint-proc-macro-core", default-features = false, version = "0.1.4" } diff --git a/blueprint-manager/src/executor/mod.rs b/blueprint-manager/src/executor/mod.rs index ee4f0be8..4b1c7914 100644 --- a/blueprint-manager/src/executor/mod.rs +++ b/blueprint-manager/src/executor/mod.rs @@ -168,7 +168,8 @@ pub async fn run_blueprint_manager>( let sub_account_id = tangle_key.account_id().clone(); let tangle_client = - TangleRuntimeClient::from_url(gadget_config.url.as_str(), sub_account_id.clone()).await?; + TangleRuntimeClient::from_url(gadget_config.ws_rpc_url.as_str(), sub_account_id.clone()) + .await?; let services_client = ServicesClient::new(tangle_client.client()); let mut active_gadgets = HashMap::new(); diff --git a/blueprint-manager/src/sdk/utils.rs b/blueprint-manager/src/sdk/utils.rs index 5d3e6066..00cd019f 100644 --- a/blueprint-manager/src/sdk/utils.rs +++ b/blueprint-manager/src/sdk/utils.rs @@ -1,7 +1,4 @@ -use crate::config::BlueprintManagerConfig; use crate::protocols::resolver::NativeGithubMetadata; -use gadget_io::GadgetConfig; -use gadget_sdk::config::Protocol; use gadget_sdk::{info, warn}; use sha2::Digest; use std::path::Path; @@ -38,51 +35,6 @@ pub fn bounded_string_to_string(string: BoundedString) -> Result color_eyre::Result> { - let mut arguments = vec![]; - arguments.push("run".to_string()); - - if opt.test_mode { - arguments.push("--test-mode=true".to_string()); - } - - for bootnode in &gadget_config.bootnodes { - arguments.push(format!("--bootnodes={}", bootnode)); - } - - arguments.extend([ - format!("--bind-addr={}", gadget_config.bind_addr), - format!("--bind-port={}", gadget_config.bind_port), - format!("--url={}", gadget_config.url), - format!("--keystore-uri={}", gadget_config.keystore_uri), - format!("--chain={}", gadget_config.chain), - format!("--verbose={}", opt.verbose), - format!("--pretty={}", opt.pretty), - format!("--blueprint-id={}", blueprint_id), - format!("--service-id={}", service_id), - format!("--protocol={}", protocol), - format!( - "--log-id=Blueprint-{blueprint_id}-Service-{service_id}-{}", - opt.instance_id.clone().unwrap_or_else(|| format!( - "{}-{}", - gadget_config.bind_addr, gadget_config.bind_port - )) - ), - ]); - - if let Some(keystore_password) = &gadget_config.keystore_password { - arguments.push(format!("--keystore-password={}", keystore_password)); - } - - Ok(arguments) -} - pub fn hash_bytes_to_hex>(input: T) -> String { let mut hasher = sha2::Sha256::default(); hasher.update(input); diff --git a/blueprint-manager/src/sources/mod.rs b/blueprint-manager/src/sources/mod.rs index e57a6de9..3ff66961 100644 --- a/blueprint-manager/src/sources/mod.rs +++ b/blueprint-manager/src/sources/mod.rs @@ -58,7 +58,14 @@ pub async fn handle<'a>( // Add required env vars for all child processes/gadgets let mut env_vars = vec![ - ("RPC_URL".to_string(), gadget_config.url.to_string()), + ( + "HTTP_RPC_URL".to_string(), + gadget_config.http_rpc_url.to_string(), + ), + ( + "WS_RPC_URL".to_string(), + gadget_config.ws_rpc_url.to_string(), + ), ( "KEYSTORE_URI".to_string(), blueprint_manager_opts.keystore_uri.clone(), @@ -141,7 +148,8 @@ pub fn generate_process_arguments( arguments.extend([ format!("--bind-addr={}", gadget_config.bind_addr), format!("--bind-port={}", gadget_config.bind_port), - format!("--url={}", gadget_config.url), + format!("--http-rpc-url={}", gadget_config.http_rpc_url), + format!("--ws-rpc-url={}", gadget_config.ws_rpc_url), format!("--keystore-uri={}", gadget_config.keystore_uri), format!("--chain={}", gadget_config.chain), format!("--verbose={}", opt.verbose), diff --git a/blueprint-test-utils/Cargo.toml b/blueprint-test-utils/Cargo.toml index 677319dd..b977d92b 100644 --- a/blueprint-test-utils/Cargo.toml +++ b/blueprint-test-utils/Cargo.toml @@ -37,9 +37,7 @@ alloy-sol-types = { workspace = true } alloy-contract = { workspace = true } alloy-rpc-types-eth = { workspace = true } alloy-node-bindings = { workspace = true } - -incredible-squaring-aggregator = { workspace = true } - +thiserror = { workspace = true } eigensdk = { workspace = true } testcontainers = { workspace = true } diff --git a/blueprint-test-utils/src/helpers.rs b/blueprint-test-utils/src/helpers.rs new file mode 100644 index 00000000..91db9b01 --- /dev/null +++ b/blueprint-test-utils/src/helpers.rs @@ -0,0 +1,468 @@ +use alloy_contract::{CallBuilder, CallDecoder}; +use alloy_provider::{RootProvider, WsConnect}; +use alloy_rpc_types::TransactionReceipt; +use futures::StreamExt; +use gadget_sdk::{error, info}; +use std::net::IpAddr; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use std::{collections::HashMap, time::Duration}; +use tokio::process::Child; +use tokio::sync::Mutex; +use url::Url; + +use crate::test_ext::{find_open_tcp_bind_port, NAME_IDS}; +use alloy_primitives::{address, Address, Bytes, U256}; +use alloy_provider::{network::Ethereum, Provider, ProviderBuilder}; +use alloy_transport::{BoxTransport, Transport, TransportError}; +use gadget_io::SupportedChains; +use gadget_sdk::config::Protocol; + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum BlueprintError { + #[error("Transport error occurred: {0}")] + TransportError(#[from] TransportError), + + #[error("Contract error occurred: {0}")] + ContractError(#[from] alloy_contract::Error), +} + +alloy_sol_types::sol!( + #[allow(missing_docs)] + #[sol(rpc)] + #[derive(Debug)] + IncredibleSquaringTaskManager, + "./../blueprints/incredible-squaring-eigenlayer/contracts/out/IncredibleSquaringTaskManager.sol/IncredibleSquaringTaskManager.json" +); + +alloy_sol_types::sol!( + #[allow(missing_docs)] + #[sol(rpc)] + #[derive(Debug)] + PauserRegistry, + "./../blueprints/incredible-squaring-eigenlayer/contracts/out/IPauserRegistry.sol/IPauserRegistry.json" +); + +alloy_sol_types::sol!( + #[allow(missing_docs, clippy::too_many_arguments)] + #[sol(rpc)] + #[derive(Debug)] + RegistryCoordinator, + "./../blueprints/incredible-squaring-eigenlayer/contracts/out/RegistryCoordinator.sol/RegistryCoordinator.json" +); + +pub fn get_provider_http(http_endpoint: &str) -> RootProvider { + let provider = ProviderBuilder::new() + .with_recommended_fillers() + .on_http(http_endpoint.parse().unwrap()) + .root() + .clone() + .boxed(); + + provider +} + +pub async fn get_provider_ws(ws_endpoint: &str) -> RootProvider { + let provider = ProviderBuilder::new() + .with_recommended_fillers() + .on_ws(WsConnect::new(ws_endpoint)) + .await + .unwrap() + .root() + .clone() + .boxed(); + + provider +} + +pub struct EigenlayerTestEnvironment { + pub http_endpoint: String, + pub ws_endpoint: String, + pub accounts: Vec
, + pub registry_coordinator_address: Address, + pub operator_state_retriever_address: Address, + pub delegation_manager_address: Address, + pub strategy_manager_address: Address, + pub pauser_registry_address: Address, +} + +pub async fn setup_eigenlayer_test_environment( + http_endpoint: &str, + ws_endpoint: &str, +) -> EigenlayerTestEnvironment { + let provider = get_provider_http(http_endpoint); + + let accounts = provider.get_accounts().await.unwrap(); + + let registry_coordinator_address = address!("c3e53f4d16ae77db1c982e75a937b9f60fe63690"); + std::env::set_var( + "REGISTRY_COORDINATOR_ADDR", + registry_coordinator_address.to_string(), + ); + let operator_state_retriever_address = address!("1613beb3b2c4f22ee086b2b38c1476a3ce7f78e8"); + std::env::set_var( + "OPERATOR_STATE_RETRIEVER_ADDR", + operator_state_retriever_address.to_string(), + ); + let delegation_manager_address = address!("dc64a140aa3e981100a9beca4e685f962f0cf6c9"); + std::env::set_var( + "DELEGATION_MANAGER_ADDR", + delegation_manager_address.to_string(), + ); + let strategy_manager_address = address!("5fc8d32690cc91d4c39d9d3abcbd16989f875707"); + std::env::set_var( + "STRATEGY_MANAGER_ADDR", + strategy_manager_address.to_string(), + ); + let erc20_mock_address = address!("7969c5ed335650692bc04293b07f5bf2e7a673c0"); + std::env::set_var("ERC20_MOCK_ADDR", erc20_mock_address.to_string()); + + let pauser_registry = PauserRegistry::deploy(provider.clone()).await.unwrap(); + let pauser_registry_address = *pauser_registry.address(); + + let registry_coordinator = + RegistryCoordinator::new(registry_coordinator_address, provider.clone()); + + let operator_set_params = RegistryCoordinator::OperatorSetParam { + maxOperatorCount: 10, + kickBIPsOfOperatorStake: 100, + kickBIPsOfTotalStake: 1000, + }; + let strategy_params = RegistryCoordinator::StrategyParams { + strategy: erc20_mock_address, + multiplier: 1, + }; + + info!("Creating Quorum"); + let _receipt = get_receipt(registry_coordinator.createQuorum( + operator_set_params, + 0, + vec![strategy_params], + )) + .await + .unwrap(); + + info!("Setup Eigenlayer test environment"); + + EigenlayerTestEnvironment { + http_endpoint: http_endpoint.to_string(), + ws_endpoint: ws_endpoint.to_string(), + accounts, + registry_coordinator_address, + operator_state_retriever_address, + delegation_manager_address, + strategy_manager_address, + pauser_registry_address, + } +} + +pub struct BlueprintProcess { + pub handle: Child, + pub arguments: Vec, + pub env_vars: HashMap, +} + +impl BlueprintProcess { + pub async fn new( + program_path: PathBuf, + arguments: Vec, + env_vars: HashMap, + ) -> Result { + let mut command = tokio::process::Command::new(program_path); + command + .args(&arguments) + .envs(&env_vars) + .kill_on_drop(true) + .stdout(std::process::Stdio::inherit()) + .stderr(std::process::Stdio::inherit()) + .stdin(std::process::Stdio::null()); + + let handle = command.spawn()?; + + Ok(BlueprintProcess { + handle, + arguments, + env_vars, + }) + } + + pub async fn kill(&mut self) -> Result<(), std::io::Error> { + self.handle.kill().await + } +} + +pub struct BlueprintProcessManager { + processes: Arc>>, +} + +impl Default for BlueprintProcessManager { + fn default() -> Self { + Self::new() + } +} + +impl BlueprintProcessManager { + pub fn new() -> Self { + BlueprintProcessManager { + processes: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Helper function to start a blueprint process with given parameters. + async fn start_blueprint_process( + program_path: PathBuf, + instance_id: usize, + http_endpoint: &str, + ws_endpoint: &str, + ) -> Result { + let tmp_store = uuid::Uuid::new_v4().to_string(); + let keystore_uri = format!( + "./target/keystores/{}/{tmp_store}/", + NAME_IDS[instance_id].to_lowercase() + ); + assert!( + !std::path::Path::new(&keystore_uri).exists(), + "Keystore URI cannot exist: {}", + keystore_uri + ); + + let keystore_uri_normalized = + std::path::absolute(&keystore_uri).expect("Failed to resolve keystore URI"); + let keystore_uri_str = format!("file:{}", keystore_uri_normalized.display()); + + let arguments = vec![ + "run".to_string(), + format!("--bind-addr={}", IpAddr::from_str("127.0.0.1").unwrap()), + format!("--bind-port={}", find_open_tcp_bind_port()), + format!("--http-rpc-url={}", Url::parse(http_endpoint).unwrap()), + format!("--ws-rpc-url={}", Url::parse(ws_endpoint).unwrap()), + format!("--keystore-uri={}", keystore_uri_str.clone()), + format!("--chain={}", SupportedChains::LocalTestnet), + format!("--verbose={}", 3), + format!("--pretty={}", true), + format!("--blueprint-id={}", instance_id), + format!("--service-id={}", instance_id), + format!("--protocol={}", Protocol::Eigenlayer), + ]; + + let mut env_vars = HashMap::new(); + env_vars.insert("HTTP_RPC_URL".to_string(), http_endpoint.to_string()); + env_vars.insert("WS_RPC_URL".to_string(), ws_endpoint.to_string()); + env_vars.insert("KEYSTORE_URI".to_string(), keystore_uri_str.clone()); + env_vars.insert("DATA_DIR".to_string(), keystore_uri_str); + env_vars.insert("BLUEPRINT_ID".to_string(), instance_id.to_string()); + env_vars.insert("SERVICE_ID".to_string(), instance_id.to_string()); + env_vars.insert("REGISTRATION_MODE_ON".to_string(), "true".to_string()); + + BlueprintProcess::new(program_path, arguments, env_vars).await + } + + /// Starts multiple blueprint processes and adds them to the process manager. + pub async fn start_blueprints( + &self, + blueprint_paths: Vec, + http_endpoint: &str, + ws_endpoint: &str, + ) -> Result<(), std::io::Error> { + for (index, program_path) in blueprint_paths.into_iter().enumerate() { + let process = + Self::start_blueprint_process(program_path, index, http_endpoint, ws_endpoint) + .await?; + self.processes.lock().await.push(process); + } + Ok(()) + } + + pub async fn kill_all(&self) -> Result<(), std::io::Error> { + let mut processes = self.processes.lock().await; + for process in processes.iter_mut() { + process.kill().await?; + } + processes.clear(); + Ok(()) + } +} + +pub async fn deploy_task_manager( + http_endpoint: &str, + registry_coordinator_address: Address, + pauser_registry_address: Address, + owner_address: Address, + aggregator_address: Address, + task_generator_address: Address, +) -> Address { + let provider = get_provider_http(http_endpoint); + let deploy_call = IncredibleSquaringTaskManager::deploy_builder( + provider.clone(), + registry_coordinator_address, + 10u32, + ); + info!("Deploying Incredible Squaring Task Manager"); + let task_manager_address = match get_receipt(deploy_call).await { + Ok(receipt) => match receipt.contract_address { + Some(address) => address, + None => { + error!("Failed to get contract address from receipt"); + panic!("Failed to get contract address from receipt"); + } + }, + Err(e) => { + error!("Failed to get receipt: {:?}", e); + panic!("Failed to get contract address from receipt"); + } + }; + info!( + "Deployed Incredible Squaring Task Manager at {}", + task_manager_address + ); + std::env::set_var("TASK_MANAGER_ADDRESS", task_manager_address.to_string()); + + let task_manager = IncredibleSquaringTaskManager::new(task_manager_address, provider.clone()); + // Initialize the Incredible Squaring Task Manager + info!("Initializing Incredible Squaring Task Manager"); + let init_call = task_manager.initialize( + pauser_registry_address, + owner_address, + aggregator_address, + task_generator_address, + ); + let init_receipt = get_receipt(init_call).await.unwrap(); + assert!(init_receipt.status()); + info!("Initialized Incredible Squaring Task Manager"); + + task_manager_address +} + +pub async fn setup_task_spawner( + task_manager_address: Address, + registry_coordinator_address: Address, + task_generator_address: Address, + accounts: Vec
, + http_endpoint: String, +) -> impl std::future::Future { + let provider = get_provider_http(http_endpoint.as_str()); + let task_manager = IncredibleSquaringTaskManager::new(task_manager_address, provider.clone()); + let registry_coordinator = + RegistryCoordinator::new(registry_coordinator_address, provider.clone()); + + let operators = vec![vec![accounts[0]]]; + let quorums = Bytes::from(vec![0]); + async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis(10000)).await; + + if get_receipt( + task_manager + .createNewTask(U256::from(2), 100u32, Bytes::from(vec![0])) + .from(task_generator_address), + ) + .await + .unwrap() + .status() + { + info!("Created a new task..."); + } + + if get_receipt( + registry_coordinator.updateOperatorsForQuorum(operators.clone(), quorums.clone()), + ) + .await + .unwrap() + .status() + { + info!("Updated operators for quorum..."); + } + + tokio::process::Command::new("sh") + .arg("-c") + .arg(format!( + "cast rpc anvil_mine 1 --rpc-url {} > /dev/null", + http_endpoint + )) + .output() + .await + .unwrap(); + info!("Mined a block..."); + } + } +} + +pub async fn setup_task_response_listener( + task_manager_address: Address, + ws_endpoint: String, + successful_responses: Arc>, +) -> impl std::future::Future { + let task_manager = IncredibleSquaringTaskManager::new( + task_manager_address, + get_provider_ws(ws_endpoint.as_str()).await, + ); + + async move { + let filter = task_manager.TaskResponded_filter().filter; + let mut event_stream = match task_manager.provider().subscribe_logs(&filter).await { + Ok(stream) => stream.into_stream(), + Err(e) => { + error!("Failed to subscribe to logs: {:?}", e); + return; + } + }; + while let Some(event) = event_stream.next().await { + let IncredibleSquaringTaskManager::TaskResponded { + taskResponse: _, .. + } = event + .log_decode::() + .unwrap() + .inner + .data; + let mut counter = successful_responses.lock().await; + *counter += 1; + } + } +} + +pub async fn get_receipt( + call: CallBuilder, +) -> Result +where + T: Transport + Clone, + P: Provider, + D: CallDecoder, +{ + let pending_tx = match call.send().await { + Ok(tx) => tx, + Err(e) => { + error!("Failed to send transaction: {:?}", e); + return Err(e.into()); + } + }; + + let receipt = match pending_tx.get_receipt().await { + Ok(receipt) => receipt, + Err(e) => { + error!("Failed to get transaction receipt: {:?}", e); + return Err(e.into()); + } + }; + + Ok(receipt) +} + +pub async fn wait_for_responses( + successful_responses: Arc>, + task_response_count: usize, + timeout_duration: Duration, +) -> Result, tokio::time::error::Elapsed> { + tokio::time::timeout(timeout_duration, async move { + loop { + let count = *successful_responses.lock().await; + if count >= task_response_count { + return Ok(()); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + }) + .await +} diff --git a/blueprint-test-utils/src/lib.rs b/blueprint-test-utils/src/lib.rs index 8f8e263b..a6dbc17f 100644 --- a/blueprint-test-utils/src/lib.rs +++ b/blueprint-test-utils/src/lib.rs @@ -20,11 +20,6 @@ use std::error::Error; use std::net::IpAddr; use std::path::{Path, PathBuf}; use std::time::Duration; -use alloy_contract::{CallBuilder, CallDecoder}; -use alloy_provider::network::Ethereum; -use alloy_provider::Provider; -use alloy_rpc_types_eth::TransactionReceipt; -use alloy_transport::{Transport, TransportResult}; use subxt::tx::Signer; use subxt::utils::AccountId32; use url::Url; @@ -40,6 +35,7 @@ pub type InputValue = runtime_types::tangle_primitives::services::field::Field; pub mod anvil; +pub mod helpers; pub mod sync; pub mod test_ext; @@ -54,7 +50,8 @@ pub struct PerTestNodeInput { pretty: bool, #[allow(dead_code)] extra_input: T, - local_tangle_node: Url, + http_rpc_url: Url, + ws_rpc_url: Url, } /// Runs a test node using a top-down approach and invoking the blueprint manager to auto manage @@ -102,7 +99,8 @@ pub async fn run_test_blueprint_manager( let gadget_config = GadgetConfig { bind_addr: input.bind_ip, bind_port: input.bind_port, - url: input.local_tangle_node, + http_rpc_url: input.http_rpc_url, + ws_rpc_url: input.ws_rpc_url, bootnodes: input.bootnodes, keystore_uri: keystore_uri_str, keystore_password: None, @@ -360,20 +358,6 @@ pub async fn get_next_call_id(client: &TestClient) -> Result Ok(res) } -pub async fn get_receipt( - call: CallBuilder, -) -> TransportResult -where - T: Transport + Clone, - P: Provider, - D: CallDecoder, -{ - let pending_tx = call.send().await.unwrap(); - let receipt = pending_tx.get_receipt().await?; - - Ok(receipt) -} - #[macro_export] macro_rules! test_blueprint { ( @@ -403,11 +387,13 @@ macro_rules! test_blueprint { let manifest_path = base_path.join("Cargo.toml"); + let http_addr = "http://127.0.0.1:9944"; let ws_addr = "ws://127.0.0.1:9944"; let opts = Opts { pkg_name: Some($blueprint_name.to_string()), - rpc_url: ws_addr.to_string(), + http_rpc_url: http_addr.to_string(), + ws_rpc_url: ws_addr.to_string(), manifest_path, signer: None, signer_evm: None, @@ -483,20 +469,17 @@ mod test_macros { mod tests_standard { use super::*; use crate::test_ext::new_test_ext_blueprint_manager; - use alloy_primitives::{address, Bytes, U256}; - use alloy_provider::network::EthereumWallet; - use alloy_provider::{Provider, ProviderBuilder, WsConnect}; - use cargo_tangle::deploy::{Opts, PrivateKeySigner}; - use futures::StreamExt; - use gadget_sdk::config::Protocol; + + use cargo_tangle::deploy::Opts; + use gadget_sdk::logging::setup_log; use gadget_sdk::{error, info}; - use incredible_squaring_aggregator::aggregator::Aggregator; - use std::str::FromStr; + use helpers::{ + deploy_task_manager, setup_eigenlayer_test_environment, setup_task_response_listener, + setup_task_spawner, BlueprintProcessManager, EigenlayerTestEnvironment, + }; use std::sync::Arc; use tokio::sync::Mutex; - use tokio::time::timeout; - use IncredibleSquaringTaskManager::TaskResponded; const ANVIL_STATE_PATH: &str = "./blueprint-test-utils/anvil/deployed_anvil_states/testnet_state.json"; @@ -520,7 +503,8 @@ mod tests_standard { let opts = Opts { pkg_name: Some("incredible-squaring-blueprint".to_string()), - rpc_url: "ws://127.0.0.1:9944".to_string(), + http_rpc_url: "http://127.0.0.1:9944".to_string(), + ws_rpc_url: "ws://127.0.0.1:9944".to_string(), manifest_path, signer: None, signer_evm: None, @@ -585,343 +569,113 @@ mod tests_standard { .await } - alloy_sol_types::sol!( - #[allow(missing_docs)] - #[sol(rpc)] - #[derive(Debug)] - IncredibleSquaringTaskManager, - "./../blueprints/incredible-squaring-eigenlayer/contracts/out/IncredibleSquaringTaskManager.sol/IncredibleSquaringTaskManager.json" - ); - - alloy_sol_types::sol!( - #[allow(missing_docs)] - #[sol(rpc)] - #[derive(Debug)] - PauserRegistry, - "./../blueprints/incredible-squaring-eigenlayer/contracts/out/IPauserRegistry.sol/IPauserRegistry.json" - ); - - alloy_sol_types::sol!( - #[allow(missing_docs, clippy::too_many_arguments)] - #[sol(rpc)] - #[derive(Debug)] - RegistryCoordinator, - "./../blueprints/incredible-squaring-eigenlayer/contracts/out/RegistryCoordinator.sol/RegistryCoordinator.json" - ); - #[tokio::test(flavor = "multi_thread")] #[allow(clippy::needless_return)] async fn test_eigenlayer_incredible_squaring_blueprint() { setup_log(); + let (_container, http_endpoint, ws_endpoint) = anvil::start_anvil_container(ANVIL_STATE_PATH, true).await; std::env::set_var("EIGENLAYER_HTTP_ENDPOINT", http_endpoint.clone()); std::env::set_var("EIGENLAYER_WS_ENDPOINT", ws_endpoint.clone()); - // Sleep to give the testnet time to spin up tokio::time::sleep(Duration::from_secs(1)).await; - // Create a provider using the transport - let provider = alloy_provider::ProviderBuilder::new() - .with_recommended_fillers() - .on_http(http_endpoint.parse().unwrap()) - .root() - .clone() - .boxed(); - let accounts = provider.get_accounts().await.unwrap(); - - // let service_manager_addr = address!("67d269191c92caf3cd7723f116c85e6e9bf55933"); - let registry_coordinator_addr = address!("c3e53f4d16ae77db1c982e75a937b9f60fe63690"); - let operator_state_retriever_addr = address!("1613beb3b2c4f22ee086b2b38c1476a3ce7f78e8"); - // let delegation_manager_addr = address!("dc64a140aa3e981100a9beca4e685f962f0cf6c9"); - // let strategy_manager_addr = address!("5fc8d32690cc91d4c39d9d3abcbd16989f875707"); - let erc20_mock_addr = address!("7969c5ed335650692bc04293b07f5bf2e7a673c0"); - - // Deploy the Pauser Registry to the running Testnet - let pauser_registry = PauserRegistry::deploy(provider.clone()).await.unwrap(); - let &pauser_registry_addr = pauser_registry.address(); - - // Create Quorum - let registry_coordinator = - RegistryCoordinator::new(registry_coordinator_addr, provider.clone()); - let operator_set_params = RegistryCoordinator::OperatorSetParam { - maxOperatorCount: 10, - kickBIPsOfOperatorStake: 100, - kickBIPsOfTotalStake: 1000, - }; - let strategy_params = RegistryCoordinator::StrategyParams { - strategy: erc20_mock_addr, - multiplier: 1, - }; - let _receipt = get_receipt(registry_coordinator.createQuorum( - operator_set_params, - 0, - vec![strategy_params], - )) - .await - .unwrap(); - - // Deploy the Incredible Squaring Task Manager to the running Testnet - let task_manager_addr = get_receipt( - super::tests_standard::IncredibleSquaringTaskManager::deploy_builder( - provider.clone(), - registry_coordinator_addr, - 10u32, - ), + let EigenlayerTestEnvironment { + accounts, + http_endpoint, + ws_endpoint, + registry_coordinator_address, + pauser_registry_address, + .. + } = setup_eigenlayer_test_environment(&http_endpoint, &ws_endpoint).await; + let owner_address = &accounts[1]; + let aggregator_address = &accounts[9]; + let task_generator_address = &accounts[4]; + let task_manager_address = deploy_task_manager( + &http_endpoint, + registry_coordinator_address, + pauser_registry_address, + *owner_address, + *aggregator_address, + *task_generator_address, ) - .await - .unwrap() - .contract_address - .unwrap(); - info!("Task Manager: {:?}", task_manager_addr); - std::env::set_var("TASK_MANAGER_ADDRESS", task_manager_addr.to_string()); - - // We create a Task Manager instance for the task spawner - let task_manager = IncredibleSquaringTaskManager::new(task_manager_addr, provider.clone()); - let task_generator_address = accounts[4]; - - // Initialize the Incredible Squaring Task Manager - let init_receipt = get_receipt(task_manager.initialize( - pauser_registry_addr, - accounts[1], - accounts[9], - task_generator_address, - )) - .await - .unwrap(); - assert!(init_receipt.status()); - - let signer: PrivateKeySigner = - "0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6" - .parse() - .unwrap(); - let wallet = EthereumWallet::from(signer); - let (aggregator, _cancellation_token) = Aggregator::new( - task_manager_addr, - registry_coordinator_addr, - operator_state_retriever_addr, - http_endpoint.clone(), - ws_endpoint.clone(), - "127.0.0.1:8081".to_string(), - wallet, - ) - .await - .unwrap(); - - // Run the server in a separate thread - let (handle, aggregator_shutdown_tx) = aggregator.start(ws_endpoint.to_string()); + .await; + let num_successful_responses_required = 3; let successful_responses = Arc::new(Mutex::new(0)); let successful_responses_clone = successful_responses.clone(); - // Create an event listener for TaskResponded events - let task_mgr_clone = task_manager.clone(); - let ws_provider = ProviderBuilder::new() - .on_ws(WsConnect::new(ws_endpoint.clone())) - .await - .unwrap() - .root() - .clone() - .boxed(); - let task_response_listener = async move { - let filter = task_mgr_clone.TaskResponded_filter().filter; - info!("Filter: {:?}", filter); - let mut event_stream = match ws_provider.subscribe_logs(&filter).await { - Ok(stream) => stream.into_stream(), - Err(e) => { - error!("Failed to subscribe to logs: {:?}", e); - return; - } - }; - info!("Listening for TaskResponded events..."); - while let Some(event) = event_stream.next().await { - let TaskResponded { - taskResponse: _response, - .. - } = event.log_decode::().unwrap().inner.data; - let mut counter = successful_responses.lock().await; - *counter += 1; - if *counter >= 1 { - break; - } - } - }; - let response_listener_handle = tokio::spawn(task_response_listener); + // Start the Task Response Listener + let response_listener = setup_task_response_listener( + task_manager_address, + ws_endpoint.clone(), + successful_responses, + ) + .await; // Start the Task Spawner - let operators = vec![vec![accounts[0]]]; - let quorums = Bytes::from(vec![0]); - let task_spawner = async move { - let mut task_count = 0; - loop { - tokio::time::sleep(std::time::Duration::from_millis(10000)).await; - - if get_receipt( - task_manager - .createNewTask(U256::from(2), 100u32, Bytes::from(vec![0])) - .from(task_generator_address), - ) - .await - .unwrap() - .status() - { - info!("Deployed a new task"); - task_count += 1; - } - - if get_receipt( - registry_coordinator - .updateOperatorsForQuorum(operators.clone(), quorums.clone()), - ) - .await - .unwrap() - .status() - { - info!("Updated operators for quorum 0"); - } + let task_spawner = setup_task_spawner( + task_manager_address, + registry_coordinator_address, + *task_generator_address, + accounts.to_vec(), + http_endpoint.clone(), + ) + .await; - tokio::process::Command::new("sh") - .arg("-c") - .arg(format!( - "cast rpc anvil_mine 1 --rpc-url {} > /dev/null", - http_endpoint - )) - .output() - .await - .unwrap(); - info!("Mined a block..."); + tokio::spawn(async move { + task_spawner.await; + }); - // Break the loop if we've created enough tasks - if task_count >= 5 { - // Create more tasks than we expect responses for - break; - } - } - }; - let task_spawner_handle = tokio::spawn(task_spawner); + tokio::spawn(async move { + response_listener.await; + }); info!("Starting Blueprint Binary..."); - let tmp_store = Uuid::new_v4().to_string(); - let keystore_uri = PathBuf::from(format!( - "./target/keystores/{}/{tmp_store}/", - NAME_IDS[0].to_lowercase() - )); - assert!( - !keystore_uri.exists(), - "Keystore URI cannot exist: {}", - keystore_uri.display() - ); - let keystore_uri_normalized = - std::path::absolute(keystore_uri).expect("Failed to resolve keystore URI"); - let keystore_uri_str = format!("file:{}", keystore_uri_normalized.display()); - - let mut arguments = vec![]; - arguments.push("run".to_string()); - - arguments.extend([ - format!("--bind-addr={}", IpAddr::from_str("127.0.0.1").unwrap()), - format!("--bind-port={}", 8545u16), - format!("--url={}", Url::parse("ws://127.0.0.1:8545").unwrap()), - format!("--keystore-uri={}", keystore_uri_str.clone()), - format!("--chain={}", SupportedChains::LocalTestnet), - format!("--verbose={}", 3), - format!("--pretty={}", true), - format!("--blueprint-id={}", 0), - format!("--service-id={}", 0), - format!("--protocol={}", Protocol::Eigenlayer), - ]); - + let blueprint_process_manager = BlueprintProcessManager::new(); let current_dir = std::env::current_dir().unwrap(); - let program_path = format!( + let xsquare_task_program_path = PathBuf::from(format!( "{}/../target/release/incredible-squaring-blueprint-eigenlayer", current_dir.display() - ); - let program_path = PathBuf::from(program_path).canonicalize().unwrap(); - - let mut env_vars = vec![ - ("RPC_URL".to_string(), "ws://127.0.0.1:8545".to_string()), - ("KEYSTORE_URI".to_string(), keystore_uri_str.clone()), - ("DATA_DIR".to_string(), keystore_uri_str), - ("BLUEPRINT_ID".to_string(), format!("{}", 0)), - ("SERVICE_ID".to_string(), format!("{}", 0)), - ("REGISTRATION_MODE_ON".to_string(), "true".to_string()), - ( - "OPERATOR_BLS_KEY_PASSWORD".to_string(), - "BLS_PASSWORD".to_string(), - ), - ( - "OPERATOR_ECDSA_KEY_PASSWORD".to_string(), - "ECDSA_PASSWORD".to_string(), - ), - ]; - - // Ensure our child process inherits the current processes' environment vars - env_vars.extend(std::env::vars()); - - // Now that the file is loaded, spawn the process - let mut process_handle = tokio::process::Command::new(program_path.as_os_str()) - .kill_on_drop(true) - .stdout(std::process::Stdio::inherit()) // Inherit the stdout of this process - .stderr(std::process::Stdio::inherit()) // Inherit the stderr of this process - .stdin(std::process::Stdio::null()) - .current_dir(std::env::current_dir().unwrap()) - .envs(env_vars) - .args(arguments) - .spawn() + )) + .canonicalize() + .unwrap(); + + blueprint_process_manager + .start_blueprints( + vec![xsquare_task_program_path], + &http_endpoint, + ws_endpoint.as_ref(), + ) + .await .unwrap(); // Wait for the process to complete or timeout - let timeout_duration = Duration::from_secs(300); // 5 minutes timeout - let result = timeout(timeout_duration, async { - loop { - let count = *successful_responses_clone.lock().await; - if count >= 1 { - return Ok::<(), std::io::Error>(()); - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - }) + let timeout_duration = Duration::from_secs(300); + let result = helpers::wait_for_responses( + successful_responses_clone, + num_successful_responses_required, + timeout_duration, + ) .await; - // When you want to shut down the aggregator: - if let Err(e) = aggregator_shutdown_tx.send(()).await { - error!("Failed to send shutdown signal to aggregator: {:?}", e); - } else { - info!("Sent shutdown signal to aggregator."); - } - - // Wait for the aggregator to shut down - if let Err(e) = handle.await { - error!("Error waiting for aggregator to shut down: {:?}", e); - } - - // Cancel the task spawner and response listener - task_spawner_handle.abort(); - response_listener_handle.abort(); - // Check the result - match result { - Ok(Ok(())) => { - info!("Test completed successfully with 3 or more tasks responded to."); - if let Err(e) = process_handle.kill().await { - error!("Failed to kill the process: {:?}", e); - } else { - info!("Process killed successfully."); - } - } - Ok(Err(e)) => { - error!("Test failed with error: {:?}", e); - panic!("Test failed"); - } - Err(_) => { - error!( - "Test timed out after {} seconds", - timeout_duration.as_secs() - ); - panic!("Test timed out"); - } + if let Ok(Ok(())) = result { + info!("Test completed successfully with {num_successful_responses_required} tasks responded to."); + blueprint_process_manager + .kill_all() + .await + .unwrap_or_else(|e| { + error!("Failed to kill all blueprint processes: {:?}", e); + }); + } else { + panic!( + "Test timed out after {} seconds", + timeout_duration.as_secs() + ); } } } diff --git a/blueprint-test-utils/src/test_ext.rs b/blueprint-test-utils/src/test_ext.rs index a329f629..8c40bb88 100644 --- a/blueprint-test-utils/src/test_ext.rs +++ b/blueprint-test-utils/src/test_ext.rs @@ -39,7 +39,7 @@ use tracing::Instrument; use gadget_sdk::{error, info, warn}; const LOCAL_BIND_ADDR: &str = "127.0.0.1"; -const LOCAL_TANGLE_NODE: &str = "ws://127.0.0.1:9944"; +const LOCAL_TANGLE_NODE_HTTP: &str = "http://127.0.0.1:9944"; pub const NAME_IDS: [&str; 5] = ["Alice", "Bob", "Charlie", "Dave", "Eve"]; /// - `N`: number of nodes @@ -102,7 +102,8 @@ pub async fn new_test_ext_blueprint_manager< verbose: 4, pretty: false, extra_input: additional_params.clone(), - local_tangle_node: Url::parse(&opts.rpc_url).expect("Should parse URL"), + http_rpc_url: Url::parse(&opts.http_rpc_url).expect("Should parse URL"), + ws_rpc_url: Url::parse(&opts.ws_rpc_url).expect("Should parse URL"), }; let handle = f(test_input).await; @@ -136,7 +137,7 @@ pub async fn new_test_ext_blueprint_manager< } }; - let client = OnlineClient::from_url(LOCAL_TANGLE_NODE) + let client = OnlineClient::from_url(LOCAL_TANGLE_NODE_HTTP) .await .expect("Failed to create an account-based localhost client"); @@ -146,7 +147,7 @@ pub async fn new_test_ext_blueprint_manager< // TODO: allow the function called to specify the registration args for handle in handles { - let client = OnlineClient::from_url(LOCAL_TANGLE_NODE) + let client = OnlineClient::from_url(LOCAL_TANGLE_NODE_HTTP) .await .expect("Failed to create an account-based localhost client"); let registration_args = registration_args.clone(); @@ -241,7 +242,7 @@ pub async fn new_test_ext_blueprint_manager< } } -fn find_open_tcp_bind_port() -> u16 { +pub fn find_open_tcp_bind_port() -> u16 { let listener = std::net::TcpListener::bind(format!("{LOCAL_BIND_ADDR}:0")) .expect("Should bind to localhost"); listener diff --git a/blueprints/ecdsa-threshold-mpc/src/main.rs b/blueprints/ecdsa-threshold-mpc/src/main.rs index 77929fbd..63d6e2db 100644 --- a/blueprints/ecdsa-threshold-mpc/src/main.rs +++ b/blueprints/ecdsa-threshold-mpc/src/main.rs @@ -27,7 +27,7 @@ async fn main() -> Result<()> { let keystore = env.keystore()?; let signer = env.first_signer()?; let client: TangleClient = - subxt::OnlineClient::from_url(&env.rpc_endpoint).await?; + subxt::OnlineClient::from_url(&env.http_rpc_endpoint).await?; // // Create the event handler from the job // let keygen_job = KeygenEventHandler { diff --git a/blueprints/incredible-squaring-eigenlayer/Cargo.toml b/blueprints/incredible-squaring-eigenlayer/Cargo.toml index 08a2fa7e..380142ee 100644 --- a/blueprints/incredible-squaring-eigenlayer/Cargo.toml +++ b/blueprints/incredible-squaring-eigenlayer/Cargo.toml @@ -50,10 +50,12 @@ serde = { workspace = true } serde_json = { workspace = true } uuid = { workspace = true, features = ["v4"] } bip39 = { workspace = true } +thiserror = { workspace = true } +jsonrpc-core = { workspace = true } +jsonrpc-http-server = { workspace = true } [dev-dependencies] blueprint-test-utils = { workspace = true } -incredible-squaring-aggregator = { workspace = true } gadget-io = { workspace = true } [features] diff --git a/blueprints/incredible-squaring-eigenlayer/aggregator/Cargo.toml b/blueprints/incredible-squaring-eigenlayer/aggregator/Cargo.toml deleted file mode 100644 index c46d9b62..00000000 --- a/blueprints/incredible-squaring-eigenlayer/aggregator/Cargo.toml +++ /dev/null @@ -1,61 +0,0 @@ -[package] -name = "incredible-squaring-aggregator" -version = "0.1.1" -description = "An aggregator for processing and aggregating BLS signatures for the incredible squaring blueprint" -authors.workspace = true -edition.workspace = true -license.workspace = true -homepage.workspace = true -repository.workspace = true -publish = false - -[dependencies] -eigensdk = { workspace = true } -tracing = { workspace = true } -futures-util = { workspace = true } -async-trait = { workspace = true } -gadget-sdk = { workspace = true, features = ["std"] } -color-eyre = { workspace = true } -lazy_static = { workspace = true } -lock_api = { workspace = true } -tokio = { workspace = true, default-features = false, features = ["full"] } -tokio-util = { workspace = true } -sp-core = { workspace = true } -subxt-signer = { workspace = true, features = ["sr25519", "subxt", "std"] } -alloy-contract = { workspace = true } -alloy-consensus = { workspace = true } -alloy-json-abi = { workspace = true, features = ["serde_json"] } -alloy-network = { workspace = true } -alloy-primitives = { workspace = true } -alloy-provider = { workspace = true } -alloy-pubsub = { workspace = true } -alloy-rpc-types = { workspace = true } -alloy-rpc-types-eth = { workspace = true } -alloy-signer = { workspace = true } -alloy-signer-local = { workspace = true } -alloy-sol-types = { workspace = true, features = ["json"] } -alloy-transport = { workspace = true } -alloy-transport-http = { workspace = true } -ark-bn254 = { workspace = true } -ark-ff = { workspace = true } -ark-ec = { workspace = true } -jsonrpc-core = { workspace = true} -jsonrpc-http-server = { workspace = true} -parking_lot = { workspace = true } -libp2p = { workspace = true } -ed25519-zebra = { workspace = true, features = ["pkcs8", "default", "der", "std", "serde", "pem"] } -structopt = { workspace = true } -hex = { workspace = true } -k256 = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -uuid = { workspace = true, features = ["v4"] } -bip39 = { workspace = true } -thiserror = { workspace = true } - -[build-dependencies] -blueprint-metadata = { workspace = true } - -[features] -default = ["std"] -std = [] diff --git a/blueprints/incredible-squaring-eigenlayer/aggregator/src/aggregator.rs b/blueprints/incredible-squaring-eigenlayer/aggregator/src/aggregator.rs deleted file mode 100644 index bbb22185..00000000 --- a/blueprints/incredible-squaring-eigenlayer/aggregator/src/aggregator.rs +++ /dev/null @@ -1,379 +0,0 @@ -use crate::IncredibleSquaringTaskManager::{ - self, G1Point, G2Point, NewTaskCreated, NonSignerStakesAndSignature, Task, TaskResponse, -}; -use alloy_network::{Ethereum, EthereumWallet, NetworkWallet}; -use alloy_primitives::{keccak256, Address}; -use alloy_provider::{Provider, ProviderBuilder, WsConnect}; -use alloy_rpc_types::Filter; -use alloy_sol_types::{SolEvent, SolType}; -use color_eyre::Result; -use eigensdk::{ - client_avsregistry::reader::AvsRegistryChainReader, - crypto_bls::{ - convert_to_g1_point, convert_to_g2_point, BlsG1Point, BlsG2Point, OperatorId, Signature, - }, - logging::get_test_logger, - services_avsregistry::chaincaller::AvsRegistryServiceChainCaller, - services_blsaggregation::bls_agg::{BlsAggregationServiceResponse, BlsAggregatorService}, - services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory, - types::avs::{TaskIndex, TaskResponseDigest}, - utils::get_provider, -}; -use futures_util::StreamExt; -use gadget_sdk::{debug, error, info}; -use jsonrpc_core::{IoHandler, Params, Value}; -use jsonrpc_http_server::{AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; -use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; -use tokio::sync::{mpsc, oneshot}; -use tokio::{sync::Mutex, task::JoinHandle}; -use tokio_util::sync::CancellationToken; - -const TASK_CHALLENGE_WINDOW_BLOCK: u32 = 100; -const BLOCK_TIME_SECONDS: u32 = 12; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SignedTaskResponse { - pub task_response: TaskResponse, - pub signature: Signature, - pub operator_id: OperatorId, -} - -pub struct Aggregator { - port_address: String, - task_manager_addr: Address, - bls_aggregation_service: BlsAggregatorService< - AvsRegistryServiceChainCaller, - >, - tasks: HashMap, - tasks_responses: HashMap>, - http_rpc_url: String, - wallet: EthereumWallet, -} - -impl Aggregator { - pub async fn new( - task_manager_addr: Address, - registry_coordinator_addr: Address, - operator_state_retriever_addr: Address, - http_rpc_url: String, - ws_rpc_url: String, - aggregator_ip_addr: String, - wallet: EthereumWallet, - ) -> Result<(Self, CancellationToken)> { - let avs_registry_chain_reader = AvsRegistryChainReader::new( - get_test_logger(), - registry_coordinator_addr, - operator_state_retriever_addr, - http_rpc_url.clone(), - ) - .await?; - - let operators_info_service = OperatorInfoServiceInMemory::new( - get_test_logger(), - avs_registry_chain_reader.clone(), - ws_rpc_url, - ) - .await; - - let cancellation_token = tokio_util::sync::CancellationToken::new(); - let operators_info_clone = operators_info_service.clone(); - let token_clone = cancellation_token.clone(); - - tokio::task::spawn(async move { - operators_info_clone - .start_service(&token_clone, 0, 200) - .await - }); - - let avs_registry_service = AvsRegistryServiceChainCaller::new( - avs_registry_chain_reader, - operators_info_service.clone(), - ); - - let bls_aggregation_service = BlsAggregatorService::new(avs_registry_service); - - Ok(( - Self { - port_address: aggregator_ip_addr, - task_manager_addr, - tasks: HashMap::new(), - tasks_responses: HashMap::new(), - bls_aggregation_service, - http_rpc_url, - wallet, - }, - cancellation_token, - )) - } - - pub fn start(self, ws_rpc_url: String) -> (JoinHandle<()>, mpsc::Sender<()>) { - let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); - let aggregator = Arc::new(Mutex::new(self)); - - let handle = tokio::spawn(async move { - info!("Starting aggregator"); - - let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel(); - let server_handle = tokio::spawn(Self::start_server( - Arc::clone(&aggregator), - server_shutdown_rx, - )); - let tasks_handle = - tokio::spawn(Self::process_tasks(ws_rpc_url, Arc::clone(&aggregator))); - - debug!("Server and task processing tasks have been spawned"); - - tokio::select! { - _ = server_handle => { - info!("Server task has completed"); - } - _ = tasks_handle => { - info!("Task processing has completed"); - } - _ = shutdown_rx.recv() => { - info!("Received shutdown signal"); - let _ = server_shutdown_tx.send(()); - } - } - info!("Aggregator is shutting down"); - }); - - (handle, shutdown_tx) - } - - async fn start_server( - aggregator: Arc>, - shutdown: oneshot::Receiver<()>, - ) -> Result<()> { - let mut io = IoHandler::new(); - io.add_method("process_signed_task_response", { - let aggregator = Arc::clone(&aggregator); - move |params: Params| { - let aggregator = Arc::clone(&aggregator); - async move { - // Parse the outer structure first - let outer_params: Value = params.parse()?; - - // Extract the inner "params" object - let inner_params = outer_params.get("params").ok_or_else(|| { - jsonrpc_core::Error::invalid_params("Missing 'params' field") - })?; - - // Now parse the inner params as SignedTaskResponse - let signed_task_response: SignedTaskResponse = - serde_json::from_value(inner_params.clone()).map_err(|e| { - jsonrpc_core::Error::invalid_params(format!( - "Invalid SignedTaskResponse: {}", - e - )) - })?; - - info!("Parsed signed task response: {:?}", signed_task_response); - - aggregator - .lock() - .await - .process_signed_task_response(signed_task_response) - .await - .map(|_| Value::Bool(true)) - .map_err(|e| jsonrpc_core::Error::invalid_params(e.to_string())) - } - } - }); - - let socket: SocketAddr = aggregator.lock().await.port_address.parse()?; - let server = ServerBuilder::new(io) - .cors(DomainsValidation::AllowOnly(vec![ - AccessControlAllowOrigin::Any, - ])) - .start_http(&socket)?; - - info!("Server running at {}", socket); - - // Create a close handle before we move the server - let close_handle = server.close_handle(); - - // Use tokio::select! to wait for either the server to finish or the shutdown signal - tokio::select! { - _ = async { server.wait() } => { - info!("Server has stopped"); - } - _ = shutdown => { - info!("Initiating server shutdown"); - close_handle.close(); - } - } - Ok(()) - } - - async fn process_tasks(ws_rpc_url: String, aggregator: Arc>) -> Result<()> { - info!("Connecting to WebSocket RPC at: {}", ws_rpc_url); - let provider = ProviderBuilder::new() - .on_ws(WsConnect::new(ws_rpc_url)) - .await?; - let filter = Filter::new().event_signature(NewTaskCreated::SIGNATURE_HASH); - let mut stream = provider.subscribe_logs(&filter).await?.into_stream(); - - while let Some(log) = stream.next().await { - let NewTaskCreated { taskIndex, task } = log.log_decode()?.inner.data; - let mut aggregator = aggregator.lock().await; - aggregator.tasks.insert(taskIndex, task.clone()); - - let time_to_expiry = std::time::Duration::from_secs( - (TASK_CHALLENGE_WINDOW_BLOCK * BLOCK_TIME_SECONDS).into(), - ); - - if let Err(e) = aggregator - .bls_aggregation_service - .initialize_new_task( - taskIndex, - task.taskCreatedBlock, - task.quorumNumbers.to_vec(), - vec![task.quorumThresholdPercentage.try_into()?; task.quorumNumbers.len()], - time_to_expiry, - ) - .await - { - error!( - "Failed to initialize new task: {}. Error: {:?}", - taskIndex, e - ); - } else { - debug!("Successfully initialized new task: {}", taskIndex); - } - } - - Ok(()) - } - - async fn process_signed_task_response(&mut self, resp: SignedTaskResponse) -> Result<()> { - let SignedTaskResponse { - task_response, - signature, - operator_id, - } = resp.clone(); - let task_index = task_response.referenceTaskIndex; - let task_response_digest = keccak256(TaskResponse::abi_encode(&task_response)); - - info!( - "Processing signed task response for task index: {}, task response digest: {}", - task_index, task_response_digest - ); - - if self - .tasks_responses - .entry(task_index) - .or_default() - .contains_key(&task_response_digest) - { - info!( - "Task response digest already processed for task index: {}", - task_index - ); - return Ok(()); - } - - self.tasks_responses - .get_mut(&task_index) - .unwrap() - .insert(task_response_digest, task_response.clone()); - - debug!( - "Inserted task response for task index: {}, {:?}", - task_index, resp - ); - - if let Err(e) = self - .bls_aggregation_service - .process_new_signature(task_index, task_response_digest, signature, operator_id) - .await - { - error!( - "Failed to process new signature for task index: {}. Error: {:?}", - task_index, e - ); - } else { - debug!( - "Successfully processed new signature for task index: {}", - task_index - ); - } - - if let Some(aggregated_response) = self - .bls_aggregation_service - .aggregated_response_receiver - .lock() - .await - .recv() - .await - { - self.send_aggregated_response_to_contract(aggregated_response?) - .await?; - } - Ok(()) - } - - async fn send_aggregated_response_to_contract( - &self, - response: BlsAggregationServiceResponse, - ) -> Result<()> { - let non_signer_stakes_and_signature = NonSignerStakesAndSignature { - nonSignerPubkeys: response - .non_signers_pub_keys_g1 - .into_iter() - .map(to_g1_point) - .collect(), - nonSignerQuorumBitmapIndices: response.non_signer_quorum_bitmap_indices, - quorumApks: response - .quorum_apks_g1 - .into_iter() - .map(to_g1_point) - .collect(), - apkG2: to_g2_point(response.signers_apk_g2), - sigma: to_g1_point(response.signers_agg_sig_g1.g1_point()), - quorumApkIndices: response.quorum_apk_indices, - totalStakeIndices: response.total_stake_indices, - nonSignerStakeIndices: response.non_signer_stake_indices, - }; - - fn to_g1_point(pk: BlsG1Point) -> G1Point { - let pt = convert_to_g1_point(pk.g1()).expect("Invalid G1 point"); - G1Point { X: pt.X, Y: pt.Y } - } - - fn to_g2_point(pk: BlsG2Point) -> G2Point { - let pt = convert_to_g2_point(pk.g2()).expect("Invalid G2 point"); - G2Point { X: pt.X, Y: pt.Y } - } - - let task = &self.tasks[&response.task_index]; - let task_response = - &self.tasks_responses[&response.task_index][&response.task_response_digest]; - - let provider = get_provider(&self.http_rpc_url); - let task_manager = - IncredibleSquaringTaskManager::new(self.task_manager_addr, provider.clone()); - - let _ = task_manager - .respondToTask( - task.clone(), - task_response.clone(), - non_signer_stakes_and_signature, - ) - .from(NetworkWallet::::default_signer_address( - &self.wallet, - )) - .send() - .await? - .get_receipt() - .await?; - - info!( - "Sent aggregated response to contract for task index: {}", - response.task_index, - ); - - Ok(()) - } -} diff --git a/blueprints/incredible-squaring-eigenlayer/aggregator/src/lib.rs b/blueprints/incredible-squaring-eigenlayer/aggregator/src/lib.rs deleted file mode 100644 index f94cf242..00000000 --- a/blueprints/incredible-squaring-eigenlayer/aggregator/src/lib.rs +++ /dev/null @@ -1,12 +0,0 @@ -use alloy_sol_types::sol; -use serde::{Deserialize, Serialize}; - -pub mod aggregator; - -sol!( - #[allow(missing_docs)] - #[sol(rpc)] - #[derive(Debug, Serialize, Deserialize)] - IncredibleSquaringTaskManager, - "../contracts/out/IncredibleSquaringTaskManager.sol/IncredibleSquaringTaskManager.json" -); diff --git a/blueprints/incredible-squaring-eigenlayer/contracts/lib/forge-std b/blueprints/incredible-squaring-eigenlayer/contracts/lib/forge-std index 1de6eecf..035de35f 160000 --- a/blueprints/incredible-squaring-eigenlayer/contracts/lib/forge-std +++ b/blueprints/incredible-squaring-eigenlayer/contracts/lib/forge-std @@ -1 +1 @@ -Subproject commit 1de6eecf821de7fe2c908cc48d3ab3dced20717f +Subproject commit 035de35f5e366c8d6ed142aec4ccb57fe2dd87d4 diff --git a/blueprints/incredible-squaring-eigenlayer/src/constants.rs b/blueprints/incredible-squaring-eigenlayer/src/constants.rs index 128a2ba4..445c8a73 100644 --- a/blueprints/incredible-squaring-eigenlayer/src/constants.rs +++ b/blueprints/incredible-squaring-eigenlayer/src/constants.rs @@ -12,6 +12,9 @@ lazy_static! { pub static ref PRIVATE_KEY: String = env::var("PRIVATE_KEY").unwrap_or_else(|_| { "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".to_string() }); + pub static ref AGGREGATOR_PRIVATE_KEY: String = env::var("PRIVATE_KEY").unwrap_or_else(|_| { + "2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6".to_string() + }); pub static ref SR_SECRET_BYTES: Vec = env::var("SR_SECRET_BYTES") .map(|v| v.into_bytes()) .unwrap_or_else(|_| vec![0; 32]); @@ -31,7 +34,4 @@ lazy_static! { pub static ref OPERATOR_ADDRESS: Address = address!("f39fd6e51aad88f6f4ce6ab8827279cfffb92266"); pub static ref OPERATOR_METADATA_URL: String = "https://github.com/tangle-network/gadget".to_string(); - pub static ref MNEMONIC_SEED: String = env::var("MNEMONIC_SEED").unwrap_or_else(|_| { - "test test test test test test test test test test test junk".to_string() - }); } diff --git a/blueprints/incredible-squaring-eigenlayer/src/contexts/aggregator.rs b/blueprints/incredible-squaring-eigenlayer/src/contexts/aggregator.rs new file mode 100644 index 00000000..0a21c4cb --- /dev/null +++ b/blueprints/incredible-squaring-eigenlayer/src/contexts/aggregator.rs @@ -0,0 +1,419 @@ +use crate::{ + contexts::client::SignedTaskResponse, + IncredibleSquaringTaskManager::{ + self, G1Point, G2Point, NonSignerStakesAndSignature, TaskResponse, + }, +}; +use alloy_network::{Ethereum, NetworkWallet}; +use alloy_primitives::keccak256; +use alloy_sol_types::SolType; +use color_eyre::Result; +use eigensdk::{ + crypto_bls::{convert_to_g1_point, convert_to_g2_point, BlsG1Point, BlsG2Point}, + services_blsaggregation::bls_agg::BlsAggregationServiceResponse, + utils::get_provider, +}; +use gadget_sdk::{ + config::StdGadgetConfiguration, + ctx::{EigenlayerContext, KeystoreContext}, + debug, error, info, +}; +use jsonrpc_core::{IoHandler, Params, Value}; +use jsonrpc_http_server::{AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; +use std::{collections::VecDeque, net::SocketAddr, sync::Arc, time::Duration}; +use tokio::task::JoinHandle; +use tokio::{ + sync::{oneshot, Mutex}, + time::interval, +}; + +use crate::IncredibleSquaringTaskManager::Task; +use alloy_network::EthereumWallet; +use alloy_primitives::Address; +use eigensdk::client_avsregistry::reader::AvsRegistryChainReader; +use eigensdk::services_avsregistry::chaincaller::AvsRegistryServiceChainCaller; +use eigensdk::services_blsaggregation::bls_agg::BlsAggregatorService; +use eigensdk::services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory; +use eigensdk::types::avs::{TaskIndex, TaskResponseDigest}; +use std::collections::HashMap; + +pub type BlsAggServiceInMemory = BlsAggregatorService< + AvsRegistryServiceChainCaller, +>; + +#[derive(Clone, EigenlayerContext, KeystoreContext)] +pub struct AggregatorContext { + pub port_address: String, + pub task_manager_address: Address, + pub tasks: Arc>>, + pub tasks_responses: Arc>>>, + pub bls_aggregation_service: Option>>, + pub http_rpc_url: String, + pub wallet: EthereumWallet, + pub response_cache: Arc>>, + #[config] + pub sdk_config: StdGadgetConfiguration, +} + +impl AggregatorContext { + pub async fn new( + port_address: String, + task_manager_address: Address, + http_rpc_url: String, + wallet: EthereumWallet, + sdk_config: StdGadgetConfiguration, + ) -> Result { + let mut aggregator_context = AggregatorContext { + port_address, + task_manager_address, + tasks: Arc::new(Mutex::new(HashMap::new())), + tasks_responses: Arc::new(Mutex::new(HashMap::new())), + bls_aggregation_service: None, + http_rpc_url, + wallet, + response_cache: Arc::new(Mutex::new(VecDeque::new())), + sdk_config, + }; + + // Initialize the bls registry service + let bls_service = aggregator_context + .bls_aggregation_service_in_memory() + .await?; + aggregator_context.bls_aggregation_service = Some(Arc::new(Mutex::new(bls_service))); + + Ok(aggregator_context) + } + + pub fn start(self, _ws_rpc_url: String) -> (JoinHandle<()>, oneshot::Sender<()>) { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let aggregator = Arc::new(Mutex::new(self)); + + let handle = tokio::spawn(async move { + info!("Starting aggregator RPC server"); + + let server_handle = + tokio::spawn(Self::start_server(Arc::clone(&aggregator), shutdown_rx)); + let process_handle = + tokio::spawn(Self::process_cached_responses(Arc::clone(&aggregator))); + + tokio::select! { + _ = server_handle => { + error!("Server task unexpectedly finished"); + } + _ = process_handle => { + error!("Process cached responses task unexpectedly finished"); + } + } + }); + + (handle, shutdown_tx) + } + + async fn start_server( + aggregator: Arc>, + shutdown: oneshot::Receiver<()>, + ) -> Result<()> { + let mut io = IoHandler::new(); + io.add_method("process_signed_task_response", { + let aggregator = Arc::clone(&aggregator); + move |params: Params| { + let aggregator = Arc::clone(&aggregator); + async move { + // Parse the outer structure first + let outer_params: Value = params.parse()?; + + // Extract the inner "params" object + let inner_params = outer_params.get("params").ok_or_else(|| { + jsonrpc_core::Error::invalid_params("Missing 'params' field") + })?; + + // Now parse the inner params as SignedTaskResponse + let signed_task_response: SignedTaskResponse = + serde_json::from_value(inner_params.clone()).map_err(|e| { + jsonrpc_core::Error::invalid_params(format!( + "Invalid SignedTaskResponse: {}", + e + )) + })?; + + aggregator + .lock() + .await + .process_signed_task_response(signed_task_response) + .await + .map(|_| Value::Bool(true)) + .map_err(|e| jsonrpc_core::Error::invalid_params(e.to_string())) + } + } + }); + + let socket: SocketAddr = aggregator.lock().await.port_address.parse()?; + let server = ServerBuilder::new(io) + .cors(DomainsValidation::AllowOnly(vec![ + AccessControlAllowOrigin::Any, + ])) + .start_http(&socket)?; + + info!("Server running at {}", socket); + + // Create a close handle before we move the server + let close_handle = server.close_handle(); + + // Use tokio::select! to wait for either the server to finish or the shutdown signal + tokio::select! { + _ = async { server.wait() } => { + info!("Server has stopped"); + } + _ = shutdown => { + info!("Initiating server shutdown"); + close_handle.close(); + } + } + Ok(()) + } + + async fn process_signed_task_response(&mut self, resp: SignedTaskResponse) -> Result<()> { + let task_index = resp.task_response.referenceTaskIndex; + let task_response_digest = keccak256(TaskResponse::abi_encode(&resp.task_response)); + + info!( + "Caching signed task response for task index: {}, task response digest: {}", + task_index, task_response_digest + ); + + self.response_cache.lock().await.push_back(resp); + + Ok(()) + } + + async fn process_cached_responses(aggregator: Arc>) { + let mut interval = interval(Duration::from_secs(6)); + + loop { + interval.tick().await; + + let mut aggregator = aggregator.lock().await; + let responses_to_process = aggregator.response_cache.lock().await.clone(); + + for resp in responses_to_process { + if let Err(e) = aggregator.process_response(resp).await { + error!("Failed to process cached response: {:?}", e); + // Continue processing other responses without failing + } else { + aggregator.response_cache.lock().await.pop_front(); + } + } + } + } + + async fn process_response(&mut self, resp: SignedTaskResponse) -> Result<()> { + let SignedTaskResponse { + task_response, + signature, + operator_id, + } = resp.clone(); + let task_index = task_response.referenceTaskIndex; + let task_response_digest = keccak256(TaskResponse::abi_encode(&task_response)); + + if self + .tasks_responses + .lock() + .await + .entry(task_index) + .or_default() + .contains_key(&task_response_digest) + { + info!( + "Task response digest already processed for task index: {}", + task_index + ); + return Ok(()); + } + + info!( + "Processing signed task response for task index: {}, task response digest: {}", + task_index, task_response_digest + ); + + self.bls_aggregation_service + .as_ref() + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::Other, + "BLS Aggregation Service not initialized", + ) + })? + .lock() + .await + .process_new_signature(task_index, task_response_digest, signature, operator_id) + .await?; + + if let Some(tasks_responses) = self.tasks_responses.lock().await.get_mut(&task_index) { + tasks_responses.insert(task_response_digest, task_response.clone()); + } + + debug!( + "Successfully processed new signature for task index: {}", + task_index + ); + + if let Some(aggregated_response) = self + .bls_aggregation_service + .as_ref() + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::Other, + "BLS Aggregation Service not initialized", + ) + })? + .lock() + .await + .aggregated_response_receiver + .lock() + .await + .recv() + .await + { + self.send_aggregated_response_to_contract(aggregated_response?) + .await?; + } + Ok(()) + } + + // async fn process_signed_task_response(&mut self, resp: SignedTaskResponse) -> Result<()> { + // let SignedTaskResponse { + // task_response, + // signature, + // operator_id, + // } = resp.clone(); + // let task_index = task_response.referenceTaskIndex; + // let task_response_digest = keccak256(TaskResponse::abi_encode(&task_response)); + + // info!( + // "Processing signed task response for task index: {}, task response digest: {}", + // task_index, task_response_digest + // ); + + // if self + // .tasks_responses + // .lock() + // .await + // .entry(task_index) + // .or_default() + // .contains_key(&task_response_digest) + // { + // info!( + // "Task response digest already processed for task index: {}", + // task_index + // ); + // return Ok(()); + // } + + // if let Some(tasks_responses) = self.tasks_responses.lock().await.get_mut(&task_index) { + // tasks_responses.insert(task_response_digest, task_response.clone()); + // } + + // debug!( + // "Inserted task response for task index: {}, {:?}", + // task_index, resp + // ); + + // if let Err(e) = self + // .bls_aggregation_service_in_memory() + // .await? + // .process_new_signature(task_index, task_response_digest, signature, operator_id) + // .await + // { + // error!( + // "Failed to process new signature for task index: {}. Error: {:?}", + // task_index, e + // ); + // } else { + // debug!( + // "Successfully processed new signature for task index: {}", + // task_index + // ); + // } + + // if let Some(aggregated_response) = self + // .bls_aggregation_service_in_memory() + // .await? + // .aggregated_response_receiver + // .lock() + // .await + // .recv() + // .await + // { + // self.send_aggregated_response_to_contract(aggregated_response?) + // .await?; + // } + // Ok(()) + // } + + async fn send_aggregated_response_to_contract( + &self, + response: BlsAggregationServiceResponse, + ) -> Result<()> { + let non_signer_stakes_and_signature = NonSignerStakesAndSignature { + nonSignerPubkeys: response + .non_signers_pub_keys_g1 + .into_iter() + .map(to_g1_point) + .collect(), + nonSignerQuorumBitmapIndices: response.non_signer_quorum_bitmap_indices, + quorumApks: response + .quorum_apks_g1 + .into_iter() + .map(to_g1_point) + .collect(), + apkG2: to_g2_point(response.signers_apk_g2), + sigma: to_g1_point(response.signers_agg_sig_g1.g1_point()), + quorumApkIndices: response.quorum_apk_indices, + totalStakeIndices: response.total_stake_indices, + nonSignerStakeIndices: response.non_signer_stake_indices, + }; + + fn to_g1_point(pk: BlsG1Point) -> G1Point { + let pt = convert_to_g1_point(pk.g1()).expect("Invalid G1 point"); + G1Point { X: pt.X, Y: pt.Y } + } + + fn to_g2_point(pk: BlsG2Point) -> G2Point { + let pt = convert_to_g2_point(pk.g2()).expect("Invalid G2 point"); + G2Point { X: pt.X, Y: pt.Y } + } + + let tasks = self.tasks.lock().await; + let task_responses = self.tasks_responses.lock().await; + let task = tasks.get(&response.task_index).expect("Task not found"); + let task_response = task_responses + .get(&response.task_index) + .and_then(|responses| responses.get(&response.task_response_digest)) + .expect("Task response not found"); + + let provider = get_provider(&self.http_rpc_url); + let task_manager = + IncredibleSquaringTaskManager::new(self.task_manager_address, provider.clone()); + + let _ = task_manager + .respondToTask( + task.clone(), + task_response.clone(), + non_signer_stakes_and_signature, + ) + .from(NetworkWallet::::default_signer_address( + &self.wallet, + )) + .send() + .await? + .get_receipt() + .await?; + + info!( + "Sent aggregated response to contract for task index: {}", + response.task_index, + ); + + Ok(()) + } +} diff --git a/blueprints/incredible-squaring-eigenlayer/src/client.rs b/blueprints/incredible-squaring-eigenlayer/src/contexts/client.rs similarity index 100% rename from blueprints/incredible-squaring-eigenlayer/src/client.rs rename to blueprints/incredible-squaring-eigenlayer/src/contexts/client.rs diff --git a/blueprints/incredible-squaring-eigenlayer/src/contexts/mod.rs b/blueprints/incredible-squaring-eigenlayer/src/contexts/mod.rs new file mode 100644 index 00000000..47404ef0 --- /dev/null +++ b/blueprints/incredible-squaring-eigenlayer/src/contexts/mod.rs @@ -0,0 +1,2 @@ +pub mod aggregator; +pub mod client; diff --git a/blueprints/incredible-squaring-eigenlayer/src/jobs/compute_x_square.rs b/blueprints/incredible-squaring-eigenlayer/src/jobs/compute_x_square.rs new file mode 100644 index 00000000..25536748 --- /dev/null +++ b/blueprints/incredible-squaring-eigenlayer/src/jobs/compute_x_square.rs @@ -0,0 +1,109 @@ +#![allow(dead_code)] +use crate::contexts::client::{AggregatorClient, SignedTaskResponse}; +use crate::{noop, IncredibleSquaringTaskManager, INCREDIBLE_SQUARING_TASK_MANAGER_ABI_STRING}; +use alloy_contract::ContractInstance; +use alloy_network::Ethereum; +use alloy_primitives::keccak256; +use alloy_primitives::{hex, Bytes, FixedBytes, U256}; +use alloy_sol_types::private::alloy_json_abi::JsonAbi; +use alloy_sol_types::SolType; +use ark_bn254::Fq; +use ark_ff::{BigInteger, PrimeField}; +use color_eyre::Result; +use eigensdk::crypto_bls::BlsKeyPair; +use eigensdk::crypto_bls::OperatorId; +use gadget_sdk::{error, info, job}; +use std::{convert::Infallible, ops::Deref, sync::OnceLock}; +use IncredibleSquaringTaskManager::TaskResponse; + +/// Returns x^2 saturating to [`u64::MAX`] if overflow occurs. +#[job( + id = 0, + params(number_to_be_squared, task_created_block, quorum_numbers, quorum_threshold_percentage, task_index), + result(_), + event_listener( + listener = EvmContractEventListener( + instance = IncredibleSquaringTaskManager, + abi = INCREDIBLE_SQUARING_TASK_MANAGER_ABI_STRING, + ), + event = IncredibleSquaringTaskManager::NewTaskCreated, + pre_processor = convert_event_to_inputs, + post_processor = noop, + ), +)] +pub async fn xsquare_eigen( + ctx: AggregatorClient, + number_to_be_squared: U256, + task_created_block: u32, + quorum_numbers: Bytes, + quorum_threshold_percentage: u8, + task_index: u32, +) -> Result { + // Calculate our response to job + let task_response = TaskResponse { + referenceTaskIndex: task_index, + numberSquared: number_to_be_squared.saturating_pow(U256::from(2u32)), + }; + + let bls_key_pair = BlsKeyPair::new( + "1371012690269088913462269866874713266643928125698382731338806296762673180359922" + .to_string(), + ) + .unwrap(); + + let operator_id = alloy_primitives::FixedBytes( + eigensdk::types::operator::operator_id_from_g1_pub_key(bls_key_pair.public_key()).unwrap(), + ); + let operator_id: OperatorId = + hex!("fd329fe7e54f459b9c104064efe0172db113a50b5f394949b4ef80b3c34ca7f5").into(); + + // Sign the Hashed Message and send it to the BLS Aggregator + let msg_hash = keccak256(::abi_encode(&task_response)); + let signed_response = SignedTaskResponse { + task_response, + signature: bls_key_pair.sign_message(msg_hash.as_ref()), + operator_id, + }; + + info!( + "Sending signed task response to BLS Aggregator: {:#?}", + signed_response + ); + if let Err(e) = ctx.send_signed_task_response(signed_response).await { + error!("Failed to send signed task response: {:?}", e); + return Ok(0); + } + + Ok(1) +} + +/// Converts the event to inputs. +/// +/// Uses a tuple to represent the return type because +/// the macro will index all values in the #[job] function +/// and parse the return type by the index. +pub fn convert_event_to_inputs( + event: IncredibleSquaringTaskManager::NewTaskCreated, + _index: u32, +) -> (U256, u32, Bytes, u8, u32) { + let task_index = event.taskIndex; + let number_to_be_squared = event.task.numberToBeSquared; + let task_created_block = event.task.taskCreatedBlock; + let quorum_numbers = event.task.quorumNumbers; + let quorum_threshold_percentage = event.task.quorumThresholdPercentage.try_into().unwrap(); + ( + number_to_be_squared, + task_created_block, + quorum_numbers, + quorum_threshold_percentage, + task_index, + ) +} + +/// Helper for converting a PrimeField to its U256 representation for Ethereum compatibility +/// (U256 reads data as big endian) +pub fn point_to_u256(point: Fq) -> U256 { + let point = point.into_bigint(); + let point_bytes = point.to_bytes_be(); + U256::from_be_slice(&point_bytes[..]) +} diff --git a/blueprints/incredible-squaring-eigenlayer/src/jobs/initialize_task.rs b/blueprints/incredible-squaring-eigenlayer/src/jobs/initialize_task.rs new file mode 100644 index 00000000..4113fd10 --- /dev/null +++ b/blueprints/incredible-squaring-eigenlayer/src/jobs/initialize_task.rs @@ -0,0 +1,72 @@ +use crate::{ + contexts::aggregator::AggregatorContext, noop, IncredibleSquaringTaskManager, + INCREDIBLE_SQUARING_TASK_MANAGER_ABI_STRING, +}; +use alloy_contract::ContractInstance; +use alloy_json_abi::JsonAbi; +use alloy_network::Ethereum; +use alloy_primitives::FixedBytes; +use gadget_sdk::{info, job}; +use std::{convert::Infallible, ops::Deref, sync::OnceLock}; +use IncredibleSquaringTaskManager::Task; + +const TASK_CHALLENGE_WINDOW_BLOCK: u32 = 100; +const BLOCK_TIME_SECONDS: u32 = 12; + +/// Initializes the task for the aggregator server +#[job( + id = 1, + params(task, task_index), + result(_), + event_listener( + listener = EvmContractEventListener( + instance = IncredibleSquaringTaskManager, + abi = INCREDIBLE_SQUARING_TASK_MANAGER_ABI_STRING, + ), + event = IncredibleSquaringTaskManager::NewTaskCreated, + pre_processor = convert_event_to_inputs, + post_processor = noop, + ), +)] +pub async fn initialize_bls_task( + ctx: AggregatorContext, + task: Task, + task_index: u32, +) -> Result { + info!("Initializing task for BLS aggregation"); + + let mut tasks = ctx.tasks.lock().await; + tasks.insert(task_index, task.clone()); + let time_to_expiry = + std::time::Duration::from_secs((TASK_CHALLENGE_WINDOW_BLOCK * BLOCK_TIME_SECONDS).into()); + + if let Some(service) = &ctx.bls_aggregation_service { + service + .lock() + .await + .initialize_new_task( + task_index, + task.taskCreatedBlock, + task.quorumNumbers.to_vec(), + vec![task.quorumThresholdPercentage.try_into().unwrap(); task.quorumNumbers.len()], + time_to_expiry, + ) + .await + .unwrap() + } + + Ok(1) +} + +/// Converts the event to inputs. +/// +/// Uses a tuple to represent the return type because +/// the macro will index all values in the #[job] function +/// and parse the return type by the index. +pub fn convert_event_to_inputs( + event: IncredibleSquaringTaskManager::NewTaskCreated, + _index: u32, +) -> (Task, u32) { + let task_index = event.taskIndex; + (event.task, task_index) +} diff --git a/blueprints/incredible-squaring-eigenlayer/src/jobs/mod.rs b/blueprints/incredible-squaring-eigenlayer/src/jobs/mod.rs new file mode 100644 index 00000000..160a7738 --- /dev/null +++ b/blueprints/incredible-squaring-eigenlayer/src/jobs/mod.rs @@ -0,0 +1,2 @@ +pub mod compute_x_square; +pub mod initialize_task; diff --git a/blueprints/incredible-squaring-eigenlayer/src/lib.rs b/blueprints/incredible-squaring-eigenlayer/src/lib.rs index c7acb684..eb9cbfb5 100644 --- a/blueprints/incredible-squaring-eigenlayer/src/lib.rs +++ b/blueprints/incredible-squaring-eigenlayer/src/lib.rs @@ -1,34 +1,13 @@ #![allow(dead_code)] -use alloy_contract::ContractInstance; -use alloy_network::Ethereum; -use alloy_network::EthereumWallet; -use alloy_primitives::keccak256; -use alloy_primitives::{hex, Bytes, FixedBytes, U256}; -use alloy_provider::fillers::WalletFiller; -use alloy_provider::fillers::{ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller}; -use alloy_provider::Identity; -use alloy_provider::RootProvider; -use alloy_sol_types::SolType; -use alloy_sol_types::{private::alloy_json_abi::JsonAbi, sol}; -use alloy_transport_http::{Client, Http}; -use ark_bn254::Fq; -use ark_ff::{BigInteger, PrimeField}; -use client::AggregatorClient; -use client::SignedTaskResponse; -use color_eyre::Result; -use eigensdk::crypto_bls::BlsKeyPair; -use eigensdk::crypto_bls::OperatorId; +use alloy_sol_types::sol; use gadget_sdk::load_abi; -use gadget_sdk::{events_watcher::evm::Config, job}; use serde::{Deserialize, Serialize}; -use std::{convert::Infallible, ops::Deref, sync::OnceLock}; -use IncredibleSquaringTaskManager::TaskResponse; -pub mod client; pub mod constants; +pub mod contexts; +pub mod jobs; pub mod runner; -// Codegen from ABI file to interact with the contract. sol!( #[allow(missing_docs)] #[sol(rpc)] @@ -42,109 +21,6 @@ load_abi!( "contracts/out/IncredibleSquaringTaskManager.sol/IncredibleSquaringTaskManager.json" ); -#[derive(Debug, Clone)] -pub struct NodeConfig {} - -impl Config for NodeConfig { - type TH = Http; - type PH = FillProvider< - JoinFill< - JoinFill, NonceFiller>, ChainIdFiller>, - WalletFiller, - >, - RootProvider>, - Http, - Ethereum, - >; -} - pub fn noop(_: u32) { // This function intentionally does nothing } - -/// Returns x^2 saturating to [`u64::MAX`] if overflow occurs. -#[job( - id = 1, - params(number_to_be_squared, task_created_block, quorum_numbers, quorum_threshold_percentage, task_index), - result(_), - event_listener( - listener = EvmContractEventListener( - instance = IncredibleSquaringTaskManager, - abi = INCREDIBLE_SQUARING_TASK_MANAGER_ABI_STRING, - ), - event = IncredibleSquaringTaskManager::NewTaskCreated, - pre_processor = convert_event_to_inputs, - post_processor = noop, - ), -)] -pub async fn xsquare_eigen( - number_to_be_squared: U256, - task_created_block: u32, - quorum_numbers: Bytes, - quorum_threshold_percentage: u8, - task_index: u32, -) -> Result { - // Calculate our response to job - let task_response = TaskResponse { - referenceTaskIndex: task_index, - numberSquared: number_to_be_squared.saturating_pow(U256::from(2u32)), - }; - - let bls_key_pair = BlsKeyPair::new( - "1371012690269088913462269866874713266643928125698382731338806296762673180359922" - .to_string(), - ) - .unwrap(); - - let operator_id = alloy_primitives::FixedBytes( - eigensdk::types::operator::operator_id_from_g1_pub_key(bls_key_pair.public_key()).unwrap(), - ); - let operator_id: OperatorId = - hex!("fd329fe7e54f459b9c104064efe0172db113a50b5f394949b4ef80b3c34ca7f5").into(); - - // Sign the Hashed Message and send it to the BLS Aggregator - let msg_hash = keccak256(::abi_encode(&task_response)); - let signed_response = SignedTaskResponse { - task_response, - signature: bls_key_pair.sign_message(msg_hash.as_ref()), - operator_id, - }; - - let client = AggregatorClient::new("127.0.0.1:8081").unwrap(); - if let Err(e) = client.send_signed_task_response(signed_response).await { - tracing::error!("Failed to send signed task response: {:?}", e); - return Ok(0); - } - - Ok(1) -} - -/// Converts the event to inputs. -/// -/// Uses a tuple to represent the return type because -/// the macro will index all values in the #[job] function -/// and parse the return type by the index. -pub fn convert_event_to_inputs( - event: IncredibleSquaringTaskManager::NewTaskCreated, - index: u32, -) -> (U256, u32, Bytes, u8, u32) { - let number_to_be_squared = event.task.numberToBeSquared; - let task_created_block = event.task.taskCreatedBlock; - let quorum_numbers = event.task.quorumNumbers; - let quorum_threshold_percentage = event.task.quorumThresholdPercentage.try_into().unwrap(); - ( - number_to_be_squared, - task_created_block, - quorum_numbers, - quorum_threshold_percentage, - index, - ) -} - -/// Helper for converting a PrimeField to its U256 representation for Ethereum compatibility -/// (U256 reads data as big endian) -pub fn point_to_u256(point: Fq) -> U256 { - let point = point.into_bigint(); - let point_bytes = point.to_bytes_be(); - U256::from_be_slice(&point_bytes[..]) -} diff --git a/blueprints/incredible-squaring-eigenlayer/src/main.rs b/blueprints/incredible-squaring-eigenlayer/src/main.rs index 3cdd447e..845bf653 100644 --- a/blueprints/incredible-squaring-eigenlayer/src/main.rs +++ b/blueprints/incredible-squaring-eigenlayer/src/main.rs @@ -1,29 +1,8 @@ use color_eyre::Result; -use gadget_sdk::config::ContextConfig; -use gadget_sdk::run::GadgetRunner; -use incredible_squaring_blueprint_eigenlayer::{self, *}; -use runner::EigenlayerGadgetRunner; -use structopt::StructOpt; -use tracing::info; +use incredible_squaring_blueprint_eigenlayer::runner::execute_runner; #[tokio::main] #[allow(clippy::needless_return)] async fn main() -> Result<()> { - gadget_sdk::logging::setup_log(); - let config = ContextConfig::from_args(); - let env = gadget_sdk::config::load(config).expect("Failed to load environment"); - let mut runner = Box::new(EigenlayerGadgetRunner::new(env.clone()).await); - - info!("~~~ Executing the incredible squaring blueprint ~~~"); - - info!("Registering..."); - if env.should_run_registration() { - runner.register().await?; - } - - info!("Running..."); - runner.run().await?; - - info!("Exiting..."); - Ok(()) + execute_runner().await } diff --git a/blueprints/incredible-squaring-eigenlayer/src/runner.rs b/blueprints/incredible-squaring-eigenlayer/src/runner.rs index 8c3565cc..718b7da2 100644 --- a/blueprints/incredible-squaring-eigenlayer/src/runner.rs +++ b/blueprints/incredible-squaring-eigenlayer/src/runner.rs @@ -1,11 +1,15 @@ use crate::{ constants::{ - AVS_DIRECTORY_ADDRESS, DELEGATION_MANAGER_ADDRESS, EIGENLAYER_HTTP_ENDPOINT, - OPERATOR_ADDRESS, OPERATOR_METADATA_URL, OPERATOR_STATE_RETRIEVER_ADDRESS, PRIVATE_KEY, - REGISTRY_COORDINATOR_ADDRESS, SIGNATURE_EXPIRY, STRATEGY_MANAGER_ADDRESS, - TASK_MANAGER_ADDRESS, + AGGREGATOR_PRIVATE_KEY, AVS_DIRECTORY_ADDRESS, DELEGATION_MANAGER_ADDRESS, + EIGENLAYER_HTTP_ENDPOINT, OPERATOR_ADDRESS, OPERATOR_METADATA_URL, + OPERATOR_STATE_RETRIEVER_ADDRESS, PRIVATE_KEY, REGISTRY_COORDINATOR_ADDRESS, + SIGNATURE_EXPIRY, STRATEGY_MANAGER_ADDRESS, TASK_MANAGER_ADDRESS, }, - IncredibleSquaringTaskManager, NodeConfig, XsquareEigenEventHandler, + contexts::{aggregator::AggregatorContext, client::AggregatorClient}, + jobs::{ + compute_x_square::XsquareEigenEventHandler, initialize_task::InitializeBlsTaskEventHandler, + }, + IncredibleSquaringTaskManager, }; use alloy_network::EthereumWallet; use alloy_primitives::{Bytes, FixedBytes, U256}; @@ -18,10 +22,14 @@ use eigensdk::client_elcontracts::writer::ELChainWriter; use eigensdk::crypto_bls::BlsKeyPair; use eigensdk::logging::get_test_logger; use eigensdk::types::operator::Operator; -use gadget_sdk::config::GadgetConfiguration; use gadget_sdk::events_watcher::InitializableEventHandler; use gadget_sdk::info; use gadget_sdk::run::GadgetRunner; +use gadget_sdk::structopt::StructOpt; +use gadget_sdk::{ + config::{ContextConfig, GadgetConfiguration}, + events_watcher::evm::DefaultNodeConfig, +}; pub struct EigenlayerGadgetRunner { pub env: GadgetConfiguration, } @@ -134,12 +142,14 @@ impl GadgetRunner for EigenlayerGadgetRunner { async fn run(&mut self) -> Result<()> { // Get the ECDSA key from the private key seed using alloy - let signer: PrivateKeySigner = PRIVATE_KEY.parse().expect("failed to generate wallet "); + let signer: PrivateKeySigner = AGGREGATOR_PRIVATE_KEY + .parse() + .expect("failed to generate wallet "); let wallet = EthereumWallet::from(signer); let provider = ProviderBuilder::new() .with_recommended_fillers() - .wallet(wallet) + .wallet(wallet.clone()) .on_http(EIGENLAYER_HTTP_ENDPOINT.parse()?); let contract = IncredibleSquaringTaskManager::IncredibleSquaringTaskManagerInstance::new( @@ -147,20 +157,74 @@ impl GadgetRunner for EigenlayerGadgetRunner { provider, ); - let x_square_eigen = XsquareEigenEventHandler:: { - contract: contract.into(), + let aggregator_client = AggregatorClient::new(&format!("{}:{}", self.env.bind_addr, 8081))?; + let x_square_eigen = XsquareEigenEventHandler:: { + ctx: aggregator_client, + contract: contract.clone().into(), + }; + + let aggregator_context = AggregatorContext::new( + format!("{}:{}", self.env.bind_addr, 8081), + *TASK_MANAGER_ADDRESS, + self.env.http_rpc_endpoint.clone(), + wallet, + self.env.clone(), + ) + .await + .unwrap(); + + let initialize_task = InitializeBlsTaskEventHandler:: { + ctx: aggregator_context.clone(), + contract: contract.clone().into(), }; - info!("Contract address: {:?}", *TASK_MANAGER_ADDRESS); + let (handle, aggregator_shutdown_tx) = + aggregator_context.start(self.env.ws_rpc_endpoint.clone()); + info!("Starting the Incredible Squaring task initializer"); + let init_task_finished = initialize_task + .init_event_handler() + .await + .expect("Event Listener init already called"); info!("Starting the Incredible Squaring event handler"); - let finished = x_square_eigen + let x_square_finished = x_square_eigen .init_event_handler() .await .expect("Event Listener init already called"); - info!("Event handler started..."); - let res = finished.await; - info!("Event handler finished with {res:?}"); + tokio::select! { + _ = init_task_finished => { + info!("Initialize task finished"); + let _ = aggregator_shutdown_tx.send(()).unwrap(); + handle.abort(); + info!("Aggregator shutdown signal sent after init task finished"); + }, + _ = x_square_finished => { + info!("X square task finished"); + let _ = aggregator_shutdown_tx.send(()).unwrap(); + handle.abort(); + info!("Aggregator shutdown signal sent after x square task finished"); + }, + }; Ok(()) } } + +pub async fn execute_runner() -> Result<()> { + gadget_sdk::logging::setup_log(); + let config = ContextConfig::from_args(); + let env = gadget_sdk::config::load(config).expect("Failed to load environment"); + let mut runner = Box::new(EigenlayerGadgetRunner::new(env.clone()).await); + + info!("~~~ Executing the incredible squaring blueprint ~~~"); + + info!("Registering..."); + if env.should_run_registration() { + runner.register().await?; + } + + info!("Running..."); + runner.run().await?; + + info!("Exiting..."); + Ok(()) +} diff --git a/blueprints/incredible-squaring/contracts/lib/forge-std b/blueprints/incredible-squaring/contracts/lib/forge-std index 1de6eecf..035de35f 160000 --- a/blueprints/incredible-squaring/contracts/lib/forge-std +++ b/blueprints/incredible-squaring/contracts/lib/forge-std @@ -1 +1 @@ -Subproject commit 1de6eecf821de7fe2c908cc48d3ab3dced20717f +Subproject commit 035de35f5e366c8d6ed142aec4ccb57fe2dd87d4 diff --git a/cli/src/deploy.rs b/cli/src/deploy.rs index f51d2804..8594376d 100644 --- a/cli/src/deploy.rs +++ b/cli/src/deploy.rs @@ -19,8 +19,10 @@ pub type TanglePairSigner = PairSigner; pub struct Opts { /// The name of the package to deploy (if the workspace has multiple packages) pub pkg_name: Option, - /// The RPC URL of the Tangle Network - pub rpc_url: String, + /// The HTTP RPC URL of the Tangle Network + pub http_rpc_url: String, + /// The WS RPC URL of the Tangle Network + pub ws_rpc_url: String, /// The path to the manifest file pub manifest_path: std::path::PathBuf, /// The signer for deploying the blueprint @@ -33,7 +35,8 @@ impl Debug for Opts { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Opts") .field("pkg_name", &self.pkg_name) - .field("rpc_url", &self.rpc_url) + .field("http_rpc_url", &self.http_rpc_url) + .field("ws_rpc_url", &self.ws_rpc_url) .field("manifest_path", &self.manifest_path) .finish() } @@ -42,7 +45,7 @@ impl Debug for Opts { pub async fn generate_service_blueprint, T: AsRef>( manifest_metadata_path: P, pkg_name: Option<&String>, - rpc_url: T, + http_rpc_url: T, signer_evm: Option, ) -> Result { let manifest_path = manifest_metadata_path.into(); @@ -57,7 +60,13 @@ pub async fn generate_service_blueprint, T: AsRef>( let mut blueprint = tokio::task::spawn_blocking(move || load_blueprint_metadata(&package)).await??; build_contracts_if_needed(package_clone, &blueprint).context("Building contracts")?; - deploy_contracts_to_tangle(rpc_url.as_ref(), package_clone, &mut blueprint, signer_evm).await?; + deploy_contracts_to_tangle( + http_rpc_url.as_ref(), + package_clone, + &mut blueprint, + signer_evm, + ) + .await?; bake_blueprint(blueprint) } @@ -65,7 +74,8 @@ pub async fn generate_service_blueprint, T: AsRef>( pub async fn deploy_to_tangle( Opts { pkg_name, - rpc_url, + http_rpc_url, + ws_rpc_url, manifest_path, signer, signer_evm, @@ -73,7 +83,8 @@ pub async fn deploy_to_tangle( ) -> Result { // Load the manifest file into cargo metadata let blueprint = - generate_service_blueprint(&manifest_path, pkg_name.as_ref(), &rpc_url, signer_evm).await?; + generate_service_blueprint(&manifest_path, pkg_name.as_ref(), &http_rpc_url, signer_evm) + .await?; let signer = if let Some(signer) = signer { signer @@ -82,7 +93,7 @@ pub async fn deploy_to_tangle( }; let my_account_id = signer.account_id(); - let client = subxt::OnlineClient::from_url(rpc_url).await?; + let client = subxt::OnlineClient::from_url(ws_rpc_url).await?; let create_blueprint_tx = TangleApi::tx().services().create_blueprint(blueprint); @@ -138,7 +149,7 @@ pub fn load_blueprint_metadata( } async fn deploy_contracts_to_tangle( - rpc_url: &str, + http_rpc_url: &str, package: &cargo_metadata::Package, blueprint: &mut ServiceBlueprint<'_>, signer_evm: Option, @@ -146,7 +157,6 @@ async fn deploy_contracts_to_tangle( enum ContractKind { Manager, } - let rpc_url = rpc_url.replace("ws", "http").replace("wss", "https"); let contract_paths = match blueprint.manager { BlueprintManager::Evm(ref path) => vec![(ContractKind::Manager, path)], _ => { @@ -191,7 +201,7 @@ async fn deploy_contracts_to_tangle( let provider = alloy_provider::ProviderBuilder::new() .with_recommended_fillers() .wallet(wallet) - .on_http(rpc_url.parse()?); + .on_http(http_rpc_url.parse()?); let chain_id = provider.get_chain_id().await?; eprintln!("Chain ID: {chain_id}"); diff --git a/cli/src/main.rs b/cli/src/main.rs index 9b3561ae..d5221415 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -47,9 +47,12 @@ pub enum GadgetCommands { /// Deploy a blueprint to the Tangle Network. #[command(visible_alias = "d")] Deploy { + /// HTTP RPC URL to use + #[arg(long, value_name = "URL", default_value = "https://rpc.tangle.tools")] + http_rpc_url: String, /// Tangle RPC URL to use #[arg(long, value_name = "URL", default_value = "wss://rpc.tangle.tools")] - rpc_url: String, + ws_rpc_url: String, /// The package to deploy (if the workspace has multiple packages). #[arg(short, long, value_name = "PACKAGE")] package: Option, @@ -99,13 +102,18 @@ async fn main() -> color_eyre::Result<()> { GadgetCommands::Create { name, source } => { create::new_blueprint(name, source); } - GadgetCommands::Deploy { rpc_url, package } => { + GadgetCommands::Deploy { + http_rpc_url, + ws_rpc_url, + package, + } => { let manifest_path = cli .manifest .manifest_path .unwrap_or_else(|| PathBuf::from("Cargo.toml")); let _ = deploy::deploy_to_tangle(deploy::Opts { - rpc_url, + http_rpc_url, + ws_rpc_url, manifest_path, pkg_name: package, signer: None, diff --git a/gadget-io/src/imp/standard/shell.rs b/gadget-io/src/imp/standard/shell.rs index 52efec1e..4dc7af8d 100644 --- a/gadget-io/src/imp/standard/shell.rs +++ b/gadget-io/src/imp/standard/shell.rs @@ -36,10 +36,14 @@ pub struct GadgetConfig { #[structopt(long = "port", short = "p", default_value = defaults::BIND_PORT)] #[serde(default = "defaults::bind_port")] pub bind_port: u16, - /// The RPC URL of the Tangle Node. - #[structopt(long = "url", parse(try_from_str = url::Url::parse), default_value = defaults::RPC_URL)] - #[serde(default = "defaults::rpc_url")] - pub url: url::Url, + /// The HTTP RPC URL of the Tangle Node. + #[structopt(long = "url", parse(try_from_str = url::Url::parse), default_value = defaults::HTTP_RPC_URL)] + #[serde(default = "defaults::http_rpc_url")] + pub http_rpc_url: url::Url, + /// The WS RPC URL of the Tangle Node. + #[structopt(long = "url", parse(try_from_str = url::Url::parse), default_value = defaults::WS_RPC_URL)] + #[serde(default = "defaults::ws_rpc_url")] + pub ws_rpc_url: url::Url, /// The List of bootnodes to connect to #[structopt(long = "bootnodes", parse(try_from_str = ::from_str))] #[serde(default)] @@ -74,10 +78,15 @@ pub struct GadgetConfig { pub mod defaults { pub const BIND_PORT: &str = "30555"; pub const BIND_IP: &str = "0.0.0.0"; - pub const RPC_URL: &str = "ws://127.0.0.1:9944"; + pub const HTTP_RPC_URL: &str = "http://127.0.0.1:9944"; + pub const WS_RPC_URL: &str = "ws://127.0.0.1:9944"; - pub fn rpc_url() -> url::Url { - url::Url::parse(RPC_URL).expect("Default RPC URL is valid") + pub fn http_rpc_url() -> url::Url { + url::Url::parse(HTTP_RPC_URL).expect("Default RPC URL is valid") + } + + pub fn ws_rpc_url() -> url::Url { + url::Url::parse(WS_RPC_URL).expect("Default RPC URL is valid") } pub fn bind_ip() -> std::net::IpAddr { diff --git a/macros/context-derive/src/eigenlayer.rs b/macros/context-derive/src/eigenlayer.rs index f031fe1d..ba9a8093 100644 --- a/macros/context-derive/src/eigenlayer.rs +++ b/macros/context-derive/src/eigenlayer.rs @@ -17,84 +17,92 @@ pub fn generate_context_impl( FieldInfo::Unnamed(index) => quote! { self.#index }, }; - let mut generics = generics.clone(); - generics.params.push(syn::parse_quote!(NodeConfig)); let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); quote! { - impl #impl_generics gadget_sdk::ctx::EigenlayerContext for #name #ty_generics #where_clause { - type Config = NodeConfig; + use alloy_provider::Provider; - fn eigenlayer_provider(&self) -> impl core::future::Future, alloy_transport::TransportError>> { - async { - let http_endpoint = #field_access.eigenlayer_http_endpoint.clone(); - let provider = alloy_provider::ProviderBuilder::new() - .with_recommended_fillers() - .on_http(http_endpoint.parse().unwrap()) - .root() - .clone() - .boxed(); - Ok(provider) - } + #[async_trait::async_trait] + impl #impl_generics gadget_sdk::ctx::EigenlayerContext for #name #ty_generics #where_clause { + async fn avs_registry_reader(&self) -> Result { + let http_rpc_endpoint = #field_access.http_rpc_endpoint.clone(); + let contract_addrs = #field_access.eigenlayer_contract_addrs; + let registry_coordinator_addr = contract_addrs.registry_coordinator_addr; + let operator_state_retriever_addr = contract_addrs.operator_state_retriever_addr; + eigensdk::client_avsregistry::reader::AvsRegistryChainReader::new( + eigensdk::logging::get_test_logger(), + registry_coordinator_addr, + operator_state_retriever_addr, + http_rpc_endpoint, + ).await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) } - fn eigenlayer_avs_registry_reader(&self) -> impl core::future::Future> { - async { - let http_endpoint = #field_access.eigenlayer_http_endpoint.clone(); - let registry_coordinator_addr = #field_access.eigenlayer_registry_coordinator_addr; - let operator_state_retriever_addr = #field_access.eigenlayer_operator_state_retriever_addr; + async fn avs_registry_writer(&self, private_key: String) -> Result { + let http_rpc_endpoint = #field_access.http_rpc_endpoint.clone(); + let contract_addrs = #field_access.eigenlayer_contract_addrs; + let registry_coordinator_addr = contract_addrs.registry_coordinator_addr; + let operator_state_retriever_addr = contract_addrs.operator_state_retriever_addr; - Ok(eigensdk::client_avsregistry::reader::AvsRegistryChainReader::new( - eigensdk::logging::get_test_logger(), - registry_coordinator_addr, - operator_state_retriever_addr, - http_endpoint, - ).await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?) - } + eigensdk::client_avsregistry::writer::AvsRegistryChainWriter::build_avs_registry_chain_writer( + eigensdk::logging::get_test_logger(), + http_rpc_endpoint, + private_key, + registry_coordinator_addr, + operator_state_retriever_addr, + ).await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) } - fn eigenlayer_avs_registry_writer(&self) -> impl core::future::Future> { - async { - let http_endpoint = #field_access.eigenlayer_http_endpoint.clone(); - let private_key = #field_access.eigenlayer_private_key.clone(); - let registry_coordinator_addr = #field_access.eigenlayer_registry_coordinator_addr; - let operator_state_retriever_addr = #field_access.eigenlayer_operator_state_retriever_addr; + async fn operator_info_service_in_memory(&self) -> Result { + let avs_registry_reader = self.avs_registry_reader().await?; + let ws_endpoint = #field_access.ws_rpc_endpoint.clone(); - Ok(eigensdk::client_avsregistry::writer::AvsRegistryChainWriter::build_avs_registry_chain_writer( - eigensdk::logging::get_test_logger(), - http_endpoint, - private_key, - registry_coordinator_addr, - operator_state_retriever_addr, - ).await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?) - } + Ok(eigensdk::services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory::new( + eigensdk::logging::get_test_logger(), + avs_registry_reader, + ws_endpoint, + ).await) } - fn eigenlayer_operator_info_service(&self) -> impl core::future::Future> { - async { - let avs_registry_reader = self.eigenlayer_avs_registry_reader().await?; - let ws_endpoint = #field_access.eigenlayer_ws_endpoint.clone(); + async fn avs_registry_service_chain_caller_in_memory(&self) -> Result< + eigensdk::services_avsregistry::chaincaller::AvsRegistryServiceChainCaller< + eigensdk::client_avsregistry::reader::AvsRegistryChainReader, + eigensdk::services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory + >, std::io::Error> { + let http_rpc_endpoint = #field_access.http_rpc_endpoint.clone(); + let avs_registry_reader = self.avs_registry_reader().await?; + let operator_info_service = self.operator_info_service_in_memory().await?; - Ok(eigensdk::services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory::new( - eigensdk::logging::get_test_logger(), - avs_registry_reader, - ws_endpoint, - ).await) - } - } + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let token_clone = cancellation_token.clone(); + let provider = alloy_provider::ProviderBuilder::new() + .with_recommended_fillers() + .on_http(http_rpc_endpoint.parse().unwrap()) + .root() + .clone() + .boxed(); + let current_block = provider.get_block_number() + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + let operator_info_clone = operator_info_service.clone(); - fn eigenlayer_bls_aggregation_service(&self) -> impl core::future::Future> { - async { - let avs_registry_reader = self.eigenlayer_avs_registry_reader().await?; - let operator_info_service = self.eigenlayer_operator_info_service().await?; + tokio::task::spawn(async move { + operator_info_clone.start_service(&token_clone, 0, current_block).await + }); - let avs_registry_service = eigensdk::services_avsregistry::chaincaller::AvsRegistryServiceChainCaller::new( - avs_registry_reader, - operator_info_service, - ); + Ok(eigensdk::services_avsregistry::chaincaller::AvsRegistryServiceChainCaller::new( + avs_registry_reader, + operator_info_service, + )) + } - Ok(eigensdk::services_blsaggregation::bls_agg::BlsAggregatorService::new(avs_registry_service)) - } + async fn bls_aggregation_service_in_memory(&self) -> Result + >, std::io::Error> { + let avs_registry_service = self.avs_registry_service_chain_caller_in_memory().await?; + Ok(eigensdk::services_blsaggregation::bls_agg::BlsAggregatorService::new(avs_registry_service)) } } } diff --git a/macros/context-derive/src/evm.rs b/macros/context-derive/src/evm.rs index 8384c7e9..59eb8082 100644 --- a/macros/context-derive/src/evm.rs +++ b/macros/context-derive/src/evm.rs @@ -41,7 +41,7 @@ pub fn generate_context_impl( match PROVIDER.get() { Some(provider) => Ok(provider.clone()), None => { - let rpc_url = #field_access.rpc_endpoint.as_str(); + let rpc_url = #field_access.http_rpc_endpoint.as_str(); let provider = alloy_provider::ProviderBuilder::new() .with_recommended_fillers() .on_builtin(rpc_url) diff --git a/macros/context-derive/src/subxt.rs b/macros/context-derive/src/subxt.rs index 637f8a00..1c5a5dd3 100644 --- a/macros/context-derive/src/subxt.rs +++ b/macros/context-derive/src/subxt.rs @@ -31,7 +31,7 @@ pub fn generate_context_impl( match CLIENT.get() { Some(client) => Ok(client.clone()), None => { - let rpc_url = #field_access.rpc_endpoint.as_str(); + let rpc_url = #field_access.http_rpc_endpoint.as_str(); let client = subxt::OnlineClient::from_url(rpc_url).await?; CLIENT.set(client.clone()).map(|_| client).map_err(|_| { subxt::Error::Io(std::io::Error::new( diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 0aeb5111..488461ae 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,6 +1,6 @@ [toolchain] #channel = "stable" -channel = "nightly-2024-10-13" +channel = "nightly" components = ["rustfmt", "clippy", "rust-src"] targets = ["wasm32-unknown-unknown"] profile = "minimal" diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 087183c6..bf08c0f5 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -79,6 +79,7 @@ alloy-provider = { workspace = true, optional = true } alloy-rpc-types = { workspace = true } alloy-sol-types = { workspace = true } alloy-transport = { workspace = true } +alloy-transport-http = { workspace = true } alloy-signer = { workspace = true } alloy-signer-local = { workspace = true } tokio-retry = { workspace = true } diff --git a/sdk/src/config.rs b/sdk/src/config.rs index 80fdd1d1..cb74c108 100644 --- a/sdk/src/config.rs +++ b/sdk/src/config.rs @@ -4,6 +4,7 @@ use crate::keystore::BackendExt; #[cfg(any(feature = "std", feature = "wasm"))] use crate::keystore::TanglePairSigner; use alloc::string::{String, ToString}; +use alloy_primitives::Address; use core::fmt::Debug; use core::net::IpAddr; use eigensdk::crypto_bls; @@ -74,8 +75,10 @@ pub type StdGadgetConfiguration = GadgetConfiguration; /// Gadget environment. #[non_exhaustive] pub struct GadgetConfiguration { - /// Tangle RPC endpoint. - pub rpc_endpoint: String, + /// Tangle HTTP RPC endpoint. + pub http_rpc_endpoint: String, + /// Tangle WS RPC endpoint. + pub ws_rpc_endpoint: String, /// Keystore URI /// /// * In Memory: `file::memory:` or `:memory:` @@ -94,7 +97,6 @@ pub struct GadgetConfiguration { /// This is only set to `None` when the gadget is in the registration mode. /// Always check for is `is_registration` flag before using this. pub service_id: Option, - /// The Current Environment is for the `PreRegisteration` of the Gadget /// /// The gadget will now start in the Registration mode and will try to register the current operator on that blueprint @@ -111,13 +113,47 @@ pub struct GadgetConfiguration { pub span: tracing::Span, /// Whether the gadget is in test mode pub test_mode: bool, + /// Basic Eigenlayer contract system + pub eigenlayer_contract_addrs: EigenlayerContractAddresses, _lock: core::marker::PhantomData, } +#[derive(Debug, Copy, Clone, Serialize, Deserialize)] +pub struct EigenlayerContractAddresses { + pub registry_coordinator_addr: Address, + pub operator_state_retriever_addr: Address, + pub delegation_manager_addr: Address, + pub strategy_manager_addr: Address, +} + +impl Default for EigenlayerContractAddresses { + fn default() -> Self { + EigenlayerContractAddresses { + registry_coordinator_addr: std::env::var("REGISTRY_COORDINATOR_ADDR") + .unwrap_or_default() + .parse() + .unwrap(), + operator_state_retriever_addr: std::env::var("OPERATOR_STATE_RETRIEVER_ADDR") + .unwrap_or_default() + .parse() + .unwrap(), + delegation_manager_addr: std::env::var("DELEGATION_MANAGER_ADDR") + .unwrap_or_default() + .parse() + .unwrap(), + strategy_manager_addr: std::env::var("STRATEGY_MANAGER_ADDR") + .unwrap_or_default() + .parse() + .unwrap(), + } + } +} + impl Debug for GadgetConfiguration { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("GadgetConfiguration") - .field("rpc_endpoint", &self.rpc_endpoint) + .field("http_rpc_endpoint", &self.http_rpc_endpoint) + .field("ws_rpc_endpoint", &self.ws_rpc_endpoint) .field("keystore_uri", &self.keystore_uri) .field("data_dir", &self.data_dir) .field("bootnodes", &self.bootnodes) @@ -128,6 +164,7 @@ impl Debug for GadgetConfiguration { .field("bind_port", &self.bind_port) .field("bind_addr", &self.bind_addr) .field("test_mode", &self.test_mode) + .field("eigenlayer_contract_addrs", &self.eigenlayer_contract_addrs) .finish() } } @@ -135,12 +172,14 @@ impl Debug for GadgetConfiguration { impl Clone for GadgetConfiguration { fn clone(&self) -> Self { Self { - rpc_endpoint: self.rpc_endpoint.clone(), + http_rpc_endpoint: self.http_rpc_endpoint.clone(), + ws_rpc_endpoint: self.ws_rpc_endpoint.clone(), keystore_uri: self.keystore_uri.clone(), data_dir: self.data_dir.clone(), bootnodes: self.bootnodes.clone(), blueprint_id: self.blueprint_id, service_id: self.service_id, + eigenlayer_contract_addrs: self.eigenlayer_contract_addrs, is_registration: self.is_registration, protocol: self.protocol, bind_port: self.bind_port, @@ -156,12 +195,14 @@ impl Clone for GadgetConfiguration { impl Default for GadgetConfiguration { fn default() -> Self { Self { - rpc_endpoint: "http://localhost:9944".to_string(), + http_rpc_endpoint: "http://localhost:9944".to_string(), + ws_rpc_endpoint: "ws://localhost:9944".to_string(), keystore_uri: "file::memory:".to_string(), data_dir: None, bootnodes: Vec::new(), blueprint_id: 0, service_id: Some(0), + eigenlayer_contract_addrs: Default::default(), is_registration: false, protocol: Protocol::Tangle, bind_port: 0, @@ -223,6 +264,9 @@ pub enum Error { /// Missing `KEYSTORE_URI` environment #[error("Missing keystore URI")] TestSetup(String), + /// Missing `EigenlayerContractAddresses` + #[error("Missing EigenlayerContractAddresses")] + MissingEigenlayerContractAddresses, } #[derive(Debug, Clone, StructOpt, Serialize, Deserialize)] @@ -248,8 +292,11 @@ pub enum GadgetCLICoreSettings { #[structopt(long, short = "l", env)] log_id: Option, #[structopt(long, short = "u", parse(try_from_str = url::Url::parse), env)] - #[serde(default = "gadget_io::defaults::rpc_url")] - url: Url, + #[serde(default = "gadget_io::defaults::http_rpc_url")] + http_rpc_url: Url, + #[structopt(long, short = "ws", parse(try_from_str = url::Url::parse), env)] + #[serde(default = "gadget_io::defaults::ws_rpc_url")] + ws_rpc_url: Url, #[structopt(long, parse(try_from_str = ::from_str), env)] #[serde(default)] bootnodes: Option>, @@ -299,7 +346,8 @@ pub fn load(config: ContextConfig) -> Result( config: ContextConfig, ) -> Result, Error> { @@ -318,7 +366,8 @@ fn load_inner( bind_port, test_mode, log_id, - url, + http_rpc_url, + ws_rpc_url, bootnodes, keystore_uri, blueprint_id, @@ -339,7 +388,8 @@ fn load_inner( bind_port, test_mode, span, - rpc_endpoint: url.to_string(), + http_rpc_endpoint: http_rpc_url.to_string(), + ws_rpc_endpoint: ws_rpc_url.to_string(), keystore_uri, data_dir: std::env::var("DATA_DIR").ok().map(PathBuf::from), bootnodes: bootnodes.unwrap_or_default(), @@ -352,6 +402,7 @@ fn load_inner( }, is_registration, protocol, + eigenlayer_contract_addrs: Default::default(), _lock: core::marker::PhantomData, }) } @@ -443,7 +494,7 @@ impl GadgetConfiguration { pub async fn client(&self) -> Result { let client = subxt::OnlineClient::::from_url( - self.rpc_endpoint.clone(), + self.http_rpc_endpoint.clone(), ) .await?; Ok(client) diff --git a/sdk/src/ctx.rs b/sdk/src/ctx.rs index 0904acdb..cea4985e 100644 --- a/sdk/src/ctx.rs +++ b/sdk/src/ctx.rs @@ -35,6 +35,12 @@ use core::future::Future; use crate::keystore::backend::GenericKeyStore; +use eigensdk::{ + client_avsregistry::{reader::AvsRegistryChainReader, writer::AvsRegistryChainWriter}, + services_avsregistry::chaincaller::AvsRegistryServiceChainCaller, + services_blsaggregation::bls_agg::BlsAggregatorService, + services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory, +}; // derives pub use gadget_context_derive::*; use tangle_subxt::tangle_testnet_runtime::api::runtime_types::{ @@ -96,3 +102,39 @@ pub trait ServicesContext { client: &subxt::OnlineClient, ) -> impl Future, subxt::Error>>; } + +/// `EigenlayerContext` trait provides access to Eigenlayer utilities +#[async_trait::async_trait] +pub trait EigenlayerContext { + /// Provides a reader for the AVS registry. + async fn avs_registry_reader(&self) -> Result; + + /// Provides a writer for the AVS registry. + async fn avs_registry_writer( + &self, + private_key: String, + ) -> Result; + + /// Provides an operator info service. + async fn operator_info_service_in_memory( + &self, + ) -> Result; + + /// Provides an AVS registry service chain caller. + async fn avs_registry_service_chain_caller_in_memory( + &self, + ) -> Result< + AvsRegistryServiceChainCaller, + std::io::Error, + >; + + /// Provides a BLS aggregation service. + async fn bls_aggregation_service_in_memory( + &self, + ) -> Result< + BlsAggregatorService< + AvsRegistryServiceChainCaller, + >, + std::io::Error, + >; +} diff --git a/sdk/src/events_watcher/evm.rs b/sdk/src/events_watcher/evm.rs index 81b5fef5..c3b3ee16 100644 --- a/sdk/src/events_watcher/evm.rs +++ b/sdk/src/events_watcher/evm.rs @@ -1,11 +1,15 @@ //! EVM Event Watcher Module use crate::events_watcher::error::Error; -use alloy_network::Ethereum; +use alloy_network::{Ethereum, EthereumWallet}; use alloy_primitives::FixedBytes; -use alloy_provider::Provider; +use alloy_provider::{ + fillers::{ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, WalletFiller}, + Identity, Provider, RootProvider, +}; use alloy_sol_types::SolEvent; use alloy_transport::Transport; +use alloy_transport_http::{Client, Http}; use std::ops::Deref; pub trait Config: Send + Sync + Clone + 'static { @@ -13,6 +17,22 @@ pub trait Config: Send + Sync + Clone + 'static { type PH: Provider + Clone + Send + Sync; } +#[derive(Debug, Copy, Clone)] +pub struct DefaultNodeConfig {} + +impl Config for DefaultNodeConfig { + type TH = Http; + type PH = FillProvider< + JoinFill< + JoinFill, NonceFiller>, ChainIdFiller>, + WalletFiller, + >, + RootProvider>, + Http, + Ethereum, + >; +} + pub trait EvmContract: Deref> + Send