From c4f3a9c6e20a8f171f5d1602f4185600bbba3d40 Mon Sep 17 00:00:00 2001 From: Pablo Lozano Date: Thu, 7 Nov 2024 16:40:18 +0100 Subject: [PATCH] refactor: unify dvn and executor workers (#309) Changes addressed previous FIXMEs, including: - Move DVN components around so it's more modular - Fix a minor bug on the executor, where the `provider` went out of scope and stopped listening. - Add some tests for the DVN. --- .gitignore | 3 +- Cargo.lock | 54 +++-- Cargo.toml | 2 +- {workers => offchain}/Cargo.toml | 8 +- {workers => offchain}/README.md | 0 {workers => offchain}/abi/L0V2Endpoint.json | 0 offchain/abi/ReceiveLibUln302.json | 1 + {workers => offchain}/abi/SendLibUln302.json | 0 {workers => offchain}/src/abi.rs | 20 +- offchain/src/bin/dvn.rs | 22 ++ {workers => offchain}/src/bin/executor.rs | 5 +- .../src/chain/connections.rs | 78 +++++- {workers => offchain}/src/chain/contracts.rs | 89 +++---- {workers => offchain}/src/chain/mod.rs | 1 + {workers => offchain}/src/config.rs | 26 +- {workers => offchain}/src/data/bytes_utils.rs | 0 {workers => offchain}/src/data/mod.rs | 1 - .../src/data/packet_v1_codec.rs | 0 offchain/src/lib.rs | 8 + {workers => offchain}/src/verifier.rs | 6 +- offchain/src/workers/dvn.rs | 222 ++++++++++++++++++ .../src/workers/executor.rs | 37 +-- offchain/src/workers/mod.rs | 2 + offchain/tests/executor.rs | 111 +++++++++ offchain/workers_config.toml | 10 + workers/src/bin/dvn.rs | 135 ----------- workers/src/data/dvn.rs | 88 ------- workers/src/lib.rs | 8 - workers/tests/executor_it.rs | 112 --------- 29 files changed, 574 insertions(+), 475 deletions(-) rename {workers => offchain}/Cargo.toml (81%) rename {workers => offchain}/README.md (100%) rename {workers => offchain}/abi/L0V2Endpoint.json (100%) create mode 100644 offchain/abi/ReceiveLibUln302.json rename {workers => offchain}/abi/SendLibUln302.json (100%) rename {workers => offchain}/src/abi.rs (57%) create mode 100644 offchain/src/bin/dvn.rs rename {workers => offchain}/src/bin/executor.rs (82%) rename {workers => offchain}/src/chain/connections.rs (55%) rename {workers => offchain}/src/chain/contracts.rs (76%) rename {workers => offchain}/src/chain/mod.rs (99%) rename {workers => offchain}/src/config.rs (71%) rename {workers => offchain}/src/data/bytes_utils.rs (100%) rename {workers => offchain}/src/data/mod.rs (77%) rename {workers => offchain}/src/data/packet_v1_codec.rs (100%) create mode 100644 offchain/src/lib.rs rename {workers => offchain}/src/verifier.rs (98%) create mode 100644 offchain/src/workers/dvn.rs rename workers/src/executor_def.rs => offchain/src/workers/executor.rs (87%) create mode 100644 offchain/src/workers/mod.rs create mode 100644 offchain/tests/executor.rs create mode 100644 offchain/workers_config.toml delete mode 100644 workers/src/bin/dvn.rs delete mode 100644 workers/src/data/dvn.rs delete mode 100644 workers/src/lib.rs delete mode 100644 workers/tests/executor_it.rs diff --git a/.gitignore b/.gitignore index fbd735af..d0689a51 100644 --- a/.gitignore +++ b/.gitignore @@ -50,5 +50,4 @@ setup/plugin/config/keys/ecdsa.json libnear_da_rpc_sys.* # Ignore configuration for DVN or Executor -**/config_dvn.toml -**/config_executor.toml +#offchain/workers_config.toml diff --git a/Cargo.lock b/Cargo.lock index 3c5f370e..d4e162bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6387,6 +6387,31 @@ dependencies = [ "memchr", ] +[[package]] +name = "offchain" +version = "0.1.0" +dependencies = [ + "alloy", + "axum 0.7.7", + "blsful", + "bytes", + "config", + "eyre", + "futures", + "http-body-util", + "log", + "project-root", + "reqwest 0.12.8", + "serde", + "serde_json", + "tempfile", + "tokio", + "tokio-tungstenite", + "tracing", + "tracing-subscriber", + "wiremock", +] + [[package]] name = "oid-registry" version = "0.7.1" @@ -7200,6 +7225,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "project-root" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bccbff07d5ed689c4087d20d7307a52ab6141edeedf487c3876a55b86cf63df" + [[package]] name = "prometheus" version = "0.13.4" @@ -10611,29 +10642,6 @@ dependencies = [ "url", ] -[[package]] -name = "workers" -version = "0.1.0" -dependencies = [ - "alloy", - "axum 0.7.7", - "blsful", - "bytes", - "config", - "eyre", - "futures", - "http-body-util", - "log", - "reqwest 0.12.8", - "serde", - "serde_json", - "tokio", - "tokio-tungstenite", - "tracing", - "tracing-subscriber", - "wiremock", -] - [[package]] name = "ws_stream_wasm" version = "0.7.4" diff --git a/Cargo.toml b/Cargo.toml index 37c03770..0c8fbf85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ resolver = "2" members = [ "indexer", "contracts/evm/test/ffi/bls-utils", - "workers", + "offchain", ] [workspace.package] diff --git a/workers/Cargo.toml b/offchain/Cargo.toml similarity index 81% rename from workers/Cargo.toml rename to offchain/Cargo.toml index 68476256..67208b10 100644 --- a/workers/Cargo.toml +++ b/offchain/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "workers" +name = "offchain" version = "0.1.0" edition = "2021" @@ -18,16 +18,18 @@ bytes = "1.7.2" config = { version = "0.14.0", features = ["toml"] } eyre.workspace = true futures = "0.3.31" +log = "0.4.22" +project-root = "0.2.2" reqwest.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true tracing.workspace = true -tracing-subscriber = {workspace = true, features = ["env-filter"] } +tracing-subscriber = { workspace = true, features = ["env-filter"] } tokio-tungstenite = "0.24.0" -log = "0.4.22" [dev-dependencies] axum = "0.7.7" http-body-util = "0.1.0" wiremock = "0.6.2" +tempfile = "3.13.0" diff --git a/workers/README.md b/offchain/README.md similarity index 100% rename from workers/README.md rename to offchain/README.md diff --git a/workers/abi/L0V2Endpoint.json b/offchain/abi/L0V2Endpoint.json similarity index 100% rename from workers/abi/L0V2Endpoint.json rename to offchain/abi/L0V2Endpoint.json diff --git a/offchain/abi/ReceiveLibUln302.json b/offchain/abi/ReceiveLibUln302.json new file mode 100644 index 00000000..9b9ad405 --- /dev/null +++ b/offchain/abi/ReceiveLibUln302.json @@ -0,0 +1 @@ +{"abi":[{"inputs":[{"internalType":"address","name":"_endpoint","type":"address"}],"stateMutability":"nonpayable","type":"constructor"},{"inputs":[],"name":"LZ_MessageLib_OnlyEndpoint","type":"error"},{"inputs":[],"name":"LZ_ULN_AtLeastOneDVN","type":"error"},{"inputs":[{"internalType":"uint32","name":"configType","type":"uint32"}],"name":"LZ_ULN_InvalidConfigType","type":"error"},{"inputs":[],"name":"LZ_ULN_InvalidConfirmations","type":"error"},{"inputs":[],"name":"LZ_ULN_InvalidEid","type":"error"},{"inputs":[],"name":"LZ_ULN_InvalidOptionalDVNCount","type":"error"},{"inputs":[],"name":"LZ_ULN_InvalidOptionalDVNThreshold","type":"error"},{"inputs":[],"name":"LZ_ULN_InvalidPacketHeader","type":"error"},{"inputs":[],"name":"LZ_ULN_InvalidPacketVersion","type":"error"},{"inputs":[],"name":"LZ_ULN_InvalidRequiredDVNCount","type":"error"},{"inputs":[],"name":"LZ_ULN_Unsorted","type":"error"},{"inputs":[{"internalType":"uint32","name":"eid","type":"uint32"}],"name":"LZ_ULN_UnsupportedEid","type":"error"},{"inputs":[],"name":"LZ_ULN_Verifying","type":"error"},{"anonymous":false,"inputs":[{"components":[{"internalType":"uint32","name":"eid","type":"uint32"},{"components":[{"internalType":"uint64","name":"confirmations","type":"uint64"},{"internalType":"uint8","name":"requiredDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNThreshold","type":"uint8"},{"internalType":"address[]","name":"requiredDVNs","type":"address[]"},{"internalType":"address[]","name":"optionalDVNs","type":"address[]"}],"internalType":"struct UlnConfig","name":"config","type":"tuple"}],"indexed":false,"internalType":"struct SetDefaultUlnConfigParam[]","name":"params","type":"tuple[]"}],"name":"DefaultUlnConfigsSet","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"previousOwner","type":"address"},{"indexed":true,"internalType":"address","name":"newOwner","type":"address"}],"name":"OwnershipTransferred","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"dvn","type":"address"},{"indexed":false,"internalType":"bytes","name":"header","type":"bytes"},{"indexed":false,"internalType":"uint256","name":"confirmations","type":"uint256"},{"indexed":false,"internalType":"bytes32","name":"proofHash","type":"bytes32"}],"name":"PayloadVerified","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"oapp","type":"address"},{"indexed":false,"internalType":"uint32","name":"eid","type":"uint32"},{"components":[{"internalType":"uint64","name":"confirmations","type":"uint64"},{"internalType":"uint8","name":"requiredDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNThreshold","type":"uint8"},{"internalType":"address[]","name":"requiredDVNs","type":"address[]"},{"internalType":"address[]","name":"optionalDVNs","type":"address[]"}],"indexed":false,"internalType":"struct UlnConfig","name":"config","type":"tuple"}],"name":"UlnConfigSet","type":"event"},{"inputs":[{"internalType":"bytes","name":"_packetHeader","type":"bytes"},{"internalType":"uint32","name":"_localEid","type":"uint32"}],"name":"assertHeader","outputs":[],"stateMutability":"pure","type":"function"},{"inputs":[{"internalType":"bytes","name":"_packetHeader","type":"bytes"},{"internalType":"bytes32","name":"_payloadHash","type":"bytes32"}],"name":"commitVerification","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"_oapp","type":"address"},{"internalType":"uint32","name":"_remoteEid","type":"uint32"}],"name":"getAppUlnConfig","outputs":[{"components":[{"internalType":"uint64","name":"confirmations","type":"uint64"},{"internalType":"uint8","name":"requiredDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNThreshold","type":"uint8"},{"internalType":"address[]","name":"requiredDVNs","type":"address[]"},{"internalType":"address[]","name":"optionalDVNs","type":"address[]"}],"internalType":"struct UlnConfig","name":"","type":"tuple"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint32","name":"_eid","type":"uint32"},{"internalType":"address","name":"_oapp","type":"address"},{"internalType":"uint32","name":"_configType","type":"uint32"}],"name":"getConfig","outputs":[{"internalType":"bytes","name":"","type":"bytes"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"_oapp","type":"address"},{"internalType":"uint32","name":"_remoteEid","type":"uint32"}],"name":"getUlnConfig","outputs":[{"components":[{"internalType":"uint64","name":"confirmations","type":"uint64"},{"internalType":"uint8","name":"requiredDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNThreshold","type":"uint8"},{"internalType":"address[]","name":"requiredDVNs","type":"address[]"},{"internalType":"address[]","name":"optionalDVNs","type":"address[]"}],"internalType":"struct UlnConfig","name":"rtnConfig","type":"tuple"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"bytes32","name":"headerHash","type":"bytes32"},{"internalType":"bytes32","name":"payloadHash","type":"bytes32"},{"internalType":"address","name":"dvn","type":"address"}],"name":"hashLookup","outputs":[{"internalType":"bool","name":"submitted","type":"bool"},{"internalType":"uint64","name":"confirmations","type":"uint64"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint32","name":"_eid","type":"uint32"}],"name":"isSupportedEid","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"messageLibType","outputs":[{"internalType":"enum MessageLibType","name":"","type":"uint8"}],"stateMutability":"pure","type":"function"},{"inputs":[],"name":"owner","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"renounceOwnership","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"_oapp","type":"address"},{"components":[{"internalType":"uint32","name":"eid","type":"uint32"},{"internalType":"uint32","name":"configType","type":"uint32"},{"internalType":"bytes","name":"config","type":"bytes"}],"internalType":"struct SetConfigParam[]","name":"_params","type":"tuple[]"}],"name":"setConfig","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"components":[{"internalType":"uint32","name":"eid","type":"uint32"},{"components":[{"internalType":"uint64","name":"confirmations","type":"uint64"},{"internalType":"uint8","name":"requiredDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNThreshold","type":"uint8"},{"internalType":"address[]","name":"requiredDVNs","type":"address[]"},{"internalType":"address[]","name":"optionalDVNs","type":"address[]"}],"internalType":"struct UlnConfig","name":"config","type":"tuple"}],"internalType":"struct SetDefaultUlnConfigParam[]","name":"_params","type":"tuple[]"}],"name":"setDefaultUlnConfigs","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"bytes4","name":"_interfaceId","type":"bytes4"}],"name":"supportsInterface","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"newOwner","type":"address"}],"name":"transferOwnership","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"components":[{"internalType":"uint64","name":"confirmations","type":"uint64"},{"internalType":"uint8","name":"requiredDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNCount","type":"uint8"},{"internalType":"uint8","name":"optionalDVNThreshold","type":"uint8"},{"internalType":"address[]","name":"requiredDVNs","type":"address[]"},{"internalType":"address[]","name":"optionalDVNs","type":"address[]"}],"internalType":"struct UlnConfig","name":"_config","type":"tuple"},{"internalType":"bytes32","name":"_headerHash","type":"bytes32"},{"internalType":"bytes32","name":"_payloadHash","type":"bytes32"}],"name":"verifiable","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"bytes","name":"_packetHeader","type":"bytes"},{"internalType":"bytes32","name":"_payloadHash","type":"bytes32"},{"internalType":"uint64","name":"_confirmations","type":"uint64"}],"name":"verify","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"version","outputs":[{"internalType":"uint64","name":"major","type":"uint64"},{"internalType":"uint8","name":"minor","type":"uint8"},{"internalType":"uint8","name":"endpointVersion","type":"uint8"}],"stateMutability":"pure","type":"function"}]} diff --git a/workers/abi/SendLibUln302.json b/offchain/abi/SendLibUln302.json similarity index 100% rename from workers/abi/SendLibUln302.json rename to offchain/abi/SendLibUln302.json diff --git a/workers/src/abi.rs b/offchain/src/abi.rs similarity index 57% rename from workers/src/abi.rs rename to offchain/src/abi.rs index b0170fa4..12735d49 100644 --- a/workers/src/abi.rs +++ b/offchain/src/abi.rs @@ -1,6 +1,5 @@ -//! Types create from the JSON ABI files. -//! -//! For example, to be able to decode the logs' data, or call contracts' methods. +//! Types from the JSON ABI files. For example, to be able to decode the logs' data, or call +//! contracts' methods. //! //! To obtain the corresponding ABI, there are two ways: //! - Manually downloading the ABI from the contract's source code (we use this one for now); @@ -24,18 +23,3 @@ sol!( L0V2EndpointAbi, "abi/L0V2Endpoint.json" ); - -//sol!( -// #[allow(missing_docs)] -// #[sol(abi)] -// #[derive(Debug, PartialEq, Eq)] -// struct Packet { -// uint64 nonce; -// uint32 src_eid; -// bytes32 sender; -// uint32 dst_eid; -// bytes32 receiver; -// bytes32 guid; -// bytes message; -// } -//); diff --git a/offchain/src/bin/dvn.rs b/offchain/src/bin/dvn.rs new file mode 100644 index 00000000..b6097a68 --- /dev/null +++ b/offchain/src/bin/dvn.rs @@ -0,0 +1,22 @@ +//! Main off-chain workflow for Nuff DVN. + +use eyre::Result; +use offchain::workers::dvn::Dvn; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize tracing + tracing_subscriber::fmt() + .with_target(false) + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::DEBUG.into()) + .from_env_lossy(), + ) + .init(); + + let mut dvn = Dvn::new_from_env()?; + dvn.listen().await +} diff --git a/workers/src/bin/executor.rs b/offchain/src/bin/executor.rs similarity index 82% rename from workers/src/bin/executor.rs rename to offchain/src/bin/executor.rs index 6b304fd1..9508c1ff 100644 --- a/workers/src/bin/executor.rs +++ b/offchain/src/bin/executor.rs @@ -1,7 +1,6 @@ +use offchain::{config, workers::executor::NFFLExecutor}; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; -use workers::config; -use workers::executor_def::NFFLExecutor; /// Executor is expected to work with low work rate, and we have a bonus /// from this observation - we don't need/want to care about concurrency control, @@ -18,7 +17,7 @@ async fn main() -> eyre::Result<()> { ) .init(); - let mut executor = NFFLExecutor::new(config::DVNConfig::load_from_env()?); + let mut executor = NFFLExecutor::new(config::WorkerConfig::load_from_env()?); executor.listen().await?; Ok(()) diff --git a/workers/src/chain/connections.rs b/offchain/src/chain/connections.rs similarity index 55% rename from workers/src/chain/connections.rs rename to offchain/src/chain/connections.rs index 03efd71d..b40dc59f 100644 --- a/workers/src/chain/connections.rs +++ b/offchain/src/chain/connections.rs @@ -2,7 +2,7 @@ use crate::{ chain::HttpProvider, - config::{DVNConfig, LayerZeroEvent}, + config::{LayerZeroEvent, WorkerConfig}, }; use alloy::{ eips::BlockNumberOrTag, @@ -11,11 +11,12 @@ use alloy::{ pubsub::{PubSubFrontend, SubscriptionStream}, rpc::types::{Filter, Log}, }; -use eyre::{OptionExt, Result}; +use eyre::{eyre, OptionExt, Result}; +use std::path::PathBuf; /// Create the subscriptions for the DVN workflow. -pub async fn build_subscriptions( - config: &DVNConfig, +pub async fn build_dvn_subscriptions( + config: &WorkerConfig, ) -> Result<( RootProvider, SubscriptionStream, @@ -49,8 +50,9 @@ pub async fn build_subscriptions( } pub async fn build_executor_subscriptions( - config: &DVNConfig, + config: &WorkerConfig, ) -> Result<( + RootProvider, SubscriptionStream, SubscriptionStream, SubscriptionStream, @@ -75,17 +77,20 @@ pub async fn build_executor_subscriptions( .event(LayerZeroEvent::PacketVerified.as_ref()) .from_block(BlockNumberOrTag::Latest); - Ok(( - provider.subscribe_logs(&packet_sent_filter).await?.into_stream(), - provider.subscribe_logs(&executor_fee_paid).await?.into_stream(), - provider.subscribe_logs(&packet_verified_filter).await?.into_stream(), - )) + let ps_stream = provider.subscribe_logs(&packet_sent_filter).await?.into_stream(); + let ef_stream = provider.subscribe_logs(&executor_fee_paid).await?.into_stream(); + let pv_stream = provider.subscribe_logs(&packet_verified_filter).await?.into_stream(); + + Ok((provider, ps_stream, ef_stream, pv_stream)) } -/// Load the MessageLib ABI. +/// Load the MessageLib ABI. The path must be relative to the project root. pub fn get_abi_from_path(path: &str) -> Result { + let path_buf = PathBuf::from(path); + let artifact_path = project_root::get_project_root()?.join(path_buf); // Get the SendLib ABI - let artifact = std::fs::read(path)?; + let artifact = + std::fs::read(artifact_path).map_err(|e| eyre!("Cannot load config for offchain worker. Error: {:?}", e))?; let json: serde_json::Value = serde_json::from_slice(&artifact)?; // SAFETY: Assume `unwrap` is safe since the key has been harcoded let abi_value = json.get("abi").ok_or_eyre("ABI not found in artifact")?; @@ -94,7 +99,54 @@ pub fn get_abi_from_path(path: &str) -> Result { } /// Construct an HTTP provider given the config. -pub fn get_http_provider(config: &DVNConfig) -> Result { +pub fn get_http_provider(config: &WorkerConfig) -> Result { let http_provider = ProviderBuilder::new().on_http(config.http_rpc_url.to_string().parse()?); Ok(http_provider) } + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + use tempfile::NamedTempFile; + + #[test] + fn test_expect_to_find_all_abis() { + get_abi_from_path("offchain/abi/ReceiveLibUln302.json").unwrap(); + get_abi_from_path("offchain/abi/SendLibUln302.json").unwrap(); + get_abi_from_path("offchain/abi/L0V2Endpoint.json").unwrap(); + } + + #[test] + fn test_get_abi_from_path() { + // Create a file inside of `env::temp_dir()`. + let mut temp_file = NamedTempFile::new_in(".").unwrap(); + + // Some mocked ABI info + let data = r#"{ + "abi": [ + { + "type": "function", + "name": "transfer", + "inputs": [ + { + "type": "address", + "name": "_to", + "internalType": "address" + }, + { + "type": "uint256", + "name": "_amount", + "internalType": "uint256" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + } + ] + }"#; + writeln!(temp_file, "{}", data).unwrap(); + + get_abi_from_path(temp_file.path().to_str().unwrap()).unwrap(); + } +} diff --git a/workers/src/chain/contracts.rs b/offchain/src/chain/contracts.rs similarity index 76% rename from workers/src/chain/contracts.rs rename to offchain/src/chain/contracts.rs index 764f53d7..35a47e17 100644 --- a/workers/src/chain/contracts.rs +++ b/offchain/src/chain/contracts.rs @@ -46,11 +46,15 @@ pub async fn get_messagelib_addr(contract: &ContractInst, eid: U256) -> Result Result Result { // Call the `getUlnConfig` function on the contract @@ -77,7 +80,7 @@ pub async fn query_confirmations(contract: &ContractInst, eid: U256) -> Result { let value = tupled_int[0] .as_uint() - .ok_or_eyre("Cannot parse response from MessageLib")?; + .ok_or_eyre("Cannot parse response as `uint` from MessageLib")?; Ok(value.0) } _ => { @@ -94,56 +97,62 @@ pub async fn query_already_verified( header_hash: &[u8], payload_hash: &[u8], required_confirmations: U256, -) -> Result { +) -> bool { // Call the `_verified` function on the 302 contract, to check if the DVN has already verified // the packet. debug!("Calling _verified on contract's ReceiveLib"); - let contract_state = contract - .function( - "_verified", - &[ - DynSolValue::Address(dvn_address), // DVN address - DynSolValue::Bytes(header_hash.to_vec()), // HeaderHash - DynSolValue::Bytes(payload_hash.to_vec()), // PayloadHash - DynSolValue::Uint(required_confirmations, 32), // confirmations - ], - )? - .call() - .await?; + let call_builder = contract.function( + "_verified", + &[ + DynSolValue::Address(dvn_address), // DVN address + DynSolValue::Bytes(header_hash.to_vec()), // HeaderHash + DynSolValue::Bytes(payload_hash.to_vec()), // PayloadHash + DynSolValue::Uint(required_confirmations, 32), // confirmations + ], + ); - let packet_state = match contract_state - .first() - .ok_or(eyre!("Empty response when querying `_verified`"))? - { - DynSolValue::Bool(b) => Ok(b), + let Ok(call_builder) = call_builder else { + error!("Failed to construct `_verified` caller"); + return false; + }; + + let Ok(state) = call_builder.call().await else { + error!("Failed to call `_verified` on contract"); + return false; + }; + + match state.first() { + Some(DynSolValue::Bool(b)) => *b, _ => { - error!("Failed to parse response from ReceiveLib for `_verified`"); - Err(eyre!("Failed to parse response from ReceiveLib for `_verified`")) + error!("Failed to parse as bool the `_verified` response from ReceiveLib"); + false } - }?; - - Ok(*packet_state) + } } -pub async fn verify(contract: &ContractInst, packet_header: &[u8], payload: &[u8], confirmations: U256) -> Result<()> { +pub async fn verify(contract: &ContractInst, packet_header: &[u8], payload: &[u8], confirmations: U256) { //// Create the hash of the payload let payload_hash = keccak256(payload); // Call the `verified` function on the contract - let _ = contract - .function( - "verify", - &[ - DynSolValue::Bytes(packet_header.to_vec()), // PacketHeader - DynSolValue::FixedBytes(payload_hash, 32), // PayloadHash - DynSolValue::Uint(confirmations, 64), // Confirmations - ], - )? - .call() - .await?; + let call_builder = contract.function( + "verify", + &[ + DynSolValue::Bytes(packet_header.to_vec()), // PacketHeader + DynSolValue::FixedBytes(payload_hash, 32), // PayloadHash + DynSolValue::Uint(confirmations, 64), // Confirmations + ], + ); - Ok(()) + if let Ok(call_builder) = call_builder { + match call_builder.call().await { + Err(e) => error!("Failed to call `verify`. Error: {:?}", e), + _ => {} + } + } else { + error!("Failed to construct `verify` caller"); + }; } /// If the state is `Executable`, your `Executor` should decode the packet's options diff --git a/workers/src/chain/mod.rs b/offchain/src/chain/mod.rs similarity index 99% rename from workers/src/chain/mod.rs rename to offchain/src/chain/mod.rs index b6f18ad3..cf61eac7 100644 --- a/workers/src/chain/mod.rs +++ b/offchain/src/chain/mod.rs @@ -12,5 +12,6 @@ pub mod contracts; /// Alias for a contract instance in the Ethereum network. pub type ContractInst = ContractInstance, RootProvider>, Ethereum>; + /// Alias for an HTTP provider. pub type HttpProvider = RootProvider>; diff --git a/workers/src/config.rs b/offchain/src/config.rs similarity index 71% rename from workers/src/config.rs rename to offchain/src/config.rs index 8d7b5b53..58541e44 100644 --- a/workers/src/config.rs +++ b/offchain/src/config.rs @@ -4,9 +4,12 @@ use alloy::primitives::Address; use config::Config; use eyre::Result; use serde::Deserialize; +use std::path::PathBuf; + +const CONFIG_PATH: &str = "offchain/workers_config"; #[derive(Debug, Deserialize)] -pub struct DVNConfig { +pub struct WorkerConfig { /// The Websocket RPC URL to connect to the Ethereum network. pub ws_rpc_url: String, /// The HTTP RPC URL to connect to the Ethereum network. @@ -22,19 +25,18 @@ pub struct DVNConfig { /// The ReceiveLib Ultra Light Node 301 address. pub receivelib_uln301_addr: Address, /// The Ethereum network ID. - pub network_eid: u64, + pub target_network_eid: u64, /// Own DVN address. Used to check when the DVN is assigned to a task. pub dvn_addr: Address, /// NFFL Aggregator URL pub aggregator_url: String, } -impl DVNConfig { +impl WorkerConfig { /// Load environment variables. pub fn load_from_env() -> Result { - let settings = Config::builder() - .add_source(config::File::with_name("./config_dvn")) - .build()?; + let path = project_root::get_project_root()?.join(PathBuf::from(CONFIG_PATH)); + let settings = Config::builder().add_source(config::File::from(path)).build()?; Ok(settings.try_deserialize::()?) } } @@ -65,6 +67,16 @@ mod tests { // #[test] #[allow(dead_code)] fn load_config_from_env() { - let _conf = DVNConfig::load_from_env().unwrap(); + let _conf = WorkerConfig::load_from_env().unwrap(); + } + + #[test] + fn test_valid_config() { + let conf = WorkerConfig::load_from_env().unwrap(); + assert!(conf.ws_rpc_url.starts_with("ws://") || conf.ws_rpc_url.starts_with("wss://")); + + assert!(conf.http_rpc_url.starts_with("http://") || conf.http_rpc_url.starts_with("https://")); + + assert!(conf.target_network_eid > 0); } } diff --git a/workers/src/data/bytes_utils.rs b/offchain/src/data/bytes_utils.rs similarity index 100% rename from workers/src/data/bytes_utils.rs rename to offchain/src/data/bytes_utils.rs diff --git a/workers/src/data/mod.rs b/offchain/src/data/mod.rs similarity index 77% rename from workers/src/data/mod.rs rename to offchain/src/data/mod.rs index fd9b7559..e5654817 100644 --- a/workers/src/data/mod.rs +++ b/offchain/src/data/mod.rs @@ -1,3 +1,2 @@ pub mod bytes_utils; -pub mod dvn; pub mod packet_v1_codec; diff --git a/workers/src/data/packet_v1_codec.rs b/offchain/src/data/packet_v1_codec.rs similarity index 100% rename from workers/src/data/packet_v1_codec.rs rename to offchain/src/data/packet_v1_codec.rs diff --git a/offchain/src/lib.rs b/offchain/src/lib.rs new file mode 100644 index 00000000..954a0d45 --- /dev/null +++ b/offchain/src/lib.rs @@ -0,0 +1,8 @@ +//! Tools to build offchain offchain for the LayerZero protocol. + +pub mod abi; +pub mod chain; +pub mod config; +pub mod data; +pub mod verifier; +pub mod workers; diff --git a/workers/src/verifier.rs b/offchain/src/verifier.rs similarity index 98% rename from workers/src/verifier.rs rename to offchain/src/verifier.rs index 79982f24..530d0c1e 100644 --- a/workers/src/verifier.rs +++ b/offchain/src/verifier.rs @@ -1,4 +1,4 @@ -use crate::config::DVNConfig; +use crate::config::WorkerConfig; use alloy::eips::BlockNumberOrTag; use alloy::network::Ethereum; use alloy::primitives::B256; @@ -47,8 +47,8 @@ impl NFFLVerifier { }) } - pub async fn new_from_config(cfg: &DVNConfig) -> eyre::Result { - Self::new(&cfg.aggregator_url, &cfg.http_rpc_url, cfg.network_eid).await + pub async fn new_from_config(cfg: &WorkerConfig) -> eyre::Result { + Self::new(&cfg.aggregator_url, &cfg.http_rpc_url, cfg.target_network_eid).await } /// Verifies the state root of a block. In case any request future diff --git a/offchain/src/workers/dvn.rs b/offchain/src/workers/dvn.rs new file mode 100644 index 00000000..4343d17f --- /dev/null +++ b/offchain/src/workers/dvn.rs @@ -0,0 +1,222 @@ +use crate::chain::ContractInst; +use crate::{ + abi::{L0V2EndpointAbi::PacketSent, SendLibraryAbi::DVNFeePaid}, + chain::{ + connections::{build_dvn_subscriptions, get_abi_from_path, get_http_provider}, + contracts::{create_contract_instance, query_already_verified, query_confirmations, verify}, + }, + config::WorkerConfig, + data::packet_v1_codec::{header, message}, + verifier::NFFLVerifier, +}; +use alloy::primitives::{keccak256, B256, U256}; +use alloy::rpc::types::Log; +use eyre::Result; +use futures::stream::StreamExt; +use tracing::{debug, error, info, warn}; + +pub enum DvnStatus { + Stopped, + Listening, + PacketReceived, + Verifying, +} + +pub struct Dvn { + pub config: WorkerConfig, + pub status: DvnStatus, + pub packet: Option, + pub receive_lib: Option, + pub verifier: Option, +} + +impl Dvn { + pub fn new(config: WorkerConfig) -> Self { + Self { + config, + status: DvnStatus::Stopped, + packet: None, + receive_lib: None, + verifier: None, + } + } + + pub fn new_from_env() -> Result { + Ok(Dvn::new(crate::config::WorkerConfig::load_from_env()?)) + } + + pub fn listening(&mut self) { + self.status = DvnStatus::Listening; + } + + pub fn packet_received(&mut self, packet: PacketSent) { + self.packet = Some(packet); + self.status = DvnStatus::PacketReceived; + } + + pub fn reset_packet(&mut self) { + self.packet = None; + self.status = DvnStatus::Listening; + debug!("DVN not required, stored packet dropped") + } + + pub fn verifying(&mut self) { + self.status = DvnStatus::Verifying; + } + + pub fn get_header(&self) -> Option<&[u8]> { + if let Some(packet) = self.packet.as_ref() { + Some(header(packet.encodedPayload.as_ref())) + } else { + None + } + } + + pub fn get_header_hash(&self) -> Option { + self.packet + .as_ref() + .map(|packet| keccak256(header(packet.encodedPayload.as_ref()))) + } + + pub fn get_message_hash(&self) -> Option { + self.packet + .as_ref() + .map(|packet| keccak256(message(packet.encodedPayload.as_ref()))) + } + + pub(crate) async fn verify_message(&mut self, log: &Log, message_hash: B256, required_confirmations: U256) { + debug!("Packet NOT verified. Calling verification."); + self.verifying(); + + if log.block_number.is_none() { + error!("Block number is None, can't verify Packet."); + return; + } + + if let Some(verifier) = self.verifier.as_ref() { + if let Err(report) = verifier.verify(log.block_number.unwrap()).await { + error!("Failed to verify the state root. Error: {:?}", report); + return; + } + } else { + error!("Verifier not present") + } + + if let (Some(receive_lib), Some(header)) = (self.receive_lib.as_ref(), self.get_header()) { + verify(receive_lib, header, message_hash.as_ref(), required_confirmations).await; + } + } + + pub async fn listen(&mut self) -> Result<()> { + // Create the WS subscriptions for listening to the events. + let (_provider, mut endpoint_stream, mut sendlib_stream) = build_dvn_subscriptions(&self.config).await?; + + // Create an HTTP provider to call contract functions. + let http_provider = get_http_provider(&self.config)?; + + // Get the relevant contract ABI, and create contract. + let abi = get_abi_from_path("offchain/abi/ReceiveLibUln302.json")?; + self.receive_lib = Some(create_contract_instance( + self.config.receivelib_uln302_addr, + http_provider, + abi, + )?); + + // Start listening for events + info!("Listening to chain events..."); + self.listening(); + + loop { + tokio::select! { + // From the LayerZeroV2 Endpoint, we need the event `PacketSent`, which contains information about the message to be sent. + Some(log) = endpoint_stream.next() => { + self.endpoint_log_logic(&log); + }, + // From the SendLib, we need the event which triggers the verification: `DVNFeePaid`. + Some(log) = sendlib_stream.next() => { + self.sendlib_log_logic(&log).await; + }, + } + } + } + + /// Run the corresponding logic when receiving a [`Log`] from the LayerZero endpoint. + fn endpoint_log_logic(&mut self, log: &Log) { + match log.log_decode::() { + Err(e) => { + error!("Received a `PacketSent` event but failed to decode it: {:?}", e); + } + Ok(inner_log) => { + debug!("PacketSent event found and decoded."); + self.packet_received(inner_log.data().clone()); + } + } + } + + /// Run the corresponding logic when receiving a [`Log`] from the SendLib. + async fn sendlib_log_logic(&mut self, log: &Log) { + match log.log_decode::() { + Err(e) => { + error!("Received a `DVNFeePaid` event but failed to decode it: {:?}", e); + } + Ok(inner_log) if self.packet.is_some() => { + info!("`DVNFeePaid` event decoded, `Packet` present."); + + let required_dvns = &inner_log.inner.requiredDVNs; + let own_dvn_addr = self.config.dvn_addr; + + if required_dvns.contains(&own_dvn_addr) { + debug!("Found DVN in required DVNs."); + + // Query how many confirmations are required. + let remote_eid = U256::from(self.config.target_network_eid); + + let Some(receive_lib) = &self.receive_lib else { + error!("No `ReceiveLib` contract present in worker to query confirmations"); + return; + }; + + let Ok(required_confirmations) = query_confirmations(&receive_lib, remote_eid).await else { + error!("Cannot query `requiredConfirmations` from `ReceiveLib` contract"); + return; + }; + + // Prepare the header hash. + let header_hash = self.get_header_hash(); + // Prepare the payload hash. + let message_hash = self.get_message_hash(); + + // Check if the info from the payload could have been extracted. + match (header_hash, message_hash) { + (_, None) => { + error!("Cannot hash payload"); + } + (None, _) => { + error!("Cannot hash message"); + } + (Some(header_hash), Some(message_hash)) => { + let already_verified = query_already_verified( + receive_lib, + own_dvn_addr, + header_hash.as_ref(), + message_hash.as_ref(), + required_confirmations, + ) + .await; + + if !already_verified { + let _ = self.verify_message(&log, message_hash, required_confirmations).await; + } + } + } + } else { + debug!("DVN not required"); + self.reset_packet(); + } + } + Ok(_) => { + warn!("Received a `DVNFeePaid` event but don't have information about the `Packet` to be verified"); + } + } + } +} diff --git a/workers/src/executor_def.rs b/offchain/src/workers/executor.rs similarity index 87% rename from workers/src/executor_def.rs rename to offchain/src/workers/executor.rs index aad5923c..100aead3 100644 --- a/workers/src/executor_def.rs +++ b/offchain/src/workers/executor.rs @@ -1,19 +1,19 @@ -use crate::abi::L0V2EndpointAbi::PacketSent; -use crate::abi::L0V2EndpointAbi::PacketVerified; -use crate::abi::SendLibraryAbi::ExecutorFeePaid; -use crate::chain::connections::build_executor_subscriptions; -use crate::chain::connections::get_abi_from_path; -use crate::chain::connections::get_http_provider; -use crate::chain::contracts::create_contract_instance; -use crate::chain::contracts::{lz_receive, prepare_header}; -use crate::chain::ContractInst; -use crate::config::DVNConfig; -use alloy::dyn_abi::DynSolValue; -use alloy::primitives::U256; +use crate::{ + abi::{ + L0V2EndpointAbi::{PacketSent, PacketVerified}, + SendLibraryAbi::ExecutorFeePaid, + }, + chain::{ + connections::{build_executor_subscriptions, get_abi_from_path, get_http_provider}, + contracts::{create_contract_instance, lz_receive, prepare_header}, + ContractInst, + }, + config::WorkerConfig, +}; +use alloy::{dyn_abi::DynSolValue, primitives::U256}; use eyre::Result; use futures::StreamExt; -use std::collections::VecDeque; -use std::time::Duration; +use std::{collections::VecDeque, time::Duration}; use tokio::time::sleep; use tracing::{debug, error}; @@ -26,7 +26,7 @@ pub enum ExecutionState { } pub struct NFFLExecutor { - config: DVNConfig, + config: WorkerConfig, packet_queue: VecDeque, finish: bool, } @@ -34,7 +34,7 @@ pub struct NFFLExecutor { impl NFFLExecutor { pub(crate) const MAX_EXECUTE_ATTEMPTS: usize = 10; - pub fn new(config: DVNConfig) -> Self { + pub fn new(config: WorkerConfig) -> Self { NFFLExecutor { config, packet_queue: VecDeque::new(), @@ -48,10 +48,11 @@ impl NFFLExecutor { } pub async fn listen(&mut self) -> Result<()> { - let (mut ps_stream, mut ef_stream, mut pv_stream) = build_executor_subscriptions(&self.config).await?; + let (_provider, mut ps_stream, mut ef_stream, mut pv_stream) = + build_executor_subscriptions(&self.config).await?; let http_provider = get_http_provider(&self.config)?; - let l0_abi = get_abi_from_path("./abi/L0V2Endpoint.json")?; + let l0_abi = get_abi_from_path("offchain/abi/L0V2Endpoint.json")?; // Create a contract instance. let contract = create_contract_instance(self.config.l0_endpoint_addr, http_provider, l0_abi)?; diff --git a/offchain/src/workers/mod.rs b/offchain/src/workers/mod.rs new file mode 100644 index 00000000..f76b8db5 --- /dev/null +++ b/offchain/src/workers/mod.rs @@ -0,0 +1,2 @@ +pub mod dvn; +pub mod executor; diff --git a/offchain/tests/executor.rs b/offchain/tests/executor.rs new file mode 100644 index 00000000..ffdb3042 --- /dev/null +++ b/offchain/tests/executor.rs @@ -0,0 +1,111 @@ +use alloy::{ + primitives::{address, Address, Bytes, FixedBytes}, + providers::ProviderBuilder, +}; +use axum::{routing::post, Json, Router}; +use offchain::{ + abi::L0V2EndpointAbi::{Origin, PacketSent, PacketVerified}, + chain::{connections::get_abi_from_path, contracts::create_contract_instance, ContractInst}, + workers::executor::NFFLExecutor, +}; +use std::{ + collections::VecDeque, + sync::atomic::{AtomicI32, Ordering}, + sync::Arc, +}; +use tokio::task::JoinHandle; +use tracing::{debug, level_filters::LevelFilter}; +use tracing_subscriber::EnvFilter; + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct EthCallRequest { + method: String, + params: Vec, + id: u32, + jsonrpc: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct EthCallResponse { + result: String, + id: u32, + jsonrpc: String, +} + +#[tokio::test] +async fn test_handle_verified_packet_success() -> eyre::Result<()> { + // Initialize tracing + tracing_subscriber::fmt() + .with_target(true) + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::DEBUG.into()) + .from_env_lossy(), + ) + .init(); + + let counter: Arc = Arc::new(AtomicI32::new(0)); + let mut queue: VecDeque = VecDeque::new(); + let verified_packet = PacketVerified { + origin: Origin { + srcEid: 1, + sender: FixedBytes::from(&[1; 32]), + nonce: 101010, + }, + receiver: Address::from_slice(&[1; 20]), + payloadHash: FixedBytes::from(&[2; 32]), + }; + + let _join_handle = prepare_server(counter.clone()).await; + let contract = setup_contract(&mut queue).await?; + + NFFLExecutor::handle_verified_packet(&contract, &mut queue, &verified_packet).await?; + + assert_eq!(counter.load(Ordering::Acquire), 2); + Ok(()) +} + +async fn prepare_server(counter: Arc) -> JoinHandle<()> { + const SERVER_ADDRESS_SHORT: &str = "127.0.0.1:8081"; + + // Define the handler for the POST request. + let app = Router::new().route( + "/", + post(|| async move { + debug!("Server : POST request accepted"); + counter.fetch_add(1, Ordering::Release); + Json(EthCallResponse { + result: "0x0000000000000000000000000000000000000000000000000000000000000002".to_string(), + id: 1, + jsonrpc: "2.0".to_string(), + }) + }), + ); + // Spawn the server on a background task. + let listener = tokio::net::TcpListener::bind(SERVER_ADDRESS_SHORT).await.unwrap(); + + debug!("Listening on {}", listener.local_addr().unwrap()); + + tokio::spawn(async move { axum::serve(listener, app).await.unwrap() }) +} + +async fn setup_contract(packet_sent_queue: &mut VecDeque) -> eyre::Result { + const SERVER_ADDRESS: &str = "http://127.0.0.1:8081"; + + let http_provider = ProviderBuilder::new().on_http(SERVER_ADDRESS.parse()?); + let l0_abi = get_abi_from_path("offchain/abi/L0V2Endpoint.json")?; + + debug!("{:?}", l0_abi.functions.iter().map(|f| f.0).collect::>()); + + packet_sent_queue.push_back(PacketSent { + encodedPayload: Bytes::from(&[1; 256]), + options: Bytes::from(&[1; 32]), + sendLibrary: Address::from_slice(&[2; 20]), + }); + + create_contract_instance( + address!("d8da6bf26964af9d7eed9e03e53415d37aa96045"), + http_provider, + l0_abi, + ) +} diff --git a/offchain/workers_config.toml b/offchain/workers_config.toml new file mode 100644 index 00000000..d00cdac4 --- /dev/null +++ b/offchain/workers_config.toml @@ -0,0 +1,10 @@ +WS_RPC_URL = "wss://arbitrum-mainnet.infura.io/ws/v3/a7d4a3dd6f774049bce5d61651549421" +HTTP_RPC_URL = "https://arbitrum-mainnet.infura.io/v3/a7d4a3dd6f774049bce5d61651549421" +L0_ENDPOINT_ADDR = "0x1a44076050125825900e736c501f859c50fE728c" +SENDLIB_ULN302_ADDR = "0x975bcd720be66659e3eb3c0e4f1866a3020e493a" +RECEIVELIB_ULN302_ADDR = "0x7B9E184e07a6EE1aC23eAe0fe8D6Be2f663f05e6" +SENDLIB_ULN301_ADDR = "0x5cdc927876031b4ef910735225c425a7fc8efed9" +RECEIVELIB_ULN301_ADDR = "0xe4DD168822767C4342e54e6241f0b91DE0d3c241" +TARGET_NETWORK_EID = 40231 +AGGREGATOR_URL = "127.0.0.1:8081" +DVN_ADDR = "0xfaa40ad1c3c248b1bdd551b3f7785c85619ec5d9" diff --git a/workers/src/bin/dvn.rs b/workers/src/bin/dvn.rs deleted file mode 100644 index 753c6d9d..00000000 --- a/workers/src/bin/dvn.rs +++ /dev/null @@ -1,135 +0,0 @@ -//! Main off-chain workflow for Nuff DVN. - -use alloy::primitives::U256; -use eyre::{OptionExt, Result}; -use futures::stream::StreamExt; -use tracing::{debug, error, info, warn}; -use tracing_subscriber::EnvFilter; -use workers::data::dvn::Dvn; -use workers::verifier::NFFLVerifier; -use workers::{ - abi::{L0V2EndpointAbi::PacketSent, SendLibraryAbi::DVNFeePaid}, - chain::{ - connections::{build_subscriptions, get_abi_from_path, get_http_provider}, - contracts::{create_contract_instance, query_already_verified, query_confirmations, verify}, - }, -}; - -#[tokio::main] -async fn main() -> Result<()> { - // Initialize tracing - tracing_subscriber::fmt() - .with_target(false) - .with_env_filter(EnvFilter::from_default_env()) - .init(); - - let mut dvn_data = Dvn::new_from_env()?; - let verifier = NFFLVerifier::new_from_config(&dvn_data.config).await?; - - // Create the WS subscriptions for listening to the events. - let (_provider, mut endpoint_stream, mut sendlib_stream) = build_subscriptions(&dvn_data.config).await?; - - // Create an HTTP provider to call contract functions. - let http_provider = get_http_provider(&dvn_data.config)?; - - // Get the relevant contract ABI, and create contract. - let receivelib_abi = get_abi_from_path("./abi/ReceiveLibUln302.json")?; - let receivelib_contract = - create_contract_instance(dvn_data.config.receivelib_uln302_addr, http_provider, receivelib_abi)?; - - info!("Listening to chain events..."); - - // FIXME: refactor the operations from this loop into smaller, testable containers. - loop { - dvn_data.listening(); - tokio::select! { - Some(log) = endpoint_stream.next() => { - match log.log_decode::() { - Err(e) => { - error!("Received a `PacketSent` event but failed to decode it: {:?}", e); - } - Ok(inner_log) => { - debug!("PacketSent event found and decoded."); - dvn_data.packet_received(inner_log.data().clone()); - }, - } - } - Some(log) = sendlib_stream.next() => { - match log.log_decode::() { - Err(e) => { - error!("Received a `DVNFeePaid` event but failed to decode it: {:?}", e); - } - Ok(inner_log) if dvn_data.packet.is_some() => { - info!("DVNFeePaid event found and decoded."); - let required_dvns = &inner_log.inner.requiredDVNs; - let own_dvn_addr = dvn_data.config.dvn_addr; - - if required_dvns.contains(&own_dvn_addr) { - debug!("Found DVN in required DVNs."); - - // Query how many confirmations are required. - let eid = U256::from(dvn_data.config.network_eid); - let required_confirmations = query_confirmations(&receivelib_contract, eid).await?; - - // Prepare the header hash. - let header_hash = dvn_data.get_header_hash(); - // Prepate the payload hash. - let message_hash = dvn_data.get_message_hash(); - - // Check if the info from the payload could have been extracted. - match (header_hash, message_hash) { - (Some(header_hash), Some(message_hash)) => { - let already_verified = query_already_verified( - &receivelib_contract, - own_dvn_addr, - header_hash.as_ref(), - message_hash.as_ref(), - required_confirmations, - ) - .await?; - - if already_verified { - debug!("Packet already verified."); - } else { - dvn_data.verifying(); - debug!("Packet NOT verified. Calling verification."); - - if log.block_number.is_none() { - error!("Block number is None, can't verify Packet."); - continue; - } - - if !verifier.verify(log.block_number.unwrap()).await? { - error!("Failed to verify the state root."); - continue; - } - - verify( - &receivelib_contract, - dvn_data.get_header().ok_or_eyre("Cannot extract header from payload")?, - message_hash.as_ref(), - required_confirmations, - ).await?; - } - } - (_, None) => { - error!("Cannot hash payload"); - } - (None, _) => { - error!("Cannot hash message"); - } - } - } else { - dvn_data.reset_packet(); - } - - } - Ok(_)=> { - warn!("Received a `DVNFeePaid` event but don't have information about the `Packet` to be verified"); - } - } - }, - } - dvn_data.reset_packet(); - } -} diff --git a/workers/src/data/dvn.rs b/workers/src/data/dvn.rs deleted file mode 100644 index 8bb28991..00000000 --- a/workers/src/data/dvn.rs +++ /dev/null @@ -1,88 +0,0 @@ -use crate::{ - abi::L0V2EndpointAbi::PacketSent, - config::{self, DVNConfig}, - data::packet_v1_codec::{header, message}, -}; -use alloy::primitives::{keccak256, B256}; -use eyre::{eyre, Result}; -use tracing::debug; - -pub struct Dvn { - pub config: DVNConfig, - pub status: DvnStatus, - pub packet: Option, -} - -pub enum DvnStatus { - Stopped, - Listening, - PacketReceived, - Verifying, -} - -impl Dvn { - pub fn new(config: DVNConfig) -> Self { - Self { - config, - status: DvnStatus::Stopped, - packet: None, - } - } - - pub fn new_from_env() -> Result { - Ok(Dvn::new(config::DVNConfig::load_from_env()?)) - } - - pub fn listening(&mut self) { - self.status = DvnStatus::Listening; - } - - pub fn packet_received(&mut self, packet: PacketSent) { - self.packet = Some(packet); - self.status = DvnStatus::PacketReceived; - } - - pub fn reset_packet(&mut self) { - self.packet = None; - self.status = DvnStatus::Listening; - debug!("DVN not required, stored packet dropped") - } - - pub fn verifying(&mut self) { - self.status = DvnStatus::Verifying; - } - - pub fn get_header(&self) -> Option<&[u8]> { - if let Some(packet) = self.packet.as_ref() { - Some(header(packet.encodedPayload.as_ref())) - } else { - None - } - } - - pub fn get_header_hash(&self) -> Option { - self.packet - .as_ref() - .map(|packet| keccak256(header(packet.encodedPayload.as_ref()))) - } - pub fn get_header_hash_result(&self) -> Result { - if let Some(packet) = self.packet.as_ref() { - Ok(keccak256(header(packet.encodedPayload.as_ref()))) - } else { - Err(eyre!("There's no header to hash")) - } - } - - pub fn get_message_hash(&self) -> Option { - self.packet - .as_ref() - .map(|packet| keccak256(message(packet.encodedPayload.as_ref()))) - } - pub fn get_message_hash_result(&self) -> Result { - if let Some(packet) = self.packet.as_ref() { - Ok(keccak256(message(packet.encodedPayload.as_ref()))) - } else { - Err(eyre!("There's no message to hash")) - } - } -} diff --git a/workers/src/lib.rs b/workers/src/lib.rs deleted file mode 100644 index 664ab4ea..00000000 --- a/workers/src/lib.rs +++ /dev/null @@ -1,8 +0,0 @@ -//! Tools to build offchain workers for the LayerZero protocol. - -pub mod abi; -pub mod chain; -pub mod config; -pub mod data; -pub mod executor_def; -pub mod verifier; diff --git a/workers/tests/executor_it.rs b/workers/tests/executor_it.rs deleted file mode 100644 index 93a35d44..00000000 --- a/workers/tests/executor_it.rs +++ /dev/null @@ -1,112 +0,0 @@ -#[cfg(test)] -mod tests { - use alloy::primitives::{address, Address, Bytes, FixedBytes}; - use alloy::providers::ProviderBuilder; - - use axum::{routing::post, Json, Router}; - use std::collections::VecDeque; - use std::sync::atomic::{AtomicI32, Ordering}; - use std::sync::Arc; - use tokio::task::JoinHandle; - use tracing::debug; - use tracing::level_filters::LevelFilter; - use tracing_subscriber::EnvFilter; - use workers::abi::L0V2EndpointAbi::{Origin, PacketSent, PacketVerified}; - use workers::chain::connections::get_abi_from_path; - use workers::chain::contracts::create_contract_instance; - use workers::chain::ContractInst; - use workers::executor_def::NFFLExecutor; - - #[derive(serde::Serialize, serde::Deserialize, Debug)] - struct EthCallRequest { - method: String, - params: Vec, - id: u32, - jsonrpc: String, - } - - #[derive(serde::Serialize, serde::Deserialize, Debug)] - struct EthCallResponse { - result: String, - id: u32, - jsonrpc: String, - } - - #[tokio::test] - async fn test_handle_verified_packet_success() -> eyre::Result<()> { - // Initialize tracing - tracing_subscriber::fmt() - .with_target(true) - .with_env_filter( - EnvFilter::builder() - .with_default_directive(LevelFilter::DEBUG.into()) - .from_env_lossy(), - ) - .init(); - - let counter: Arc = Arc::new(AtomicI32::new(0)); - let mut queue: VecDeque = VecDeque::new(); - let verified_packet = PacketVerified { - origin: Origin { - srcEid: 1, - sender: FixedBytes::from(&[1; 32]), - nonce: 101010, - }, - receiver: Address::from_slice(&[1; 20]), - payloadHash: FixedBytes::from(&[2; 32]), - }; - - let _join_handle = prepare_server(counter.clone()).await; - let contract = setup_contract(&mut queue).await?; - - NFFLExecutor::handle_verified_packet(&contract, &mut queue, &verified_packet).await?; - - assert_eq!(counter.load(Ordering::Acquire), 2); - Ok(()) - } - - async fn prepare_server(counter: Arc) -> JoinHandle<()> { - const SERVER_ADDRESS_SHORT: &str = "127.0.0.1:8081"; - - // Define the handler for the POST request. - let app = Router::new().route( - "/", - post(|| async move { - debug!("Server : POST request accepted"); - counter.fetch_add(1, Ordering::Release); - Json(EthCallResponse { - result: "0x0000000000000000000000000000000000000000000000000000000000000002".to_string(), - id: 1, - jsonrpc: "2.0".to_string(), - }) - }), - ); - // Spawn the server on a background task. - let listener = tokio::net::TcpListener::bind(SERVER_ADDRESS_SHORT).await.unwrap(); - - debug!("Listening on {}", listener.local_addr().unwrap()); - - tokio::spawn(async move { axum::serve(listener, app).await.unwrap() }) - } - - async fn setup_contract(packet_sent_queue: &mut VecDeque) -> eyre::Result { - const SERVER_ADDRESS: &str = "http://127.0.0.1:8081"; - - let http_provider = ProviderBuilder::new().on_http(SERVER_ADDRESS.parse()?); - let l0_abi = get_abi_from_path("./abi/L0V2Endpoint.json")?; - - debug!("{:?}", l0_abi.functions.iter().map(|f| f.0).collect::>()); - - packet_sent_queue.push_back(PacketSent { - encodedPayload: Bytes::from(&[1; 256]), - options: Bytes::from(&[1; 32]), - sendLibrary: Address::from_slice(&[2; 20]), - }); - - create_contract_instance( - address!("d8da6bf26964af9d7eed9e03e53415d37aa96045"), - http_provider, - l0_abi, - ) - } -}