Skip to content

Commit

Permalink
fix: katana getTxns long poll
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Mar 27, 2024
1 parent 0d49f1b commit 8d14df7
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 28 deletions.
5 changes: 3 additions & 2 deletions crates/katana/rpc/rpc/src/torii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ impl<EF: ExecutorFactory> ToriiApiServer for ToriiApi<EF> {
})
.collect::<Vec<_>>();

// Update our next cursor to be at this block
next_cursor.block_number = block_number;
// Add transactions to the total and break if MAX_PAGE_SIZE is reached
for transaction in block_transactions {
transactions.push((transaction.0.into(), transaction.1.clone()));
next_cursor.transaction_index = transactions.len() as u64;
if transactions.len() >= MAX_PAGE_SIZE {
next_cursor.block_number = block_number;
next_cursor.transaction_index = MAX_PAGE_SIZE as u64;
return Ok(TransactionsPage { transactions, cursor: next_cursor });
}
}
Expand Down
120 changes: 94 additions & 26 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::collections::HashMap;
use std::time::Duration;

use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{
BlockId, EmittedEvent, Event, EventFilter, MaybePendingBlockWithTxHashes,
MaybePendingTransactionReceipt, Transaction, TransactionReceipt,
BlockId, BlockStatus, BlockWithTxHashes, EmittedEvent, Event, EventFilter,
MaybePendingBlockWithTxHashes, MaybePendingTransactionReceipt, ResourcePrice, Transaction,
TransactionReceipt,
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
Expand Down Expand Up @@ -51,13 +53,24 @@ pub struct Engine<P: KatanaProvider + Sync, R: Provider + Sync> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
processed_blocks: HashMap<u64, Block>,
}

struct UnprocessedEvent {
keys: Vec<String>,
data: Vec<String>,
}

#[derive(Debug, Clone)]
struct Block {
/// The hash of this block's parent
pub parent_hash: FieldElement,
/// The block number (its height)
pub block_number: u64,
/// The time in which the block was created, encoded in Unix time
pub timestamp: u64,
}

