Skip to content

Commit

Permalink
feat(arbiter-core): stream with meta (#863)
Browse files Browse the repository at this point in the history
* works

* cleanup and test

* chore: fix tests

---------

Co-authored-by: Waylon Jepsen <[email protected]>
  • Loading branch information
Autoparallel and 0xJepsen authored Feb 8, 2024
1 parent e3aa481 commit cdb4659
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 33 deletions.
1 change: 0 additions & 1 deletion arbiter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ futures.workspace = true
cargo_metadata = "0.18.1"
chrono = "0.4.33"


assert_matches = { version = "=1.5" }

[[bench]]
Expand Down
8 changes: 4 additions & 4 deletions arbiter-core/src/data_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,9 @@ impl EventLogger {
}
break;
}
Broadcast::Event(event) => {
Broadcast::Event(event, receipt_data) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event);
let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
for log in ethers_logs {
for (contract_name, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
Expand Down Expand Up @@ -365,9 +365,9 @@ impl EventLogger {
trace!("`EventLogger` has seen a stop signal");
break;
}
Broadcast::Event(event) => {
Broadcast::Event(event, receipt_data) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event);
let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
for log in ethers_logs {
for (_id, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
Expand Down
7 changes: 5 additions & 2 deletions arbiter-core/src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,10 @@ impl Environment {
transaction_index,
cumulative_gas_per_block,
};
match event_broadcaster.send(Broadcast::Event(execution_result.logs())) {
match event_broadcaster.send(Broadcast::Event(
execution_result.logs(),
receipt_data.clone(),
)) {
Ok(_) => {}
Err(_) => {
warn!(
Expand Down Expand Up @@ -599,7 +602,7 @@ pub enum Broadcast {
/// Represents a signal to stop the event logger process.
StopSignal,
/// Represents a broadcast of a vector of Ethereum logs.
Event(Vec<Log>),
Event(Vec<Log>, ReceiptData),
}

/// Convert a U256 to a U64, discarding the higher bits if the number is larger
Expand Down
32 changes: 17 additions & 15 deletions arbiter-core/src/middleware/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ impl JsonRpcClient for Connection {
if let Some(receiver) = filter_receiver.receiver.as_mut() {
if let Ok(broadcast) = receiver.try_recv() {
match broadcast {
Broadcast::Event(received_logs) => {
let ethers_logs = revm_logs_to_ethers_logs(received_logs);
Broadcast::Event(received_logs, receipt_data) => {
let ethers_logs =
revm_logs_to_ethers_logs(received_logs, &receipt_data);
for log in ethers_logs {
if filtered_params.filter_address(&log)
&& filtered_params.filter_topics(&log)
Expand Down Expand Up @@ -150,10 +151,10 @@ impl PubsubClient for Connection {
Broadcast::StopSignal => {
break;
}
Broadcast::Event(logs) => {
Broadcast::Event(logs, receipt_data) => {
let filtered_params =
FilteredParams::new(Some(filter_receiver.filter.clone()));
let ethers_logs = revm_logs_to_ethers_logs(logs);
let ethers_logs = revm_logs_to_ethers_logs(logs, &receipt_data);
// Return the first log that matches the filter, if any
for log in ethers_logs {
if filtered_params.filter_address(&log)
Expand Down Expand Up @@ -214,28 +215,29 @@ pub(crate) struct FilterReceiver {
}

// TODO: The logs below could have the block number, transaction index, and
// maybe other fields populated.
// maybe other fields populated. Right now, some are defaulted and are not
// correct!

/// Converts logs from the Revm format to the Ethers format.
///
/// This function iterates over a list of logs as they appear in the `revm` and
/// converts each log entry to the corresponding format used by the `ethers-rs`
/// library.
#[inline]
pub fn revm_logs_to_ethers_logs(revm_logs: Vec<Log>) -> Vec<eLog> {
let mut logs: Vec<ethers::core::types::Log> = vec![];
pub fn revm_logs_to_ethers_logs(revm_logs: Vec<Log>, receipt_data: &ReceiptData) -> Vec<eLog> {
let mut logs: Vec<eLog> = vec![];
for revm_log in revm_logs {
let topics = revm_log.topics().iter().map(recast_b256).collect();
let data = ethers::core::types::Bytes::from(revm_log.data.data.0);
let log = ethers::core::types::Log {
address: ethers::core::types::H160::from(revm_log.address.into_array()),
let data = eBytes::from(revm_log.data.data.0);
let log = eLog {
address: eAddress::from(revm_log.address.into_array()),
topics,
data,
block_hash: None,
block_number: None,
transaction_hash: None,
transaction_index: None,
log_index: None,
block_hash: Some(H256::default()),
block_number: Some(receipt_data.block_number),
transaction_hash: Some(H256::default()),
transaction_index: Some(receipt_data.transaction_index),
log_index: Some(eU256::from(0)),
transaction_log_index: None,
log_type: None,
removed: None,
Expand Down
20 changes: 10 additions & 10 deletions arbiter-core/src/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ impl Middleware for ArbiterMiddleware {
let outcome = provider.outcome_receiver.recv()??;

if let Outcome::TransactionCompleted(execution_result, receipt_data) = outcome {
let mut block_hasher = Sha256::new();
block_hasher.update(receipt_data.block_number.to_string().as_bytes());
let block_hash = block_hasher.finalize();
let block_hash = Some(H256::from_slice(&block_hash));
match execution_result {
ExecutionResult::Revert { gas_used, output } => {
return Err(ArbiterCoreError::ExecutionRevert {
Expand All @@ -475,12 +479,7 @@ impl Middleware for ArbiterMiddleware {
logs,
..
} => {
let logs = revm_logs_to_ethers_logs(logs);
let to: Option<eAddress> = match tx_env.transact_to {
TransactTo::Call(address) => Some(address.into_array().into()),
TransactTo::Create(_) => None,
};

// TODO: This is why we need the signer middleware
// Note that this is technically not the correct construction on the tx hash
// but until we increment the nonce correctly this will do
let sender = self.address();
Expand All @@ -490,10 +489,11 @@ impl Middleware for ArbiterMiddleware {
hasher.update(data.as_ref());
let hash = hasher.finalize();

let mut block_hasher = Sha256::new();
block_hasher.update(receipt_data.block_number.to_string().as_bytes());
let block_hash = block_hasher.finalize();
let block_hash = Some(H256::from_slice(&block_hash));
let logs = revm_logs_to_ethers_logs(logs, &receipt_data);
let to: Option<eAddress> = match tx_env.transact_to {
TransactTo::Call(address) => Some(address.into_array().into()),
TransactTo::Create(_) => None,
};

match output {
Output::Create(_, address) => {
Expand Down
2 changes: 1 addition & 1 deletion arbiter-core/tests/environment_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn receipt_data() {
block_hasher.update(receipt.block_number.unwrap().to_string().as_bytes());
let block_hash = block_hasher.finalize();
let block_hash = Some(H256::from_slice(&block_hash));
assert_eq!(receipt.block_hash, block_hash);
assert_eq!(receipt.block_hash, block_hash); // panic here left side is none
assert_eq!(receipt.status, Some(1.into()));

assert!(receipt.contract_address.is_none());
Expand Down
31 changes: 31 additions & 0 deletions arbiter-core/tests/middleware_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,34 @@ async fn access() {
}
}
}

#[tokio::test]
async fn stream_with_meta() {
let (_environment, client) = startup();

let arbx = deploy_arbx(client.clone()).await;

let events = arbx.events();
let mut stream = events.stream_with_meta().await.unwrap();

for _ in 0..2 {
arbx.approve(client.address(), eU256::from(1))
.send()
.await
.unwrap()
.await
.unwrap();
}

client.update_block(1, 1).unwrap();

arbx.approve(client.address(), eU256::from(1))
.send()
.await
.unwrap()
.await
.unwrap();
assert_eq!(format!("{:?}", stream.next().await), "Some(Ok((ApprovalFilter(ApprovalFilter { owner: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, spender: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, amount: 1 }), LogMeta { address: 0x067ea9e44c76a2620f10b39a1b51d5124a299192, block_number: 0, block_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_index: 1, log_index: 0 })))");
assert_eq!(format!("{:?}", stream.next().await), "Some(Ok((ApprovalFilter(ApprovalFilter { owner: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, spender: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, amount: 1 }), LogMeta { address: 0x067ea9e44c76a2620f10b39a1b51d5124a299192, block_number: 0, block_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_index: 2, log_index: 0 })))");
assert_eq!(format!("{:?}", stream.next().await), "Some(Ok((ApprovalFilter(ApprovalFilter { owner: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, spender: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, amount: 1 }), LogMeta { address: 0x067ea9e44c76a2620f10b39a1b51d5124a299192, block_number: 1, block_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_index: 0, log_index: 0 })))");
}

0 comments on commit cdb4659

Please sign in to comment.