Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): index whitelisted ERC20/ERC721 tokens #2277

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0e4a370
tmp
lambda-0x Aug 3, 2024
02ee483
mvp that reads and prints events
lambda-0x Aug 4, 2024
1375b32
add erc721 and update comment
lambda-0x Aug 4, 2024
be63326
add sql schema for erc
lambda-0x Aug 5, 2024
e0f80bb
add db logic
lambda-0x Aug 6, 2024
e46e8f6
remove token_uri from balances
lambda-0x Aug 7, 2024
8c959e3
add grpc endpoint (wip)
lambda-0x Aug 7, 2024
505c0bc
update schema + how we serialize u256
lambda-0x Aug 8, 2024
44bc339
comment out graphql changes
lambda-0x Aug 8, 2024
66bd2dc
tmp grpc
lambda-0x Aug 9, 2024
d229899
bunch of stuff
lambda-0x Aug 13, 2024
7cbb6b6
update schema and try to debug indexing issue
lambda-0x Aug 13, 2024
39b233b
Merge branch 'main' into torii-index-erc
lambda-0x Aug 13, 2024
16c100b
reenable world events
lambda-0x Aug 13, 2024
5728549
make raw data fetching work on graphql
lambda-0x Aug 13, 2024
307917c
update schema file (wip)
lambda-0x Aug 13, 2024
705b55a
feat: take contract address from cli and config file
lambda-0x Aug 14, 2024
62a4f34
change table schema and core implementation
lambda-0x Aug 18, 2024
13f183e
remove grpc changes
lambda-0x Aug 18, 2024
97defc4
update graphql implementation for new schema
lambda-0x Aug 18, 2024
570ee7a
Merge branch 'main' into torii-index-erc
lambda-0x Aug 18, 2024
c0ab69e
add ability to query erc transfer in graphql
lambda-0x Aug 19, 2024
0dca74d
fix formatting
lambda-0x Aug 19, 2024
ef16b65
fix lints
lambda-0x Aug 19, 2024
1240bdf
update schema and enable foreign key constraint
lambda-0x Aug 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 70 additions & 3 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! for more info.

