Skip to content

Commit

Permalink
Merge pull request #5 from abhinavmsra/fix/create-listener
Browse files Browse the repository at this point in the history
fix: create listener service
  • Loading branch information
abhinavmsra authored Dec 7, 2024
2 parents b65880b + b33adfe commit 21f5ab0
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 6 deletions.
Empty file removed .env.example
Empty file.
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
[workspace]
resolver = '2'
resolver = "2"
members = [
'libs/indexer-db'
"libs/indexer-db",
"listener"
]

[workspace.dependencies]
sqlx = { version = '0.8', features = [ 'bigdecimal', 'chrono', 'postgres', 'runtime-tokio', 'tls-native-tls' ] }
alloy = { version = "0.7.2", features = ["full"] }
sqlx = { version = "0.8", features = [ "bigdecimal", "chrono", "postgres", "runtime-tokio", "tls-native-tls" ] }
tokio = { version = "1.41.1", features = ["full"] }

[profile.release]
lto = true
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ services:
POSTGRES_DB: indexer_development
POSTGRES_HOST_AUTH_METHOD: trust
PGDATA: /var/lib/postgresql/data/pgdata
ports:
- 127.0.0.1:5432:5432
volumes:
- ./pgdata:/var/lib/postgresql/data
3 changes: 3 additions & 0 deletions libs/indexer-db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ version = '0.0.2'
edition = '2021'

[dependencies]
alloy = { workspace = true }
serde = "1.0.215"
serde_json = "1.0.133"
sqlx = { workspace = true }
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE UNLOGGED TABLE IF NOT EXISTS evm_logs
(
id SERIAL PRIMARY KEY,
block_number NUMERIC NOT NULL,
address BYTEA NOT NULL,
transaction_hash BYTEA NOT NULL,
data BYTEA,
event_signature BYTEA,
topics BYTEA[],

created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW()
);
23 changes: 23 additions & 0 deletions libs/indexer-db/migrations/20241205070014_create_chains.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE TABLE IF NOT EXISTS evm_chains
(
id BIGINT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
last_synced_block_number BIGINT NULL,
block_time INTEGER NOT NULL,

created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_evm_chains_updated_at
BEFORE UPDATE ON evm_chains
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
2 changes: 2 additions & 0 deletions libs/indexer-db/src/entity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod evm_chains;
pub mod evm_logs;
42 changes: 42 additions & 0 deletions libs/indexer-db/src/entity/evm_chains.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use sqlx::{types::chrono, Executor, Postgres};

#[derive(sqlx::FromRow, Debug)]
pub struct EvmChains {
pub id: i64,
pub name: String,
pub last_synced_block_number: i64,
pub block_time: i32,
pub created_at: chrono::NaiveDateTime,
pub updated_at: chrono::NaiveDateTime,
}

impl EvmChains {
pub async fn fetch_by_id<'c, E>(id: u64, connection: E) -> Result<EvmChains, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
let query = "SELECT * FROM evm_chains WHERE id = $1";

sqlx::query_as::<_, EvmChains>(query)
.bind(id as i64)
.fetch_one(connection)
.await
}

pub async fn update_last_synced_block_number<'c, E>(
&self,
block_number: u64,
connection: E,
) -> Result<EvmChains, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
let query = "UPDATE evm_chains SET last_synced_block_number = $1 WHERE id = $2 RETURNING *";

sqlx::query_as::<_, EvmChains>(query)
.bind(block_number as i64)
.bind(self.id)
.fetch_one(connection)
.await
}
}
63 changes: 63 additions & 0 deletions libs/indexer-db/src/entity/evm_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use alloy::rpc::types::Log;
use sqlx::{
types::{chrono, BigDecimal},
Executor, Postgres,
};

#[derive(sqlx::FromRow, Debug)]
pub struct EvmLogs {
pub id: i32,
pub block_number: BigDecimal,
pub address: Vec<u8>,
pub transaction_hash: Vec<u8>,
pub data: Vec<u8>,
pub event_signature: Vec<u8>,
pub topics: Vec<Vec<u8>>,
pub created_at: chrono::NaiveDateTime,
}

