Skip to content

Commit

Permalink
Merge pull request #10 from abhinavmsra/fix/make-processor-async
Browse files Browse the repository at this point in the history
fix: make processor calls async
  • Loading branch information
abhinavmsra authored Dec 12, 2024
2 parents c1a98ef + 8a0aaba commit 39b1256
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 39 deletions.
20 changes: 10 additions & 10 deletions processor/src/contracts/uniswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,27 @@ impl ContractHandler for UniswapV3Factory {
}

// Map event to handlers here
fn handle_event(&self, event: &str, log: &Log) {
async fn handle_event(&self, event: &str, log: &Log) {
match event {
"PoolCreated" => self.pool_created_handler(log),
"OwnerChanged" => self.owner_changed_handler(log),
"FeeAmountEnabled" => self.fee_amount_enabled_handler(log),
"PoolCreated" => self.pool_created_handler(log).await,
"OwnerChanged" => self.owner_changed_handler(log).await,
"FeeAmountEnabled" => self.fee_amount_enabled_handler(log).await,
unsupported => eprintln!("Unsupported event, {}", unsupported),
}
}
}

// Implement Handlers here
impl UniswapV3Factory {
fn pool_created_handler(&self, _log: &Log) {
// println!("pool_created_handler called");
async fn pool_created_handler(&self, _log: &Log) {
println!("pool_created_handler called");
}

fn owner_changed_handler(&self, _log: &Log) {
// println!("owner_changed_handler called");
async fn owner_changed_handler(&self, _log: &Log) {
println!("owner_changed_handler called");
}

fn fee_amount_enabled_handler(&self, _log: &Log) {
// println!("owner_changed_handler called");
async fn fee_amount_enabled_handler(&self, _log: &Log) {
println!("owner_changed_handler called");
}
}
61 changes: 33 additions & 28 deletions processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,46 @@ pub trait ContractHandler {
event_name.to_string()
}

fn process(&self, unprocessed_log: EvmLogs) {
let event_name = self.event_signature_to_name(unprocessed_log.event_signature);
fn process(&self, unprocessed_log: EvmLogs) -> impl std::future::Future<Output = ()> + Send
where
Self: Sync,
{
async move {
let event_name = self.event_signature_to_name(unprocessed_log.event_signature);

let topics: Vec<FixedBytes<32>> = unprocessed_log
.topics
.iter()
.map(FixedBytes::<32>::from)
.collect();
let topics: Vec<FixedBytes<32>> = unprocessed_log
.topics
.iter()
.map(FixedBytes::<32>::from)
.collect();

let data = Bytes::from(unprocessed_log.data);
let contract_address = Address::from(unprocessed_log.address);
let block_number = unprocessed_log
.block_number
.to_string()
.parse::<u64>()
.unwrap();
let transaction_hash = FixedBytes::<32>::from(unprocessed_log.transaction_hash);
let data = Bytes::from(unprocessed_log.data);
let contract_address = Address::from(unprocessed_log.address);
let block_number = unprocessed_log
.block_number
.to_string()
.parse::<u64>()
.unwrap();
let transaction_hash = FixedBytes::<32>::from(unprocessed_log.transaction_hash);

let log_data = alloy::primitives::Log::new(contract_address, topics, data).unwrap();
let log_data = alloy::primitives::Log::new(contract_address, topics, data).unwrap();

let log = Log {
inner: log_data,
block_number: Some(block_number),
block_hash: None,
block_timestamp: None,
transaction_hash: Some(transaction_hash),
transaction_index: None,
log_index: None,
removed: false,
};
let log = Log {
inner: log_data,
block_number: Some(block_number),
block_hash: None,
block_timestamp: None,
transaction_hash: Some(transaction_hash),
transaction_index: None,
log_index: None,
removed: false,
};

self.handle_event(&event_name, &log);
self.handle_event(&event_name, &log).await;
}
}

fn new(address: &str) -> Self;
fn handle_event(&self, event: &str, log: &Log);
fn handle_event(&self, event: &str, log: &Log) -> impl std::future::Future<Output = ()> + Send;
fn abi(&self) -> &JsonAbi;
}
5 changes: 4 additions & 1 deletion processor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let unprocessed_logs = EvmLogs::find_all(&db_pool).await?;

for log in unprocessed_logs {
contract_registry.get_processor(log.address).process(log);
contract_registry
.get_processor(log.address)
.process(log)
.await;
}

Ok(())
Expand Down

0 comments on commit 39b1256

Please sign in to comment.