use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -27,6 +28,9 @@ use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio_stream::StreamExt;
use torii_core::engine::{Engine, EngineConfig, Processors};
use torii_core::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor;
use torii_core::processors::erc20_transfer::Erc20TransferProcessor;
use torii_core::processors::erc721_transfer::Erc721TransferProcessor;
use torii_core::processors::event_message::EventMessageProcessor;
use torii_core::processors::metadata_update::MetadataUpdateProcessor;
use torii_core::processors::register_model::RegisterModelProcessor;
Expand All @@ -37,7 +41,7 @@ use torii_core::processors::store_update_member::StoreUpdateMemberProcessor;
use torii_core::processors::store_update_record::StoreUpdateRecordProcessor;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::Sql;
use torii_core::types::Model;
use torii_core::types::{ErcContract, ErcType, Model, ToriiConfig};
use torii_server::proxy::Proxy;
use tracing::{error, info};
use tracing_subscriber::{fmt, EnvFilter};
Expand Down Expand Up @@ -115,11 +119,39 @@ struct Args {
/// Enable indexing pending blocks
#[arg(long)]
index_pending: bool,

/// ERC contract addresses to index
#[arg(long, value_parser = parse_erc_contracts)]
#[arg(conflicts_with = "config")]
erc_contracts: Option<std::vec::Vec<ErcContract>>,

/// Configuration file
#[arg(long)]
config: Option<PathBuf>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();

let mut start_block = args.start_block;

let mut config = if let Some(path) = args.config {
ToriiConfig::load_from_path(&path)?
} else {
ToriiConfig::default()
};
lambda-0x marked this conversation as resolved.
Show resolved Hide resolved

if let Some(erc_contracts) = args.erc_contracts {
config.erc_contracts = erc_contracts;
}

for address in &config.erc_contracts {
if address.start_block < start_block {
start_block = address.start_block;
}
}
lambda-0x marked this conversation as resolved.
Show resolved Hide resolved

let filter_layer = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,hyper_reverse_proxy=off"));

Expand Down Expand Up @@ -162,7 +194,12 @@ async fn main() -> anyhow::Result<()> {
// Get world address
let world = WorldContractReader::new(args.world_address, &provider);

let db = Sql::new(pool.clone(), args.world_address).await?;
let erc_contracts = config
.erc_contracts
.iter()
.map(|contract| (contract.contract_address, contract.clone()))
.collect();
let db = Sql::new(pool.clone(), args.world_address, &erc_contracts).await?;
let processors = Processors {
event: vec![
Box::new(RegisterModelProcessor),
Expand All @@ -172,6 +209,9 @@ async fn main() -> anyhow::Result<()> {
Box::new(EventMessageProcessor),
Box::new(StoreUpdateRecordProcessor),
Box::new(StoreUpdateMemberProcessor),
Box::new(Erc20TransferProcessor),
Box::new(Erc20LegacyTransferProcessor),
Box::new(Erc721TransferProcessor),
],
transaction: vec![Box::new(StoreTransactionProcessor)],
..Processors::default()
Expand All @@ -185,13 +225,14 @@ async fn main() -> anyhow::Result<()> {
&provider,
processors,
EngineConfig {
start_block: args.start_block,
start_block,
events_chunk_size: args.events_chunk_size,
index_pending: args.index_pending,
..Default::default()
},
shutdown_tx.clone(),
Some(block_tx),
erc_contracts,
);

let shutdown_rx = shutdown_tx.subscribe();
Expand Down Expand Up @@ -286,3 +327,29 @@ async fn spawn_rebuilding_graphql_server(
}
}
}

// Parses clap cli argument which is expected to be in the format:
// - erc_type:address:start_block
// - address:start_block (erc_type defaults to ERC20)
fn parse_erc_contracts(s: &str) -> anyhow::Result<Vec<ErcContract>> {
let parts: Vec<&str> = s.split(',').collect();
let mut contracts = Vec::new();
for part in parts {
match part.split(':').collect::<Vec<&str>>().as_slice() {
[r#type, address, start_block] => {
let contract_address = Felt::from_str(address).unwrap();
let start_block = start_block.parse::<u64>()?;
let r#type = r#type.parse::<ErcType>()?;
contracts.push(ErcContract { contract_address, start_block, r#type });
}
[address, start_block] => {
let contract_address = Felt::from_str(address)?;
let start_block = start_block.parse::<u64>()?;
let r#type = ErcType::default();
contracts.push(ErcContract { contract_address, start_block, r#type });
}
_ => return Err(anyhow::anyhow!("Invalid ERC contract format")),
}
}
Ok(contracts)
}
lambda-0x marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 8 additions & 0 deletions bin/torii/torii.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Example configuration file for Torii
# erc_contracts = [
# { contract_address = "0x1234567890abcdef1234567890abcdef12345678", start_block = 0, type = "ERC20" },
# { contract_address = "0xabcdef1234567890abcdef1234567890abcdef12", start_block = 1, type = "ERC721" },
# ]
erc_contracts = [
{ type = "ERC20", contract_address = "0x07fc13cc1f43f0b0519f84df8bf13bea4d9fd5ce2d748c3baf27bf90a565f60a", start_block = 0 },
]
1 change: 1 addition & 0 deletions crates/katana/scripts/deploy-erc20.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
starkli deploy --account account.json --keystore signer.json --keystore-password "" 0x02a8846878b6ad1f54f6ba46f5f40e11cee755c677f130b2c4b60566c9003f1f 0x626c6f62 0x424c42 0x8 u256:10000000000 0xb3ff441a68610b30fd5e2abbf3a1548eb6ba6f3559f2862bf2dc757e5828c
1 change: 1 addition & 0 deletions crates/torii/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ thiserror.workspace = true
tokio = { version = "1.32.0", features = [ "sync" ], default-features = true }
tokio-stream = "0.1.11"
tokio-util = "0.7.7"
toml.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
107 changes: 82 additions & 25 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::time::Duration;

use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::join_all;
use starknet::core::types::{
BlockId, BlockTag, Event, EventFilter, Felt, MaybePendingBlockWithTxHashes,
BlockId, BlockTag, Event, EventFilter, EventsPage, Felt, MaybePendingBlockWithTxHashes,
MaybePendingBlockWithTxs, ReceiptBlock, Transaction, TransactionReceipt,
TransactionReceiptWithBlockInfo,
};
Expand All @@ -18,6 +19,7 @@ use tracing::{error, info, trace, warn};

use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor};
use crate::sql::Sql;
use crate::types::ErcContract;

#[allow(missing_debug_implementations)]
pub struct Processors<P: Provider + Sync> {
Expand Down Expand Up @@ -62,6 +64,8 @@ pub struct Engine<P: Provider + Sync> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
// ERC tokens to index
tokens: HashMap<Felt, ErcContract>,
}

struct UnprocessedEvent {
Expand All @@ -78,8 +82,18 @@ impl<P: Provider + Sync> Engine<P> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tokens: HashMap<Felt, ErcContract>,
) -> Self {
Self { world, db, provider: Box::new(provider), processors, config, shutdown_tx, block_tx }
Self {
world,
db,
provider: Box::new(provider),
processors,
config,
shutdown_tx,
block_tx,
tokens,
}
}

pub async fn start(&mut self) -> Result<()> {
Expand Down Expand Up @@ -221,29 +235,45 @@ impl<P: Provider + Sync> Engine<P> {
to: u64,
pending_block_tx: Option<Felt>,
) -> Result<Option<Felt>> {
// Process all blocks from current to latest.
let get_events = |token: Option<String>| {
self.provider.get_events(
EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
address: Some(self.world.address),
keys: None,
},
token,
self.config.events_chunk_size,
)
};
// Fetch all events and collect them using `get_events` method on provider

// handle next events pages
let mut events_pages = vec![get_events(None).await?];
let mut fetch_all_events_tasks = vec![];

let events_filter = EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
address: Some(self.world.address),
keys: None,
};

let world_events_pages =
get_all_events(&self.provider, events_filter, self.config.events_chunk_size);
fetch_all_events_tasks.push(world_events_pages);

for token in &self.tokens {
let events_filter = EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
address: Some(*token.0),
keys: None,
};

let erc20_events_pages =
get_all_events(&self.provider, events_filter, self.config.events_chunk_size);

fetch_all_events_tasks.push(erc20_events_pages);
}

while let Some(token) = &events_pages.last().unwrap().continuation_token {
events_pages.push(get_events(Some(token.clone())).await?);
// wait for all events tasks to complete
let task_result = join_all(fetch_all_events_tasks).await;

let mut events_pages = vec![];
for result in task_result {
events_pages.extend(result?);
}

// Transactions & blocks to process
let mut last_block = 0_u64;
let mut blocks = BTreeMap::new();

// Flatten events pages and events according to the pending block cursor
Expand Down Expand Up @@ -277,12 +307,10 @@ impl<P: Provider + Sync> Engine<P> {
}
};

// Keep track of last block number and fetch block timestamp
if block_number > last_block {
// Fetch block timestamp if not already fetched and inserts into the map
if !blocks.contains_key(&block_number) {
let block_timestamp = self.get_block_timestamp(block_number).await?;
blocks.insert(block_number, block_timestamp);

last_block = block_number;
}

// Then we skip all transactions until we reach the last pending processed
Expand Down Expand Up @@ -378,7 +406,9 @@ impl<P: Provider + Sync> Engine<P> {
let mut world_event = false;
if let Some(events) = events {
for (event_idx, event) in events.iter().enumerate() {
if event.from_address != self.world.address {
if event.from_address != self.world.address
&& !self.tokens.contains_key(&event.from_address)
{
continue;
}

Expand Down Expand Up @@ -501,3 +531,30 @@ impl<P: Provider + Sync> Engine<P> {
Ok(())
}
}

async fn get_all_events<P>(
provider: &P,
events_filter: EventFilter,
events_chunk_size: u64,
) -> Result<Vec<EventsPage>>
where
P: Provider + Sync,
{
let mut events_pages = Vec::new();
let mut continuation_token = None;

loop {
let events_page = provider
.get_events(events_filter.clone(), continuation_token.clone(), events_chunk_size)
.await?;

continuation_token = events_page.continuation_token.clone();
events_pages.push(events_page);

if continuation_token.is_none() {
break;
}
}

Ok(events_pages)
}
58 changes: 58 additions & 0 deletions crates/torii/core/src/processors/erc20_legacy_transfer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceiptWithBlockInfo, U256};
use starknet::providers::Provider;
use tracing::info;

use super::EventProcessor;
use crate::sql::Sql;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_legacy_transfer";

#[derive(Default, Debug)]
pub struct Erc20LegacyTransferProcessor;

#[async_trait]
impl<P> EventProcessor<P> for Erc20LegacyTransferProcessor
where
P: Provider + Send + Sync + std::fmt::Debug,
{
fn event_key(&self) -> String {
"Transfer".to_string()
}

fn validate(&self, event: &Event) -> bool {
// key: [hash(Transfer)]
// data: [from, to, value.0, value.1]
if event.keys.len() == 1 && event.data.len() == 4 {
return true;
}

false
}

async fn process(
&self,
world: &WorldContractReader<P>,
db: &mut Sql,
_block_number: u64,
_block_timestamp: u64,
_transaction_receipt: &TransactionReceiptWithBlockInfo,
_event_id: &str,
event: &Event,
) -> Result<(), Error> {
let token_address = event.from_address;
let from = event.data[0];
let to = event.data[1];

let value = U256Cainome::cairo_deserialize(&event.data, 2)?;
let value = U256::from_words(value.low, value.high);

db.handle_erc20_transfer(token_address, from, to, value, world.provider()).await?;
info!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer");

Ok(())
}
}
Loading
Loading