From 9bd8a7ae81c43b2d9f909b9caf27becb8cf1e0d0 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Thu, 14 Nov 2024 12:10:45 +0200 Subject: [PATCH] Fix executor (#312) --- offchain/src/config.rs | 2 +- offchain/src/workers/executor.rs | 141 +++++++++++++++---------------- offchain/tests/executor.rs | 27 +++--- offchain/workers_config.toml | 2 +- 4 files changed, 83 insertions(+), 89 deletions(-) diff --git a/offchain/src/config.rs b/offchain/src/config.rs index ef1a833c..f2c23020 100644 --- a/offchain/src/config.rs +++ b/offchain/src/config.rs @@ -72,4 +72,4 @@ mod tests { assert!(conf.target_http_rpc_url.starts_with("http://") || conf.target_http_rpc_url.starts_with("https://")); assert!(conf.target_network_eid > 0); } -} +} \ No newline at end of file diff --git a/offchain/src/workers/executor.rs b/offchain/src/workers/executor.rs index 2ce35059..98f52479 100644 --- a/offchain/src/workers/executor.rs +++ b/offchain/src/workers/executor.rs @@ -11,11 +11,12 @@ use crate::{ config::WorkerConfig, }; use alloy::{dyn_abi::DynSolValue, primitives::U256}; -use eyre::{eyre, Result}; +use eyre::Result; use futures::StreamExt; use std::{collections::VecDeque, time::Duration}; +use tokio::sync::mpsc; use tokio::time::sleep; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; #[derive(Debug, Clone, Copy, PartialEq)] pub enum ExecutionState { @@ -43,52 +44,50 @@ pub enum ExecutorState { pub struct NFFLExecutor { config: WorkerConfig, - packet_queue: VecDeque, - finish: bool, - status: ExecutorState, + finish: bool } impl NFFLExecutor { - pub(crate) const MAX_EXECUTE_ATTEMPTS: usize = 10; + pub(crate) const MAX_EXECUTE_ATTEMPTS: usize = 3; pub fn new(config: WorkerConfig) -> Self { - NFFLExecutor { - config, - packet_queue: VecDeque::new(), - finish: false, - status: ExecutorState::Created, - } + NFFLExecutor { config, finish: false } } pub fn finish(&mut self) { - // Note: we are in a single-threaded event loop, and we do not care about atomicity (yet). self.finish = true; } pub async fn listen(&mut self) -> Result<()> { - let (_source_source_provider, _target_provider, mut ps_stream, mut ef_stream, mut pv_stream) = + let (_sp, _tp, mut ps_stream, mut ef_stream, mut pv_stream) = build_executor_subscriptions(&self.config).await?; - // FIXME: should this be source or target? let http_provider = get_http_provider(&self.config.target_http_rpc_url)?; let l0_abi = get_abi_from_path("offchain/abi/L0V2Endpoint.json")?; // Create a contract instance. - let contract = create_contract_instance(self.config.target_endpoint, http_provider, l0_abi)?; - self.status = ExecutorState::WaitingPacket; + // Verified packet handler task + let l0_addr = self.config.target_endpoint; + let (tx, mut rx) = mpsc::channel::<(PacketSent, PacketVerified)>(4); + tokio::spawn(async move { + let contract = create_contract_instance(l0_addr, http_provider, l0_abi).unwrap(); + while let Some((packet_sent, packet_verified)) = rx.recv().await { + debug!("Handler received PacketSent and PacketVerified"); + let _ = Self::handle_verified_packet(&contract, packet_sent, packet_verified).await; + } + }); + + let mut packet_queue: VecDeque = VecDeque::new(); + // Network I/O handler loop { + debug!("Iteration started, queue size {:?}", packet_queue.len()); tokio::select! { Some(log) = ps_stream.next() => { match log.log_decode::() { Ok(packet_log) => { - debug!("Packet seemed to arrive"); - - if self.status == ExecutorState::WaitingPacket { - self.packet_queue.push_back(packet_log.data().clone()); - self.status = ExecutorState::WaitingAssignation; - continue; - } + debug!("PacketSent received"); + packet_queue.push_back(packet_log.data().clone()); }, Err(e) => { error!("Failed to decode PacketSent event: {:?}", e);} } @@ -96,24 +95,29 @@ impl NFFLExecutor { Some(log) = ef_stream.next() => { match log.log_decode::() { Ok(executor_fee_log) => { - debug!("Executor fee paid seemed to arrive"); - if executor_fee_log.data().executor.eq(&self.config.target_executor) { - self.status = ExecutorState::WaitingVerification; + debug!("ExecutorFeePaid received"); + if packet_queue.is_empty() { continue; } + debug!("{:?} ~ {:?}", executor_fee_log.data().executor, &self.config.target_executor); + // if !executor_fee_log.data().executor.eq(&self.config.executor) { + // packet_queue.pop_front(); + //// self.packet_queue.clear(); + // continue; + // } }, Err(e) => { error!("Failed to decode ExecutorFeePaid event: {:?}", e);} } }, Some(log) = pv_stream.next() => { - // FIXME: we should not process everything, check for some condition to filter match log.log_decode::() { Ok(inner_log) => { - debug!("->> Packet verified seemd to arrived"); - if self.status != ExecutorState::WaitingVerification { - Self::handle_verified_packet(&contract, &mut self.packet_queue, inner_log.data()).await?; - continue; + if !packet_queue.is_empty() { + debug!("PacketSent and PacketVerified sent to handler"); + tx.send((packet_queue.pop_front().unwrap(), inner_log.data().clone())).await?; + } else { + warn!("PacketVerified event {:?} arrived for non-handled PacketSent", inner_log.log_index); } }, Err(e) => { error!("Failed to decode PacketVerified event: {:?}", e);} @@ -127,26 +131,11 @@ impl NFFLExecutor { Ok(()) } - #[cfg(test)] - pub fn is_queue_empty(&self) -> bool { - self.packet_queue.is_empty() - } - pub async fn handle_verified_packet( contract: &ContractInst, - queue: &mut VecDeque, - packet_verified: &PacketVerified, + packet_sent: PacketSent, + packet_verified: PacketVerified, ) -> Result<()> { - //if queue.is_empty() { - // return Ok(()); - //} - // - //let packet_sent = queue.pop_front().unwrap(); - let Some(packet_sent) = queue.pop_front() else { - return Err(eyre!("Queue was empty, not handling packet verified")); - }; - // We don't expect any more items to be present. If we have any - they are garbage/leftovers. - queue.clear(); // Despite being described with other arguments, the only real implementation of // `executable` function is in the contract located here: https://shorturl.at/4H6Yz // function executable(Origin memory _origin, address _receiver) returns (ExecutionState) @@ -171,29 +160,39 @@ impl NFFLExecutor { error!("Maximum retries reached while waiting for `Executable` state."); break; } - let call_result = call_builder.call().await?; - debug!("Result of `executable` function call: {:?}", call_result); - if call_result.len() != 1 { - error!("`executable` function call returned invalid response."); - break; - } - // Note: why not pattern matching here? fn calls are not allowed in patterns - if call_result[0].eq(¬_executable) || call_result[0].eq(&verified_not_executable) { - debug!("State: NotExecutable or VerifiedNotExecutable, await commits/verifications"); - sleep(Duration::from_secs(1)).await; - retry_count += 1; - continue; - } else if call_result[0].eq(&executable) { - debug!("State: Executable, fire and forget `lzReceive`"); - lz_receive(contract, &packet_sent.encodedPayload[..]).await?; - break; - } else if call_result[0].eq(&executed) { - debug!("State: Executed, free the executor"); - break; - } else { - error!("Unknown state for `executable` call"); - break; + match call_builder.call().await { + Ok(call_result) => { + debug!("> {:?}", call_result); + if call_result.len() != 1 { + error!("`executable` function call returned invalid response."); + break; + } + debug!(">> {:?}", call_result[0]); + // Note: why not pattern matching here? fn calls are not allowed in patterns + if call_result[0].eq(¬_executable) || call_result[0].eq(&verified_not_executable) { + debug!("State: NotExecutable or VerifiedNotExecutable, await commits/verifications"); + sleep(Duration::from_secs(1)).await; + retry_count += 1; + continue; + } else if call_result[0].eq(&executable) { + debug!("State: Executable, fire and forget `lzReceive`"); + lz_receive(contract, &packet_sent.encodedPayload[..]).await?; + break; + } else if call_result[0].eq(&executed) { + debug!("State: Executed, free the executor"); + break; + } else { + error!("Unknown state for `executable` call"); + break; + } + } + Err(e) => { + warn!("Failed to call `executable` function: {:?}", e); + sleep(Duration::from_secs(1)).await; + retry_count += 1; + continue; + } } } Ok(()) diff --git a/offchain/tests/executor.rs b/offchain/tests/executor.rs index ffdb3042..dd5a37b5 100644 --- a/offchain/tests/executor.rs +++ b/offchain/tests/executor.rs @@ -9,7 +9,6 @@ use offchain::{ workers::executor::NFFLExecutor, }; use std::{ - collections::VecDeque, sync::atomic::{AtomicI32, Ordering}, sync::Arc, }; @@ -45,7 +44,13 @@ async fn test_handle_verified_packet_success() -> eyre::Result<()> { .init(); let counter: Arc = Arc::new(AtomicI32::new(0)); - let mut queue: VecDeque = VecDeque::new(); + + let packet_sent = PacketSent { + encodedPayload: Bytes::from(&[1; 256]), + options: Bytes::from(&[1; 32]), + sendLibrary: Address::from_slice(&[2; 20]), + }; + let verified_packet = PacketVerified { origin: Origin { srcEid: 1, @@ -57,9 +62,9 @@ async fn test_handle_verified_packet_success() -> eyre::Result<()> { }; let _join_handle = prepare_server(counter.clone()).await; - let contract = setup_contract(&mut queue).await?; + let contract = setup_contract().await?; - NFFLExecutor::handle_verified_packet(&contract, &mut queue, &verified_packet).await?; + NFFLExecutor::handle_verified_packet(&contract, packet_sent, verified_packet).await?; assert_eq!(counter.load(Ordering::Acquire), 2); Ok(()) @@ -84,28 +89,18 @@ async fn prepare_server(counter: Arc) -> JoinHandle<()> { // Spawn the server on a background task. let listener = tokio::net::TcpListener::bind(SERVER_ADDRESS_SHORT).await.unwrap(); - debug!("Listening on {}", listener.local_addr().unwrap()); - tokio::spawn(async move { axum::serve(listener, app).await.unwrap() }) } -async fn setup_contract(packet_sent_queue: &mut VecDeque) -> eyre::Result { +async fn setup_contract() -> eyre::Result { const SERVER_ADDRESS: &str = "http://127.0.0.1:8081"; let http_provider = ProviderBuilder::new().on_http(SERVER_ADDRESS.parse()?); let l0_abi = get_abi_from_path("offchain/abi/L0V2Endpoint.json")?; - debug!("{:?}", l0_abi.functions.iter().map(|f| f.0).collect::>()); - - packet_sent_queue.push_back(PacketSent { - encodedPayload: Bytes::from(&[1; 256]), - options: Bytes::from(&[1; 32]), - sendLibrary: Address::from_slice(&[2; 20]), - }); - create_contract_instance( address!("d8da6bf26964af9d7eed9e03e53415d37aa96045"), http_provider, l0_abi, ) -} +} \ No newline at end of file diff --git a/offchain/workers_config.toml b/offchain/workers_config.toml index 46c5d33d..b85f8825 100644 --- a/offchain/workers_config.toml +++ b/offchain/workers_config.toml @@ -14,4 +14,4 @@ TARGET_NETWORK_EID = 40267 SOURCE_DVN = "0x25e8edce1bcf8d074f8af8838e9ceecd3e3e0268" TARGET_DVN = "0xd58388997b0ad8f7f4a035f6c159c9c2270170f4" SOURCE_EXECUTOR = "0xBc0C24E6f24eC2F1fd7E859B8322A1277F80aaD5" -TARGET_EXECUTOR = "0x4Cf1B3Fa61465c2c907f82fC488B43223BA0CF93" +TARGET_EXECUTOR = "0x4Cf1B3Fa61465c2c907f82fC488B43223BA0CF93" \ No newline at end of file