diff --git a/.env.example b/.env.example deleted file mode 100644 index e69de29..0000000 diff --git a/Cargo.toml b/Cargo.toml index e062cd2..79edd45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,9 @@ members = [ ] [workspace.dependencies] +alloy = { version = "0.7.2", features = ["full"] } sqlx = { version = "0.8", features = [ "bigdecimal", "chrono", "postgres", "runtime-tokio", "tls-native-tls" ] } +tokio = { version = "1.41.1", features = ["full"] } [profile.release] lto = true diff --git a/docker-compose.yml b/docker-compose.yml index 802a87d..6e0ba27 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,5 +22,7 @@ services: POSTGRES_DB: indexer_development POSTGRES_HOST_AUTH_METHOD: trust PGDATA: /var/lib/postgresql/data/pgdata + ports: + - 127.0.0.1:5432:5432 volumes: - ./pgdata:/var/lib/postgresql/data diff --git a/libs/indexer-db/Cargo.toml b/libs/indexer-db/Cargo.toml index c1e41ca..b276d43 100644 --- a/libs/indexer-db/Cargo.toml +++ b/libs/indexer-db/Cargo.toml @@ -5,4 +5,7 @@ version = '0.0.2' edition = '2021' [dependencies] +alloy = { workspace = true } +serde = "1.0.215" +serde_json = "1.0.133" sqlx = { workspace = true } diff --git a/libs/indexer-db/migrations/20241203080313_create_evm_logs_table.sql b/libs/indexer-db/migrations/20241203080313_create_evm_logs_table.sql index fa3869c..205fb49 100644 --- a/libs/indexer-db/migrations/20241203080313_create_evm_logs_table.sql +++ b/libs/indexer-db/migrations/20241203080313_create_evm_logs_table.sql @@ -1,8 +1,12 @@ CREATE UNLOGGED TABLE IF NOT EXISTS evm_logs ( - id integer PRIMARY KEY, - block_number numeric NOT NULL, - transaction_hash bytea NOT NULL, - address bytea NOT NULL, - data jsonb + id SERIAL PRIMARY KEY, + block_number NUMERIC NOT NULL, + address BYTEA NOT NULL, + transaction_hash BYTEA NOT NULL, + data BYTEA, + event_signature BYTEA, + topics BYTEA[], + + created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW() ); diff --git a/libs/indexer-db/migrations/20241205070014_create_chains.sql b/libs/indexer-db/migrations/20241205070014_create_chains.sql new file mode 100644 index 0000000..98d1128 --- /dev/null +++ b/libs/indexer-db/migrations/20241205070014_create_chains.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS evm_chains +( + id BIGINT PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + last_synced_block_number BIGINT NULL, + block_time INTEGER NOT NULL, + + created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW() +); + +CREATE OR REPLACE FUNCTION update_updated_at_column() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER update_evm_chains_updated_at +BEFORE UPDATE ON evm_chains +FOR EACH ROW +EXECUTE FUNCTION update_updated_at_column(); diff --git a/libs/indexer-db/src/entity.rs b/libs/indexer-db/src/entity.rs new file mode 100644 index 0000000..0f97a2d --- /dev/null +++ b/libs/indexer-db/src/entity.rs @@ -0,0 +1,2 @@ +pub mod evm_chains; +pub mod evm_logs; diff --git a/libs/indexer-db/src/entity/evm_chains.rs b/libs/indexer-db/src/entity/evm_chains.rs new file mode 100644 index 0000000..876a15c --- /dev/null +++ b/libs/indexer-db/src/entity/evm_chains.rs @@ -0,0 +1,42 @@ +use sqlx::{types::chrono, Executor, Postgres}; + +#[derive(sqlx::FromRow, Debug)] +pub struct EvmChains { + pub id: i64, + pub name: String, + pub last_synced_block_number: i64, + pub block_time: i32, + pub created_at: chrono::NaiveDateTime, + pub updated_at: chrono::NaiveDateTime, +} + +impl EvmChains { + pub async fn fetch_by_id<'c, E>(id: u64, connection: E) -> Result + where + E: Executor<'c, Database = Postgres>, + { + let query = "SELECT * FROM evm_chains WHERE id = $1"; + + sqlx::query_as::<_, EvmChains>(query) + .bind(id as i64) + .fetch_one(connection) + .await + } + + pub async fn update_last_synced_block_number<'c, E>( + &self, + block_number: u64, + connection: E, + ) -> Result + where + E: Executor<'c, Database = Postgres>, + { + let query = "UPDATE evm_chains SET last_synced_block_number = $1 WHERE id = $2 RETURNING *"; + + sqlx::query_as::<_, EvmChains>(query) + .bind(block_number as i64) + .bind(self.id) + .fetch_one(connection) + .await + } +} diff --git a/libs/indexer-db/src/entity/evm_logs.rs b/libs/indexer-db/src/entity/evm_logs.rs new file mode 100644 index 0000000..7e7c95b --- /dev/null +++ b/libs/indexer-db/src/entity/evm_logs.rs @@ -0,0 +1,63 @@ +use alloy::rpc::types::Log; +use sqlx::{ + types::{chrono, BigDecimal}, + Executor, Postgres, +}; + +#[derive(sqlx::FromRow, Debug)] +pub struct EvmLogs { + pub id: i32, + pub block_number: BigDecimal, + pub address: Vec, + pub transaction_hash: Vec, + pub data: Vec, + pub event_signature: Vec, + pub topics: Vec>, + pub created_at: chrono::NaiveDateTime, +} + +impl EvmLogs { + pub async fn create<'c, E>(log: Log, connection: E) -> Result + where + E: Executor<'c, Database = Postgres>, + { + let block_number: BigDecimal = log + .block_number + .ok_or_else(|| sqlx::Error::Decode("Missing block number".into()))? + .try_into() + .map_err(|_| sqlx::Error::Decode("Block number exceeds BigDecimal range".into()))?; + + let transaction_hash = log + .transaction_hash + .ok_or_else(|| sqlx::Error::Decode("Missing transaction hash".into()))? + .to_vec(); + + let address = log.address().to_vec(); + + let event_signature: &[u8] = log.topics()[0].as_slice(); + + let topics: Vec<&[u8]> = log.topics()[1..] + .iter() + .map(|topic| -> &[u8] { topic.as_slice() }) + .collect(); + + let log_data: Vec = log.inner.data.data.to_vec(); + + // Insert log into the database and return the inserted row + let query = r#" + INSERT INTO evm_logs (block_number, address, transaction_hash, event_signature, topics, data) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING * + "#; + + sqlx::query_as::<_, EvmLogs>(query) + .bind(block_number) + .bind(address) + .bind(transaction_hash) + .bind(event_signature) + .bind(topics) + .bind(log_data) + .fetch_one(connection) + .await + } +} diff --git a/libs/indexer-db/src/lib.rs b/libs/indexer-db/src/lib.rs index 8c898b2..b04159a 100644 --- a/libs/indexer-db/src/lib.rs +++ b/libs/indexer-db/src/lib.rs @@ -1,3 +1,32 @@ -pub fn greeting() { - println!("Hello world"); +use std::env; + +use sqlx::{ + postgres::{PgConnectOptions, PgPoolOptions}, + Pool, Postgres, +}; + +pub mod entity; + +mod defaults { + pub const DATABASE_MAX_CONNECTIONS: &str = "5"; +} + +async fn create_pool(max_connections: u32) -> Result, sqlx::Error> { + let conn = PgConnectOptions::new(); + + PgPoolOptions::new() + .max_connections(max_connections) + .connect_with(conn) + .await +} + +pub async fn initialize_database() -> Result, sqlx::Error> { + let db_max_connections = env::var("DATABASE_MAX_CONNECTIONS") + .unwrap_or(String::from(defaults::DATABASE_MAX_CONNECTIONS)) + .parse::() + .unwrap(); + + let pool = create_pool(db_max_connections).await.unwrap(); + + Ok(pool) } diff --git a/listener/.env.example b/listener/.env.example new file mode 100644 index 0000000..537eed2 --- /dev/null +++ b/listener/.env.example @@ -0,0 +1 @@ +RPC_URL= diff --git a/listener/Cargo.toml b/listener/Cargo.toml index 0c962db..b740a12 100644 --- a/listener/Cargo.toml +++ b/listener/Cargo.toml @@ -4,3 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] +alloy = { workspace = true } +tokio = { workspace = true } + +indexer-db = { path = '../libs/indexer-db', version = '0.0.2' } +tower = { version = "0.5.1", features = ["limit", "util"] } +sqlx = { workspace = true } diff --git a/listener/src/main.rs b/listener/src/main.rs index e7a11a9..30db8d0 100644 --- a/listener/src/main.rs +++ b/listener/src/main.rs @@ -1,3 +1,51 @@ -fn main() { - println!("Hello, world!"); +use std::{env, str::FromStr, time::Duration}; + +use alloy::primitives::Address; +use indexer_db::{entity::evm_chains::EvmChains, initialize_database}; +use service::ListenerService; +use tower::{Service, ServiceBuilder, ServiceExt}; + +mod service; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let rpc_url = env::var("RPC_URL").expect("Missing RPC_URL environment variable"); + let chain_id = env::var("CHAIN_ID") + .expect("Missing CHAIN_ID environment variable") + .parse::() + .expect("Invalid chain id"); + + let contract_addresses = + env::var("CONTRACT_ADDRESSES").expect("Missing RPC_URL environment variable"); + + let addresses_to_listen = contract_addresses.split(",").map(|address_str| { + let address = address_str.trim(); + Address::from_str(address).unwrap_or_else(|_| panic!("Invalid address: {address_str:?}")) + }); + + let db_pool = initialize_database().await.unwrap(); + + let evm_chain = EvmChains::fetch_by_id(chain_id, &db_pool).await?; + + let mut service = ServiceBuilder::new() + .rate_limit(1, Duration::from_secs(evm_chain.block_time as u64)) + .service(ListenerService { + chain_id, + db_pool, + rpc_url, + addresses: addresses_to_listen.collect(), + }); + + loop { + if service.ready().await.is_ok() { + match service.call(()).await { + Ok(()) => { + println!("Indexed block"); + } + Err(err) => { + eprintln!("Failed to indexed: {:?}", err); + } + } + } + } } diff --git a/listener/src/service.rs b/listener/src/service.rs new file mode 100644 index 0000000..44baacb --- /dev/null +++ b/listener/src/service.rs @@ -0,0 +1,98 @@ +use std::{ + error::Error, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use alloy::{ + eips::BlockNumberOrTag, + primitives::Address, + providers::{Provider, ProviderBuilder}, + rpc::types::Filter, +}; +use indexer_db::entity::{evm_chains::EvmChains, evm_logs::EvmLogs}; +use sqlx::{Pool, Postgres}; +use tower::Service; + +pub struct ListenerService { + pub chain_id: u64, + pub rpc_url: String, + pub addresses: Vec
, + pub db_pool: Pool, +} + +impl Service<()> for ListenerService { + type Response = (); + type Error = Box; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: ()) -> Self::Future { + let db_pool = self.db_pool.clone(); + let chain_id = self.chain_id; + let rpc_url = self.rpc_url.clone(); + let addresses = self.addresses.clone(); + + Box::pin(async move { fetch_and_save_logs(chain_id, rpc_url, db_pool, addresses).await }) + } +} + +pub async fn fetch_and_save_logs( + chain_id: u64, + rpc_url: String, + db_pool: Pool, + addresses: Vec
, +) -> Result<(), Box> { + let provider = ProviderBuilder::new().on_builtin(&rpc_url).await?; + let evm_chain = EvmChains::fetch_by_id(chain_id, &db_pool).await?; + let latest_block = provider.get_block_number().await?; + if latest_block == evm_chain.last_synced_block_number as u64 { + println!("Fully indexed"); + return Ok(()); + } + + let from_block_number = match evm_chain.last_synced_block_number as u64 { + 0 => 0, + block_number => block_number + 1_u64, + }; + + let to_block_number = match evm_chain.last_synced_block_number as u64 { + 0 => latest_block, + block_number => std::cmp::min(block_number + 25_u64, latest_block), + }; + + let filter = Filter::new() + .address(addresses) + .from_block(BlockNumberOrTag::Number(from_block_number)) + .to_block(BlockNumberOrTag::Number(to_block_number)); + + println!( + "Fetching blocks from {} to {}", + from_block_number, to_block_number + ); + + let logs = provider.get_logs(&filter).await?; + + let mut tx = db_pool.begin().await?; + for log in logs { + EvmLogs::create(log, &mut *tx).await.unwrap(); + } + + evm_chain + .update_last_synced_block_number(to_block_number, &mut *tx) + .await?; + + match tx.commit().await { + Ok(_) => println!( + "Saved logs for blocks: {} to {}", + from_block_number, to_block_number + ), + Err(err) => eprintln!("{}", err), + } + + Ok(()) +}