Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): index whitelisted ERC20/ERC721 tokens #2277

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0e4a370
tmp
lambda-0x Aug 3, 2024
02ee483
mvp that reads and prints events
lambda-0x Aug 4, 2024
1375b32
add erc721 and update comment
lambda-0x Aug 4, 2024
be63326
add sql schema for erc
lambda-0x Aug 5, 2024
e0f80bb
add db logic
lambda-0x Aug 6, 2024
e46e8f6
remove token_uri from balances
lambda-0x Aug 7, 2024
8c959e3
add grpc endpoint (wip)
lambda-0x Aug 7, 2024
505c0bc
update schema + how we serialize u256
lambda-0x Aug 8, 2024
44bc339
comment out graphql changes
lambda-0x Aug 8, 2024
66bd2dc
tmp grpc
lambda-0x Aug 9, 2024
d229899
bunch of stuff
lambda-0x Aug 13, 2024
7cbb6b6
update schema and try to debug indexing issue
lambda-0x Aug 13, 2024
39b233b
Merge branch 'main' into torii-index-erc
lambda-0x Aug 13, 2024
16c100b
reenable world events
lambda-0x Aug 13, 2024
5728549
make raw data fetching work on graphql
lambda-0x Aug 13, 2024
307917c
update schema file (wip)
lambda-0x Aug 13, 2024
705b55a
feat: take contract address from cli and config file
lambda-0x Aug 14, 2024
62a4f34
change table schema and core implementation
lambda-0x Aug 18, 2024
13f183e
remove grpc changes
lambda-0x Aug 18, 2024
97defc4
update graphql implementation for new schema
lambda-0x Aug 18, 2024
570ee7a
Merge branch 'main' into torii-index-erc
lambda-0x Aug 18, 2024
c0ab69e
add ability to query erc transfer in graphql
lambda-0x Aug 19, 2024
0dca74d
fix formatting
lambda-0x Aug 19, 2024
ef16b65
fix lints
lambda-0x Aug 19, 2024
1240bdf
update schema and enable foreign key constraint
lambda-0x Aug 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 70 additions & 3 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! for more info.

use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -27,6 +28,9 @@
use tokio::sync::broadcast::Sender;
use tokio_stream::StreamExt;
use torii_core::engine::{Engine, EngineConfig, Processors};
use torii_core::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor;
use torii_core::processors::erc20_transfer::Erc20TransferProcessor;
use torii_core::processors::erc721_transfer::Erc721TransferProcessor;
use torii_core::processors::event_message::EventMessageProcessor;
use torii_core::processors::metadata_update::MetadataUpdateProcessor;
use torii_core::processors::register_model::RegisterModelProcessor;
Expand All @@ -37,7 +41,7 @@
use torii_core::processors::store_update_record::StoreUpdateRecordProcessor;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::Sql;
use torii_core::types::Model;
use torii_core::types::{ErcContract, ErcType, Model, ToriiConfig};
use torii_server::proxy::Proxy;
use tracing::{error, info};
use tracing_subscriber::{fmt, EnvFilter};
Expand Down Expand Up @@ -115,11 +119,39 @@
/// Enable indexing pending blocks
#[arg(long)]
index_pending: bool,

/// ERC contract addresses to index
#[arg(long, value_parser = parse_erc_contracts)]
#[arg(conflicts_with = "config")]
erc_contracts: Option<std::vec::Vec<ErcContract>>,

/// Configuration file
#[arg(long)]
config: Option<PathBuf>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();

let mut start_block = args.start_block;

let mut config = if let Some(path) = args.config {
ToriiConfig::load_from_path(&path)?
} else {
ToriiConfig::default()
};
lambda-0x marked this conversation as resolved.
Show resolved Hide resolved

if let Some(erc_contracts) = args.erc_contracts {
config.erc_contracts = erc_contracts;
}

for address in &config.erc_contracts {
if address.start_block < start_block {
start_block = address.start_block;
}
}
lambda-0x marked this conversation as resolved.
Show resolved Hide resolved

Check warning on line 154 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L136-L154

Added lines #L136 - L154 were not covered by tests
let filter_layer = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,hyper_reverse_proxy=off"));

Expand Down Expand Up @@ -162,7 +194,12 @@
// Get world address
let world = WorldContractReader::new(args.world_address, &provider);