impl EvmLogs {
pub async fn create<'c, E>(log: Log, connection: E) -> Result<EvmLogs, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
let block_number: BigDecimal = log
.block_number
.ok_or_else(|| sqlx::Error::Decode("Missing block number".into()))?
.try_into()
.map_err(|_| sqlx::Error::Decode("Block number exceeds BigDecimal range".into()))?;

let transaction_hash = log
.transaction_hash
.ok_or_else(|| sqlx::Error::Decode("Missing transaction hash".into()))?
.to_vec();

let address = log.address().to_vec();

let event_signature: &[u8] = log.topics()[0].as_slice();

let topics: Vec<&[u8]> = log.topics()[1..]
.iter()
.map(|topic| -> &[u8] { topic.as_slice() })
.collect();

let log_data: Vec<u8> = log.inner.data.data.to_vec();

// Insert log into the database and return the inserted row
let query = r#"
INSERT INTO evm_logs (block_number, address, transaction_hash, event_signature, topics, data)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
"#;

sqlx::query_as::<_, EvmLogs>(query)
.bind(block_number)
.bind(address)
.bind(transaction_hash)
.bind(event_signature)
.bind(topics)
.bind(log_data)
.fetch_one(connection)
.await
}
}
33 changes: 31 additions & 2 deletions libs/indexer-db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
pub fn greeting() {
println!("Hello world");
use std::env;

use sqlx::{
postgres::{PgConnectOptions, PgPoolOptions},
Pool, Postgres,
};

pub mod entity;

mod defaults {
pub const DATABASE_MAX_CONNECTIONS: &str = "5";
}

async fn create_pool(max_connections: u32) -> Result<Pool<Postgres>, sqlx::Error> {
let conn = PgConnectOptions::new();

PgPoolOptions::new()
.max_connections(max_connections)
.connect_with(conn)
.await
}

pub async fn initialize_database() -> Result<Pool<Postgres>, sqlx::Error> {
let db_max_connections = env::var("DATABASE_MAX_CONNECTIONS")
.unwrap_or(String::from(defaults::DATABASE_MAX_CONNECTIONS))
.parse::<u32>()
.unwrap();

let pool = create_pool(db_max_connections).await.unwrap();

Ok(pool)
}
1 change: 1 addition & 0 deletions listener/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
RPC_URL=
12 changes: 12 additions & 0 deletions listener/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "listener"
version = "0.1.0"
edition = "2021"

[dependencies]
alloy = { workspace = true }
tokio = { workspace = true }

indexer-db = { path = '../libs/indexer-db', version = '0.0.2' }
tower = { version = "0.5.1", features = ["limit", "util"] }
sqlx = { workspace = true }
51 changes: 51 additions & 0 deletions listener/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::{env, str::FromStr, time::Duration};

use alloy::primitives::Address;
use indexer_db::{entity::evm_chains::EvmChains, initialize_database};
use service::ListenerService;
use tower::{Service, ServiceBuilder, ServiceExt};

mod service;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let rpc_url = env::var("RPC_URL").expect("Missing RPC_URL environment variable");
let chain_id = env::var("CHAIN_ID")
.expect("Missing CHAIN_ID environment variable")
.parse::<u64>()
.expect("Invalid chain id");

let contract_addresses =
env::var("CONTRACT_ADDRESSES").expect("Missing RPC_URL environment variable");

let addresses_to_listen = contract_addresses.split(",").map(|address_str| {
let address = address_str.trim();
Address::from_str(address).unwrap_or_else(|_| panic!("Invalid address: {address_str:?}"))
});

let db_pool = initialize_database().await.unwrap();

let evm_chain = EvmChains::fetch_by_id(chain_id, &db_pool).await?;

let mut service = ServiceBuilder::new()
.rate_limit(1, Duration::from_secs(evm_chain.block_time as u64))
.service(ListenerService {
chain_id,
db_pool,
rpc_url,
addresses: addresses_to_listen.collect(),
});

loop {
if service.ready().await.is_ok() {
match service.call(()).await {
Ok(()) => {
println!("Indexed block");
}
Err(err) => {
eprintln!("Failed to indexed: {:?}", err);
}
}
}
}
}
98 changes: 98 additions & 0 deletions listener/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::{
error::Error,
future::Future,
pin::Pin,
task::{Context, Poll},
};

use alloy::{
eips::BlockNumberOrTag,
primitives::Address,
providers::{Provider, ProviderBuilder},
rpc::types::Filter,
};
use indexer_db::entity::{evm_chains::EvmChains, evm_logs::EvmLogs};
use sqlx::{Pool, Postgres};
use tower::Service;

pub struct ListenerService {
pub chain_id: u64,
pub rpc_url: String,
pub addresses: Vec<Address>,
pub db_pool: Pool<Postgres>,
}

impl Service<()> for ListenerService {
type Response = ();
type Error = Box<dyn Error>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: ()) -> Self::Future {
let db_pool = self.db_pool.clone();
let chain_id = self.chain_id;
let rpc_url = self.rpc_url.clone();
let addresses = self.addresses.clone();

Box::pin(async move { fetch_and_save_logs(chain_id, rpc_url, db_pool, addresses).await })
}
}

pub async fn fetch_and_save_logs(
chain_id: u64,
rpc_url: String,
db_pool: Pool<Postgres>,
addresses: Vec<Address>,
) -> Result<(), Box<dyn Error>> {
let provider = ProviderBuilder::new().on_builtin(&rpc_url).await?;
let evm_chain = EvmChains::fetch_by_id(chain_id, &db_pool).await?;
let latest_block = provider.get_block_number().await?;
if latest_block == evm_chain.last_synced_block_number as u64 {
println!("Fully indexed");
return Ok(());
}

let from_block_number = match evm_chain.last_synced_block_number as u64 {
0 => 0,
block_number => block_number + 1_u64,
};

let to_block_number = match evm_chain.last_synced_block_number as u64 {
0 => latest_block,
block_number => std::cmp::min(block_number + 25_u64, latest_block),
};

let filter = Filter::new()
.address(addresses)
.from_block(BlockNumberOrTag::Number(from_block_number))
.to_block(BlockNumberOrTag::Number(to_block_number));

println!(
"Fetching blocks from {} to {}",
from_block_number, to_block_number
);

let logs = provider.get_logs(&filter).await?;

let mut tx = db_pool.begin().await?;
for log in logs {
EvmLogs::create(log, &mut *tx).await.unwrap();
}

evm_chain
.update_last_synced_block_number(to_block_number, &mut *tx)
.await?;

match tx.commit().await {
Ok(_) => println!(
"Saved logs for blocks: {} to {}",
from_block_number, to_block_number
),
Err(err) => eprintln!("{}", err),
}

Ok(())
}
2 changes: 1 addition & 1 deletion nx.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"@monodon/rust"
],
"release": {
"projects": ["libs/indexer-db"],
"projects": ["libs/indexer-db", "listener"],
"projectsRelationship": "independent",
"changelog": {
"projectChangelogs": {
Expand Down

0 comments on commit 21f5ab0

Please sign in to comment.