From 9eb13db941947aac5c7519876210f82ac9e4e81d Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Wed, 11 Sep 2024 17:35:53 +0200 Subject: [PATCH] test(ethexe): multiple validators test (#4211) --- Cargo.lock | 1 + core/src/message/mod.rs | 2 +- ethexe/cli/Cargo.toml | 4 +- ethexe/cli/src/tests.rs | 1776 +++++++++++++++++------------ ethexe/ethereum/src/router/mod.rs | 1 + ethexe/network/src/lib.rs | 1 - 6 files changed, 1043 insertions(+), 742 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b362ebb0dcb..ac18442b9a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5099,6 +5099,7 @@ dependencies = [ "alloy", "anyhow", "clap 4.5.9", + "demo-async", "demo-ping", "derive_more 0.99.18", "directories", diff --git a/core/src/message/mod.rs b/core/src/message/mod.rs index 6883abd79aa..dd1285f8760 100644 --- a/core/src/message/mod.rs +++ b/core/src/message/mod.rs @@ -32,7 +32,7 @@ pub use common::{Dispatch, Message, MessageDetails, ReplyDetails, SignalDetails} pub use context::{ ContextOutcome, ContextOutcomeDrain, ContextSettings, ContextStore, MessageContext, }; -pub use gear_core_errors::ReplyCode; +pub use gear_core_errors::{ErrorReplyReason, ReplyCode, SuccessReplyReason}; pub use handle::{HandleMessage, HandlePacket}; pub use incoming::{IncomingDispatch, IncomingMessage}; pub use init::{InitMessage, InitPacket}; diff --git a/ethexe/cli/Cargo.toml b/ethexe/cli/Cargo.toml index 1f562d6da44..a6894ac1bc9 100644 --- a/ethexe/cli/Cargo.toml +++ b/ethexe/cli/Cargo.toml @@ -66,7 +66,9 @@ alloy = { workspace = true, features = [ "rpc-types-beacon", "signer-local", ] } -demo-ping = { workspace = true, features = ["debug"] } ntest = "0.9.3" gear-core.workspace = true gear-utils.workspace = true + +demo-ping = { workspace = true, features = ["debug", "ethexe"] } +demo-async = { workspace = true, features = ["debug", "ethexe"] } diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 06b792a8b40..18a5b3e22a8 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -24,7 +24,7 @@ use alloy::{ providers::{ext::AnvilApi, Provider}, rpc::types::anvil::MineOptions, }; -use anyhow::{anyhow, Result}; +use anyhow::Result; use ethexe_common::{ db::CodesStorage, mirror::Event as MirrorEvent, router::Event as RouterEvent, BlockEvent, }; @@ -35,460 +35,105 @@ use ethexe_processor::Processor; use ethexe_sequencer::Sequencer; use ethexe_signer::Signer; use ethexe_validator::Validator; -use futures::StreamExt; -use gear_core::ids::prelude::*; +use gear_core::{ + ids::prelude::*, + message::{ReplyCode, SuccessReplyReason}, +}; use gprimitives::{ActorId, CodeId, MessageId, H160, H256}; +use parity_scale_codec::Encode; use std::{sync::Arc, time::Duration}; use tokio::{ - sync::{ - mpsc::{self, Receiver}, - oneshot, - }, + sync::oneshot, task::{self, JoinHandle}, }; - -struct Listener { - receiver: Receiver, - _handle: JoinHandle<()>, -} - -impl Listener { - pub async fn new(mut observer: Observer) -> Self { - let (sender, receiver) = mpsc::channel::(1024); - - let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); - let _handle = task::spawn(async move { - let observer_events = observer.events_all(); - futures::pin_mut!(observer_events); - - send_subscription_created.send(()).unwrap(); - - while let Some(event) = observer_events.next().await { - sender.send(event).await.unwrap(); - } - }); - receive_subscription_created.await.unwrap(); - - Self { receiver, _handle } - } - - pub async fn next_event(&mut self) -> Result { - self.receiver - .recv() - .await - .ok_or_else(|| anyhow!("No more events")) - } - - pub async fn apply_until( - &mut self, - mut f: impl FnMut(Event) -> Result>, - ) -> Result { - loop { - let event = self.next_event().await?; - if let Some(res) = f(event)? { - return Ok(res); - } - } - } - - pub async fn apply_until_block_event( - &mut self, - mut f: impl FnMut(BlockEvent) -> Result>, - ) -> Result { - loop { - let event = self.next_event().await?; - - let Event::Block(block) = event else { - continue; - }; - - for event in block.events { - if let Some(res) = f(event)? { - return Ok(res); - } - } - } - } -} - -struct TestEnvConfig { - rpc_url: String, - router_address: Option, - blob_reader: Option>, - validator_private_key: Option, - block_time: Duration, -} - -impl Default for TestEnvConfig { - fn default() -> Self { - Self { - rpc_url: "ws://localhost:8545".to_string(), - router_address: None, - blob_reader: None, - validator_private_key: None, - block_time: Duration::from_secs(1), - } - } -} - -impl TestEnvConfig { - pub fn rpc_url(mut self, rpc_url: String) -> Self { - self.rpc_url = rpc_url.to_string(); - self - } -} - -struct TestEnv { - db: Database, - blob_reader: Arc, - observer: Observer, - ethereum: Ethereum, - query: Query, - router_query: RouterQuery, - signer: Signer, - rpc_url: String, - sequencer_public_key: ethexe_signer::PublicKey, - validator_private_key: ethexe_signer::PrivateKey, - validator_public_key: ethexe_signer::PublicKey, - router_address: ethexe_signer::Address, - sender_address: ActorId, - block_time: Duration, - running_service_handle: Option>>, -} - -impl TestEnv { - async fn new(config: TestEnvConfig) -> Result { - let TestEnvConfig { - rpc_url, - router_address, - blob_reader, - validator_private_key, - block_time, - } = config; - - let db = Database::from_one(&MemDb::default(), router_address.unwrap_or_default().0); - - let tempdir = tempfile::tempdir()?.into_path(); - let signer = Signer::new(tempdir.join("key"))?; - let sender_public_key = signer.add_key( - // Anvil account (0) with balance - "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse()?, - )?; - let (validator_private_key, validator_public_key) = match validator_private_key { - Some(key) => (key, signer.add_key(key).unwrap()), - None => { - let pub_key = signer.generate_key()?; - (signer.get_private_key(pub_key).unwrap(), pub_key) - } - }; - let sequencer_public_key = signer.add_key( - // Anvil account (1) with balance - "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d".parse()?, - )?; - - let sender_address = sender_public_key.to_address(); - let ethereum = if let Some(router_address) = router_address { - Ethereum::new(&rpc_url, router_address, signer.clone(), sender_address).await? - } else { - let validators = vec![validator_public_key.to_address()]; - Ethereum::deploy(&rpc_url, validators, signer.clone(), sender_address).await? - }; - - let router = ethereum.router(); - let router_query = router.query(); - - let genesis_block_hash = router_query.genesis_block_hash().await?; - - let blob_reader = blob_reader.unwrap_or_else(|| Arc::new(MockBlobReader::new(block_time))); - - let router_address = router.address(); - - let query = Query::new( - Arc::new(db.clone()), - &rpc_url, - router_address, - genesis_block_hash, - blob_reader.clone(), - 10000, - ) - .await?; - - let observer = Observer::new(&rpc_url, router_address, blob_reader.clone()) - .await - .expect("failed to create observer"); - - let env = TestEnv { - db, - query, - blob_reader, - observer, - ethereum, - router_query, - signer, - rpc_url, - sequencer_public_key, - validator_private_key, - validator_public_key, - router_address, - sender_address: ActorId::from(H160::from(sender_address.0)), - block_time, - running_service_handle: None, - }; - - Ok(env) - } - - pub fn start_anvil() -> AnvilInstance { - let anvil = Anvil::new().try_spawn().unwrap(); - log::info!("📍 Anvil started at {}", anvil.ws_endpoint()); - anvil - } - - pub async fn new_listener(&self) -> Listener { - Listener::new(self.observer.clone()).await - } - - pub async fn start_service(&mut self) -> Result<()> { - if self.running_service_handle.is_some() { - return Err(anyhow!("Service is already running")); - } - - let processor = Processor::new(self.db.clone())?; - - let sequencer = Sequencer::new( - ðexe_sequencer::Config { - ethereum_rpc: self.rpc_url.clone(), - sign_tx_public: self.sequencer_public_key, - router_address: self.router_address, - validators: vec![self.validator_public_key.to_address()], - threshold: 1, - }, - self.signer.clone(), - ) - .await?; - - let validator = Validator::new( - ðexe_validator::Config { - pub_key: self.validator_public_key, - router_address: self.router_address, - }, - self.signer.clone(), - ); - - let service = Service::new_from_parts( - self.db.clone(), - self.observer.clone(), - self.query.clone(), - processor, - self.signer.clone(), - self.block_time, - None, - Some(sequencer), - Some(validator), - None, - None, - ); - - let handle = task::spawn(service.run()); - self.running_service_handle = Some(handle); - - // Sleep to wait for the new service to start - // TODO: find a better way to wait for the service to start #4099 - tokio::time::sleep(Duration::from_secs(1)).await; - - Ok(()) - } - - pub async fn stop_service(&mut self) -> Result<()> { - if let Some(handle) = self.running_service_handle.take() { - handle.abort(); - let _ = handle.await; - Ok(()) - } else { - Err(anyhow!("Service is not running")) - } - } - - pub async fn upload_code(&self, code: &[u8]) -> Result<(H256, CodeId)> { - let code_id = CodeId::generate(code); - let blob_tx = H256::random(); - - self.blob_reader - .add_blob_transaction(blob_tx, code.to_vec()) - .await; - let tx_hash = self - .ethereum - .router() - .request_code_validation(code_id, blob_tx) - .await?; - - Ok((tx_hash, code_id)) - } -} - -impl Drop for TestEnv { - fn drop(&mut self) { - if let Some(handle) = self.running_service_handle.take() { - handle.abort(); - } - } -} +use utils::{NodeConfig, TestEnv, TestEnvConfig}; #[tokio::test(flavor = "multi_thread")] #[ntest::timeout(60_000)] async fn ping() { gear_utils::init_default_logger(); - let anvil = TestEnv::start_anvil(); + let mut env = TestEnv::new(Default::default()).await.unwrap(); + + let sequencer_public_key = env.wallets.next(); + let mut node = env.new_node( + NodeConfig::default() + .sequencer(sequencer_public_key) + .validator(env.validators[0]), + ); + node.start_service().await; - let mut env = TestEnv::new(TestEnvConfig::default().rpc_url(anvil.ws_endpoint())) + let res = env + .upload_code(demo_ping::WASM_BINARY) .await - .unwrap(); - let mut listener = env.new_listener().await; - - env.start_service().await.unwrap(); - - let (_, code_id) = env.upload_code(demo_ping::WASM_BINARY).await.unwrap(); - - log::info!("📗 Waiting for code loaded"); - listener - .apply_until(|event| match event { - Event::CodeLoaded { - code_id: loaded_id, - code, - } => { - assert_eq!(code_id, loaded_id); - assert_eq!(&code, demo_ping::WASM_BINARY); - Ok(Some(())) - } - _ => Ok(None), - }) + .unwrap() + .wait_for() .await .unwrap(); + assert_eq!(res.code, demo_ping::WASM_BINARY); + assert!(res.valid); - log::info!("📗 Waiting for code to get validated"); - listener - .apply_until_block_event(|event| { - if let BlockEvent::Router(RouterEvent::CodeGotValidated { - id: loaded_id, - valid, - }) = event - { - assert_eq!(code_id, loaded_id); - assert!(valid); - Ok(Some(())) - } else { - Ok(None) - } - }) - .await - .unwrap(); + let code_id = res.code_id; - let code = env + let code = node .db .original_code(code_id) .expect("After approval, the code is guaranteed to be in the database"); - let _ = env + assert_eq!(code, demo_ping::WASM_BINARY); + + let _ = node .db .instrumented_code(1, code_id) .expect("After approval, instrumented code is guaranteed to be in the database"); - assert_eq!(code, demo_ping::WASM_BINARY); - let _ = env - .ethereum - .router() - .create_program(code_id, H256::random(), b"PING", 0) + let res = env + .create_program(code_id, b"PING", 0) .await - .unwrap(); - - log::info!("📗 Waiting for program create, PONG reply and program update"); - - let mut program_id = ActorId::default(); - let mut init_message_id = MessageId::default(); - let mut reply_sent = false; - let mut block_committed = None; - listener - .apply_until_block_event(|event| { - match event { - BlockEvent::Router(RouterEvent::ProgramCreated { - actor_id, - code_id: loaded_id, - }) => { - assert_eq!(code_id, loaded_id); - program_id = actor_id; - } - BlockEvent::Mirror { address, event } => { - if address == program_id { - match event { - MirrorEvent::MessageQueueingRequested { - id, - source, - payload, - value, - } => { - assert_eq!(source, env.sender_address); - assert_eq!(payload, b"PING"); - assert_eq!(value, 0); - init_message_id = id; - } - MirrorEvent::Reply { - payload, reply_to, .. - } => { - assert_eq!(payload, b"PONG"); - assert_eq!(reply_to, init_message_id); - reply_sent = true; - } - MirrorEvent::StateChanged { .. } => { - assert!(reply_sent); - } - _ => {} - } - } - } - BlockEvent::Router(RouterEvent::BlockCommitted { block_hash }) => { - block_committed = Some(block_hash); - return Ok(Some(())); - } - _ => {} - } - Ok(None) - }) + .unwrap() + .wait_for() .await .unwrap(); + assert_eq!(res.code_id, code_id); + assert_eq!(res.init_message_source, env.sender_id); + assert_eq!(res.init_message_payload, b"PING"); + assert_eq!(res.init_message_value, 0); + assert_eq!(res.reply_payload, b"PONG"); + assert_eq!(res.reply_value, 0); + assert_eq!( + res.reply_code, + ReplyCode::Success(SuccessReplyReason::Manual) + ); - let block_committed_on_router = env.router_query.last_commitment_block_hash().await.unwrap(); - assert_eq!(block_committed, Some(block_committed_on_router)); - - let program_address = ethexe_signer::Address::try_from(program_id).unwrap(); - - let wvara = env.ethereum.router().wvara(); - - log::info!("📗 Approving WVara to mirror"); - wvara.approve_all(program_address.0.into()).await.unwrap(); - - let ping_program = env.ethereum.mirror(program_address); + let ping_id = res.program_id; - log::info!("📗 Sending PING message"); - let _tx = ping_program.send_message(b"PING", 0).await.unwrap(); + env.approve_wvara(ping_id).await; - log::info!("📗 Waiting for PONG reply"); - listener - .apply_until_block_event(|event| match event { - BlockEvent::Mirror { address, event } => { - if address == program_id { - if let MirrorEvent::Reply { payload, value, .. } = event { - assert_eq!(payload, b"PONG"); - assert_eq!(value, 0); - return Ok(Some(())); - } - } + let res = env + .send_message(ping_id, b"PING", 0) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert_eq!(res.program_id, ping_id); + assert_eq!( + res.reply_code, + ReplyCode::Success(SuccessReplyReason::Manual) + ); + assert_eq!(res.reply_payload, b"PONG"); + assert_eq!(res.reply_value, 0); - Ok(None) - } - _ => Ok(None), - }) + let res = env + .send_message(ping_id, b"PUNK", 0) + .await + .unwrap() + .wait_for() .await .unwrap(); + assert_eq!(res.program_id, ping_id); + assert_eq!(res.reply_code, ReplyCode::Success(SuccessReplyReason::Auto)); + assert_eq!(res.reply_payload, b""); + assert_eq!(res.reply_value, 0); } #[tokio::test(flavor = "multi_thread")] @@ -496,69 +141,33 @@ async fn ping() { async fn ping_reorg() { gear_utils::init_default_logger(); - let mut _anvil = None; - let rpc_url = if let Ok(lol) = std::env::var("LOL") { - lol - } else { - let a = TestEnv::start_anvil(); - let url = a.ws_endpoint(); - _anvil = Some(a); - url - }; - - let mut env = TestEnv::new(TestEnvConfig::default().rpc_url(rpc_url.clone())) - .await - .unwrap(); - let mut listener = env.new_listener().await; + let mut env = TestEnv::new(Default::default()).await.unwrap(); - env.start_service().await.unwrap(); + let sequencer_pub_key = env.wallets.next(); + let mut node = env.new_node( + NodeConfig::default() + .sequencer(sequencer_pub_key) + .validator(env.validators[0]), + ); + node.start_service().await; let provider = env.observer.provider().clone(); - log::info!("📗 upload code"); - let (_, code_id) = env.upload_code(demo_ping::WASM_BINARY).await.unwrap(); - - log::info!("📗 Waiting for code loaded"); - listener - .apply_until(|event| match event { - Event::CodeLoaded { - code_id: loaded_id, - code, - } => { - assert_eq!(code_id, loaded_id); - assert_eq!(&code, demo_ping::WASM_BINARY); - Ok(Some(())) - } - _ => Ok(None), - }) + let res = env + .upload_code(demo_ping::WASM_BINARY) .await - .unwrap(); - - log::info!("📗 Waiting for code approval"); - listener - .apply_until_block_event(|event| match event { - BlockEvent::Router(RouterEvent::CodeGotValidated { - id: validated_code_id, - valid, - }) => { - assert_eq!(code_id, validated_code_id); - assert!(valid); - Ok(Some(())) - } - _ => Ok(None), - }) + .unwrap() + .wait_for() .await .unwrap(); + assert!(res.valid); + + let code_id = res.code_id; log::info!("📗 Abort service to simulate node blocks skipping"); - env.stop_service().await.unwrap(); + node.stop_service().await; - let _ = env - .ethereum - .router() - .create_program(code_id, H256::random(), b"PING", 0) - .await - .unwrap(); + let create_program = env.create_program(code_id, b"PING", 0).await.unwrap(); // Mine some blocks to check missed blocks support provider @@ -570,68 +179,18 @@ async fn ping_reorg() { .unwrap(); // Start new service - env.start_service().await.unwrap(); + node.start_service().await; // IMPORTANT: Mine one block to sent block event to the new service. provider.evm_mine(None).await.unwrap(); - log::info!("📗 Waiting for program creation"); - let mut program_id = ActorId::default(); - let mut init_message_id = MessageId::default(); - let mut reply_sent = false; - listener - .apply_until_block_event(|event| { - match event { - BlockEvent::Router(RouterEvent::ProgramCreated { - actor_id, - code_id: loaded_id, - }) => { - assert_eq!(code_id, loaded_id); - program_id = actor_id; - } - BlockEvent::Mirror { address, event } => { - if address == program_id { - match event { - MirrorEvent::MessageQueueingRequested { - id, - source, - payload, - value, - } => { - assert_eq!(source, env.sender_address); - assert_eq!(payload, b"PING"); - assert_eq!(value, 0); - init_message_id = id; - } - MirrorEvent::Reply { - payload, reply_to, .. - } => { - assert_eq!(payload, b"PONG"); - assert_eq!(reply_to, init_message_id); - reply_sent = true; - } - MirrorEvent::StateChanged { .. } => { - assert!(reply_sent); - } - _ => {} - } - } - } - BlockEvent::Router(RouterEvent::BlockCommitted { .. }) => return Ok(Some(())), - _ => {} - }; - - Ok(None) - }) - .await - .unwrap(); - - let program_address = ethexe_signer::Address::try_from(program_id).unwrap(); + let res = create_program.wait_for().await.unwrap(); + assert_eq!(res.code_id, code_id); + assert_eq!(res.reply_payload, b"PONG"); - let wvara = env.ethereum.router().wvara(); + let ping_id = res.program_id; - log::info!("📗 Approving WVara to mirror"); - wvara.approve_all(program_address.0.into()).await.unwrap(); + env.approve_wvara(ping_id).await; log::info!( "📗 Create snapshot for block: {}, where ping program is already created", @@ -639,71 +198,43 @@ async fn ping_reorg() { ); let program_created_snapshot_id = provider.anvil_snapshot().await.unwrap(); - let ping_program = env.ethereum.mirror(program_address); - - log::info!("📗 Sending PING message"); - let _tx = ping_program.send_message(b"PING", 0).await.unwrap(); - - log::info!("📗 Waiting for PONG reply"); - listener - .apply_until_block_event(|event| match event { - BlockEvent::Mirror { address, event } => { - if address == program_id { - if let MirrorEvent::Reply { payload, value, .. } = event { - assert_eq!(payload, b"PONG"); - assert_eq!(value, 0); - return Ok(Some(())); - } - } - - Ok(None) - } - _ => Ok(None), - }) + let res = env + .send_message(ping_id, b"PING", 0) + .await + .unwrap() + .wait_for() .await .unwrap(); + assert_eq!(res.program_id, ping_id); + assert_eq!(res.reply_payload, b"PONG"); // Await for service block with user reply handling // TODO: this is for better logs reading only, should find a better solution #4099 tokio::time::sleep(env.block_time).await; - log::info!("📗 Reverting to the program creation snapshot"); + log::info!("📗 Test after reverting to the program creation snapshot"); provider .anvil_revert(program_created_snapshot_id) .await .map(|res| assert!(res)) .unwrap(); - log::info!("📗 Sending PING message after reorg"); - let _tx = ping_program.send_message(b"PING", 0).await.unwrap(); - - log::info!("📗 Waiting for PONG reply after reorg"); - listener - .apply_until_block_event(|event| match event { - BlockEvent::Mirror { address, event } => { - if address == program_id { - if let MirrorEvent::Reply { payload, value, .. } = event { - assert_eq!(payload, b"PONG"); - assert_eq!(value, 0); - return Ok(Some(())); - } - } - - Ok(None) - } - _ => Ok(None), - }) + let res = env + .send_message(ping_id, b"PING", 0) + .await + .unwrap() + .wait_for() .await .unwrap(); + assert_eq!(res.program_id, ping_id); + assert_eq!(res.reply_payload, b"PONG"); // The last step is to test correctness after db cleanup - let router_address = env.router_address; - let blob_reader = env.blob_reader.clone(); - let validator_private_key = env.validator_private_key; - drop(env); + node.stop_service().await; + node.db = Database::from_one(&MemDb::default(), env.router_address.0); - log::info!("📗 Sending PING message, db cleanup and service shutting down"); - let _tx = ping_program.send_message(b"PING", 0).await.unwrap(); + log::info!("📗 Test after db cleanup and service shutting down"); + let send_message = env.send_message(ping_id, b"PING", 0).await.unwrap(); // Skip some blocks to simulate long time without service provider @@ -714,44 +245,18 @@ async fn ping_reorg() { .await .unwrap(); - let mut env = TestEnv::new(TestEnvConfig { - rpc_url, - router_address: Some(router_address), - blob_reader: Some(blob_reader), - validator_private_key: Some(validator_private_key), - ..Default::default() - }) - .await - .unwrap(); - env.start_service().await.unwrap(); + node.start_service().await; // Important: mine one block to sent block event to the new service. provider.evm_mine(None).await.unwrap(); - log::info!("📗 Waiting for PONG reply service restart on empty db"); - listener - .apply_until_block_event(|event| match event { - BlockEvent::Mirror { address, event } => { - if address == program_id { - if let MirrorEvent::Reply { payload, value, .. } = event { - assert_eq!(payload, b"PONG"); - assert_eq!(value, 0); - return Ok(Some(())); - } - } - - Ok(None) - } - _ => Ok(None), - }) - .await - .unwrap(); + let res = send_message.wait_for().await.unwrap(); + assert_eq!(res.program_id, ping_id); + assert_eq!(res.reply_payload, b"PONG"); // Await for service block with user reply handling // TODO: this is for better logs reading only, should find a better solution #4099 tokio::time::sleep(Duration::from_secs(1)).await; - - log::info!("📗 Done"); } // Mine 150 blocks - send message - mine 150 blocks. @@ -761,125 +266,48 @@ async fn ping_reorg() { async fn ping_deep_sync() { gear_utils::init_default_logger(); - let anvil = TestEnv::start_anvil(); - - let mut env = TestEnv::new(TestEnvConfig::default().rpc_url(anvil.ws_endpoint())) - .await - .unwrap(); - let mut listener = env.new_listener().await; + let mut env = TestEnv::new(Default::default()).await.unwrap(); - env.start_service().await.unwrap(); + let sequencer_pub_key = env.wallets.next(); + let mut node = env.new_node( + NodeConfig::default() + .sequencer(sequencer_pub_key) + .validator(env.validators[0]), + ); + node.start_service().await; let provider = env.observer.provider().clone(); - let (_, code_id) = env.upload_code(demo_ping::WASM_BINARY).await.unwrap(); - - log::info!("📗 Waiting for code loaded"); - listener - .apply_until(|event| match event { - Event::CodeLoaded { - code_id: loaded_id, - code, - } => { - assert_eq!(code_id, loaded_id); - assert_eq!(&code, demo_ping::WASM_BINARY); - Ok(Some(())) - } - _ => Ok(None), - }) + let res = env + .upload_code(demo_ping::WASM_BINARY) .await - .unwrap(); - - log::info!("📗 Waiting for code approval"); - listener - .apply_until_block_event(|event| match event { - BlockEvent::Router(RouterEvent::CodeGotValidated { - id: validated_code_id, - valid, - }) => { - assert_eq!(code_id, validated_code_id); - assert!(valid); - Ok(Some(())) - } - _ => Ok(None), - }) + .unwrap() + .wait_for() .await .unwrap(); + assert_eq!(res.code.as_slice(), demo_ping::WASM_BINARY); + assert!(res.valid); - let code = env - .db - .original_code(code_id) - .expect("After approval, the code is guaranteed to be in the database"); - let _ = env - .db - .instrumented_code(1, code_id) - .expect("After approval, instrumented code is guaranteed to be in the database"); - assert_eq!(code, demo_ping::WASM_BINARY); + let code_id = res.code_id; - let _ = env - .ethereum - .router() - .create_program(code_id, H256::random(), b"PING", 0) + let res = env + .create_program(code_id, b"PING", 0) .await - .unwrap(); - - log::info!("📗 Waiting for program create, PONG reply and program update"); - let mut program_id = ActorId::default(); - let mut init_message_id = MessageId::default(); - let mut reply_sent = false; - let mut block_committed = None; - listener - .apply_until_block_event(|event| { - match event { - BlockEvent::Router(RouterEvent::ProgramCreated { - actor_id, - code_id: loaded_id, - }) => { - assert_eq!(code_id, loaded_id); - program_id = actor_id; - } - BlockEvent::Mirror { address, event } => { - if address == program_id { - match event { - MirrorEvent::MessageQueueingRequested { - id, - source, - payload, - value, - .. - } => { - assert_eq!(source, env.sender_address); - assert_eq!(payload, b"PING"); - assert_eq!(value, 0); - init_message_id = id; - } - MirrorEvent::Reply { - payload, reply_to, .. - } => { - assert_eq!(payload, b"PONG"); - assert_eq!(reply_to, init_message_id); - reply_sent = true; - } - MirrorEvent::StateChanged { .. } => { - assert!(reply_sent); - } - _ => {} - } - } - } - BlockEvent::Router(RouterEvent::BlockCommitted { block_hash }) => { - block_committed = Some(block_hash); - return Ok(Some(())); - } - _ => {} - } - Ok(None) - }) + .unwrap() + .wait_for() .await .unwrap(); + assert_eq!(res.code_id, code_id); + assert_eq!(res.init_message_payload, b"PING"); + assert_eq!(res.init_message_value, 0); + assert_eq!(res.reply_payload, b"PONG"); + assert_eq!(res.reply_value, 0); + assert_eq!( + res.reply_code, + ReplyCode::Success(SuccessReplyReason::Manual) + ); - let block_committed_on_router = env.router_query.last_commitment_block_hash().await.unwrap(); - assert_eq!(block_committed, Some(block_committed_on_router)); + let ping_id = res.program_id; // Mine some blocks to check deep sync. provider @@ -890,18 +318,9 @@ async fn ping_deep_sync() { .await .unwrap(); - // Send message in between. - let program_address = ethexe_signer::Address::try_from(program_id).unwrap(); - - let wvara = env.ethereum.router().wvara(); - - log::info!("📗 Approving WVara to mirror"); - wvara.approve_all(program_address.0.into()).await.unwrap(); + env.approve_wvara(ping_id).await; - let ping_program = env.ethereum.mirror(program_address); - - log::info!("📗 Sending PING message"); - let _tx = ping_program.send_message(b"PING", 0).await.unwrap(); + let send_message = env.send_message(ping_id, b"PING", 0).await.unwrap(); // Mine some blocks to check deep sync. provider @@ -912,22 +331,901 @@ async fn ping_deep_sync() { .await .unwrap(); - log::info!("📗 Waiting for PONG reply"); - listener - .apply_until_block_event(|event| match event { - BlockEvent::Mirror { address, event } => { - if address == program_id { - if let MirrorEvent::Reply { payload, value, .. } = event { - assert_eq!(payload, b"PONG"); - assert_eq!(value, 0); - return Ok(Some(())); + let res = send_message.wait_for().await.unwrap(); + assert_eq!(res.program_id, ping_id); + assert_eq!(res.reply_payload, b"PONG"); + assert_eq!(res.reply_value, 0); + assert_eq!( + res.reply_code, + ReplyCode::Success(SuccessReplyReason::Manual) + ); +} + +#[tokio::test(flavor = "multi_thread")] +#[ntest::timeout(60_000)] +async fn multiple_validators() { + gear_utils::init_default_logger(); + + let mut env = TestEnv::new(TestEnvConfig::default().validators_amount(3)) + .await + .unwrap(); + + log::info!("📗 Starting sequencer"); + let sequencer_pub_key = env.wallets.next(); + let mut sequencer = env.new_node( + NodeConfig::default() + .sequencer(sequencer_pub_key) + .network(None, None), + ); + sequencer.start_service().await; + + log::info!("📗 Starting validator 0"); + let mut validator0 = env.new_node( + NodeConfig::default() + .validator(env.validators[0]) + .network(None, sequencer.multiaddr.clone()), + ); + validator0.start_service().await; + + log::info!("📗 Starting validator 1"); + let mut validator1 = env.new_node( + NodeConfig::default() + .validator(env.validators[1]) + .network(None, sequencer.multiaddr.clone()), + ); + validator1.start_service().await; + + log::info!("📗 Starting validator 2"); + let mut validator2 = env.new_node( + NodeConfig::default() + .validator(env.validators[2]) + .network(None, sequencer.multiaddr.clone()), + ); + validator2.start_service().await; + + let res = env + .upload_code(demo_ping::WASM_BINARY) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert_eq!(res.code, demo_ping::WASM_BINARY); + assert!(res.valid); + + let ping_code_id = res.code_id; + + let res = env + .create_program(ping_code_id, b"", 0) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert_eq!(res.code_id, ping_code_id); + assert_eq!(res.init_message_payload, b""); + assert_eq!(res.init_message_value, 0); + assert_eq!(res.reply_payload, b""); + assert_eq!(res.reply_value, 0); + assert_eq!(res.reply_code, ReplyCode::Success(SuccessReplyReason::Auto)); + + let ping_id = res.program_id; + + let res = env + .upload_code(demo_async::WASM_BINARY) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert_eq!(res.code, demo_async::WASM_BINARY); + assert!(res.valid); + + let async_code_id = res.code_id; + + let res = env + .create_program(async_code_id, ping_id.encode().as_slice(), 0) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert_eq!(res.code_id, async_code_id); + assert_eq!(res.init_message_payload, ping_id.encode().as_slice()); + assert_eq!(res.init_message_value, 0); + assert_eq!(res.reply_payload, b""); + assert_eq!(res.reply_value, 0); + assert_eq!(res.reply_code, ReplyCode::Success(SuccessReplyReason::Auto)); + + let async_id = res.program_id; + + env.approve_wvara(ping_id).await; + env.approve_wvara(async_id).await; + + let res = env + .send_message(async_id, demo_async::Command::Common.encode().as_slice(), 0) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert_eq!(res.program_id, async_id); + assert_eq!(res.reply_payload, res.message_id.encode().as_slice()); + assert_eq!(res.reply_value, 0); + assert_eq!( + res.reply_code, + ReplyCode::Success(SuccessReplyReason::Manual) + ); + + log::info!("📗 Stop validator 2 and check that all is still working"); + validator2.stop_service().await; + let res = env + .send_message(async_id, demo_async::Command::Common.encode().as_slice(), 0) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert_eq!(res.reply_payload, res.message_id.encode().as_slice()); + + log::info!("📗 Stop validator 1 and check that it's not working"); + validator1.stop_service().await; + + let wait_for_reply_to = env + .send_message(async_id, demo_async::Command::Common.encode().as_slice(), 0) + .await + .unwrap(); + + let _ = tokio::time::timeout(env.block_time * 3, wait_for_reply_to.clone().wait_for()) + .await + .expect_err("Timeout expected"); + + log::info!("📗 Start validator 2 and check that now is working, validator 1 is still stopped."); + // TODO: impossible to restart validator 2 with the same network address, need to fix it #4210 + let mut validator2 = env.new_node( + NodeConfig::default() + .validator(env.validators[2]) + .network(None, sequencer.multiaddr.clone()), + ); + validator2.start_service().await; + + // IMPORTANT: mine one block to sent a new block event. + env.observer.provider().evm_mine(None).await.unwrap(); + + let res = wait_for_reply_to.wait_for().await.unwrap(); + assert_eq!(res.reply_payload, res.message_id.encode().as_slice()); +} + +mod utils { + use super::*; + use futures::StreamExt; + use gear_core::message::ReplyCode; + use tokio::sync::{broadcast::Sender, Mutex}; + + pub struct TestEnv { + pub rpc_url: String, + pub wallets: AnvilWallets, + pub observer: Observer, + pub blob_reader: Arc, + pub ethereum: Ethereum, + #[allow(unused)] + pub router_query: RouterQuery, + pub signer: Signer, + pub validators: Vec, + pub router_address: ethexe_signer::Address, + pub sender_id: ActorId, + pub genesis_block_hash: H256, + pub threshold: u64, + pub block_time: Duration, + + network_addresses_nonce: u64, + /// In order to reduce amount of observers, we create only one observer and broadcast events to all subscribers. + broadcaster: Arc>>, + _anvil: Option, + _events_stream: JoinHandle<()>, + } + + impl TestEnv { + pub async fn new(config: TestEnvConfig) -> Result { + let TestEnvConfig { + validators_amount, + block_time, + } = config; + + let (rpc_url, anvil) = match std::env::var("__ETHEXE_CLI_TESTS_RPC_URL") { + Ok(rpc_url) => { + log::info!("📍 Using provided RPC URL: {}", rpc_url); + (rpc_url, None) + } + Err(_) => { + let mut anvil = Anvil::new().try_spawn().unwrap(); + drop(anvil.child_mut().stdout.take()); //temp fix for alloy#1078 + log::info!("📍 Anvil started at {}", anvil.ws_endpoint()); + (anvil.ws_endpoint(), Some(anvil)) + } + }; + + let signer = Signer::new(tempfile::tempdir()?.into_path())?; + let mut wallets = AnvilWallets::new(&signer); + + let sender_address = wallets.next().to_address(); + let validators: Vec<_> = (0..validators_amount) + .map(|_| signer.generate_key().unwrap()) + .collect(); + + let ethereum = Ethereum::deploy( + &rpc_url, + validators.iter().map(|k| k.to_address()).collect(), + signer.clone(), + sender_address, + ) + .await?; + + let router = ethereum.router(); + let router_query = router.query(); + let router_address = router.address(); + + let blob_reader = Arc::new(MockBlobReader::new(block_time)); + + let observer = Observer::new(&rpc_url, router_address, blob_reader.clone()) + .await + .expect("failed to create observer"); + + let (broadcaster, _events_stream) = { + let mut observer = observer.clone(); + let (sender, mut receiver) = tokio::sync::broadcast::channel::(2048); + let sender = Arc::new(Mutex::new(sender)); + let cloned_sender = sender.clone(); + + let (send_subscription_created, receive_subscription_created) = + oneshot::channel::<()>(); + let handle = task::spawn(async move { + let observer_events = observer.events_all(); + futures::pin_mut!(observer_events); + + send_subscription_created.send(()).unwrap(); + + while let Some(event) = observer_events.next().await { + log::trace!(target: "test-event", "📗 Event: {:?}", event); + + cloned_sender + .lock() + .await + .send(event) + .inspect_err(|err| log::error!("Failed to broadcast event: {err}")) + .unwrap(); + + // At least one receiver is presented always, in order to avoid the channel dropping. + receiver + .recv() + .await + .inspect_err(|err| log::error!("Failed to receive event: {err}")) + .unwrap(); + } + }); + receive_subscription_created.await.unwrap(); + + (sender, handle) + }; + + let genesis_block_hash = router_query.genesis_block_hash().await?; + let threshold = router_query.threshold().await?; + + Ok(TestEnv { + rpc_url, + wallets, + observer, + blob_reader, + ethereum, + router_query, + signer, + validators, + router_address, + sender_id: ActorId::from(H160::from(sender_address.0)), + genesis_block_hash, + threshold, + block_time, + broadcaster, + network_addresses_nonce: 0, + _anvil: anvil, + _events_stream, + }) + } + + pub fn new_node(&mut self, config: NodeConfig) -> Node { + let NodeConfig { + db, + sequencer_public_key, + validator_public_key, + network, + } = config; + + let db = + db.unwrap_or_else(|| Database::from_one(&MemDb::default(), self.router_address.0)); + + let network_address = network.as_ref().map(|network| { + network.address.clone().unwrap_or_else(|| { + self.network_addresses_nonce += 1; + format!("/memory/{}", self.network_addresses_nonce) + }) + }); + + let network_bootstrap_address = network.and_then(|network| network.bootstrap_address); + + Node { + db, + multiaddr: None, + rpc_url: self.rpc_url.clone(), + genesis_block_hash: self.genesis_block_hash, + blob_reader: self.blob_reader.clone(), + observer: self.observer.clone(), + signer: self.signer.clone(), + block_time: self.block_time, + validators: self.validators.iter().map(|k| k.to_address()).collect(), + threshold: self.threshold, + router_address: self.router_address, + running_service_handle: None, + sequencer_public_key, + validator_public_key, + network_address, + network_bootstrap_address, + } + } + + pub async fn upload_code(&self, code: &[u8]) -> Result { + log::info!("📗 Upload code, len {}", code.len()); + + let code_id = CodeId::generate(code); + let blob_tx = H256::random(); + + let listener = self.events_publisher().subscribe().await; + + self.blob_reader + .add_blob_transaction(blob_tx, code.to_vec()) + .await; + let _tx_hash = self + .ethereum + .router() + .request_code_validation(code_id, blob_tx) + .await?; + + Ok(WaitForUploadCode { listener, code_id }) + } + + pub async fn create_program( + &self, + code_id: CodeId, + payload: &[u8], + value: u128, + ) -> Result { + log::info!( + "📗 Create program, code_id {code_id}, payload len {}", + payload.len() + ); + + let listener = self.events_publisher().subscribe().await; + + let (_, program_id) = self + .ethereum + .router() + .create_program(code_id, H256::random(), payload, value) + .await?; + + Ok(WaitForProgramCreation { + listener, + program_id, + }) + } + + pub async fn send_message( + &self, + target: ActorId, + payload: &[u8], + value: u128, + ) -> Result { + log::info!("📗 Send message to {target}, payload len {}", payload.len()); + + let listener = self.events_publisher().subscribe().await; + + let program_address = ethexe_signer::Address::try_from(target)?; + let program = self.ethereum.mirror(program_address); + + let (_, message_id) = program.send_message(payload, value).await?; + + Ok(WaitForReplyTo { + listener, + message_id, + }) + } + + pub async fn approve_wvara(&self, program_id: ActorId) { + log::info!("📗 Approving WVara for {program_id}"); + + let program_address = ethexe_signer::Address::try_from(program_id).unwrap(); + let wvara = self.ethereum.router().wvara(); + wvara.approve_all(program_address.0.into()).await.unwrap(); + } + + pub fn events_publisher(&self) -> EventsPublisher { + EventsPublisher { + broadcaster: self.broadcaster.clone(), + } + } + } + + pub struct TestEnvConfig { + pub validators_amount: usize, + pub block_time: Duration, + } + + impl Default for TestEnvConfig { + fn default() -> Self { + Self { + validators_amount: 1, + block_time: Duration::from_secs(1), + } + } + } + + impl TestEnvConfig { + pub fn validators_amount(mut self, validators_amount: usize) -> Self { + self.validators_amount = validators_amount; + self + } + + pub fn block_time(mut self, block_time: Duration) -> Self { + self.block_time = block_time; + self + } + } + + #[derive(Default)] + pub struct NodeConfig { + /// Database, if not provided, will be created with MemDb. + pub db: Option, + /// Sequencer public key, if provided then new node starts as sequencer. + pub sequencer_public_key: Option, + /// Validator public key, if provided then new node starts as validator. + pub validator_public_key: Option, + /// Network configuration, if provided then new node starts with network. + pub network: Option, + } + + impl NodeConfig { + pub fn db(mut self, db: Database) -> Self { + self.db = Some(db); + self + } + + pub fn sequencer(mut self, sequencer_public_key: ethexe_signer::PublicKey) -> Self { + self.sequencer_public_key = Some(sequencer_public_key); + self + } + + pub fn validator(mut self, validator_public_key: ethexe_signer::PublicKey) -> Self { + self.validator_public_key = Some(validator_public_key); + self + } + + pub fn network( + mut self, + address: Option, + bootstrap_address: Option, + ) -> Self { + self.network = Some(NodeNetworkConfig { + address, + bootstrap_address, + }); + self + } + } + + #[derive(Default)] + pub struct NodeNetworkConfig { + /// Network address, if not provided, will be generated by test env. + pub address: Option, + /// Network bootstrap address, if not provided, then no bootstrap address will be used. + pub bootstrap_address: Option, + } + + pub struct EventsPublisher { + broadcaster: Arc>>, + } + + impl EventsPublisher { + pub async fn subscribe(&self) -> EventsListener { + EventsListener { + receiver: self.broadcaster.lock().await.subscribe(), + } + } + } + + pub struct EventsListener { + receiver: tokio::sync::broadcast::Receiver, + } + + impl Clone for EventsListener { + fn clone(&self) -> Self { + Self { + receiver: self.receiver.resubscribe(), + } + } + } + + impl EventsListener { + pub async fn next_event(&mut self) -> Result { + self.receiver.recv().await.map_err(Into::into) + } + + pub async fn apply_until( + &mut self, + mut f: impl FnMut(Event) -> Result>, + ) -> Result { + loop { + let event = self.next_event().await?; + if let Some(res) = f(event)? { + return Ok(res); + } + } + } + + pub async fn apply_until_block_event( + &mut self, + mut f: impl FnMut(BlockEvent) -> Result>, + ) -> Result { + loop { + let event = self.next_event().await?; + + let Event::Block(block) = event else { + continue; + }; + + for event in block.events { + if let Some(res) = f(event)? { + return Ok(res); } } + } + } + } - Ok(None) + /// Provides access to hardcoded anvil wallets. + pub struct AnvilWallets { + wallets: Vec, + next_wallet: usize, + } + + impl AnvilWallets { + pub fn new(signer: &Signer) -> Self { + let accounts = [ + "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80", + "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d", + "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a", + "0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6", + "0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a", + "0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba", + "0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e", + "0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356", + "0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97", + "0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6", + ] + .map(|s| signer.add_key(s.parse().unwrap()).unwrap()); + + Self { + wallets: accounts.to_vec(), + next_wallet: 0, } - _ => Ok(None), - }) - .await - .unwrap(); + } + + pub fn next(&mut self) -> ethexe_signer::PublicKey { + let pub_key = self.wallets.get(self.next_wallet).expect("No more wallets"); + self.next_wallet += 1; + *pub_key + } + } + + pub struct Node { + pub db: Database, + pub multiaddr: Option, + + rpc_url: String, + genesis_block_hash: H256, + blob_reader: Arc, + observer: Observer, + signer: Signer, + validators: Vec, + threshold: u64, + router_address: ethexe_signer::Address, + block_time: Duration, + running_service_handle: Option>>, + sequencer_public_key: Option, + validator_public_key: Option, + network_address: Option, + network_bootstrap_address: Option, + } + + impl Node { + pub async fn start_service(&mut self) { + assert!( + self.running_service_handle.is_none(), + "Service is already running" + ); + + let processor = Processor::new(self.db.clone()).unwrap(); + + let query = Query::new( + Arc::new(self.db.clone()), + &self.rpc_url, + self.router_address, + self.genesis_block_hash, + self.blob_reader.clone(), + 10000, + ) + .await + .unwrap(); + + let network = self.network_address.as_ref().map(|addr| { + let config_path = tempfile::tempdir().unwrap().into_path(); + let mut config = + ethexe_network::NetworkEventLoopConfig::new_memory(config_path, addr.as_str()); + if let Some(bootstrap_addr) = self.network_bootstrap_address.as_ref() { + let multiaddr = bootstrap_addr.parse().unwrap(); + config.bootstrap_addresses = [multiaddr].into(); + } + let network = + ethexe_network::NetworkService::new(config, &self.signer, self.db.clone()) + .unwrap(); + self.multiaddr = Some(format!("{addr}/p2p/{}", network.event_loop.local_peer_id())); + network + }); + + let sequencer = match self.sequencer_public_key.as_ref() { + Some(key) => Some( + Sequencer::new( + ðexe_sequencer::Config { + ethereum_rpc: self.rpc_url.clone(), + sign_tx_public: *key, + router_address: self.router_address, + validators: self.validators.clone(), + threshold: self.threshold, + }, + self.signer.clone(), + ) + .await + .unwrap(), + ), + None => None, + }; + + let validator = match self.validator_public_key.as_ref() { + Some(key) => Some(Validator::new( + ðexe_validator::Config { + pub_key: *key, + router_address: self.router_address, + }, + self.signer.clone(), + )), + None => None, + }; + + let service = Service::new_from_parts( + self.db.clone(), + self.observer.clone(), + query, + processor, + self.signer.clone(), + self.block_time, + network, + sequencer, + validator, + None, + None, + ); + + let handle = task::spawn(service.run()); + self.running_service_handle = Some(handle); + + // Sleep to wait for the new service to start + tokio::time::sleep(Duration::from_secs(1)).await; + } + + pub async fn stop_service(&mut self) { + let handle = self + .running_service_handle + .take() + .expect("Service is not running"); + handle.abort(); + let _ = handle.await; + self.multiaddr = None; + } + } + + #[derive(Clone)] + pub struct WaitForUploadCode { + listener: EventsListener, + pub code_id: CodeId, + } + + #[derive(Debug)] + pub struct UploadCodeInfo { + pub code_id: CodeId, + pub code: Vec, + pub valid: bool, + } + + impl WaitForUploadCode { + pub async fn wait_for(mut self) -> Result { + log::info!("📗 Waiting for code upload, code_id {}", self.code_id); + + let mut code_info = None; + let mut valid_info = None; + + self.listener + .apply_until(|event| match event { + Event::CodeLoaded { + code_id: loaded_id, + code, + } if loaded_id == self.code_id => { + code_info = Some(code); + Ok(Some(())) + } + _ => Ok(None), + }) + .await?; + + self.listener + .apply_until_block_event(|event| match event { + BlockEvent::Router(RouterEvent::CodeGotValidated { id, valid }) + if id == self.code_id => + { + valid_info = Some(valid); + Ok(Some(())) + } + _ => Ok(None), + }) + .await?; + + Ok(UploadCodeInfo { + code_id: self.code_id, + code: code_info.expect("Code must be set"), + valid: valid_info.expect("Valid must be set"), + }) + } + } + + #[derive(Clone)] + pub struct WaitForProgramCreation { + listener: EventsListener, + pub program_id: ActorId, + } + + #[derive(Debug)] + pub struct ProgramCreationInfo { + pub program_id: ActorId, + pub code_id: CodeId, + #[allow(unused)] + pub init_message_id: MessageId, + pub init_message_source: ActorId, + pub init_message_payload: Vec, + pub init_message_value: u128, + pub reply_payload: Vec, + pub reply_code: ReplyCode, + pub reply_value: u128, + } + + impl WaitForProgramCreation { + pub async fn wait_for(mut self) -> Result { + log::info!("📗 Waiting for program {} creation", self.program_id); + + let mut code_id_info = None; + let mut init_message_info = None; + let mut reply_info = None; + + self.listener + .apply_until_block_event(|event| { + match event { + BlockEvent::Router(RouterEvent::ProgramCreated { actor_id, code_id }) + if actor_id == self.program_id => + { + code_id_info = Some(code_id); + } + BlockEvent::Mirror { address, event } if address == self.program_id => { + match event { + MirrorEvent::MessageQueueingRequested { + id, + source, + payload, + value, + } => { + init_message_info = Some((id, source, payload, value)); + } + MirrorEvent::Reply { + payload, + reply_to, + reply_code, + value, + } if init_message_info.as_ref().map(|(id, ..)| *id) + == Some(reply_to) => + { + reply_info = Some((payload, reply_code, value)); + return Ok(Some(())); + } + _ => {} + } + } + _ => {} + } + Ok(None) + }) + .await?; + + let code_id = code_id_info.expect("Code ID must be set"); + let (init_message_id, init_message_source, init_message_payload, init_message_value) = + init_message_info.expect("Init message info must be set"); + let (reply_payload, reply_code, reply_value) = + reply_info.expect("Reply info must be set"); + + Ok(ProgramCreationInfo { + program_id: self.program_id, + code_id, + init_message_id, + init_message_source, + init_message_payload, + init_message_value, + reply_payload, + reply_code, + reply_value, + }) + } + } + + #[derive(Clone)] + pub struct WaitForReplyTo { + listener: EventsListener, + pub message_id: MessageId, + } + + #[derive(Debug)] + pub struct ReplyInfo { + pub message_id: MessageId, + pub program_id: ActorId, + pub reply_payload: Vec, + pub reply_code: ReplyCode, + pub reply_value: u128, + } + + impl WaitForReplyTo { + pub async fn wait_for(mut self) -> Result { + log::info!("📗 Waiting for reply to message {}", self.message_id); + + let mut info = None; + + self.listener + .apply_until_block_event(|event| match event { + BlockEvent::Mirror { + address, + event: + MirrorEvent::Reply { + reply_to, + payload, + reply_code, + value, + }, + } if reply_to == self.message_id => { + info = Some(ReplyInfo { + message_id: reply_to, + program_id: address, + reply_payload: payload, + reply_code, + reply_value: value, + }); + Ok(Some(())) + } + _ => Ok(None), + }) + .await?; + + Ok(info.expect("Reply info must be set")) + } + } } diff --git a/ethexe/ethereum/src/router/mod.rs b/ethexe/ethereum/src/router/mod.rs index 7cc5339f59d..36247d1fb4c 100644 --- a/ethexe/ethereum/src/router/mod.rs +++ b/ethexe/ethereum/src/router/mod.rs @@ -208,6 +208,7 @@ impl Router { } } +#[derive(Clone)] pub struct RouterQuery { instance: QueryInstance, } diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index 184685cc145..893c5d0eacc 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -200,7 +200,6 @@ impl NetworkEventLoopConfig { } } - #[cfg(test)] pub fn new_memory(config_path: PathBuf, addr: &str) -> Self { Self { config_dir: config_path,