let db = Sql::new(pool.clone(), args.world_address).await?;
let erc_contracts = config
.erc_contracts
.iter()
.map(|contract| (contract.contract_address, contract.clone()))
.collect();
let db = Sql::new(pool.clone(), args.world_address, &erc_contracts).await?;

Check warning on line 202 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L197-L202

Added lines #L197 - L202 were not covered by tests
let processors = Processors {
event: vec![
Box::new(RegisterModelProcessor),
Expand All @@ -172,6 +209,9 @@
Box::new(EventMessageProcessor),
Box::new(StoreUpdateRecordProcessor),
Box::new(StoreUpdateMemberProcessor),
Box::new(Erc20TransferProcessor),
Box::new(Erc20LegacyTransferProcessor),
Box::new(Erc721TransferProcessor),

Check warning on line 214 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L212-L214

Added lines #L212 - L214 were not covered by tests
],
transaction: vec![Box::new(StoreTransactionProcessor)],
..Processors::default()
Expand All @@ -185,13 +225,14 @@
&provider,
processors,
EngineConfig {
start_block: args.start_block,
start_block,

Check warning on line 228 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L228

Added line #L228 was not covered by tests
events_chunk_size: args.events_chunk_size,
index_pending: args.index_pending,
..Default::default()
},
shutdown_tx.clone(),
Some(block_tx),
erc_contracts,

Check warning on line 235 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L235

Added line #L235 was not covered by tests
);

let shutdown_rx = shutdown_tx.subscribe();
Expand Down Expand Up @@ -286,3 +327,29 @@
}
}
}