impl<P: KatanaProvider + Sync, R: Provider + Sync> Engine<P, R> {
pub fn new(
world: WorldContractReader<R>,
Expand All @@ -68,7 +81,16 @@ impl<P: KatanaProvider + Sync, R: Provider + Sync> Engine<P, R> {
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
) -> Self {
Self { world, db, provider: Box::new(provider), processors, config, shutdown_tx, block_tx }
Self {
world,
db,
provider: Box::new(provider),
processors,
config,
shutdown_tx,
block_tx,
processed_blocks: HashMap::new(),
}
}

pub async fn start(&mut self) -> Result<()> {
Expand All @@ -84,7 +106,7 @@ impl<P: KatanaProvider + Sync, R: Provider + Sync> Engine<P, R> {
// And use the returned cursor for next pages.
let transactions_page = match self
.provider
.get_transactions(TransactionsPageCursor { block_number: head, transaction_index: 0 })
.get_transactions(TransactionsPageCursor { block_number: head, transaction_index: 0, chunk_size: 100 })
.await
{
Ok(page) => Some(page),
Expand All @@ -95,12 +117,18 @@ impl<P: KatanaProvider + Sync, R: Provider + Sync> Engine<P, R> {
};

if let Some(transactions_page) = transactions_page.clone() {
self.process_katana(transactions_page.transactions, head).await?;
self.process_katana(
transactions_page.transactions,
head,
transactions_page.cursor.block_number - 1,
)
.await?;


self.db.execute().await?;
}

let mut current_cursor = transactions_page.map(|t| t.cursor);
let mut current_cursor = transactions_page.clone().map(|t| t.cursor);

let mut backoff_delay = Duration::from_secs(1);
let max_backoff_delay = Duration::from_secs(60);
Expand Down Expand Up @@ -195,34 +223,72 @@ impl<P: KatanaProvider + Sync, R: Provider + Sync> Engine<P, R> {
) -> Result<TransactionsPageCursor> {
let transactions = self.provider.get_transactions(cursor.clone()).await?;

self.process_katana(transactions.transactions, cursor.block_number).await?;
self.process_katana(
transactions.transactions,
cursor.block_number,
transactions.cursor.block_number - 1,
)
.await?;
self.db.execute().await?;

Ok(transactions.cursor)
}

async fn get_block_timestamp(&self, block_number: u64) -> Result<u64> {
async fn get_block_metadata(&self, block_number: u64) -> Result<Block> {
match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? {
MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp),
MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp),
MaybePendingBlockWithTxHashes::Block(block) => Ok(Block {
block_number: block.block_number,
parent_hash: block.parent_hash,
timestamp: block.timestamp,
}),
MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(Block {
block_number,
parent_hash: block.parent_hash,
timestamp: block.timestamp,
}),
}
}

async fn process_katana(
&mut self,
transactions: Vec<(Transaction, MaybePendingTransactionReceipt)>,
block_number: u64,
from: u64,
to: u64,
) -> Result<()> {
if let Some(ref block_tx) = self.block_tx {
block_tx.send(block_number).await?;
}
for block_number in from..=to {
if let Some(ref block_tx) = self.block_tx {
block_tx.send(block_number).await?;
}

let block_timestamp = self.get_block_timestamp(block_number).await?;
let block = self.get_block_metadata(block_number).await?;
self.process_block(&block).await?;
}

self.process_block(block_number, block_timestamp).await?;
self.db.set_head(block_number);
self.db.set_head(to);

for (transaction, receipt) in transactions {
let block_number = match &receipt {
MaybePendingTransactionReceipt::Receipt(receipt) => match receipt {
TransactionReceipt::Invoke(receipt) => receipt.block_number,
TransactionReceipt::L1Handler(receipt) => receipt.block_number,
TransactionReceipt::Declare(receipt) => receipt.block_number,
TransactionReceipt::Deploy(receipt) => receipt.block_number,
TransactionReceipt::DeployAccount(receipt) => receipt.block_number,
},
// If the receipt is pending, we can assume that the transaction
// block number is the heighest block number we have processed.
MaybePendingTransactionReceipt::PendingReceipt(_) => to,
};
let block_timestamp = match self.processed_blocks.get(&block_number) {
Some(block) => block.timestamp,
None => {
return Err(anyhow::anyhow!(
"block {} not found in processed blocks",
block_number
))
}
};

self.process_transaction_and_receipt(
&transaction,
Some(receipt),
Expand All @@ -239,11 +305,13 @@ impl<P: KatanaProvider + Sync, R: Provider + Sync> Engine<P, R> {
let block_number = match event.block_number {
Some(block_number) => block_number,
None => {
error!("event without block number");
return Ok(());
let error = anyhow::anyhow!("event has no block number");
error!("processing event: {}", error);

return Err(error);
}
};
let block_timestamp = self.get_block_timestamp(block_number).await?;
let block = self.get_block_metadata(block_number).await?;

if block_number > *last_block {
*last_block = block_number;
Expand All @@ -252,13 +320,13 @@ impl<P: KatanaProvider + Sync, R: Provider + Sync> Engine<P, R> {
block_tx.send(block_number).await?;
}

self.process_block(block_number, block_timestamp).await?;
self.process_block(&block).await?;

self.db.set_head(block_number);
}

let transaction = self.provider.get_transaction_by_hash(event.transaction_hash).await?;
self.process_transaction_and_receipt(&transaction, None, block_number, block_timestamp)
self.process_transaction_and_receipt(&transaction, None, block_number, block.timestamp)
.await?;

Ok(())
Expand All @@ -272,7 +340,6 @@ impl<P: KatanaProvider + Sync, R: Provider + Sync> Engine<P, R> {
block_timestamp: u64,
) -> Result<()> {
let transaction_hash = transaction.transaction_hash();
println!("Processing transaction: {:?}", transaction_hash);

let receipt = receipt.unwrap_or(
match self.provider.get_transaction_receipt(transaction_hash).await {
Expand Down Expand Up @@ -338,14 +405,15 @@ impl<P: KatanaProvider + Sync, R: Provider + Sync> Engine<P, R> {
Ok(())
}

async fn process_block(&mut self, block_number: u64, block_timestamp: u64) -> Result<()> {
async fn process_block(&mut self, block: &Block) -> Result<()> {
for processor in &self.processors.block {
processor
.process(&mut self.db, &self.world.provider, block_number, block_timestamp)
.process(&mut self.db, &self.world.provider, block.block_number, block.timestamp)
.await?;
}

info!(target: "torii_core::engine", block_number = %block_number, "Processed block");
self.processed_blocks.insert(block.block_number, block.clone());
info!(target: "torii_core::engine", block_number = %block.block_number, "Processed block");

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/torii/core/src/provider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct GetTransactionsRequest {
pub struct TransactionsPageCursor {
pub block_number: u64,
pub transaction_index: u64,
pub chunk_size: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down

0 comments on commit 8d14df7

Please sign in to comment.