Skip to content

Commit

Permalink
feat: impl only index header mode by default (#31)
Browse files Browse the repository at this point in the history
The indexer now defaults to only indexing header, and allows switching
on indexing transactions when the tx flags is set.
  • Loading branch information
cwkang1998 authored Jan 22, 2025
1 parent 78c9370 commit f1c954f
Show file tree
Hide file tree
Showing 6 changed files with 500 additions and 30 deletions.
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
DB_CONNECTION_STRING=postgres://postgres:postgres@localhost:5433/fossil_test
NODE_CONNECTION_STRING=http://x.x.x.x:x
INDEX_TRANSACTIONS=false # flag to index transactions
30 changes: 24 additions & 6 deletions src/indexer/batch_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{error, info, warn};
use crate::{
db::DbConnection,
repositories::{
block_header::insert_block_header_query,
block_header::{insert_block_header_only_query, insert_block_header_query},
index_metadata::{get_index_metadata, update_backfilling_block_number_query},
},
rpc,
Expand All @@ -27,6 +27,7 @@ pub struct BatchIndexConfig {
pub poll_interval: u32,
pub rpc_timeout: u32,
pub index_batch_size: u32,
pub should_index_txs: bool,
}

impl Default for BatchIndexConfig {
Expand All @@ -36,6 +37,7 @@ impl Default for BatchIndexConfig {
poll_interval: 10,
rpc_timeout: 300,
index_batch_size: 50,
should_index_txs: false,
}
}
}
Expand Down Expand Up @@ -139,10 +141,17 @@ impl BatchIndexer {
let rpc_block_headers_futures: Vec<_> = block_range
.iter()
.map(|block_number| {
task::spawn(rpc::get_full_block_by_number(
*block_number,
Some(timeout.into()),
))
if self.config.should_index_txs {
task::spawn(rpc::get_full_block_by_number(
*block_number,
Some(timeout.into()),
))
} else {
task::spawn(rpc::get_full_block_only_by_number(
*block_number,
Some(timeout.into()),
))
}
})
.collect();

Expand All @@ -169,7 +178,16 @@ impl BatchIndexer {
if !has_err {
let mut db_tx = self.db.pool.begin().await?;

insert_block_header_query(&mut db_tx, block_headers).await?;
if self.config.should_index_txs {
insert_block_header_query(&mut db_tx, block_headers).await?;
} else {
insert_block_header_only_query(
&mut db_tx,
block_headers.iter().map(|h| h.clone().into()).collect(),
)
.await?;
}

update_backfilling_block_number_query(&mut db_tx, starting_block).await?;

// Commit at the end
Expand Down
33 changes: 26 additions & 7 deletions src/indexer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ pub async fn main() -> Result<()> {
let db_conn_string =
env::var("DB_CONNECTION_STRING").context("DB_CONNECTION_STRING must be set")?;

let should_index_txs = env::var("INDEX_TRANSACTIONS")
.unwrap_or_else(|_| "false".to_string())
.parse::<bool>()
.context("INDEX_TRANSACTIONS must be set")?;

// Initialize tracing subscriber
fmt().init();

Expand All @@ -60,7 +65,7 @@ pub async fn main() -> Result<()> {
setup_ctrlc_handler(Arc::clone(&should_terminate))?;

// Start by checking and updating the current status in the db.
// let indexing_metadata = get_base_index_metadata(db.clone()).await?;
get_base_index_metadata(db.clone()).await?;
let router_terminator = Arc::clone(&should_terminate);

// Setup the router which allows us to query health status and operations
Expand All @@ -79,9 +84,16 @@ pub async fn main() -> Result<()> {
})?;

// Start the quick indexer
let quick_index_config = QuickIndexConfig::default();
let quick_indexer =
QuickIndexer::new(quick_index_config, db.clone(), should_terminate.clone()).await;
let quick_indexer = QuickIndexer::new(
QuickIndexConfig {
should_index_txs,
index_batch_size: 100, // larger size since we are not indexing txs
..Default::default()
},
db.clone(),
should_terminate.clone(),
)
.await;

let quick_indexer_handle = thread::Builder::new()
.name("[quick_index]".to_owned())
Expand All @@ -96,9 +108,16 @@ pub async fn main() -> Result<()> {
})?;

// Start the batch indexer
let batch_index_config = BatchIndexConfig::default();
let batch_indexer =
BatchIndexer::new(batch_index_config, db.clone(), should_terminate.clone()).await;
let batch_indexer = BatchIndexer::new(
BatchIndexConfig {
should_index_txs,
index_batch_size: 100, // larger size since we are not indexing txs
..Default::default()
},
db.clone(),
should_terminate.clone(),
)
.await;

let batch_indexer_handle = thread::Builder::new()
.name("[batch_index]".to_owned())
Expand Down
30 changes: 24 additions & 6 deletions src/indexer/quick_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{error, info, warn};
use crate::{
db::DbConnection,
repositories::{
block_header::insert_block_header_query,
block_header::{insert_block_header_only_query, insert_block_header_query},
index_metadata::{get_index_metadata, update_latest_quick_index_block_number_query},
},
rpc::{self},
Expand All @@ -26,6 +26,7 @@ pub struct QuickIndexConfig {
pub poll_interval: u32,
pub rpc_timeout: u32,
pub index_batch_size: u32,
pub should_index_txs: bool,
}

impl Default for QuickIndexConfig {
Expand All @@ -35,6 +36,7 @@ impl Default for QuickIndexConfig {
poll_interval: 10,
rpc_timeout: 300,
index_batch_size: 20,
should_index_txs: false,
}
}
}
Expand Down Expand Up @@ -130,10 +132,17 @@ impl QuickIndexer {
let rpc_block_headers_futures: Vec<_> = block_range
.iter()
.map(|block_number| {
task::spawn(rpc::get_full_block_by_number(
*block_number,
Some(timeout.into()),
))
if self.config.should_index_txs {
task::spawn(rpc::get_full_block_by_number(
*block_number,
Some(timeout.into()),
))
} else {
task::spawn(rpc::get_full_block_only_by_number(
*block_number,
Some(timeout.into()),
))
}
})
.collect();

Expand All @@ -160,7 +169,16 @@ impl QuickIndexer {
if !has_err {
let mut db_tx = self.db.pool.begin().await?;

insert_block_header_query(&mut db_tx, block_headers).await?;
if self.config.should_index_txs {
insert_block_header_query(&mut db_tx, block_headers).await?;
} else {
insert_block_header_only_query(
&mut db_tx,
block_headers.iter().map(|h| h.clone().into()).collect(),
)
.await?;
}

update_latest_quick_index_block_number_query(&mut db_tx, ending_block).await?;

// Commit at the end
Expand Down
Loading

0 comments on commit f1c954f

Please sign in to comment.