// Parses clap cli argument which is expected to be in the format:
// - erc_type:address:start_block
// - address:start_block (erc_type defaults to ERC20)
fn parse_erc_contracts(s: &str) -> anyhow::Result<Vec<ErcContract>> {
let parts: Vec<&str> = s.split(',').collect();
let mut contracts = Vec::new();
for part in parts {
match part.split(':').collect::<Vec<&str>>().as_slice() {
[r#type, address, start_block] => {
let contract_address = Felt::from_str(address).unwrap();
let start_block = start_block.parse::<u64>()?;
let r#type = r#type.parse::<ErcType>()?;
contracts.push(ErcContract { contract_address, start_block, r#type });

Check warning on line 343 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L334-L343

Added lines #L334 - L343 were not covered by tests
}
[address, start_block] => {
let contract_address = Felt::from_str(address)?;
let start_block = start_block.parse::<u64>()?;
let r#type = ErcType::default();
contracts.push(ErcContract { contract_address, start_block, r#type });

Check warning on line 349 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L345-L349

Added lines #L345 - L349 were not covered by tests
}
_ => return Err(anyhow::anyhow!("Invalid ERC contract format")),

Check warning on line 351 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L351

Added line #L351 was not covered by tests
}
}
Ok(contracts)
}

Check warning on line 355 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L354-L355

Added lines #L354 - L355 were not covered by tests
lambda-0x marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 8 additions & 0 deletions bin/torii/torii.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Example configuration file for Torii
# erc_contracts = [
# { contract_address = "0x1234567890abcdef1234567890abcdef12345678", start_block = 0, type = "ERC20" },
# { contract_address = "0xabcdef1234567890abcdef1234567890abcdef12", start_block = 1, type = "ERC721" },
# ]
erc_contracts = [
{ type = "ERC20", contract_address = "0x07fc13cc1f43f0b0519f84df8bf13bea4d9fd5ce2d748c3baf27bf90a565f60a", start_block = 0 },
]
1 change: 1 addition & 0 deletions crates/katana/scripts/deploy-erc20.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
starkli deploy --account account.json --keystore signer.json --keystore-password "" 0x02a8846878b6ad1f54f6ba46f5f40e11cee755c677f130b2c4b60566c9003f1f 0x626c6f62 0x424c42 0x8 u256:10000000000 0xb3ff441a68610b30fd5e2abbf3a1548eb6ba6f3559f2862bf2dc757e5828c
1 change: 1 addition & 0 deletions crates/torii/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ thiserror.workspace = true
tokio = { version = "1.32.0", features = [ "sync" ], default-features = true }
tokio-stream = "0.1.11"
tokio-util = "0.7.7"
toml.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
111 changes: 85 additions & 26 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::BTreeMap;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::time::Duration;

use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::join_all;
use starknet::core::types::{
BlockId, BlockTag, Event, EventFilter, Felt, MaybePendingBlockWithTxHashes,
BlockId, BlockTag, Event, EventFilter, EventsPage, Felt, MaybePendingBlockWithTxHashes,
MaybePendingBlockWithTxs, ReceiptBlock, Transaction, TransactionReceipt,
TransactionReceiptWithBlockInfo,
};
Expand All @@ -18,6 +20,7 @@

use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor};
use crate::sql::Sql;
use crate::types::ErcContract;

#[allow(missing_debug_implementations)]
pub struct Processors<P: Provider + Sync> {
Expand Down Expand Up @@ -62,6 +65,8 @@
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
// ERC tokens to index
tokens: HashMap<Felt, ErcContract>,
}

struct UnprocessedEvent {
Expand All @@ -70,6 +75,7 @@
}

impl<P: Provider + Sync> Engine<P> {
#[allow(clippy::too_many_arguments)]
pub fn new(
world: WorldContractReader<P>,
db: Sql,
Expand All @@ -78,8 +84,18 @@
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tokens: HashMap<Felt, ErcContract>,
) -> Self {
Self { world, db, provider: Box::new(provider), processors, config, shutdown_tx, block_tx }
Self {
world,
db,
provider: Box::new(provider),
processors,
config,
shutdown_tx,
block_tx,
tokens,
}
}

pub async fn start(&mut self) -> Result<()> {
Expand Down Expand Up @@ -221,29 +237,45 @@
to: u64,
pending_block_tx: Option<Felt>,
) -> Result<Option<Felt>> {
// Process all blocks from current to latest.
let get_events = |token: Option<String>| {
self.provider.get_events(
EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
address: Some(self.world.address),
keys: None,
},
token,
self.config.events_chunk_size,
)
};
// Fetch all events and collect them using `get_events` method on provider

// handle next events pages
let mut events_pages = vec![get_events(None).await?];
let mut fetch_all_events_tasks = vec![];

let events_filter = EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
address: Some(self.world.address),
keys: None,
};

let world_events_pages =
get_all_events(&self.provider, events_filter, self.config.events_chunk_size);
fetch_all_events_tasks.push(world_events_pages);

for token in &self.tokens {
let events_filter = EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
address: Some(*token.0),
keys: None,
};

let erc20_events_pages =
get_all_events(&self.provider, events_filter, self.config.events_chunk_size);

fetch_all_events_tasks.push(erc20_events_pages);
}

Check warning on line 268 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L257-L268

Added lines #L257 - L268 were not covered by tests

while let Some(token) = &events_pages.last().unwrap().continuation_token {
events_pages.push(get_events(Some(token.clone())).await?);
// wait for all events tasks to complete
let task_result = join_all(fetch_all_events_tasks).await;

let mut events_pages = vec![];
for result in task_result {
events_pages.extend(result?);
}

// Transactions & blocks to process
let mut last_block = 0_u64;
let mut blocks = BTreeMap::new();

// Flatten events pages and events according to the pending block cursor
Expand Down Expand Up @@ -277,12 +309,10 @@
}
};

// Keep track of last block number and fetch block timestamp
if block_number > last_block {
// Fetch block timestamp if not already fetched and inserts into the map
if let Entry::Vacant(e) = blocks.entry(block_number) {
let block_timestamp = self.get_block_timestamp(block_number).await?;
blocks.insert(block_number, block_timestamp);

last_block = block_number;
e.insert(block_timestamp);
}

// Then we skip all transactions until we reach the last pending processed
Expand Down Expand Up @@ -378,7 +408,9 @@
let mut world_event = false;
if let Some(events) = events {
for (event_idx, event) in events.iter().enumerate() {
if event.from_address != self.world.address {
if event.from_address != self.world.address
&& !self.tokens.contains_key(&event.from_address)
{
continue;
}

Expand Down Expand Up @@ -501,3 +533,30 @@
Ok(())
}
}

async fn get_all_events<P>(
provider: &P,
events_filter: EventFilter,
events_chunk_size: u64,
) -> Result<Vec<EventsPage>>
where
P: Provider + Sync,
{
let mut events_pages = Vec::new();
let mut continuation_token = None;

loop {
let events_page = provider
.get_events(events_filter.clone(), continuation_token.clone(), events_chunk_size)
.await?;

continuation_token = events_page.continuation_token.clone();
events_pages.push(events_page);

if continuation_token.is_none() {
break;
}

Check warning on line 558 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L558

Added line #L558 was not covered by tests
}

Ok(events_pages)
}
Loading
Loading