Skip to content

Commit

Permalink
LayerZero: Executor (#305)
Browse files Browse the repository at this point in the history
This patch adds `Executor` role to `NFFL` as well as an integration test for it
  • Loading branch information
Fly-Style authored Oct 30, 2024
1 parent 2c9fc41 commit 950da38
Show file tree
Hide file tree
Showing 12 changed files with 1,149 additions and 32 deletions.
759 changes: 739 additions & 20 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ edition = "2021"
repository = "https://github.com/NethermindEth/near-sffl"

[workspace.dependencies]
alloy = { version = "0.4.2", features = ["full", "node-bindings", "rpc-types-debug", "rpc-types-trace", "json-rpc", "rpc-client", "serde", "json-abi"] }
alloy = { version = "0.4.2", features = ["full", "node-bindings", "rpc-types-debug", "rpc-types-trace", "json-rpc", "rpc-client", "serde", "json-abi", "eip712"] }
alloy-primitives = "0.8.3"
alloy-rlp = "0.3.8"
alloy-rpc-client = "0.4.2"
Expand Down
8 changes: 8 additions & 0 deletions workers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ edition = "2021"
name = "dvn"
path = "src/bin/dvn.rs"

[[bin]]
name = "executor"
path = "src/bin/executor.rs"

[dependencies]
alloy.workspace = true
blsful = "2.5.7"
Expand All @@ -20,6 +24,10 @@ serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber = {workspace = true, features = ["env-filter"] }
tokio-tungstenite = "0.24.0"
log = "0.4.22"

[dev-dependencies]
axum = "0.7.7"
http-body-util = "0.1.0"
wiremock = "0.6.2"
2 changes: 1 addition & 1 deletion workers/abi/L0V2Endpoint.json

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion workers/abi/ReceiveLibUln302.json

This file was deleted.

25 changes: 25 additions & 0 deletions workers/src/bin/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
use workers::config;
use workers::executor_def::NFFLExecutor;

/// Executor is expected to work with low work rate, and we have a bonus
/// from this observation - we don't need/want to care about concurrency control,
/// so we choose run single-threaded runtime so far.
#[tokio::main(flavor = "current_thread")]
async fn main() -> eyre::Result<()> {
// Initialize tracing
tracing_subscriber::fmt()
.with_target(false)
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy(),
)
.init();

let mut executor = NFFLExecutor::new(config::DVNConfig::load_from_env()?);
executor.listen().await?;

Ok(())
}
40 changes: 37 additions & 3 deletions workers/src/chain/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::{
chain::HttpProvider,
config::{DVNConfig, DVNEvent},
config::{DVNConfig, LayerZeroEvent},
};
use alloy::{
eips::BlockNumberOrTag,
Expand All @@ -28,13 +28,13 @@ pub async fn build_subscriptions(
// layerzero endpoint filter
let packet_filter = Filter::new()
.address(config.l0_endpoint_addr)
.event(DVNEvent::PacketSent.as_ref())
.event(LayerZeroEvent::PacketSent.as_ref())
.from_block(BlockNumberOrTag::Latest);

// messagelib endpoint filter
let fee_paid_filter = Filter::new()
.address(config.sendlib_uln302_addr)
.event(DVNEvent::FeePaid.as_ref())
.event(LayerZeroEvent::DVNFeePaid.as_ref())
.from_block(BlockNumberOrTag::Latest);

// Subscribe to logs
Expand All @@ -48,6 +48,40 @@ pub async fn build_subscriptions(
Ok((provider, endpoint_stream, sendlib_stream))
}

pub async fn build_executor_subscriptions(
config: &DVNConfig,
) -> Result<(
SubscriptionStream<Log>,
SubscriptionStream<Log>,
SubscriptionStream<Log>,
)> {
// Create the provider
let ws = WsConnect::new(&config.ws_rpc_url);
let provider = ProviderBuilder::new().on_ws(ws).await?;

// PacketSent
let packet_sent_filter = Filter::new()
.address(config.l0_endpoint_addr)
.event(LayerZeroEvent::PacketSent.as_ref())
.from_block(BlockNumberOrTag::Latest);

let executor_fee_paid = Filter::new()
.address(config.sendlib_uln302_addr)
.event(LayerZeroEvent::ExecutorFeePaid.as_ref())
.from_block(BlockNumberOrTag::Latest);

let packet_verified_filter = Filter::new()
.address(config.l0_endpoint_addr)
.event(LayerZeroEvent::PacketVerified.as_ref())
.from_block(BlockNumberOrTag::Latest);

Ok((
provider.subscribe_logs(&packet_sent_filter).await?.into_stream(),
provider.subscribe_logs(&executor_fee_paid).await?.into_stream(),
provider.subscribe_logs(&packet_verified_filter).await?.into_stream(),
))
}

/// Load the MessageLib ABI.
pub fn get_abi_from_path(path: &str) -> Result<JsonAbi> {
// Get the SendLib ABI
Expand Down
49 changes: 49 additions & 0 deletions workers/src/chain/contracts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Utilities for interacting with onchain contracts.
use crate::chain::{ContractInst, HttpProvider};
use crate::data::packet_v1_codec::{guid, header, message, nonce, receiver, sender, src_eid};
use alloy::primitives::B256;
use alloy::{
contract::{ContractInstance, Interface},
dyn_abi::DynSolValue,
Expand Down Expand Up @@ -143,3 +145,50 @@ pub async fn verify(contract: &ContractInst, packet_header: &[u8], payload: &[u8

Ok(())
}

/// If the state is `Executable`, your `Executor` should decode the packet's options
/// using the options.ts package and call the Endpoint's `lzReceive` function with
/// the packet information:
/// `endpoint.lzReceive(_origin, _receiver, _guid, _message, _extraData)`
pub async fn lz_receive(contract: &ContractInst, packet: &[u8]) -> Result<()> {
let guid = guid(packet);
let call_builder_result = contract.function(
"lzReceive",
&[
prepare_header(header(packet)),
DynSolValue::Address(Address::from_slice(&receiver(packet)[0..20])),
DynSolValue::FixedBytes(B256::from_slice(guid.as_slice()), 32),
DynSolValue::Bytes(message(packet).to_vec()),
DynSolValue::Bytes(vec![]),
],
);

if call_builder_result.is_err() {
error!("Failed to call lzReceive, because it doesn't exist in the contract/ABI.");
return Ok(());
}

call_builder_result.unwrap().call().await.map_err(|e| {
error!("Failed to call lzReceive for packet {:?}: {:?}", guid, e);
eyre!("lzReceive call failed: {}", e)
})?;
debug!("Successfully called lzReceive for packet {:?}", guid);
Ok(())
}

/// Converts `Origin` data structure from the received `PacketVerified`
/// to the `DynSolValue`, understandable by `alloy-rs`.
pub(crate) fn prepare_header(packet: &[u8]) -> DynSolValue {
const ORIGIN_STRUCT_NAME: &str = "Origin";
const ORIGIN_PROPS: [&str; 3] = ["srcEid", "sender", "nonce"];

DynSolValue::CustomStruct {
name: String::from(ORIGIN_STRUCT_NAME),
prop_names: ORIGIN_PROPS.iter().map(|&s| String::from(s)).collect(),
tuple: vec![
DynSolValue::Uint(U256::from(src_eid(packet)), 32),
DynSolValue::FixedBytes(B256::from_slice(sender(packet).as_ref()), 32),
DynSolValue::Uint(U256::from(nonce(packet)), 64),
],
}
}
16 changes: 10 additions & 6 deletions workers/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Configuration for the DVN offchain workflow.
//! Configuration for the DVN off-chain workflow.
use alloy::primitives::Address;
use config::Config;
Expand Down Expand Up @@ -40,16 +40,20 @@ impl DVNConfig {
}

/// Useful events for the DVN workflow.
pub enum DVNEvent {
pub enum LayerZeroEvent {
PacketSent,
FeePaid,
DVNFeePaid,
ExecutorFeePaid,
PacketVerified,
}

impl AsRef<str> for DVNEvent {
impl AsRef<str> for LayerZeroEvent {
fn as_ref(&self) -> &str {
match self {
DVNEvent::PacketSent => "PacketSent(bytes,bytes,address)",
DVNEvent::FeePaid => "DVNFeePaid(address[],address[],uint256[])",
LayerZeroEvent::PacketSent => "PacketSent(bytes,bytes,address)",
LayerZeroEvent::DVNFeePaid => "DVNFeePaid(address[],address[],uint256[])",
LayerZeroEvent::ExecutorFeePaid => "ExecutorFeePaid(address,uint256)",
LayerZeroEvent::PacketVerified => "PacketVerified(address,bytes,uint256,bytes32)",
}
}
}
Expand Down
Loading

0 comments on commit 950da38

Please sign in to comment.