Skip to content

Commit

Permalink
Torii indexer update db tables with event_id (#981)
Browse files Browse the repository at this point in the history
  • Loading branch information
broody authored Oct 5, 2023
1 parent 4c2b84f commit e96f08d
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 42 deletions.
16 changes: 10 additions & 6 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ where

process_block(self.db, self.provider, &self.processors.block, &block).await?;

for transaction in block.clone().transactions {
for (tx_idx, transaction) in block.clone().transactions.iter().enumerate() {
let invoke_transaction = match &transaction {
Transaction::Invoke(invoke_transaction) => invoke_transaction,
_ => continue,
Expand Down Expand Up @@ -170,15 +170,19 @@ where
continue;
}

let event_id = format!(
"0x{:064x}:0x{:04x}:0x{:04x}",
block.block_number, tx_idx, event_idx
);
process_event(
self.world,
self.db,
self.provider,
&self.processors.event,
&block,
&invoke_receipt,
&event_id,
event,
event_idx,
)
.await?;
}
Expand Down Expand Up @@ -232,16 +236,16 @@ async fn process_event<P: Provider + Sync>(
db: &mut Sql,
provider: &P,
processors: &[Box<dyn EventProcessor<P>>],
_block: &BlockWithTxs,
block: &BlockWithTxs,
invoke_receipt: &InvokeTransactionReceipt,
event_id: &str,
event: &Event,
event_idx: usize,
) -> Result<(), Box<dyn Error>> {
db.store_event(event, event_idx, invoke_receipt.transaction_hash);
db.store_event(event_id, event, invoke_receipt.transaction_hash);

for processor in processors {
if get_selector_from_name(&processor.event_key())? == event.keys[0] {
processor.process(world, db, provider, invoke_receipt, event, event_idx).await?;
processor.process(world, db, provider, block, invoke_receipt, event_id, event).await?;
}
}

Expand Down
5 changes: 4 additions & 1 deletion crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ pub mod store_set_record;
#[async_trait]
pub trait EventProcessor<P: Provider + Sync> {
fn event_key(&self) -> String;

#[allow(clippy::too_many_arguments)]
async fn process(
&self,
world: &WorldContractReader<'_, P>,
db: &mut Sql,
provider: &P,
block: &BlockWithTxs,
invoke_receipt: &InvokeTransactionReceipt,
event_id: &str,
event: &Event,
event_idx: usize,
) -> Result<(), Error>;
}

Expand Down
5 changes: 3 additions & 2 deletions crates/torii/core/src/processors/register_model.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use starknet::core::types::{BlockId, BlockTag, Event, InvokeTransactionReceipt};
use starknet::core::types::{BlockId, BlockTag, BlockWithTxs, Event, InvokeTransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use torii_client::contract::world::WorldContractReader;
Expand All @@ -23,9 +23,10 @@ impl<P: Provider + Sync + 'static> EventProcessor<P> for RegisterModelProcessor
world: &WorldContractReader<'_, P>,
db: &mut Sql,
_provider: &P,
_block: &BlockWithTxs,
_invoke_receipt: &InvokeTransactionReceipt,
_event_id: &str,
event: &Event,
_event_idx: usize,
) -> Result<(), Error> {
let name = parse_cairo_short_string(&event.data[0])?;

Expand Down
5 changes: 3 additions & 2 deletions crates/torii/core/src/processors/register_system.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::manifest::System;
use starknet::core::types::{Event, InvokeTransactionReceipt};
use starknet::core::types::{BlockWithTxs, Event, InvokeTransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use torii_client::contract::world::WorldContractReader;
Expand All @@ -24,9 +24,10 @@ impl<P: Provider + Sync> EventProcessor<P> for RegisterSystemProcessor {
_world: &WorldContractReader<'_, P>,
db: &mut Sql,
_provider: &P,
_block: &BlockWithTxs,
_invoke_receipt: &InvokeTransactionReceipt,
_event_id: &str,
event: &Event,
_event_idx: usize,
) -> Result<(), Error> {
let name = parse_cairo_short_string(&event.data[0])?;

Expand Down
9 changes: 5 additions & 4 deletions crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use starknet::core::types::{BlockId, BlockTag, Event, InvokeTransactionReceipt};
use starknet::core::types::{BlockId, BlockTag, BlockWithTxs, Event, InvokeTransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use starknet_crypto::FieldElement;
Expand All @@ -27,17 +27,18 @@ impl<P: Provider + Sync + 'static> EventProcessor<P> for StoreSetRecordProcessor
world: &WorldContractReader<'_, P>,
db: &mut Sql,
_provider: &P,
transaction_receipt: &InvokeTransactionReceipt,
_block: &BlockWithTxs,
_transaction_receipt: &InvokeTransactionReceipt,
event_id: &str,
event: &Event,
event_idx: usize,
) -> Result<(), Error> {
let name = parse_cairo_short_string(&event.data[MODEL_INDEX])?;
info!("store set record: {}", name);

let model = world.model(&name, BlockId::Tag(BlockTag::Pending)).await?;
let keys = values_at(&event.data, NUM_KEYS_INDEX)?;
let entity = model.entity(keys, BlockId::Tag(BlockTag::Pending)).await?;
db.set_entity(entity, transaction_receipt.block_number, event_idx).await?;
db.set_entity(entity, event_id).await?;
Ok(())
}
}
Expand Down
19 changes: 9 additions & 10 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Sql {
Ok(())
}

pub async fn set_entity(&mut self, entity: Ty, block_num: u64, event_idx: usize) -> Result<()> {
pub async fn set_entity(&mut self, entity: Ty, event_id: &str) -> Result<()> {
let keys = if let Ty::Struct(s) = &entity {
let mut keys = Vec::new();
for m in s.keys() {
Expand All @@ -155,15 +155,14 @@ impl Sql {

let keys_str = felts_sql_string(&keys);
self.query_queue.push(format!(
"INSERT INTO entities (id, keys, model_names) VALUES ('{}', '{}', '{}') ON \
CONFLICT(id) DO UPDATE SET model_names=excluded.model_names, \
updated_at=CURRENT_TIMESTAMP",
entity_id, keys_str, model_names
"INSERT INTO entities (id, keys, model_names, event_id) VALUES ('{}', '{}', '{}', \
'{}') ON CONFLICT(id) DO UPDATE SET model_names=excluded.model_names, \
updated_at=CURRENT_TIMESTAMP, event_id=excluded.event_id",
entity_id, keys_str, model_names, event_id
));

let path = vec![entity.name()];
let id = format!("0x{:064x}:0x{:04x}", block_num, event_idx);
self.build_set_entity_queries_recursive(path, &id, &entity_id, &entity);
self.build_set_entity_queries_recursive(path, event_id, &entity_id, &entity);

self.execute().await?;

Expand All @@ -177,6 +176,7 @@ impl Sql {
id: entity_id.clone(),
keys: keys_str,
model_names,
event_id: event_id.to_string(),
created_at,
updated_at: Utc::now(),
});
Expand Down Expand Up @@ -219,15 +219,14 @@ impl Sql {
self.query_queue.push(query);
}

pub fn store_event(&mut self, event: &Event, event_idx: usize, transaction_hash: FieldElement) {
pub fn store_event(&mut self, event_id: &str, event: &Event, transaction_hash: FieldElement) {
let keys_str = felts_sql_string(&event.keys);
let data_str = felts_sql_string(&event.data);

let id = format!("0x{:064x}:0x{:04x}", transaction_hash, event_idx);
let query = format!(
"INSERT OR IGNORE INTO events (id, keys, data, transaction_hash) VALUES ('{}', '{}', \
'{}', '{:#x}')",
id, keys_str, data_str, transaction_hash
event_id, keys_str, data_str, transaction_hash
);

self.query_queue.push(query);
Expand Down
11 changes: 7 additions & 4 deletions crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,25 @@ async fn test_load_from_remote() {
assert_eq!(packed_size, 1);
assert_eq!(unpacked_size, 2);

let event_id = format!("0x{:064x}:0x{:04x}:0x{:04x}", 0, 42, 69);
db.store_event(
&event_id,
&Event {
from_address: FieldElement::ONE,
keys: Vec::from([FieldElement::TWO]),
data: Vec::from([FieldElement::TWO, FieldElement::THREE]),
},
0,
FieldElement::THREE,
);

db.execute().await.unwrap();

let keys = format!("{:#x}/", FieldElement::TWO);
let query = format!("SELECT data, transaction_hash FROM events WHERE keys = '{}'", keys);
let (data, tx_hash): (String, String) = sqlx::query_as(&query).fetch_one(&pool).await.unwrap();
let query =
format!("SELECT keys, data, transaction_hash FROM events WHERE id = '{}'", event_id);
let (keys, data, tx_hash): (String, String, String) =
sqlx::query_as(&query).fetch_one(&pool).await.unwrap();

assert_eq!(keys, format!("{:#x}/", FieldElement::TWO));
assert_eq!(data, format!("{:#x}/{:#x}/", FieldElement::TWO, FieldElement::THREE));
assert_eq!(tx_hash, format!("{:#x}", FieldElement::THREE))
}
1 change: 1 addition & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl fmt::LowerHex for SQLFieldElement {
pub struct Entity {
pub id: String,
pub keys: String,
pub event_id: String,
pub model_names: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/graphql/src/object/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl Default for EntityObject {
(Name::new("id"), TypeData::Simple(TypeRef::named(TypeRef::ID))),
(Name::new("keys"), TypeData::Simple(TypeRef::named_list(TypeRef::STRING))),
(Name::new("modelNames"), TypeData::Simple(TypeRef::named(TypeRef::STRING))),
(Name::new("eventId"), TypeData::Simple(TypeRef::named(TypeRef::STRING))),
(
Name::new("createdAt"),
TypeData::Simple(TypeRef::named(GraphqlType::DateTime.to_string())),
Expand All @@ -52,6 +53,7 @@ impl EntityObject {
(Name::new("id"), Value::from(entity.id)),
(Name::new("keys"), Value::from(keys)),
(Name::new("modelNames"), Value::from(entity.model_names)),
(Name::new("eventId"), Value::from(entity.event_id)),
(
Name::new("createdAt"),
Value::from(entity.created_at.format("%Y-%m-%d %H:%M:%S").to_string()),
Expand Down
12 changes: 4 additions & 8 deletions crates/torii/graphql/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ pub async fn entity_fixtures(db: &mut Sql) {
},
],
}),
0,
0,
&format!("0x{:064x}:0x{:04x}:0x{:04x}", 0, 0, 0),
)
.await
.unwrap();
Expand Down Expand Up @@ -220,8 +219,7 @@ pub async fn entity_fixtures(db: &mut Sql) {
},
],
}),
0,
1,
&format!("0x{:064x}:0x{:04x}:0x{:04x}", 0, 0, 1),
)
.await
.unwrap();
Expand Down Expand Up @@ -258,8 +256,7 @@ pub async fn entity_fixtures(db: &mut Sql) {
},
],
}),
0,
2,
&format!("0x{:064x}:0x{:04x}:0x{:04x}", 0, 0, 2),
)
.await
.unwrap();
Expand Down Expand Up @@ -294,8 +291,7 @@ pub async fn entity_fixtures(db: &mut Sql) {
},
],
}),
0,
3,
&format!("0x{:064x}:0x{:04x}:0x{:04x}", 0, 0, 3),
)
.await
.unwrap();
Expand Down
8 changes: 3 additions & 5 deletions crates/torii/migrations/20230316154230_setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ CREATE INDEX idx_systems_created_at ON systems (created_at);
CREATE TABLE entities (
id TEXT NOT NULL PRIMARY KEY,
keys TEXT,
event_id TEXT NOT NULL,
model_names TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_entities_keys ON entities (keys);

CREATE INDEX idx_entities_keys_create_on ON entities (keys, created_at);
CREATE INDEX idx_entities_event_id ON entities (event_id);

CREATE TABLE events (
id TEXT NOT NULL PRIMARY KEY,
Expand All @@ -83,6 +83,4 @@ CREATE TABLE events (
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_events_keys ON events (keys);

CREATE INDEX idx_events_created_at ON events (created_at);
CREATE INDEX idx_events_keys ON events (keys);

0 comments on commit e96f08d

Please sign in to comment.