Skip to content

Commit

Permalink
Fix executor (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fly-Style authored Nov 14, 2024
1 parent 1edc562 commit 9bd8a7a
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 89 deletions.
2 changes: 1 addition & 1 deletion offchain/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ mod tests {
assert!(conf.target_http_rpc_url.starts_with("http://") || conf.target_http_rpc_url.starts_with("https://"));
assert!(conf.target_network_eid > 0);
}
}
}
141 changes: 70 additions & 71 deletions offchain/src/workers/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use crate::{
config::WorkerConfig,
};
use alloy::{dyn_abi::DynSolValue, primitives::U256};
use eyre::{eyre, Result};
use eyre::Result;
use futures::StreamExt;
use std::{collections::VecDeque, time::Duration};
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::{debug, error};
use tracing::{debug, error, warn};

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ExecutionState {
Expand Down Expand Up @@ -43,77 +44,80 @@ pub enum ExecutorState {

pub struct NFFLExecutor {
config: WorkerConfig,
packet_queue: VecDeque<PacketSent>,
finish: bool,
status: ExecutorState,
finish: bool
}

impl NFFLExecutor {
pub(crate) const MAX_EXECUTE_ATTEMPTS: usize = 10;
pub(crate) const MAX_EXECUTE_ATTEMPTS: usize = 3;

pub fn new(config: WorkerConfig) -> Self {
NFFLExecutor {
config,
packet_queue: VecDeque::new(),
finish: false,
status: ExecutorState::Created,
}
NFFLExecutor { config, finish: false }
}

pub fn finish(&mut self) {
// Note: we are in a single-threaded event loop, and we do not care about atomicity (yet).
self.finish = true;
}

pub async fn listen(&mut self) -> Result<()> {
let (_source_source_provider, _target_provider, mut ps_stream, mut ef_stream, mut pv_stream) =
let (_sp, _tp, mut ps_stream, mut ef_stream, mut pv_stream) =
build_executor_subscriptions(&self.config).await?;

// FIXME: should this be source or target?
let http_provider = get_http_provider(&self.config.target_http_rpc_url)?;
let l0_abi = get_abi_from_path("offchain/abi/L0V2Endpoint.json")?;
// Create a contract instance.
let contract = create_contract_instance(self.config.target_endpoint, http_provider, l0_abi)?;

self.status = ExecutorState::WaitingPacket;
// Verified packet handler task
let l0_addr = self.config.target_endpoint;
let (tx, mut rx) = mpsc::channel::<(PacketSent, PacketVerified)>(4);
tokio::spawn(async move {
let contract = create_contract_instance(l0_addr, http_provider, l0_abi).unwrap();
while let Some((packet_sent, packet_verified)) = rx.recv().await {
debug!("Handler received PacketSent and PacketVerified");
let _ = Self::handle_verified_packet(&contract, packet_sent, packet_verified).await;
}
});

let mut packet_queue: VecDeque<PacketSent> = VecDeque::new();

// Network I/O handler
loop {
debug!("Iteration started, queue size {:?}", packet_queue.len());
tokio::select! {
Some(log) = ps_stream.next() => {
match log.log_decode::<PacketSent>() {
Ok(packet_log) => {
debug!("Packet seemed to arrive");

if self.status == ExecutorState::WaitingPacket {
self.packet_queue.push_back(packet_log.data().clone());
self.status = ExecutorState::WaitingAssignation;
continue;
}
debug!("PacketSent received");
packet_queue.push_back(packet_log.data().clone());
},
Err(e) => { error!("Failed to decode PacketSent event: {:?}", e);}
}
}
Some(log) = ef_stream.next() => {
match log.log_decode::<ExecutorFeePaid>() {
Ok(executor_fee_log) => {
debug!("Executor fee paid seemed to arrive");
if executor_fee_log.data().executor.eq(&self.config.target_executor) {
self.status = ExecutorState::WaitingVerification;
debug!("ExecutorFeePaid received");
if packet_queue.is_empty() {
continue;
}

debug!("{:?} ~ {:?}", executor_fee_log.data().executor, &self.config.target_executor);
// if !executor_fee_log.data().executor.eq(&self.config.executor) {
// packet_queue.pop_front();
//// self.packet_queue.clear();
// continue;
// }
},
Err(e) => { error!("Failed to decode ExecutorFeePaid event: {:?}", e);}
}
},
Some(log) = pv_stream.next() => {
// FIXME: we should not process everything, check for some condition to filter
match log.log_decode::<PacketVerified>() {
Ok(inner_log) => {
debug!("->> Packet verified seemd to arrived");
if self.status != ExecutorState::WaitingVerification {
Self::handle_verified_packet(&contract, &mut self.packet_queue, inner_log.data()).await?;
continue;
if !packet_queue.is_empty() {
debug!("PacketSent and PacketVerified sent to handler");
tx.send((packet_queue.pop_front().unwrap(), inner_log.data().clone())).await?;
} else {
warn!("PacketVerified event {:?} arrived for non-handled PacketSent", inner_log.log_index);
}
},
Err(e) => { error!("Failed to decode PacketVerified event: {:?}", e);}
Expand All @@ -127,26 +131,11 @@ impl NFFLExecutor {
Ok(())
}

#[cfg(test)]
pub fn is_queue_empty(&self) -> bool {
self.packet_queue.is_empty()
}

pub async fn handle_verified_packet(
contract: &ContractInst,
queue: &mut VecDeque<PacketSent>,
packet_verified: &PacketVerified,
packet_sent: PacketSent,
packet_verified: PacketVerified,
) -> Result<()> {
//if queue.is_empty() {
// return Ok(());
//}
//
//let packet_sent = queue.pop_front().unwrap();
let Some(packet_sent) = queue.pop_front() else {
return Err(eyre!("Queue was empty, not handling packet verified"));
};
// We don't expect any more items to be present. If we have any - they are garbage/leftovers.
queue.clear();
// Despite being described with other arguments, the only real implementation of
// `executable` function is in the contract located here: https://shorturl.at/4H6Yz
// function executable(Origin memory _origin, address _receiver) returns (ExecutionState)
Expand All @@ -171,29 +160,39 @@ impl NFFLExecutor {
error!("Maximum retries reached while waiting for `Executable` state.");
break;
}
let call_result = call_builder.call().await?;
debug!("Result of `executable` function call: {:?}", call_result);
if call_result.len() != 1 {
error!("`executable` function call returned invalid response.");
break;
}

// Note: why not pattern matching here? fn calls are not allowed in patterns
if call_result[0].eq(&not_executable) || call_result[0].eq(&verified_not_executable) {
debug!("State: NotExecutable or VerifiedNotExecutable, await commits/verifications");
sleep(Duration::from_secs(1)).await;
retry_count += 1;
continue;
} else if call_result[0].eq(&executable) {
debug!("State: Executable, fire and forget `lzReceive`");
lz_receive(contract, &packet_sent.encodedPayload[..]).await?;
break;
} else if call_result[0].eq(&executed) {
debug!("State: Executed, free the executor");
break;
} else {
error!("Unknown state for `executable` call");
break;
match call_builder.call().await {
Ok(call_result) => {
debug!("> {:?}", call_result);
if call_result.len() != 1 {
error!("`executable` function call returned invalid response.");
break;
}
debug!(">> {:?}", call_result[0]);
// Note: why not pattern matching here? fn calls are not allowed in patterns
if call_result[0].eq(&not_executable) || call_result[0].eq(&verified_not_executable) {
debug!("State: NotExecutable or VerifiedNotExecutable, await commits/verifications");
sleep(Duration::from_secs(1)).await;
retry_count += 1;
continue;
} else if call_result[0].eq(&executable) {
debug!("State: Executable, fire and forget `lzReceive`");
lz_receive(contract, &packet_sent.encodedPayload[..]).await?;
break;
} else if call_result[0].eq(&executed) {
debug!("State: Executed, free the executor");
break;
} else {
error!("Unknown state for `executable` call");
break;
}
}
Err(e) => {
warn!("Failed to call `executable` function: {:?}", e);
sleep(Duration::from_secs(1)).await;
retry_count += 1;
continue;
}
}
}
Ok(())
Expand Down
27 changes: 11 additions & 16 deletions offchain/tests/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use offchain::{
workers::executor::NFFLExecutor,
};
use std::{
collections::VecDeque,
sync::atomic::{AtomicI32, Ordering},
sync::Arc,
};
Expand Down Expand Up @@ -45,7 +44,13 @@ async fn test_handle_verified_packet_success() -> eyre::Result<()> {
.init();

let counter: Arc<AtomicI32> = Arc::new(AtomicI32::new(0));
let mut queue: VecDeque<PacketSent> = VecDeque::new();

let packet_sent = PacketSent {
encodedPayload: Bytes::from(&[1; 256]),
options: Bytes::from(&[1; 32]),
sendLibrary: Address::from_slice(&[2; 20]),
};

let verified_packet = PacketVerified {
origin: Origin {
srcEid: 1,
Expand All @@ -57,9 +62,9 @@ async fn test_handle_verified_packet_success() -> eyre::Result<()> {
};

let _join_handle = prepare_server(counter.clone()).await;
let contract = setup_contract(&mut queue).await?;
let contract = setup_contract().await?;

NFFLExecutor::handle_verified_packet(&contract, &mut queue, &verified_packet).await?;
NFFLExecutor::handle_verified_packet(&contract, packet_sent, verified_packet).await?;

assert_eq!(counter.load(Ordering::Acquire), 2);
Ok(())
Expand All @@ -84,28 +89,18 @@ async fn prepare_server(counter: Arc<AtomicI32>) -> JoinHandle<()> {
// Spawn the server on a background task.
let listener = tokio::net::TcpListener::bind(SERVER_ADDRESS_SHORT).await.unwrap();

debug!("Listening on {}", listener.local_addr().unwrap());

tokio::spawn(async move { axum::serve(listener, app).await.unwrap() })
}

async fn setup_contract(packet_sent_queue: &mut VecDeque<PacketSent>) -> eyre::Result<ContractInst> {
async fn setup_contract() -> eyre::Result<ContractInst> {
const SERVER_ADDRESS: &str = "http://127.0.0.1:8081";

let http_provider = ProviderBuilder::new().on_http(SERVER_ADDRESS.parse()?);
let l0_abi = get_abi_from_path("offchain/abi/L0V2Endpoint.json")?;

debug!("{:?}", l0_abi.functions.iter().map(|f| f.0).collect::<Vec<_>>());

packet_sent_queue.push_back(PacketSent {
encodedPayload: Bytes::from(&[1; 256]),
options: Bytes::from(&[1; 32]),
sendLibrary: Address::from_slice(&[2; 20]),
});

create_contract_instance(
address!("d8da6bf26964af9d7eed9e03e53415d37aa96045"),
http_provider,
l0_abi,
)
}
}
2 changes: 1 addition & 1 deletion offchain/workers_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ TARGET_NETWORK_EID = 40267
SOURCE_DVN = "0x25e8edce1bcf8d074f8af8838e9ceecd3e3e0268"
TARGET_DVN = "0xd58388997b0ad8f7f4a035f6c159c9c2270170f4"
SOURCE_EXECUTOR = "0xBc0C24E6f24eC2F1fd7E859B8322A1277F80aaD5"
TARGET_EXECUTOR = "0x4Cf1B3Fa61465c2c907f82fC488B43223BA0CF93"
TARGET_EXECUTOR = "0x4Cf1B3Fa61465c2c907f82fC488B43223BA0CF93"

0 comments on commit 9bd8a7a

Please sign in to comment.